tikv: RaftStore propose 过程

PollHandler::handle_normal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
fn handle_normal(&mut self, peer: &mut impl DerefMut<Target = PeerFsm<EK, ER>>) -> HandleResult {
let mut handle_result = HandleResult::KeepProcessing;

while self.peer_msg_buf.len() < self.messages_per_tick {
match peer.receiver.try_recv() {
Ok(msg) => {
self.peer_msg_buf.push(msg);
}
Err(TryRecvError::Empty) => {
handle_result = HandleResult::stop_at(0, false);
break;
}
Err(TryRecvError::Disconnected) => {
peer.stop();
handle_result = HandleResult::stop_at(0, false);
break;
}
}
}

let mut delegate = PeerFsmDelegate::new(peer, &mut self.poll_ctx);
delegate.handle_msgs(&mut self.peer_msg_buf);
// No readiness is generated and using sync write, skipping calling ready and release early.
if !delegate.collect_ready() && self.poll_ctx.sync_write_worker.is_some() {
if let HandleResult::StopAt { skip_end, .. } = &mut handle_result {
*skip_end = true;
}
}

handle_result
}

Propose

这里有两类 propose 方法,

  1. 是积累 batch_propose,即积累了一定熟练的 proposal,再统一提交到状态机

    目前只是支持对 put/delete 进行 batch 操作,然后调用 BatchRaftCmdRequestBuilder::add 方法将 put/Delete command 缓存起来,当达到阈值BatchRaftCmdRequestBuilder::should_finish 返回 true,表示可以发送给 raft。

    Question: 如果没有发送出去就宕机了呢???

  2. 非 Put/Delete 指令,或者没有开启 batch 操作,就直接 propose

下面以 propose_batch_raft_command 操作为例进行说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
pub fn handle_msgs(&mut self, msgs: &mut Vec<PeerMsg<EK>>) {
//...
PeerMsg::RaftCommand(cmd) => {
if let Some(Err(e)) = cmd.extra_opts.deadline.map(|deadline| deadline.check()) {
cmd.callback.invoke_with_response(new_error(e.into()));
continue;
}

let req_size = cmd.request.compute_size();
if self.ctx.cfg.cmd_batch
&& self.fsm.batch_req_builder.can_batch(&self.ctx.cfg, &cmd.request, req_size)
// Avoid to merge requests with different `DiskFullOpt`s into one,
// so that normal writes can be rejected when proposing if the
// store's disk is full.
&& ((self.ctx.self_disk_usage == DiskUsage::Normal
&& !self.fsm.peer.disk_full_peers.majority())
|| cmd.extra_opts.disk_full_opt == DiskFullOpt::NotAllowedOnFull)
{
self.fsm.batch_req_builder.add(cmd, req_size);
if self.fsm.batch_req_builder.should_finish(&self.ctx.cfg) {
self.propose_batch_raft_command(true);
}
} else {
self.propose_raft_command(
cmd.request,
cmd.callback,
cmd.extra_opts.disk_full_opt,
)
}
}
// ...
}

PeerFsmDelegate::propose_batch_raft_command

propose_batch_raft_command 第二个参数 force 用来表示是否强制执行 propose 操作,

  • 可能为 true 的情况:
    • 当前积累的 proposals,已到达 batch 的条件;
    • 在同步写入 kv-rocksdb 情况下也需要每次都 propose。
  • 如果 force 为 false,并且当前等持久化的 proposals 个数已经达到阈值 cmd_batch_concurrent_ready_max_count,则也拒绝当前的 propose 请求。

如果上述条件 check 通过,则调用 BatchRaftCmdRequestBuilder::build 构建出 {request, callback},调用 propose_raft_command_internal 进入 propose 状态(没有开启 batch 时,也是进入 propose_raft_command_internal 函数)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fn propose_batch_raft_command(&mut self, force: bool) {
if self.fsm.batch_req_builder.request.is_none() {
return;
}

if !force
&& self.ctx.cfg.cmd_batch_concurrent_ready_max_count != 0
&& self.fsm.peer.unpersisted_ready_len()
>= self.ctx.cfg.cmd_batch_concurrent_ready_max_count
{
return;
}
fail_point!("propose_batch_raft_command", !force, |_| {});
let (request, callback) = self
.fsm
.batch_req_builder
.build(&mut self.ctx.raft_metrics)
.unwrap();
self.propose_raft_command_internal(request, callback, DiskFullOpt::NotAllowedOnFull)
}

PeerFsmDelegate::propose_raft_command_internal

propose_raft_command_internal 函数中,主要是做一些校验工作,通过之后才能完将 proposal 提交到 raft 中进行处理。

  • 当前 region 不能处于待删除的状态,即 pending_remove 不能为 false;
  • pre_propose_raft_command 函数中进一步 check

如果成功提交到 raft 中,则返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
fn propose_raft_command_internal(
&mut self,
mut msg: RaftCmdRequest,
cb: Callback<EK::Snapshot>,
diskfullopt: DiskFullOpt,
) {
if self.fsm.peer.pending_remove {
apply::notify_req_region_removed(self.region_id(), cb); // 回复客户端
return;
}

match self.pre_propose_raft_command(&msg) {
Ok(Some(resp)) => {
cb.invoke_with_response(resp);
return;
}
Err(e) => {
debug!(
"failed to propose";
"region_id" => self.region_id(),
"peer_id" => self.fsm.peer_id(),
"message" => ?msg,
"err" => %e,
);
cb.invoke_with_response(new_error(e));
return;
}
_ => (),
}
//...
}

Ready

Peer::handle_raft_ready_append(before ready)

根据 raft 状态机处理机制,propose 之后需要收集已 ready 的状态和数据并对其进行处理。调用链是 PeerFsmDelegate::collect_ready => Peer::handle_raft_ready_append

Peer::handle_raft_ready_append 其实就是个大杂烩,里面会对各种情况进行处理,比如 leader 切换、replicate、Snapshot 处理以及 apply 已经 commit 的数据、持久化等。,因此下面的执行流程并不是表示在一次 handle_raft_ready_append 调用中都同时发生的,即只有处于 ready 状态,才会被执行。

在 处理 ready 之前,先处理 snapshot 之前的相关状态:

  1. 先检测当前是否存在 snapshot,如果当前 region 正在 apply snapshot,则需要等待这个过程结束,此时 self.check_snap_status 返回 false;3

  2. 当前 region 是否存在未 committed 的 snapshot

    注意,这一步检测的是 uncommitted 的 snapshot,数据的流程是从 commit ==> apply

    1
    2
    3
    4
    pub fn ready_to_handle_pending_snap(&self) -> bool {
    self.last_applying_idx == self.get_store().applied_index()
    && self.pending_request_snapshot_count.load(Ordering::SeqCst) == 0
    }

    destroy_regions 如果当前没有待处理的 snapshot,再检测是否有待删除的 region,可能是 split/merge 过程导致的需要删除的 region.

  3. 如果 raft 当前没有 ready,则检测是否存在待生成 snapshot 的任务,如果有,则调用 ApplyFsm 异步去生成 snapshot

那么就有个问题,在什么情况下 self.mut_store().take_gen_snap_task() 的返回值不是 None 呢?

这个 snapshot 不是由外部产生的,是 Raft 状态机在 replication 的过程自动产生的:比如新加入一个 follower,或者一个 follower 滞后的 log 较多,此时 leader 就会调用 PeerStore::snapshot 函数生成包含最新 commit 数据的 snapshot 将其发送给 follower:

  • leader 需要生成 snapshot
  • follower 在接受到 snapshot 之后需要 apply snapshot

只要等 leader && follower 这个流程都执行完,Raft-group 才能处于执行 ready 状态

下面代码是去除注释和非必要啊部分后的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
pub fn handle_raft_ready_append<T: Transport>(
&mut self,
ctx: &mut PollContext<EK, ER, T>,
) -> Option<ReadyResult> {
// region 是否已删除 or 当前是否有 snapshot 正在处理
if self.pending_remove && !self.check_snap_status(ctx) {
return None;
}

let mut destroy_regions = vec![];
if self.has_pending_snapshot() {
if !self.ready_to_handle_pending_snap() && !self.unpersisted_readies.is_empty() {
return None;
}

let meta = ctx.store_meta.lock().unwrap();
if let Some(wait_destroy_regions) = meta.atomic_snap_regions.get(&self.region_id) {
for (source_region_id, is_ready) in wait_destroy_regions {
if !is_ready {
return None;
}
destroy_regions.push(meta.regions[source_region_id].clone());
}
}
}

if !self.raft_group.has_ready() {
// Generating snapshot task won't set ready for raft group.
if let Some(gen_task) = self.mut_store().take_gen_snap_task() {
self.pending_request_snapshot_count.fetch_add(1, Ordering::SeqCst);
// 让 ApplyFsm 去异步生成 snapshot
ctx.apply_router.schedule_task(self.region_id, ApplyTask::Snapshot(gen_task));
}
return None;
}
//...
}

Peer::handle_raft_ready_append(after ready)

下面就是 raft ready 才能处理的部分,关于 snapshot 的部分,后面再说。

  1. leader 是否发生切换、leader 状态是否更改、还需要对 leader 进行 renew_leader_lease 等一些列操作;

  2. ready.messages() 是前面用户 propose 的消息,需要将其 replicate 给其他 TiKV follower;

    send_raft_messages 中发送消息使用的就是 Transport::send,内部实际上封装的 raft_client

  3. 如果 TiKV 中已存在 committed 数据,那么下一步就是将 committed 的数据 apply 到 kv-rocksdb 中

  4. 经过第 3 步,此时最新的 committed 的数据都已经 applied。如果此时新来了 gen_snaps_task 并异步调用 PeerStorage::snapshot,这样可以保证生成的 snapshot 包含最新的数据;

  5. 最后一步,需要将 applied 的数据写入到 kv-rocksdb 中,具体是 sync_write 还是 async_write 取决于配置

代码简化后如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
pub fn handle_raft_ready_append<T: Transport>(
&mut self,
ctx: &mut PollContext<EK, ER, T>,
) -> Option<ReadyResult> {
//...
let mut ready = self.raft_group.ready();
// 1. 节点的角色是否发生了变化
self.on_role_changed(ctx, &ready);

if let Some(hs) = ready.hs() {
let pre_commit_index = self.get_store().commit_index();
assert!(hs.get_commit() >= pre_commit_index);
if self.is_leader() {
self.on_leader_commit_idx_changed(pre_commit_index, hs.get_commit());
}
}

// 2. 将 proposal 发送消息到其他 node
if !ready.messages().is_empty() {
assert!(self.is_leader());
let raft_msgs = self.build_raft_messages(ctx, ready.take_messages());
self.send_raft_messages(ctx, raft_msgs);
}

self.apply_reads(ctx, &ready);

// 3. 发送到 ApplyBatchSystem 进行持久化 ==> 应用到状态机
if !ready.committed_entries().is_empty() {
self.handle_raft_committed_entries(ctx, ready.take_committed_entries());
}
// 4. 检测是否有个 snapshot task,需要发送到 apply system
if let Some(gen_task) = self.mut_store().take_gen_snap_task() {
self.pending_request_snapshot_count.fetch_add(1, Ordering::SeqCst);
ctx.apply_router.schedule_task(self.region_id, ApplyTask::Snapshot(gen_task));
}

// 5. handle_raft_ready,将 raft 相关数据写入到 raft_rocksdb
let (res, mut task) = match self
.mut_store()
.handle_raft_ready(&mut ready, destroy_regions)
{
Ok(r) => r,
Err(e) => {
// We may have written something to writebatch and it can't be reverted, so has
// to panic here.
panic!("{} failed to handle raft ready: {:?}", self.tag, e)
}
};

let ready_number = ready.number();
let persisted_msgs = ready.take_persisted_messages();
let mut has_write_ready = false;
match &res {
// 根据配置,决定是同步 or 异步写入 rocksdb
//...
}

//...
Some(ReadyResult {
state_role,
has_new_entries,
has_write_ready,
})
}

下面主要讲解 Peer::handle_raft_ready_append 函数中和 replication 有关的部分,关于 leader 切换后面单独说。

PeerStorage::snapshot

PeerStorage::snapshot 函数是个异步任务,内部有个 {sender, recevier} 连接着生产消费者两端,本质上是用于创建生成 snapshot 的任务(gen_snap_task),而 gen_snap_task 会异步执行去生成 snapshot:

  1. 先检测当前是否已经有正在生成 snapshot 的任务;

    如果有,则先检测上一个异步 gen_snap_task 是否已经完成:即 receiver.try_recv() 返回的 snap,如果经过 PeerStore::validate_snap 函数校验,确认这个 snap 有效则直接返回,不会产生新的异步 gen_snap_task。

    然后,这个 snap 就会被 leader 发送给 follower。

  2. 如果当前没 gen_snap_task 任务,或者上一个异步 gen_snap_task 尚未完成,或者 snap 校验没有通过,则生成新的 gen_snap_task。

PeerStorage::snapshot 设置了 gen_snap_task 之后,在 Peer::handle_raft_ready_append 中检测到之后,就会基于 apply_router 将该异步任务发送给 ApplyFsm,让 ApplyFsm 真正创建 snpahot。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
pub fn snapshot(&self, request_index: u64, to: u64) -> raft::Result<Snapshot> {
let mut snap_state = self.snap_state.borrow_mut();
let mut tried_cnt = self.snap_tried_cnt.borrow_mut();

let (mut tried, mut last_canceled, mut snap) = (false, false, None);
if let SnapState::Generating {
ref canceled,
ref receiver,
..
} = *snap_state
{
tried = true;
last_canceled = canceled.load(Ordering::SeqCst);
match receiver.try_recv() {
Err(TryRecvError::Empty) => {
let e = raft::StorageError::SnapshotTemporarilyUnavailable;
return Err(raft::Error::Store(e));
}
Ok(s) if !last_canceled => snap = Some(s),
Err(TryRecvError::Disconnected) | Ok(_) => {}
}
}

if tried {
*snap_state = SnapState::Relax;
match snap {
Some(s) => {
*tried_cnt = 0;
if self.validate_snap(&s, request_index) {
return Ok(s);
}
}
None => { /* warn_log */}
}
}

if SnapState::Relax != *snap_state {
panic!("{} unexpected state: {:?}", self.tag, *snap_state);
}

// 2. 创建 gen_snap_task

let (sender, receiver) = mpsc::sync_channel(1);
let canceled = Arc::new(AtomicBool::new(false));
let index = Arc::new(AtomicU64::new(0));
*snap_state = SnapState::Generating {
canceled: canceled.clone(),
index: index.clone(),
receiver,
};
let mut to_store_id = 0; // follower 的 store_id
if let Some(peer) = self.region().get_peers().iter().find(|p| p.id == to) {
to_store_id = peer.store_id;
}
// 创建异步任务
let task = GenSnapTask::new(self.region.get_id(), index, canceled, sender, to_store_id);

let mut gen_snap_task = self.gen_snap_task.borrow_mut();
assert!(gen_snap_task.is_none());
*gen_snap_task = Some(task);
Err(raft::Error::Store(
raft::StorageError::SnapshotTemporarilyUnavailable,
))
}
ApplyFsm::handle_snapshot

ApplyFsm 在接受到 Msg::Snapshot 类型的消息后,就会进入 ApplyFsm::handle_snapshot 处理该 gen_snap_task。

由于生成 snapshot 是个 io 任务,比较耗时,自然也不会同步阻塞执行。

因此,handle_snapshot 函数内部调用 GenSnapTask::generate_and_schedule_snapshot 异步执行。

不过,在执行前,需要将这个 region 中 unpersisted 的数据给 flush 到 kv-rocksdb 中,确保生成的 snapshot 是最新的。这个 ApplyContext::flush 流程后面再说,目前先关注 gen_snap_task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
fn handle_snapshot(&mut self, apply_ctx: &mut ApplyContext<EK>, snap_task: GenSnapTask) {
if self.delegate.pending_remove || self.delegate.stopped {
return;
}
let applied_index = self.delegate.apply_state.get_applied_index();
let mut need_sync = apply_ctx
.apply_res
.iter()
.any(|res| res.region_id == self.delegate.region_id())
&& self.delegate.last_flush_applied_index != applied_index;

if need_sync {
self.delegate.write_apply_state(apply_ctx.kv_wb_mut());
apply_ctx.flush();
self.delegate.last_flush_applied_index = applied_index;
}

if let Err(e) = snap_task.generate_and_schedule_snapshot::<EK>(
apply_ctx.engine.snapshot(),
self.delegate.applied_index_term,
self.delegate.apply_state.clone(),
&apply_ctx.region_scheduler,
) { /* error!(...); */ }

self.delegate.pending_request_snapshot_count.fetch_sub(1, Ordering::SeqCst);
}
SnapContext::generate_snap

最终异步调用到 SnapContext::generate_snap 函数中,基于 store::do_snapshot 函数生成 snapshot:

  • notifier

    snapshot 创建后,notify 通知 gen_snap_task 的消费端,即 PeerStore::snapshot() 中的 receiver.try_recv() 返回 snap,这样就可以让 raft-group 的 ready.snapshot() 不为空;

    这样就可以在下一次调用 Peer::handle_raft_ready_append 函数时,继续处理已经 ready 的 snapshot,即将其发送到指定 follower,(这一部分在 Peer::handle_raft_ready 中)。

  • rotuer

    这里的 router 的消费端是 PeerDelegate,此处则是用于通知 leader 去处理已经生成的 snapshot。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
fn generate_snap(
&self,
region_id: u64,
last_applied_index_term: u64,
last_applied_state: RaftApplyState,
kv_snap: EK::Snapshot,
notifier: SyncSender<RaftSnapshot>,
for_balance: bool,
allow_multi_files_snapshot: bool,
) -> Result<()> {
// 基于 PeerStore 创建 snapshot
let snap = box_try!(store::do_snapshot::<EK>(
self.mgr.clone(),
&self.engine,
kv_snap,
region_id,
last_applied_index_term,
last_applied_state,
for_balance,
allow_multi_files_snapshot,
));

// 将生成的 snapshot 数据发送给消费端,
if let Err(e) = notifier.try_send(snap) {
// info!(...);
}
// 再通知消息类型
self.router.send(region_id, CasualMessage::SnapshotGenerated);
Ok(())
}
store::do_snapshot

生成 snapshot,则需要获得当前 kv-rocksdb 的最新数据和元信息,主要包括三部分:

  • RaftApplyState

    • apply_index 表示当前 apply raft log 的进度。如果 apply raft log 在 persist 前发生宕机,那么重启后就会从较老的 apply index 开始重放日志。所以 apply raftlog 是需要支持幂等的,对于一些特殊的不支持幂等的指令,就需要 apply 完立刻 persist 并 fsync。
    • 因为日志不可能无限增长,所以 TiKV 会定期做 CompactLog 来 gc raft log。truncated_state 表示上次做完 CompactLog 后现有日志的头部。
  • RegionLocalState

    • RegionLocalState 主要包含 Region 的 range、epoch、各个 peer 以及当前 Region 状态,其中 RegionEpoch 会在 ConfChange,以及 Split 和 Merge 的时候发生变化,在处理 Raft 消息时,我们会校验 RegionEpoch 并拒绝掉过期的消息
    • PeerState 状态比如 Normal、Applying、Merging、Tombstone等。
  • RaftSnapshotData

数据都是保存在 CF_RAFT column family 中。 代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
pub fn do_snapshot<E>(
mgr: SnapManager,
engine: &E,
kv_snap: E::Snapshot,
region_id: u64,
last_applied_index_term: u64,
last_applied_state: RaftApplyState,
for_balance: bool,
allow_multi_files_snapshot: bool,
) -> raft::Result<Snapshot>
where
E: KvEngine,
{
debug!( "begin to generate a snapshot"; "region_id" => region_id);

// 获得最新的 apply_state
let msg = kv_snap
.get_msg_cf(CF_RAFT, &keys::apply_state_key(region_id))
.map_err(into_other::<_, raft::Error>)?;
let apply_state: RaftApplyState = match msg {
None => { return Err(/**/); }
Some(state) => state,
};
assert_eq!(apply_state, last_applied_state);

let key = SnapKey::new(
region_id,
last_applied_index_term,
apply_state.get_applied_index(),
);

mgr.register(key.clone(), SnapEntry::Generating);
defer!(mgr.deregister(&key, &SnapEntry::Generating));

// 获得最新的 local_state
let state: RegionLocalState = kv_snap
.get_msg_cf(CF_RAFT, &keys::region_state_key(key.region_id))
.and_then(|res| match res {
None => Err(box_err!("region {} could not find region info", region_id)),
Some(state) => Ok(state),
})
.map_err(into_other::<_, raft::Error>)?;

if state.get_state() != PeerState::Normal {
return Err(storage_error(format!(
"snap job for {} seems stale, skip.",
region_id
)));
}

let mut snapshot = Snapshot::default();

// Set snapshot metadata.
snapshot.mut_metadata().set_index(key.idx);
snapshot.mut_metadata().set_term(key.term);

let conf_state = util::conf_state_from_region(state.get_region());
snapshot.mut_metadata().set_conf_state(conf_state);

// Set snapshot data.
let mut snap_data = RaftSnapshotData::default();
snap_data.set_region(state.get_region().clone());
let mut s = mgr.get_snapshot_for_building(&key)?;
let mut stat = SnapshotStatistics::new();
s.build(
engine,
&kv_snap,
state.get_region(),
&mut snap_data, // 获得 snapshot 数据
&mut stat,
allow_multi_files_snapshot,
)?;
snap_data.mut_meta().set_for_balance(for_balance);
let v = snap_data.write_to_bytes()?;
snapshot.set_data(v.into());

Ok(snapshot)
}

ApplyFsm::handle_apply

Peer::handle_raft_committed_entries

此函数用于处理已经 committed 数据:

  • 对于 leader 来说,即接受到超过半数 follower 回应的 proposals;
  • 对于 follower 来说,即已经 Append 到 raft-log 的 proposals。

Peer::handle_raft_committed_entries 函数中构建的 ApplyTask,最终在 ApplyFsm::handle_apply 中使用

  • 更新当前 ApplyDelegate 的状态及元数据;
  • 添加 proposals
  • handle_raft_committed_entries

代码简化如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
fn handle_apply(&mut self, apply_ctx: &mut ApplyContext<EK>, mut apply: Apply<EK::Snapshot>) {
if self.delegate.pending_remove || self.delegate.stopped {
return;
}

let mut entries = Vec::new();

let mut dangle_size = 0;
for cached_entries in apply.entries {
let (e, sz) = cached_entries.take_entries();
dangle_size += sz;
if e.is_empty() {
let rid = self.delegate.region_id();
let StdRange { start, end } = cached_entries.range;
self.delegate
.raft_engine
.fetch_entries_to(rid, start, end, None, &mut entries)
.unwrap();
} else if entries.is_empty() {
entries = e;
} else {
entries.extend(e);
}
}
self.delegate.term = apply.term;
let cur_state = (apply.commit_index, apply.commit_term);
self.delegate.apply_state.set_commit_index(cur_state.0);
self.delegate.apply_state.set_commit_term(cur_state.1);

// 添加 proposal
self.append_proposal(apply.cbs.drain(..));
// If there is any apply task, we change this fsm to normal-priority.
// When it meets a ingest-request or a delete-range request, it will change to
// low-priority.
self.delegate.priority = Priority::Normal;
self.delegate
.handle_raft_committed_entries(apply_ctx, entries.drain(..));
fail_point!("post_handle_apply_1003", self.delegate.id() == 1003, |_| {});
}
ApplyFsm::append_proposal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/// Handles proposals, and appends the commands to the apply delegate.
fn append_proposal(&mut self, props_drainer: Drain<'_, Proposal<EK::Snapshot>>) {
let (region_id, peer_id) = (self.delegate.region_id(), self.delegate.id());
if self.delegate.stopped {
for p in props_drainer {
let cmd = PendingCmd::<EK::Snapshot>::new(p.index, p.term, p.cb);
notify_stale_command(region_id, peer_id, self.delegate.term, cmd);
}
return;
}

let propose_num = props_drainer.len();
for p in props_drainer {
let cmd = PendingCmd::new(p.index, p.term, p.cb);
if p.is_conf_change {
if let Some(cmd) = self.delegate.pending_cmds.take_conf_change() {
notify_stale_command(region_id, peer_id, self.delegate.term, cmd);
}
self.delegate.pending_cmds.set_conf_change(cmd);
} else {
self.delegate.pending_cmds.append_normal(cmd);
}
}
}
ApplyDelegate::handle_raft_committed_entries

这里的处理,即对 committed entries 进行 apply。

函数有个 ApplyContext 参数,用来控制一些调用:

1
ApplyContext::prepare_for -> ApplyContext::commit [-> ApplyContext::commit ...] -> ApplyContext::finish_for.

从这个函数处理流程可以看出也是符合这个流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/// Handles all the committed_entries, namely, applies the committed entries.
fn handle_raft_committed_entries(
&mut self,
apply_ctx: &mut ApplyContext<EK>,
mut committed_entries_drainer: Drain<'_, Entry>,
) {
if committed_entries_drainer.len() == 0 {
return;
}
apply_ctx.prepare_for(self);
// If we send multiple ConfChange commands, only first one will be proposed correctly,
// others will be saved as a normal entry with no data, so we must re-propose these
// commands again.
apply_ctx.committed_count += committed_entries_drainer.len();
let mut results = VecDeque::new();
while let Some(entry) = committed_entries_drainer.next() {
if self.pending_remove {
break;
}

let res = match entry.get_entry_type() {
EntryType::EntryNormal => self.handle_raft_entry_normal(apply_ctx, &entry),
EntryType::EntryConfChange | EntryType::EntryConfChangeV2 => {
self.handle_raft_entry_conf_change(apply_ctx, &entry)
}
};

match res {
ApplyResult::None => {}
ApplyResult::Res(res) => results.push_back(res),
ApplyResult::Yield | ApplyResult::WaitMergeSource(_) => {
// Both cancel and merge will yield current processing.
apply_ctx.committed_count -= committed_entries_drainer.len() + 1;
let mut pending_entries =
Vec::with_capacity(committed_entries_drainer.len() + 1);
// Note that current entry is skipped when yield.
pending_entries.push(entry);
pending_entries.extend(committed_entries_drainer);
apply_ctx.finish_for(self, results);
self.yield_state = Some(YieldState {
pending_entries,
pending_msgs: Vec::default(),
heap_size: None,
});
if let ApplyResult::WaitMergeSource(logs_up_to_date) = res {
self.wait_merge_state = Some(WaitSourceMergeState { logs_up_to_date });
}
return;
}
}
}
apply_ctx.finish_for(self, results);

if self.pending_remove {
self.destroy(apply_ctx);
}
}
ApplyContext::prepare_for
ApplyContext::commit

在 commit 函数中先将 RaftApplyState 写入 kv_wb,这样在 commit_opt 函数中就可以一起将最新的 delegate.apply_state 和 kv 数据一起持久化到 rocksdb 中,并在持久化完成后,更新 ApplyDelegate::last_flush_applied_index 字段。

因此, commit 函数返回后,所有 unflushed 的数据都持久化到 rocksdb 中了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
pub fn commit(&mut self, delegate: &mut ApplyDelegate<EK>) {
// 写入 {apply_state_key, RaftApplyState} 最新的状态
if delegate.last_flush_applied_index < delegate.apply_state.get_applied_index() {
delegate.write_apply_state(self.kv_wb_mut());
}
// 再写入数据到 write_batch
self.commit_opt(delegate, true);
}

fn commit_opt(&mut self, delegate: &mut ApplyDelegate<EK>, persistent: bool) {
if persistent {
self.write_to_db();
self.prepare_for(delegate);
delegate.last_flush_applied_index = delegate.apply_state.get_applied_index()
}
self.kv_wb_last_bytes = self.kv_wb().data_size() as u64;
self.kv_wb_last_keys = self.kv_wb().count() as u64;
}
ApplyContext::write_to_db

write_to_db 主要就是将 pendig_sst、kv、delete_sst 数据导入到 rocksdb 中,没有什么特别的操作。持久化完成,则调用 callback 回复 client。

简化后如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
pub fn write_to_db(&mut self) -> bool {
let need_sync = self.sync_log_hint;
// There may be put and delete requests after ingest request in the same fsm.
// To guarantee the correct order, we must ingest the pending_sst first, and
// then persist the kv write batch to engine.
if !self.pending_ssts.is_empty() {
let tag = self.tag.clone();
self.importer
.ingest(&self.pending_ssts, &self.engine)
.unwrap_or_else(|e| {
panic!(
"{} failed to ingest ssts {:?}: {:?}",
tag, self.pending_ssts, e
);
});
self.pending_ssts = vec![];
}
if !self.kv_wb_mut().is_empty() {
let mut write_opts = engine_traits::WriteOptions::new();
write_opts.set_sync(need_sync);
// 写入 rocksdb,是否需要 sync_log
self.kv_wb().write_opt(&write_opts).unwrap_or_else(|e| {
panic!("failed to write to engine: {:?}", e);
});
self.sync_log_hint = false;
let data_size = self.kv_wb().data_size();
if data_size > APPLY_WB_SHRINK_SIZE {
// 内存太大,不释放会有影响,因此重新生成 write_batch
self.kv_wb = self.engine.write_batch_with_cap(DEFAULT_APPLY_WB_SIZE);
} else {
// 内存不大,那就不用释放了,直接清除原来的数据即可
self.kv_wb_mut().clear();
}
self.kv_wb_last_bytes = 0;
self.kv_wb_last_keys = 0;
}
if !self.delete_ssts.is_empty() {
let tag = self.tag.clone();
for sst in self.delete_ssts.drain(..) {
self.importer.delete(&sst.meta).unwrap_or_else(|e| {
panic!("{} cleanup ingested file {:?}: {:?}", tag, sst, e);
});
}
}
// Take the applied commands and their callback
let ApplyCallbackBatch {
cmd_batch,
batch_max_level,
mut cb_batch,
} = mem::replace(&mut self.applied_batch, ApplyCallbackBatch::new());
// Invoke callbacks
for (cb, resp) in cb_batch.drain(..) {
cb.invoke_with_response(resp); // 回复客户端
}
need_sync
}
ApplyContext::finish_for

finish_for 函数和 commit 函数功能类似,区别在于

  • commit_opt(delegate, false),即不会持久化数据到 rocksdb,仅是更新统计数字。
  • 保存 ApplyRes

留在 ApplyPoller::end 中处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
pub fn finish_for(
&mut self,
delegate: &mut ApplyDelegate<EK>,
results: VecDeque<ExecResult<EK::Snapshot>>,
) {
if !delegate.pending_remove {
delegate.write_apply_state(self.kv_wb_mut());
}
self.commit_opt(delegate, false);
self.apply_res.push(ApplyRes {
region_id: delegate.region_id(),
apply_state: delegate.apply_state.clone(),
exec_res: results,
metrics: delegate.metrics.clone(),
applied_index_term: delegate.applied_index_term,
bucket_stat: delegate.buckets.clone().map(Box::new),
});
}
ApplyDelegate::handle_raft_entry_normal

ApplyDelegate::process_raft_cmd 函数真正处理本次 proposals 之前,需要先保证之前的数据都已经持久化到 rocksdb 中,即 [last_flush_applied_index, apply_state.applied_index) 区间没有数据。

process_raft_cmd 中处理完 proposals,就更新 apply_state.applied_index,即增大了 [last_flush_applied_index, apply_state.applied_index) 区间,如果是有多个 proposals,那么处理下一个 proposal 就会先持久化数据,最后一个 proposal 则在 handle_destory 处理。

TODO: 这里有个时间片 && 优先级的设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
fn handle_raft_entry_normal(
&mut self,
apply_ctx: &mut ApplyContext<EK>,
entry: &Entry,
) -> ApplyResult<EK::Snapshot> {
let index = entry.get_index();
let term = entry.get_term();
let data = entry.get_data();

if !data.is_empty() {
let cmd = util::parse_data_at(data, index, &self.tag);
// 调整优先级
if apply_ctx.yield_high_latency_operation && has_high_latency_operation(&cmd) {
self.priority = Priority::Low;
}
// 是否存在 unflushed 的数据
let mut has_unflushed_data =
self.last_flush_applied_index != self.apply_state.get_applied_index();
if has_unflushed_data && should_write_to_engine(&cmd)
|| apply_ctx.kv_wb().should_write_to_engine()
{
// 这是将数据写入 rocksdb
apply_ctx.commit(self);
// 是否耗时过长,切出时间片
if let Some(start) = self.handle_start.as_ref() {
if start.saturating_elapsed() >= apply_ctx.yield_duration {
return ApplyResult::Yield;
}
}
has_unflushed_data = false;
}
// 可能是上面切出的情况
if self.priority != apply_ctx.priority {
if has_unflushed_data {
apply_ctx.commit(self);
}
return ApplyResult::Yield;
}

// 处理真正的指令
return self.process_raft_cmd(apply_ctx, index, term, cmd);
}

self.apply_state.set_applied_index(index);
self.applied_index_term = term;
assert!(term > 0);

// 1. When a peer become leader, it will send an empty entry.
// 2. When a leader tries to read index during transferring leader,
// it will also propose an empty entry. But that entry will not contain
// any associated callback. So no need to clear callback.
// 清理所有的过时的 normal proposal
while let Some(mut cmd) = self.pending_cmds.pop_normal(u64::MAX, term - 1) {
if let Some(cb) = cmd.cb.take() {
apply_ctx
.applied_batch
.push_cb(cb, cmd_resp::err_resp(Error::StaleCommand, term));
}
}
ApplyResult::None
}
ApplyDelegate::end

执行流程是: ApplyDelegate::handle_normal –> ApplyDelegate::end;

上面的都是在 ApplyDelegate::handle_normal 中执行,最后在 ApplyDelegate::end 做收尾工作,这里就是向 peer 返回执行结果。

1
2
3
4
5
6
fn end(&mut self, fsms: &mut [Option<impl DerefMut<Target = ApplyFsm<EK>>>]) {
self.apply_ctx.flush(); // 向 peer 返回执行结果
for fsm in fsms.iter_mut().flatten() {
fsm.delegate.last_flush_applied_index = fsm.delegate.apply_state.get_applied_index();
}
}
ApplyContext::flush

主要做两个事:

  1. 将 unflushed 数据写入到 rocksdb 中,
  2. 并将 ApplyRes 返回给 PeerFsm,更新 RaftLocalState、region 等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
pub fn flush(&mut self) -> bool {
// Write to engine
// raftstore.sync-log = true means we need prevent data loss when power failure.
// take raft log gc for example, we write kv WAL first, then write raft WAL,
// if power failure happen, raft WAL may synced to disk, but kv WAL may not.
// so we use sync-log flag here.
let is_synced = self.write_to_db();

if !self.apply_res.is_empty() {
let apply_res = mem::take(&mut self.apply_res);
self.notifier.notify(apply_res); // 通知 PeerFsm 本次的 ApplyRes
}

slow_log!(
elapsed,
"{} handle ready {} committed entries",
self.tag,
self.committed_count
);
self.committed_count = 0;
is_synced
}

PeerStorage::handle_raft_ready

raft_ready:

  • leader: 可以处理新增的 entries
  • follower: 可以处理新增的 snapshot、entries

reft_ready,则可以将新增数据写入自己的 raft-rocksdb 中,这里是以 rocksdb 的 write_batch 为单位,即将一个 ready 中的数据写入到 write_batch,然后一起 commit 到 raft-rocksdb 中。

因此 handle_raft_ready 函数返回,如果存在待写的数据,则都在返回的 write_task 中,用 HandleReadyResult 来标志是否有数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
pub fn handle_raft_ready(
&mut self,
ready: &mut Ready,
destroy_regions: Vec<metapb::Region>,
) -> Result<(HandleReadyResult, WriteTask<EK, ER>)> {
let region_id = self.get_region_id();
let prev_raft_state = self.raft_state.clone();

let mut write_task = WriteTask::new(region_id, self.peer_id, ready.number());

let mut res = HandleReadyResult::SendIOTask; // 有数据,但是没有 snapshot
// 1. 处理 snapshot
if !ready.snapshot().is_empty() {
fail_point!("raft_before_apply_snap");
let last_first_index = self.first_index();
let snap_region =
self.apply_snapshot(ready.snapshot(), &mut write_task, &destroy_regions)?; // apply 的是元数据

res = HandleReadyResult::Snapshot {
msgs: ready.take_persisted_messages(),
snap_region,
destroy_regions,
last_first_index,
};
fail_point!("raft_after_apply_snap");
};

// 2. 处理新增的 entries
if !ready.entries().is_empty() {
self.append(ready.take_entries(), &mut write_task);
}

// Last index is 0 means the peer is created from raft message
// and has not applied snapshot yet, so skip persistent hard state.
if self.raft_state.get_last_index() > 0 {
if let Some(hs) = ready.hs() {
self.raft_state.set_hard_state(hs.clone());
}
}

// Save raft state if it has changed or there is a snapshot.
if prev_raft_state != self.raft_state || !ready.snapshot().is_empty() {
write_task.raft_state = Some(self.raft_state.clone());
}

if !ready.snapshot().is_empty() {
// In case of restart happens when we just write region state to Applying,
// but not write raft_local_state to raft db in time.
// We write raft state to kv db, with last index set to snap index,
// in case of recv raft log after snapshot.
self.save_snapshot_raft_state_to(
ready.snapshot().get_metadata().get_index(),
write_task.kv_wb.as_mut().unwrap(),
)?;
self.save_apply_state_to(write_task.kv_wb.as_mut().unwrap())?;
}

if !write_task.has_data() {
res = HandleReadyResult::NoIOTask;
}

Ok((res, write_task))
}

然后 Peer::handle_raft_reafy_append 中再基于 handle_raft_ready 的返回结果,进行处理。

每次 raft ready,都会有个 ready_number,这个用于异步确认当前一些过时的消息。

如果有新增的 entries or snapshot,在 PeerStorage::handle_raft_ready 中更新 meta 信息后,下面就需要将真实的 raft 数据写入到 raft-rocksdb。这里有两种方案:

  • 同步写:此时 PollContext::sync_write_worker 字段的值不为 None,直接调用 handle_write_task 完成写;

  • 异步写:将 write_task 发送到 StoreWriters 的线程池中,在线程池中调用 handle_write_task 完成写。

    async_write 问题在于线程吃中写操作完成后怎么通知 PeerFsm 写操作完成了以及哪次的 raft-reay 完成了写?

    • 这在 StoreWriters 内部包含了一个 PeerFsm 的 router 的 sender,通过他就能通知 PeerFsm;
    • 第二个问题是靠 ready_number 建立联系的

    StoreWriters 完成后发的消息是

    1
    2
    3
    4
    PeerMsg::Persisted {
    peer_id, // 哪个 peer 完成了
    ready_number, // 哪次 raft-ready 完成了写
    } => self.on_persisted_msg(peer_id, ready_number),

    PeerFsm 接受到 PeerMsg::Persisted 后就会进行后续的处理。

    异步写时,没有完成的操作都会缓存在 self.unpersisted_readies 中,也依赖 ready_number 来建立联系。

Ready::persisted_message()Ready::messsage() 内部都是message,只是区别于leader、follower

  • leader:调用的是 ready.take_messages() 获得的 message 需要发送给 follower,follower 调用 ready.take_messages() 返回空的数组;
  • follower:调用的是 ready.take_persisted_messages() 获得的是返回给 leader 的response,leader 调用 ready.take_persisted_messages() 返回的也是空数组;

只要 ready.take_persisted_messages() 返回了非空数组:

  • 如果当前 write_tasks 非空:此时返回的 res 不是 HandleReadyResult::NoIOTask,无论是同步还是异步写,都会在 handle_write_task 中等待写完成后,将这个 msg 发送给 leader;
  • 如果当前 write_task 没有数据,返回的 res 是 HandleReadyResult::NoIOTask:
    • 如果之前有未完成的写,则将其添加到 unpersisted_readies.raft_msgs 中,等写完成后,回应 leader
    • 如果当前没有未完成的写,则直接将其发送出去,回应 leader

关于 advance 下一节再说。 其余的主要逻辑大致如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
let ready_number = ready.number();
let persisted_msgs = ready.take_persisted_messages();
let mut has_write_ready = false;
match &res {
HandleReadyResult::SendIOTask | HandleReadyResult::Snapshot { .. } => {
if !persisted_msgs.is_empty() {
task.messages = self.build_raft_messages(ctx, persisted_msgs);
}

if !request_times.is_empty() {
task.request_times = request_times;
}

/// 根据配置,决定是同步还是异步写入 rocksdb
if let Some(write_worker) = &mut ctx.sync_write_worker {
// 同步写
write_worker.handle_write_task(task);

assert_eq!(self.unpersisted_ready, None);
self.unpersisted_ready = Some(ready);
has_write_ready = true;
} else {
// 异步写
// 将写任务发送到 StoreWriters 的线程池中
// 在线程池中调用 handle_write_task 函数
self.write_router.send_write_msg(
ctx,
self.unpersisted_readies.back().map(|r| r.number),
WriteMsg::WriteTask(task),
);

// 缓存队列,记录未完成的写操作
self.unpersisted_readies.push_back(UnpersistedReady {
number: ready_number,
max_empty_number: ready_number,
raft_msgs: vec![],
});

// 清空本轮 ready 数据,尚未持久化部分数据
self.raft_group.advance_append_async(ready);
}
}
HandleReadyResult::NoIOTask => {
/// 1. 之前存在未持久化的 raft_log
if let Some(last) = self.unpersisted_readies.back_mut() {
if ready_number <= last.max_empty_number {
panic!(
"{} ready number is not monotonically increaing, {} <= {}",
self.tag, ready_number, last.max_empty_number
);
}
last.max_empty_number = ready_number;

// 等待 raft_log 持久化之后,在 callback 中执行
// 即 on_persist_ready 函数中执行
if !persisted_msgs.is_empty() {
self.unpersisted_message_count += persisted_msgs.capacity();
last.raft_msgs.push(persisted_msgs);
}
} else {
/// 2. 当前没有任何未完成的 raft_log 未持久化完成的任务, 则 follower 直接回应 leader
self.persisted_number = ready_number;

if !persisted_msgs.is_empty() {
let msgs = self.build_raft_messages(ctx, persisted_msgs);
self.send_raft_messages(ctx, msgs);
}
// advance_append 表示 follower 的 append 完成
let mut light_rd = self.raft_group.advance_append(ready);

// The committed entries may not be empty when the size is too large to
// be fetched in the previous ready.
if !light_rd.committed_entries().is_empty() {
self.handle_raft_committed_entries(ctx, light_rd.take_committed_entries());
}
}
}
}

Advance

大的流程是 propose ==> ready ==> advance,上面已经讲解了 propose ==> ready 的流程,下面就是 advance。

advance 主要是在 ready、apply 之后进行一些元数据信息的更改,而同步和异步的区别也在于 advance 流程的拆分。

  • 同步的 advance 在消息处理状态机快结束的时候处理的,即在 RaftPoller::end 函数中执行了 Peer::handle_raft_ready_advance
  • 异步的 advance 将同步的逻辑整体划分为
    • raft_log 持久化前

      先将已经 ready 的数据取出,异步发送到 StorageWriter 中的线程池去持久化,然后调用 RawNode::advance_append_async 在内存中更新 commit 相关状态,raft 中的 ready 的数据(entries、snapshot)重置为None。

    • raft-log 持久化后

      调用 on_persist_ready 更新相关操作。

PeerFsmDelegate::on_raft_message

发出的的消息的处理。

step