tikv: 创建 TiKvServer 流程

Engine Trait

当 TiKV 的 Service 层收到请求之后,会根据请求的类型把这些请求转发到不同的模块进行处理。

  • 对于从 TiDB 下推的读请求,比如 sum,avg 操作,会转发到 Coprocessor 模块进行处理,
  • 对于 KV 请求会直接转发到 Storage 进行处理。

TiKV 把底层 KV 存储引擎抽象成一个 Engine trait,定义见 components/tikv_kv/src/lib.rs

Engine trait 主要提供了读和写两个接口,分别为 async_snapshot 和 async_write 。

  • 调用者把要写的内容交给 async_write , async_write 通过回调的方式告诉调用者写操作成功完成了或者遇到错误了。
  • 同样的,async_snapshot 通过回调的方式把数据库的快照返回给调用者,供调用者读,或者把遇到的错误返回给调用者。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/// Engine defines the common behaviour for a storage engine type.
pub trait Engine: Send + Clone + 'static {
type Snap: Snapshot;
type Local: LocalEngine;

/// Local storage engine.
fn kv_engine(&self) -> Self::Local;

fn snapshot_on_kv_engine(&self, start_key: &[u8], end_key: &[u8]) -> Result<Self::Snap>;

/// Write modifications into internal local engine directly.
fn modify_on_kv_engine(&self, modifies: Vec<Modify>) -> Result<()>;

fn async_snapshot(&self, ctx: SnapContext<'_>, cb: Callback<Self::Snap>) -> Result<()>;

fn async_write(&self, ctx: &Context, batch: WriteData, write_cb: Callback<()>) -> Result<()>;

fn write(&self, ctx: &Context, batch: WriteData) -> Result<()> {
let timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECS);
wait_op!(|cb| self.async_write(ctx, batch, cb), timeout)
.unwrap_or_else(|| Err(Error::from(ErrorInner::Timeout(timeout))))
}

fn snapshot(&self, ctx: SnapContext<'_>) -> Result<Self::Snap> {
let timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECS);
wait_op!(|cb| self.async_snapshot(ctx, cb), timeout)
.unwrap_or_else(|| Err(Error::from(ErrorInner::Timeout(timeout))))
}

fn put(&self, ctx: &Context, key: Key, value: Value) -> Result<()> {
self.put_cf(ctx, CF_DEFAULT, key, value)
}

fn delete(&self, ctx: &Context, key: Key) -> Result<()> {
self.delete_cf(ctx, CF_DEFAULT, key)
}
}

Engine 的实例是 RaftKV ,当调用 RaftKV 的 async_write 进行写入操作时,如果 async_write 通过回调方式成功返回了,说明写入操作已经通过 raft 复制给了大多数副本,并且在 leader 节点(调用者所在 TiKV)完成写入了,后续 leader 节点上的读就能够看到之前写入的内容

1
2
3
4
5
6
7
8
9
pub struct RaftKv<E, S>
where
E: KvEngine,
S: RaftStoreRouter<E> + LocalReadRouter<E> + 'static,
{
router: S,
engine: E,
txn_extra_scheduler: Option<Arc<dyn TxnExtraScheduler>>,
}

Storage

Storage 定义在 storage/mod.rs文件中,下面我们介绍下 Storage 几个重要的成员:

  • engine:代表的是底层的 KV 存储引擎,实际上就是 RaftKV。
  • sched:事务调度器,负责并发事务请求的调度工作。
  • read_pool:读取线程池,所有只读 KV 请求,包括事务的非事务的,如 raw get、txn kv get 等最终都会在这个线程池内执行。由于只读请求不需要获取 latches,所以为其分配一个独立的线程池直接执行,而不是与非只读事务共用事务调度器。
  • gc_worker:从 3.0 版本开始,TiKV 支持分布式 GC,每个 TiKV 有一个 gc_worker 线程负责定期从 PD 更新 safepoint,然后进行 GC 工作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ub struct Storage<E: Engine, L: LockManager, F: KvFormat> {
// TODO: Too many Arcs, would be slow when clone.
engine: E,

sched: TxnScheduler<E, L>,

/// The thread pool used to run most read operations.
read_pool: ReadPoolHandle,

concurrency_manager: ConcurrencyManager,

/// How many strong references. Thread pool and workers will be stopped
/// once there are no more references.
// TODO: This should be implemented in thread pool and worker.
refs: Arc<atomic::AtomicUsize>,

// Fields below are storage configurations.
max_key_size: usize,

resource_tag_factory: ResourceTagFactory,

api_version: ApiVersion, // TODO: remove this. Use `Api` instead.

quota_limiter: Arc<QuotaLimiter>,

_phantom: PhantomData<F>,
}

对于只读请求,包括 txn get 和 txn scan,Storage 调用 engine 的 async_snapshot 获取数据库快照之后交给 read_pool 线程池进行处理。写入请求,包括 prewrite、commit、rollback 等,直接交给 Scheduler 进行处理。Scheduler 的定义在 src/storage/txn/scheduler.rs 中。

create_raft_storage

由函数 create_raft_storage 创建一个 基于 RaftKV 引擎的 Storage 对象,位于 src/server/node.rs

创建一个Storage对象的调用栈:

1
main ==> run_tikv ==> run_impl ==> TiKvServer::init_servers ==> create_raft_storage ==> Server::new

TiKVServer

一个完整的 TiKv Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct TiKvServer<ER: RaftEngine> {
//...
engines: Option<TiKvEngines<RocksEngine, ER>>,
servers: Option<Servers<RocksEngine, ER>>,
//...
}

struct TiKvEngines<EK: KvEngine, ER: RaftEngine> {
engines: Engines<EK, ER>,
store_meta: Arc<Mutex<StoreMeta>>,
engine: RaftKv<EK, ServerRaftStoreRouter<EK, ER>>,
}

struct Servers<EK: KvEngine, ER: RaftEngine> {
lock_mgr: LockManager,
server: LocalServer<EK, ER>,
node: Node<RpcClient, EK, ER>,
importer: Arc<SstImporter>,
cdc_scheduler: tikv_util::worker::Scheduler<cdc::Task>,
cdc_memory_quota: MemoryQuota,
rsmeter_pubsub_service: resource_metering::PubSubService,
}