PollHandler::handle_normal 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 fn handle_normal (&mut self , peer: &mut impl DerefMut <Target = PeerFsm<EK, ER>>) -> HandleResult { let mut handle_result = HandleResult::KeepProcessing; while self .peer_msg_buf.len () < self .messages_per_tick { match peer.receiver.try_recv () { Ok (msg) => { self .peer_msg_buf.push (msg); } Err (TryRecvError::Empty) => { handle_result = HandleResult::stop_at (0 , false ); break ; } Err (TryRecvError::Disconnected) => { peer.stop (); handle_result = HandleResult::stop_at (0 , false ); break ; } } } let mut delegate = PeerFsmDelegate::new (peer, &mut self .poll_ctx); delegate.handle_msgs (&mut self .peer_msg_buf); if !delegate.collect_ready () && self .poll_ctx.sync_write_worker.is_some () { if let HandleResult ::StopAt { skip_end, .. } = &mut handle_result { *skip_end = true ; } } handle_result }
Propose 这里有两类 propose 方法,
是积累 batch_propose,即积累了一定熟练的 proposal,再统一提交到状态机
目前只是支持对 put/delete 进行 batch 操作,然后调用 BatchRaftCmdRequestBuilder::add
方法将 put/Delete command 缓存起来,当达到阈值BatchRaftCmdRequestBuilder::should_finish
返回 true,表示可以发送给 raft。
Question: 如果没有发送出去就宕机了呢???
非 Put/Delete 指令,或者没有开启 batch 操作,就直接 propose
下面以 propose_batch_raft_command
操作为例进行说明。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 pub fn handle_msgs (&mut self , msgs: &mut Vec <PeerMsg<EK>>) { PeerMsg::RaftCommand (cmd) => { if let Some (Err (e)) = cmd.extra_opts.deadline.map (|deadline| deadline.check ()) { cmd.callback.invoke_with_response (new_error (e.into ())); continue ; } let req_size = cmd.request.compute_size (); if self .ctx.cfg.cmd_batch && self .fsm.batch_req_builder.can_batch (&self .ctx.cfg, &cmd.request, req_size) && ((self .ctx.self_disk_usage == DiskUsage::Normal && !self .fsm.peer.disk_full_peers.majority ()) || cmd.extra_opts.disk_full_opt == DiskFullOpt::NotAllowedOnFull) { self .fsm.batch_req_builder.add (cmd, req_size); if self .fsm.batch_req_builder.should_finish (&self .ctx.cfg) { self .propose_batch_raft_command (true ); } } else { self .propose_raft_command ( cmd.request, cmd.callback, cmd.extra_opts.disk_full_opt, ) } } }
PeerFsmDelegate::propose_batch_raft_command propose_batch_raft_command
第二个参数 force
用来表示是否强制执行 propose 操作,
可能为 true 的情况:
当前积累的 proposals,已到达 batch 的条件;
在同步写入 kv-rocksdb 情况下也需要每次都 propose。
如果 force 为 false,并且当前等持久化的 proposals 个数已经达到阈值 cmd_batch_concurrent_ready_max_count
,则也拒绝当前的 propose 请求。
如果上述条件 check 通过,则调用 BatchRaftCmdRequestBuilder::build
构建出 {request, callback}
,调用 propose_raft_command_internal
进入 propose 状态(没有开启 batch 时,也是进入 propose_raft_command_internal
函数)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 fn propose_batch_raft_command (&mut self , force: bool ) { if self .fsm.batch_req_builder.request.is_none () { return ; } if !force && self .ctx.cfg.cmd_batch_concurrent_ready_max_count != 0 && self .fsm.peer.unpersisted_ready_len () >= self .ctx.cfg.cmd_batch_concurrent_ready_max_count { return ; } fail_point!("propose_batch_raft_command" , !force, |_| {}); let (request, callback) = self .fsm .batch_req_builder .build (&mut self .ctx.raft_metrics) .unwrap (); self .propose_raft_command_internal (request, callback, DiskFullOpt::NotAllowedOnFull) }
PeerFsmDelegate::propose_raft_command_internal propose_raft_command_internal
函数中,主要是做一些校验工作,通过之后才能完将 proposal 提交到 raft 中进行处理。
当前 region 不能处于待删除的状态,即 pending_remove
不能为 false;
在 pre_propose_raft_command
函数中进一步 check
如果成功提交到 raft 中,则返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 fn propose_raft_command_internal ( &mut self , mut msg: RaftCmdRequest, cb: Callback<EK::Snapshot>, diskfullopt: DiskFullOpt, ) { if self .fsm.peer.pending_remove { apply::notify_req_region_removed (self .region_id (), cb); return ; } match self .pre_propose_raft_command (&msg) { Ok (Some (resp)) => { cb.invoke_with_response (resp); return ; } Err (e) => { debug!( "failed to propose" ; "region_id" => self .region_id (), "peer_id" => self .fsm.peer_id (), "message" => ?msg, "err" => %e, ); cb.invoke_with_response (new_error (e)); return ; } _ => (), } }
Ready Peer::handle_raft_ready_append(before ready) 根据 raft 状态机处理机制,propose 之后需要收集已 ready 的状态和数据并对其进行处理。调用链是 PeerFsmDelegate::collect_ready
=> Peer::handle_raft_ready_append
Peer::handle_raft_ready_append
其实就是个大杂烩,里面会对各种情况进行处理,比如 leader 切换、replicate、Snapshot 处理以及 apply 已经 commit 的数据、持久化等。,因此下面的执行流程并不是表示在一次 handle_raft_ready_append
调用中都同时发生的,即只有处于 ready 状态,才会被执行。
在 处理 ready 之前,先处理 snapshot 之前的相关状态:
先检测当前是否存在 snapshot,如果当前 region 正在 apply snapshot,则需要等待这个过程结束,此时 self.check_snap_status
返回 false;3
当前 region 是否存在未 committed 的 snapshot
注意,这一步检测的是 uncommitted 的 snapshot,数据的流程是从 commit ==> apply
1 2 3 4 pub fn ready_to_handle_pending_snap (&self ) -> bool { self .last_applying_idx == self .get_store ().applied_index () && self .pending_request_snapshot_count.load (Ordering::SeqCst) == 0 }
destroy_regions
如果当前没有待处理的 snapshot,再检测是否有待删除的 region,可能是 split/merge 过程导致的需要删除的 region.
如果 raft 当前没有 ready,则检测是否存在待生成 snapshot 的任务,如果有,则调用 ApplyFsm 异步去生成 snapshot
那么就有个问题,在什么情况下 self.mut_store().take_gen_snap_task()
的返回值不是 None
呢?
这个 snapshot 不是由外部产生的,是 Raft 状态机在 replication 的过程自动产生的:比如新加入一个 follower,或者一个 follower 滞后的 log 较多,此时 leader 就会调用 PeerStore::snapshot 函数生成包含最新 commit 数据的 snapshot 将其发送给 follower:
leader 需要生成 snapshot
follower 在接受到 snapshot 之后需要 apply snapshot
只要等 leader && follower 这个流程都执行完,Raft-group 才能处于执行 ready 状态
下面代码是去除注释和非必要啊部分后的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 pub fn handle_raft_ready_append <T: Transport>( &mut self , ctx: &mut PollContext<EK, ER, T>, ) -> Option <ReadyResult> { if self .pending_remove && !self .check_snap_status (ctx) { return None ; } let mut destroy_regions = vec! []; if self .has_pending_snapshot () { if !self .ready_to_handle_pending_snap () && !self .unpersisted_readies.is_empty () { return None ; } let meta = ctx.store_meta.lock ().unwrap (); if let Some (wait_destroy_regions) = meta.atomic_snap_regions.get (&self .region_id) { for (source_region_id, is_ready) in wait_destroy_regions { if !is_ready { return None ; } destroy_regions.push (meta.regions[source_region_id].clone ()); } } } if !self .raft_group.has_ready () { if let Some (gen_task) = self .mut_store ().take_gen_snap_task () { self .pending_request_snapshot_count.fetch_add (1 , Ordering::SeqCst); ctx.apply_router.schedule_task (self .region_id, ApplyTask::Snapshot (gen_task)); } return None ; } }
Peer::handle_raft_ready_append(after ready) 下面就是 raft ready 才能处理的部分,关于 snapshot 的部分,后面再说。
leader 是否发生切换、leader 状态是否更改、还需要对 leader 进行 renew_leader_lease 等一些列操作;
ready.messages() 是前面用户 propose 的消息,需要将其 replicate 给其他 TiKV follower;
send_raft_messages
中发送消息使用的就是 Transport::send
,内部实际上封装的 raft_client
如果 TiKV 中已存在 committed 数据,那么下一步就是将 committed 的数据 apply 到 kv-rocksdb 中
经过第 3 步,此时最新的 committed 的数据都已经 applied。如果此时新来了 gen_snaps_task 并异步调用 PeerStorage::snapshot,这样可以保证生成的 snapshot 包含最新的数据;
最后一步,需要将 applied 的数据写入到 kv-rocksdb 中,具体是 sync_write 还是 async_write 取决于配置
代码简化后如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 pub fn handle_raft_ready_append <T: Transport>( &mut self , ctx: &mut PollContext<EK, ER, T>, ) -> Option <ReadyResult> { let mut ready = self .raft_group.ready (); self .on_role_changed (ctx, &ready); if let Some (hs) = ready.hs () { let pre_commit_index = self .get_store ().commit_index (); assert! (hs.get_commit () >= pre_commit_index); if self .is_leader () { self .on_leader_commit_idx_changed (pre_commit_index, hs.get_commit ()); } } if !ready.messages ().is_empty () { assert! (self .is_leader ()); let raft_msgs = self .build_raft_messages (ctx, ready.take_messages ()); self .send_raft_messages (ctx, raft_msgs); } self .apply_reads (ctx, &ready); if !ready.committed_entries ().is_empty () { self .handle_raft_committed_entries (ctx, ready.take_committed_entries ()); } if let Some (gen_task) = self .mut_store ().take_gen_snap_task () { self .pending_request_snapshot_count.fetch_add (1 , Ordering::SeqCst); ctx.apply_router.schedule_task (self .region_id, ApplyTask::Snapshot (gen_task)); } let (res, mut task) = match self .mut_store () .handle_raft_ready (&mut ready, destroy_regions) { Ok (r) => r, Err (e) => { panic! ("{} failed to handle raft ready: {:?}" , self .tag, e) } }; let ready_number = ready.number (); let persisted_msgs = ready.take_persisted_messages (); let mut has_write_ready = false ; match &res { } Some (ReadyResult { state_role, has_new_entries, has_write_ready, }) }
下面主要讲解 Peer::handle_raft_ready_append
函数中和 replication 有关的部分,关于 leader 切换后面单独说。
PeerStorage::snapshot PeerStorage::snapshot
函数是个异步任务,内部有个 {sender, recevier}
连接着生产消费者两端,本质上是用于创建生成 snapshot 的任务(gen_snap_task),而 gen_snap_task 会异步执行去生成 snapshot:
先检测当前是否已经有正在生成 snapshot 的任务;
如果有,则先检测上一个异步 gen_snap_task 是否已经完成:即 receiver.try_recv()
返回的 snap
,如果经过 PeerStore::validate_snap
函数校验,确认这个 snap 有效则直接返回,不会产生新的异步 gen_snap_task。
然后,这个 snap 就会被 leader 发送给 follower。
如果当前没 gen_snap_task 任务,或者上一个异步 gen_snap_task 尚未完成,或者 snap 校验没有通过,则生成新的 gen_snap_task。
在 PeerStorage::snapshot
设置了 gen_snap_task
之后,在 Peer::handle_raft_ready_append
中检测到之后,就会基于 apply_router
将该异步任务发送给 ApplyFsm
,让 ApplyFsm 真正创建 snpahot。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 pub fn snapshot (&self , request_index: u64 , to: u64 ) -> raft::Result <Snapshot> { let mut snap_state = self .snap_state.borrow_mut (); let mut tried_cnt = self .snap_tried_cnt.borrow_mut (); let (mut tried, mut last_canceled, mut snap) = (false , false , None ); if let SnapState ::Generating { ref canceled, ref receiver, .. } = *snap_state { tried = true ; last_canceled = canceled.load (Ordering::SeqCst); match receiver.try_recv () { Err (TryRecvError::Empty) => { let e = raft::StorageError::SnapshotTemporarilyUnavailable; return Err (raft::Error::Store (e)); } Ok (s) if !last_canceled => snap = Some (s), Err (TryRecvError::Disconnected) | Ok (_) => {} } } if tried { *snap_state = SnapState::Relax; match snap { Some (s) => { *tried_cnt = 0 ; if self .validate_snap (&s, request_index) { return Ok (s); } } None => { } } } if SnapState::Relax != *snap_state { panic! ("{} unexpected state: {:?}" , self .tag, *snap_state); } let (sender, receiver) = mpsc::sync_channel (1 ); let canceled = Arc::new (AtomicBool::new (false )); let index = Arc::new (AtomicU64::new (0 )); *snap_state = SnapState::Generating { canceled: canceled.clone (), index: index.clone (), receiver, }; let mut to_store_id = 0 ; if let Some (peer) = self .region ().get_peers ().iter ().find (|p| p.id == to) { to_store_id = peer.store_id; } let task = GenSnapTask::new (self .region.get_id (), index, canceled, sender, to_store_id); let mut gen_snap_task = self .gen_snap_task.borrow_mut (); assert! (gen_snap_task.is_none ()); *gen_snap_task = Some (task); Err (raft::Error::Store ( raft::StorageError::SnapshotTemporarilyUnavailable, )) }
ApplyFsm::handle_snapshot ApplyFsm 在接受到 Msg::Snapshot
类型的消息后,就会进入 ApplyFsm::handle_snapshot
处理该 gen_snap_task。
由于生成 snapshot 是个 io 任务,比较耗时,自然也不会同步阻塞执行。
因此,handle_snapshot
函数内部调用 GenSnapTask::generate_and_schedule_snapshot
异步执行。
不过,在执行前,需要将这个 region 中 unpersisted 的数据给 flush 到 kv-rocksdb 中,确保生成的 snapshot 是最新的。这个 ApplyContext::flush
流程后面再说,目前先关注 gen_snap_task
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 fn handle_snapshot (&mut self , apply_ctx: &mut ApplyContext<EK>, snap_task: GenSnapTask) { if self .delegate.pending_remove || self .delegate.stopped { return ; } let applied_index = self .delegate.apply_state.get_applied_index (); let mut need_sync = apply_ctx .apply_res .iter () .any (|res| res.region_id == self .delegate.region_id ()) && self .delegate.last_flush_applied_index != applied_index; if need_sync { self .delegate.write_apply_state (apply_ctx.kv_wb_mut ()); apply_ctx.flush (); self .delegate.last_flush_applied_index = applied_index; } if let Err (e) = snap_task.generate_and_schedule_snapshot::<EK>( apply_ctx.engine.snapshot (), self .delegate.applied_index_term, self .delegate.apply_state.clone (), &apply_ctx.region_scheduler, ) { } self .delegate.pending_request_snapshot_count.fetch_sub (1 , Ordering::SeqCst); }
SnapContext::generate_snap 最终异步调用到 SnapContext::generate_snap
函数中,基于 store::do_snapshot
函数生成 snapshot:
notifier
snapshot 创建后,notify
通知 gen_snap_task
的消费端,即 PeerStore::snapshot()
中的 receiver.try_recv()
返回 snap,这样就可以让 raft-group 的 ready.snapshot()
不为空;
这样就可以在下一次调用 Peer::handle_raft_ready_append
函数时,继续处理已经 ready 的 snapshot,即将其发送到指定 follower,(这一部分在 Peer::handle_raft_ready
中)。
rotuer
这里的 router
的消费端是 PeerDelegate
,此处则是用于通知 leader 去处理已经生成的 snapshot。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 fn generate_snap ( &self , region_id: u64 , last_applied_index_term: u64 , last_applied_state: RaftApplyState, kv_snap: EK::Snapshot, notifier: SyncSender<RaftSnapshot>, for_balance: bool , allow_multi_files_snapshot: bool , ) -> Result <()> { let snap = box_try!(store::do_snapshot::<EK>( self .mgr.clone (), &self .engine, kv_snap, region_id, last_applied_index_term, last_applied_state, for_balance, allow_multi_files_snapshot, )); if let Err (e) = notifier.try_send (snap) { } self .router.send (region_id, CasualMessage::SnapshotGenerated); Ok (()) }
store::do_snapshot 生成 snapshot,则需要获得当前 kv-rocksdb 的最新数据和元信息,主要包括三部分:
RaftApplyState
apply_index 表示当前 apply raft log 的进度。如果 apply raft log 在 persist 前发生宕机,那么重启后就会从较老的 apply index 开始重放日志。所以 apply raftlog 是需要支持幂等的,对于一些特殊的不支持幂等的指令,就需要 apply 完立刻 persist 并 fsync。
因为日志不可能无限增长,所以 TiKV 会定期做 CompactLog 来 gc raft log。truncated_state
表示上次做完 CompactLog 后现有日志的头部。
RegionLocalState
RegionLocalState 主要包含 Region 的 range、epoch、各个 peer 以及当前 Region 状态,其中 RegionEpoch 会在 ConfChange,以及 Split 和 Merge 的时候发生变化,在处理 Raft 消息时,我们会校验 RegionEpoch 并拒绝掉过期的消息
PeerState 状态比如 Normal、Applying、Merging、Tombstone等。
RaftSnapshotData
数据都是保存在 CF_RAFT
column family 中。 代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 pub fn do_snapshot <E>( mgr: SnapManager, engine: &E, kv_snap: E::Snapshot, region_id: u64 , last_applied_index_term: u64 , last_applied_state: RaftApplyState, for_balance: bool , allow_multi_files_snapshot: bool , ) -> raft::Result <Snapshot> where E: KvEngine, { debug!( "begin to generate a snapshot" ; "region_id" => region_id); let msg = kv_snap .get_msg_cf (CF_RAFT, &keys::apply_state_key (region_id)) .map_err (into_other::<_, raft::Error>)?; let apply_state : RaftApplyState = match msg { None => { return Err (); } Some (state) => state, }; assert_eq! (apply_state, last_applied_state); let key = SnapKey::new ( region_id, last_applied_index_term, apply_state.get_applied_index (), ); mgr.register (key.clone (), SnapEntry::Generating); defer!(mgr.deregister (&key, &SnapEntry::Generating)); let state : RegionLocalState = kv_snap .get_msg_cf (CF_RAFT, &keys::region_state_key (key.region_id)) .and_then (|res| match res { None => Err (box_err!("region {} could not find region info" , region_id)), Some (state) => Ok (state), }) .map_err (into_other::<_, raft::Error>)?; if state.get_state () != PeerState::Normal { return Err (storage_error (format! ( "snap job for {} seems stale, skip." , region_id ))); } let mut snapshot = Snapshot::default (); snapshot.mut_metadata ().set_index (key.idx); snapshot.mut_metadata ().set_term (key.term); let conf_state = util::conf_state_from_region (state.get_region ()); snapshot.mut_metadata ().set_conf_state (conf_state); let mut snap_data = RaftSnapshotData::default (); snap_data.set_region (state.get_region ().clone ()); let mut s = mgr.get_snapshot_for_building (&key)?; let mut stat = SnapshotStatistics::new (); s.build ( engine, &kv_snap, state.get_region (), &mut snap_data, &mut stat, allow_multi_files_snapshot, )?; snap_data.mut_meta ().set_for_balance (for_balance); let v = snap_data.write_to_bytes ()?; snapshot.set_data (v.into ()); Ok (snapshot) }
ApplyFsm::handle_apply Peer::handle_raft_committed_entries
此函数用于处理已经 committed 数据:
对于 leader 来说,即接受到超过半数 follower 回应的 proposals;
对于 follower 来说,即已经 Append 到 raft-log 的 proposals。
在 Peer::handle_raft_committed_entries
函数中构建的 ApplyTask
,最终在 ApplyFsm::handle_apply
中使用
更新当前 ApplyDelegate 的状态及元数据;
添加 proposals
handle_raft_committed_entries
代码简化如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 fn handle_apply (&mut self , apply_ctx: &mut ApplyContext<EK>, mut apply: Apply<EK::Snapshot>) { if self .delegate.pending_remove || self .delegate.stopped { return ; } let mut entries = Vec ::new (); let mut dangle_size = 0 ; for cached_entries in apply.entries { let (e, sz) = cached_entries.take_entries (); dangle_size += sz; if e.is_empty () { let rid = self .delegate.region_id (); let StdRange { start, end } = cached_entries.range; self .delegate .raft_engine .fetch_entries_to (rid, start, end, None , &mut entries) .unwrap (); } else if entries.is_empty () { entries = e; } else { entries.extend (e); } } self .delegate.term = apply.term; let cur_state = (apply.commit_index, apply.commit_term); self .delegate.apply_state.set_commit_index (cur_state.0 ); self .delegate.apply_state.set_commit_term (cur_state.1 ); self .append_proposal (apply.cbs.drain (..)); self .delegate.priority = Priority::Normal; self .delegate .handle_raft_committed_entries (apply_ctx, entries.drain (..)); fail_point!("post_handle_apply_1003" , self .delegate.id () == 1003 , |_| {}); }
ApplyFsm::append_proposal 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 fn append_proposal (&mut self , props_drainer: Drain<'_ , Proposal<EK::Snapshot>>) { let (region_id, peer_id) = (self .delegate.region_id (), self .delegate.id ()); if self .delegate.stopped { for p in props_drainer { let cmd = PendingCmd::<EK::Snapshot>::new (p.index, p.term, p.cb); notify_stale_command (region_id, peer_id, self .delegate.term, cmd); } return ; } let propose_num = props_drainer.len (); for p in props_drainer { let cmd = PendingCmd::new (p.index, p.term, p.cb); if p.is_conf_change { if let Some (cmd) = self .delegate.pending_cmds.take_conf_change () { notify_stale_command (region_id, peer_id, self .delegate.term, cmd); } self .delegate.pending_cmds.set_conf_change (cmd); } else { self .delegate.pending_cmds.append_normal (cmd); } } }
ApplyDelegate::handle_raft_committed_entries 这里的处理,即对 committed entries 进行 apply。
函数有个 ApplyContext 参数,用来控制一些调用:
1 ApplyContext::prepare_for -> ApplyContext::commit [-> ApplyContext::commit ...] -> ApplyContext::finish_for.
从这个函数处理流程可以看出也是符合这个流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 fn handle_raft_committed_entries ( &mut self , apply_ctx: &mut ApplyContext<EK>, mut committed_entries_drainer: Drain<'_ , Entry>, ) { if committed_entries_drainer.len () == 0 { return ; } apply_ctx.prepare_for (self ); apply_ctx.committed_count += committed_entries_drainer.len (); let mut results = VecDeque::new (); while let Some (entry) = committed_entries_drainer.next () { if self .pending_remove { break ; } let res = match entry.get_entry_type () { EntryType::EntryNormal => self .handle_raft_entry_normal (apply_ctx, &entry), EntryType::EntryConfChange | EntryType::EntryConfChangeV2 => { self .handle_raft_entry_conf_change (apply_ctx, &entry) } }; match res { ApplyResult::None => {} ApplyResult::Res (res) => results.push_back (res), ApplyResult::Yield | ApplyResult::WaitMergeSource (_) => { apply_ctx.committed_count -= committed_entries_drainer.len () + 1 ; let mut pending_entries = Vec ::with_capacity (committed_entries_drainer.len () + 1 ); pending_entries.push (entry); pending_entries.extend (committed_entries_drainer); apply_ctx.finish_for (self , results); self .yield_state = Some (YieldState { pending_entries, pending_msgs: Vec ::default (), heap_size: None , }); if let ApplyResult ::WaitMergeSource (logs_up_to_date) = res { self .wait_merge_state = Some (WaitSourceMergeState { logs_up_to_date }); } return ; } } } apply_ctx.finish_for (self , results); if self .pending_remove { self .destroy (apply_ctx); } }
ApplyContext::prepare_for ApplyContext::commit 在 commit 函数中先将 RaftApplyState 写入 kv_wb,这样在 commit_opt
函数中就可以一起将最新的 delegate.apply_state 和 kv 数据一起持久化到 rocksdb 中,并在持久化完成后,更新 ApplyDelegate::last_flush_applied_index 字段。
因此, commit 函数返回后,所有 unflushed 的数据都持久化到 rocksdb 中了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 pub fn commit (&mut self , delegate: &mut ApplyDelegate<EK>) { if delegate.last_flush_applied_index < delegate.apply_state.get_applied_index () { delegate.write_apply_state (self .kv_wb_mut ()); } self .commit_opt (delegate, true ); } fn commit_opt (&mut self , delegate: &mut ApplyDelegate<EK>, persistent: bool ) { if persistent { self .write_to_db (); self .prepare_for (delegate); delegate.last_flush_applied_index = delegate.apply_state.get_applied_index () } self .kv_wb_last_bytes = self .kv_wb ().data_size () as u64 ; self .kv_wb_last_keys = self .kv_wb ().count () as u64 ; }
ApplyContext::write_to_db write_to_db
主要就是将 pendig_sst、kv、delete_sst 数据导入到 rocksdb 中,没有什么特别的操作。持久化完成,则调用 callback 回复 client。
简化后如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 pub fn write_to_db (&mut self ) -> bool { let need_sync = self .sync_log_hint; if !self .pending_ssts.is_empty () { let tag = self .tag.clone (); self .importer .ingest (&self .pending_ssts, &self .engine) .unwrap_or_else (|e| { panic! ( "{} failed to ingest ssts {:?}: {:?}" , tag, self .pending_ssts, e ); }); self .pending_ssts = vec! []; } if !self .kv_wb_mut ().is_empty () { let mut write_opts = engine_traits::WriteOptions::new (); write_opts.set_sync (need_sync); self .kv_wb ().write_opt (&write_opts).unwrap_or_else (|e| { panic! ("failed to write to engine: {:?}" , e); }); self .sync_log_hint = false ; let data_size = self .kv_wb ().data_size (); if data_size > APPLY_WB_SHRINK_SIZE { self .kv_wb = self .engine.write_batch_with_cap (DEFAULT_APPLY_WB_SIZE); } else { self .kv_wb_mut ().clear (); } self .kv_wb_last_bytes = 0 ; self .kv_wb_last_keys = 0 ; } if !self .delete_ssts.is_empty () { let tag = self .tag.clone (); for sst in self .delete_ssts.drain (..) { self .importer.delete (&sst.meta).unwrap_or_else (|e| { panic! ("{} cleanup ingested file {:?}: {:?}" , tag, sst, e); }); } } let ApplyCallbackBatch { cmd_batch, batch_max_level, mut cb_batch, } = mem::replace (&mut self .applied_batch, ApplyCallbackBatch::new ()); for (cb, resp) in cb_batch.drain (..) { cb.invoke_with_response (resp); } need_sync }
ApplyContext::finish_for finish_for
函数和 commit
函数功能类似,区别在于
commit_opt(delegate, false)
,即不会持久化数据到 rocksdb,仅是更新统计数字。
保存 ApplyRes
留在 ApplyPoller::end
中处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 pub fn finish_for ( &mut self , delegate: &mut ApplyDelegate<EK>, results: VecDeque<ExecResult<EK::Snapshot>>, ) { if !delegate.pending_remove { delegate.write_apply_state (self .kv_wb_mut ()); } self .commit_opt (delegate, false ); self .apply_res.push (ApplyRes { region_id: delegate.region_id (), apply_state: delegate.apply_state.clone (), exec_res: results, metrics: delegate.metrics.clone (), applied_index_term: delegate.applied_index_term, bucket_stat: delegate.buckets.clone ().map (Box ::new), }); }
ApplyDelegate::handle_raft_entry_normal 在 ApplyDelegate::process_raft_cmd
函数真正处理本次 proposals 之前,需要先保证之前的数据都已经持久化到 rocksdb 中,即 [last_flush_applied_index, apply_state.applied_index)
区间没有数据。
在 process_raft_cmd
中处理完 proposals,就更新 apply_state.applied_index
,即增大了 [last_flush_applied_index, apply_state.applied_index)
区间,如果是有多个 proposals,那么处理下一个 proposal 就会先持久化数据,最后一个 proposal 则在 handle_destory 处理。
TODO: 这里有个时间片 && 优先级的设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 fn handle_raft_entry_normal ( &mut self , apply_ctx: &mut ApplyContext<EK>, entry: &Entry, ) -> ApplyResult<EK::Snapshot> { let index = entry.get_index (); let term = entry.get_term (); let data = entry.get_data (); if !data.is_empty () { let cmd = util::parse_data_at (data, index, &self .tag); if apply_ctx.yield_high_latency_operation && has_high_latency_operation (&cmd) { self .priority = Priority::Low; } let mut has_unflushed_data = self .last_flush_applied_index != self .apply_state.get_applied_index (); if has_unflushed_data && should_write_to_engine (&cmd) || apply_ctx.kv_wb ().should_write_to_engine () { apply_ctx.commit (self ); if let Some (start) = self .handle_start.as_ref () { if start.saturating_elapsed () >= apply_ctx.yield_duration { return ApplyResult::Yield; } } has_unflushed_data = false ; } if self .priority != apply_ctx.priority { if has_unflushed_data { apply_ctx.commit (self ); } return ApplyResult::Yield; } return self .process_raft_cmd (apply_ctx, index, term, cmd); } self .apply_state.set_applied_index (index); self .applied_index_term = term; assert! (term > 0 ); while let Some (mut cmd) = self .pending_cmds.pop_normal (u64 ::MAX, term - 1 ) { if let Some (cb) = cmd.cb.take () { apply_ctx .applied_batch .push_cb (cb, cmd_resp::err_resp (Error::StaleCommand, term)); } } ApplyResult::None }
ApplyDelegate::end 执行流程是: ApplyDelegate::handle_normal –> ApplyDelegate::end;
上面的都是在 ApplyDelegate::handle_normal
中执行,最后在 ApplyDelegate::end
做收尾工作,这里就是向 peer 返回执行结果。
1 2 3 4 5 6 fn end (&mut self , fsms: &mut [Option <impl DerefMut <Target = ApplyFsm<EK>>>]) { self .apply_ctx.flush (); for fsm in fsms.iter_mut ().flatten () { fsm.delegate.last_flush_applied_index = fsm.delegate.apply_state.get_applied_index (); } }
ApplyContext::flush 主要做两个事:
将 unflushed 数据写入到 rocksdb 中,
并将 ApplyRes 返回给 PeerFsm,更新 RaftLocalState、region 等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 pub fn flush (&mut self ) -> bool { let is_synced = self .write_to_db (); if !self .apply_res.is_empty () { let apply_res = mem::take (&mut self .apply_res); self .notifier.notify (apply_res); } slow_log!( elapsed, "{} handle ready {} committed entries" , self .tag, self .committed_count ); self .committed_count = 0 ; is_synced }
PeerStorage::handle_raft_ready raft_ready:
leader: 可以处理新增的 entries
follower: 可以处理新增的 snapshot、entries
reft_ready,则可以将新增数据写入自己的 raft-rocksdb 中,这里是以 rocksdb 的 write_batch 为单位,即将一个 ready 中的数据写入到 write_batch,然后一起 commit 到 raft-rocksdb 中。
因此 handle_raft_ready
函数返回,如果存在待写的数据,则都在返回的 write_task 中,用 HandleReadyResult 来标志是否有数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 pub fn handle_raft_ready ( &mut self , ready: &mut Ready, destroy_regions: Vec <metapb::Region>, ) -> Result <(HandleReadyResult, WriteTask<EK, ER>)> { let region_id = self .get_region_id (); let prev_raft_state = self .raft_state.clone (); let mut write_task = WriteTask::new (region_id, self .peer_id, ready.number ()); let mut res = HandleReadyResult::SendIOTask; if !ready.snapshot ().is_empty () { fail_point!("raft_before_apply_snap" ); let last_first_index = self .first_index (); let snap_region = self .apply_snapshot (ready.snapshot (), &mut write_task, &destroy_regions)?; res = HandleReadyResult::Snapshot { msgs: ready.take_persisted_messages (), snap_region, destroy_regions, last_first_index, }; fail_point!("raft_after_apply_snap" ); }; if !ready.entries ().is_empty () { self .append (ready.take_entries (), &mut write_task); } if self .raft_state.get_last_index () > 0 { if let Some (hs) = ready.hs () { self .raft_state.set_hard_state (hs.clone ()); } } if prev_raft_state != self .raft_state || !ready.snapshot ().is_empty () { write_task.raft_state = Some (self .raft_state.clone ()); } if !ready.snapshot ().is_empty () { self .save_snapshot_raft_state_to ( ready.snapshot ().get_metadata ().get_index (), write_task.kv_wb.as_mut ().unwrap (), )?; self .save_apply_state_to (write_task.kv_wb.as_mut ().unwrap ())?; } if !write_task.has_data () { res = HandleReadyResult::NoIOTask; } Ok ((res, write_task)) }
然后 Peer::handle_raft_reafy_append
中再基于 handle_raft_ready
的返回结果,进行处理。
每次 raft ready,都会有个 ready_number,这个用于异步确认当前一些过时的消息。
如果有新增的 entries or snapshot,在 PeerStorage::handle_raft_ready
中更新 meta 信息后,下面就需要将真实的 raft 数据写入到 raft-rocksdb。这里有两种方案:
同步写:此时 PollContext::sync_write_worker
字段的值不为 None,直接调用 handle_write_task 完成写;
异步写:将 write_task 发送到 StoreWriters 的线程池中,在线程池中调用 handle_write_task 完成写。
async_write 问题在于线程吃中写操作完成后怎么通知 PeerFsm 写操作完成了以及哪次的 raft-reay 完成了写?
这在 StoreWriters 内部包含了一个 PeerFsm 的 router 的 sender,通过他就能通知 PeerFsm;
第二个问题是靠 ready_number 建立联系的
StoreWriters 完成后发的消息是 1 2 3 4 PeerMsg::Persisted { peer_id, ready_number, } => self .on_persisted_msg (peer_id, ready_number),
PeerFsm 接受到 PeerMsg::Persisted
后就会进行后续的处理。
异步写时,没有完成的操作都会缓存在 self.unpersisted_readies
中,也依赖 ready_number 来建立联系。
Ready::persisted_message()
和 Ready::messsage()
内部都是message,只是区别于leader、follower
leader:调用的是 ready.take_messages()
获得的 message 需要发送给 follower,follower 调用 ready.take_messages()
返回空的数组;
follower:调用的是 ready.take_persisted_messages()
获得的是返回给 leader 的response,leader 调用 ready.take_persisted_messages()
返回的也是空数组;
只要 ready.take_persisted_messages()
返回了非空数组:
如果当前 write_tasks 非空:此时返回的 res 不是 HandleReadyResult::NoIOTask,无论是同步还是异步写,都会在 handle_write_task
中等待写完成后,将这个 msg 发送给 leader;
如果当前 write_task 没有数据,返回的 res 是 HandleReadyResult::NoIOTask:
如果之前有未完成的写,则将其添加到 unpersisted_readies.raft_msgs
中,等写完成后,回应 leader
如果当前没有未完成的写,则直接将其发送出去,回应 leader
关于 advance 下一节再说。 其余的主要逻辑大致如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 let ready_number = ready.number ();let persisted_msgs = ready.take_persisted_messages ();let mut has_write_ready = false ;match &res { HandleReadyResult::SendIOTask | HandleReadyResult::Snapshot { .. } => { if !persisted_msgs.is_empty () { task.messages = self .build_raft_messages (ctx, persisted_msgs); } if !request_times.is_empty () { task.request_times = request_times; } if let Some (write_worker) = &mut ctx.sync_write_worker { write_worker.handle_write_task (task); assert_eq! (self .unpersisted_ready, None ); self .unpersisted_ready = Some (ready); has_write_ready = true ; } else { self .write_router.send_write_msg ( ctx, self .unpersisted_readies.back ().map (|r| r.number), WriteMsg::WriteTask (task), ); self .unpersisted_readies.push_back (UnpersistedReady { number: ready_number, max_empty_number: ready_number, raft_msgs: vec! [], }); self .raft_group.advance_append_async (ready); } } HandleReadyResult::NoIOTask => { if let Some (last) = self .unpersisted_readies.back_mut () { if ready_number <= last.max_empty_number { panic! ( "{} ready number is not monotonically increaing, {} <= {}" , self .tag, ready_number, last.max_empty_number ); } last.max_empty_number = ready_number; if !persisted_msgs.is_empty () { self .unpersisted_message_count += persisted_msgs.capacity (); last.raft_msgs.push (persisted_msgs); } } else { self .persisted_number = ready_number; if !persisted_msgs.is_empty () { let msgs = self .build_raft_messages (ctx, persisted_msgs); self .send_raft_messages (ctx, msgs); } let mut light_rd = self .raft_group.advance_append (ready); if !light_rd.committed_entries ().is_empty () { self .handle_raft_committed_entries (ctx, light_rd.take_committed_entries ()); } } } }
Advance 大的流程是 propose ==> ready ==> advance
,上面已经讲解了 propose ==> ready 的流程,下面就是 advance。
advance 主要是在 ready、apply 之后进行一些元数据信息的更改,而同步和异步的区别也在于 advance 流程的拆分。
同步的 advance 在消息处理状态机快结束的时候处理的,即在 RaftPoller::end
函数中执行了 Peer::handle_raft_ready_advance
异步的 advance 将同步的逻辑整体划分为
PeerFsmDelegate::on_raft_message 发出的的消息的处理。
step