Engine Trait 当 TiKV 的 Service 层收到请求之后,会根据请求的类型把这些请求转发到不同的模块进行处理。
对于从 TiDB 下推的读请求,比如 sum,avg 操作,会转发到 Coprocessor 模块进行处理,
对于 KV 请求会直接转发到 Storage 进行处理。
TiKV 把底层 KV 存储引擎抽象成一个 Engine trait,定义见 components/tikv_kv/src/lib.rs
。
Engine trait 主要提供了读和写两个接口,分别为 async_snapshot 和 async_write 。
调用者把要写的内容交给 async_write , async_write 通过回调的方式告诉调用者写操作成功完成了或者遇到错误了。
同样的,async_snapshot 通过回调的方式把数据库的快照返回给调用者,供调用者读,或者把遇到的错误返回给调用者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 pub trait Engine : Send + Clone + 'static { type Snap : Snapshot; type Local : LocalEngine; fn kv_engine (&self ) -> Self ::Local; fn snapshot_on_kv_engine (&self , start_key: &[u8 ], end_key: &[u8 ]) -> Result <Self ::Snap>; fn modify_on_kv_engine (&self , modifies: Vec <Modify>) -> Result <()>; fn async_snapshot (&self , ctx: SnapContext<'_ >, cb: Callback<Self ::Snap>) -> Result <()>; fn async_write (&self , ctx: &Context, batch: WriteData, write_cb: Callback<()>) -> Result <()>; fn write (&self , ctx: &Context, batch: WriteData) -> Result <()> { let timeout = Duration::from_secs (DEFAULT_TIMEOUT_SECS); wait_op!(|cb| self .async_write (ctx, batch, cb), timeout) .unwrap_or_else (|| Err (Error::from (ErrorInner::Timeout (timeout)))) } fn snapshot (&self , ctx: SnapContext<'_ >) -> Result <Self ::Snap> { let timeout = Duration::from_secs (DEFAULT_TIMEOUT_SECS); wait_op!(|cb| self .async_snapshot (ctx, cb), timeout) .unwrap_or_else (|| Err (Error::from (ErrorInner::Timeout (timeout)))) } fn put (&self , ctx: &Context, key: Key, value: Value) -> Result <()> { self .put_cf (ctx, CF_DEFAULT, key, value) } fn delete (&self , ctx: &Context, key: Key) -> Result <()> { self .delete_cf (ctx, CF_DEFAULT, key) } }
Engine 的实例是 RaftKV ,当调用 RaftKV 的 async_write 进行写入操作时,如果 async_write 通过回调方式成功返回了,说明写入操作已经通过 raft 复制给了大多数副本,并且在 leader 节点(调用者所在 TiKV)完成写入了,后续 leader 节点上的读就能够看到之前写入的内容
1 2 3 4 5 6 7 8 9 pub struct RaftKv <E, S>where E: KvEngine, S: RaftStoreRouter<E> + LocalReadRouter<E> + 'static , { router: S, engine: E, txn_extra_scheduler: Option <Arc<dyn TxnExtraScheduler>>, }
Storage Storage 定义在 storage/mod.rs文件中,下面我们介绍下 Storage 几个重要的成员:
engine:代表的是底层的 KV 存储引擎,实际上就是 RaftKV。
sched:事务调度器,负责并发事务请求的调度工作。
read_pool:读取线程池,所有只读 KV 请求,包括事务的非事务的,如 raw get、txn kv get 等最终都会在这个线程池内执行。由于只读请求不需要获取 latches,所以为其分配一个独立的线程池直接执行,而不是与非只读事务共用事务调度器。
gc_worker:从 3.0 版本开始,TiKV 支持分布式 GC,每个 TiKV 有一个 gc_worker 线程负责定期从 PD 更新 safepoint,然后进行 GC 工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 ub struct Storage <E: Engine, L: LockManager, F: KvFormat> { engine: E, sched: TxnScheduler<E, L>, read_pool: ReadPoolHandle, concurrency_manager: ConcurrencyManager, refs: Arc<atomic::AtomicUsize>, max_key_size: usize , resource_tag_factory: ResourceTagFactory, api_version: ApiVersion, quota_limiter: Arc<QuotaLimiter>, _phantom: PhantomData<F>, }
对于只读请求,包括 txn get 和 txn scan,Storage 调用 engine 的 async_snapshot 获取数据库快照之后交给 read_pool 线程池进行处理。写入请求,包括 prewrite、commit、rollback 等,直接交给 Scheduler 进行处理。Scheduler 的定义在 src/storage/txn/scheduler.rs
中。
create_raft_storage 由函数 create_raft_storage 创建一个 基于 RaftKV 引擎的 Storage 对象,位于 src/server/node.rs
创建一个Storage对象的调用栈:
1 main ==> run_tikv ==> run_impl ==> TiKvServer::init_servers ==> create_raft_storage ==> Server::new
TiKVServer 一个完整的 TiKv Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 struct TiKvServer <ER: RaftEngine> { engines: Option <TiKvEngines<RocksEngine, ER>>, servers: Option <Servers<RocksEngine, ER>>, } struct TiKvEngines <EK: KvEngine, ER: RaftEngine> { engines: Engines<EK, ER>, store_meta: Arc<Mutex<StoreMeta>>, engine: RaftKv<EK, ServerRaftStoreRouter<EK, ER>>, } struct Servers <EK: KvEngine, ER: RaftEngine> { lock_mgr: LockManager, server: LocalServer<EK, ER>, node: Node<RpcClient, EK, ER>, importer: Arc<SstImporter>, cdc_scheduler: tikv_util::worker::Scheduler<cdc::Task>, cdc_memory_quota: MemoryQuota, rsmeter_pubsub_service: resource_metering::PubSubService, }