raft_rs 这边博客主要是从源码角度分析下 raft-rs 的日志复制和选举过程。
Entry 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 // The entry is a type of change that needs to be applied. It contains two data fields. // While the fields are built into the model; their usage is determined by the entry_type. // // For normal entries, the data field should contain the data change that should be applied. // The context field can be used for any contextual data that might be relevant to the // application of the data. // // For configuration changes, the data will contain the ConfChange message and the // context will provide anything needed to assist the configuration change. The context // if for the user to set and use in this case. message Entry { EntryType entry_type = 1; uint64 term = 2; uint64 index = 3; bytes data = 4; bytes context = 6; }
index:当前这个entry在整个raft日志中的位置索引。有了Term和Index之后,一个log entry就能被唯一标识。
data:一个被序列化后的byte数组,代表当前entry真正要执行的操作,比方说如果上面的Type是EntryNormal,那这里的Data就可能是具体要更改的key-value pair,如果Type是EntryConfChange,那Data就是具体的配置更改项ConfChange。raft算法本身并不关心这个数据是什么,它只是把这段数据当做log同步过程中的payload来处理,具体对这个数据的解析则有上层应用来完成。
Message TiKV 接受用户发送过来的 command,并将其封装成 entry,然后调用 RawNode::propose 方法,将数据进一步封装。
Message 由 proto 定义,在 raft 之间传输数据的消息格式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 message Message { MessageType msg_type = 1; uint64 to = 2; uint64 from = 3; uint64 term = 4; uint64 log_term = 5; uint64 index = 6; repeated Entry entries = 7; uint64 commit = 8; Snapshot snapshot = 9; uint64 request_snapshot = 13; bool reject = 10; uint64 reject_hint = 11; bytes context = 12; uint64 priority = 14; }
Raft 协议相关的,包括心跳 MsgHeartbeat、日志 MsgApp、投票消息MsgVote等。
to, from:分别代表了这个消息的接受者和发送者。
index:日志索引号。如果当前消息是MsgVote的话,代表这个 candidate 最后一条日志的索引号,它跟上面的 log_term 一起代表这个 candidate 所拥有的最新日志信息,这样别人就可以比较自己的日志是不是比 candidata 的日志要新,从而决定是否投票。
snapshot:一般跟 MsgSnap 合用,用来放置具体的 Snapshot 值。
propose RawNode::propose 函数,将传入的数据 data
封装成 message
格式,再交给 Raft::step
注意,propose 的目的是将传入的数据 data
封装成可以发送到其他的 TiKV 的消息格式,在 RawNode::propose 函数中只是简单处理了下,下面进一步的操作,会进入 Raft::step 函数。
1 2 3 4 5 6 7 8 9 10 11 pub fn propose (&mut self , context: Vec <u8 >, data: Vec <u8 >) -> Result <()> { let mut m = Message::default (); m.set_msg_type (MessageType::MsgPropose); m.from = self .raft.id; let mut e = Entry::default (); e.data = data.into (); e.context = context.into (); m.set_entries (vec! [e].into ()); self .raft.step (m) }
RawNode::propose 封装完的 m 的 m.term 并没初始化,默认值是 0, 即本地消息,因此在 Raft::step 函数中会进入 Raft::step_leader
函数,最终会进入 MessageType::MsgPropose
如果待处理的 m.entries 是空的,不处理;
当集群的配置发生更改,而这个 leader 已经被标记为删除。即便这个 leader 仍然 alive,也就不能接受 proposal;
正在迁移 leader 也不能处理 proposal
在 Raft::append_entry 中处理 m.entries,传入的类型是可变类型,是因为在 Raft::append_entry 需要丰富 m.entries 中每个 entry 的信息,比如 term, index。
Raft::bcast_append 广播给其他的 tikv
这些过程走完,其实消息并没有发送出去,只是缓存在 RaftLog 中 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 fn step_leader (&mut self , mut m: Message) -> Result <()> { match m.get_msg_type () { MessageType::MsgPropose => { if m.entries.is_empty () { fatal!(self .logger, "stepped empty MsgProp" ); } if !self .prs.progress ().contains_key (&self .id) { return Err (Error::ProposalDropped); } if self .lead_transferee.is_some () { debug!( self .logger, "[term {term}] transfer leadership to {lead_transferee} is in progress; dropping \ proposal" , term = self .term, lead_transferee = self .lead_transferee.unwrap (); ); return Err (Error::ProposalDropped); } if !self .append_entry (m.mut_entries ()) { debug!( self .logger, "entries are dropped due to overlimit of max uncommitted size, uncommitted_size: {}" , self .uncommitted_size () ); return Err (Error::ProposalDropped); } self .bcast_append (); return Ok (()); } } }
Raft::append_entry 当前 raft 的已经本地保存的 entry 的 index 是 self.raft_log.last_index(),新加入的 entries 的 index 就需要在 last_index 之后连续递增,同时每个 entry 的 term 都是当前 raft leader 的 term。
1 2 3 4 5 6 7 8 9 10 11 message Entry { EntryType entry_type = 1; uint64 term = 2; uint64 index = 3; bytes data = 4; bytes context = 6; // Deprecated! It is kept for backward compatibility. // TODO: remove it in the next major release. bool sync_log = 5; }
填充完 entries,则调用 RaftLog::append
将其 append 到 raft_log 中,这样在 leader 节点上就不会丢失。
1 2 3 4 5 6 7 8 9 10 11 12 13 pub fn append_entry (&mut self , es: &mut [Entry]) -> bool { if !self .maybe_increase_uncommitted_size (es) { return false ; } let li = self .raft_log.last_index (); for (i, e) in es.iter_mut ().enumerate () { e.term = self .term; e.index = li + 1 + i as u64 ; } self .raft_log.append (es); true }
Raft::bcast_append 1 2 3 4 5 6 7 8 9 10 11 pub fn bcast_append (&mut self ) { let self_id = self .id; let core = &mut self .r; let msgs = &mut self .msgs; self .prs .iter_mut () .filter (|&(id, _)| *id != self_id) .for_each (|(id, pr)| core.send_append (*id, pr, msgs)); }
函数,将待发送的消息,封装成 Message
RaftLog::entries RaftCore::maybe_send_append 函数需要将发送给 to 的消息封装起来。那么问题来了,该函数的入参并没传入 propose 中封装的 m,那又怎么得到该 m.entries。
依赖的是 RaftLog::entries
: 由于上面 Raft::append_entry
将 entries
idx: 是我们需要读取的 entries 的起始位置,实际上传入的 pr.next_idx
self.last_index() 记录的是 raft_log 最后一个 entry 的位置,
那么,[idx, last_index + 1) 即刚在 Raft::append_entry
函数中写入 raft_log 的 entries 数据,那么调用 RaftLog::slice 函数,即可获得所需 entries 的视图,即 slice。
1 2 3 4 5 6 7 8 9 10 11 12 13 pub fn entries ( &self , idx: u64 , max_size: impl Into <Option <u64 >>, context: GetEntriesContext, ) -> Result <Vec <Entry>> { let max_size = max_size.into (); let last = self .last_index (); if idx > last { return Ok (Vec ::new ()); } self .slice (idx, last + 1 , max_size, context) }
RaftCore::prepare_send_entries 在 RaftCore::prepare_send_entries 函数中,将待发送的消息封装成 MessageType::MsgAppend 类型。在 Raft::append_entry
中已填充了 m.entries 的{term, index}
,当 m 需要发送给其他 TiKV 节点,则需要填充 leader 中记录的 follower 的 {index, log_term, commit}
p.next_idx:复制给 pr 对应的 raft-node 下一条数据的位置,next_index -1 则是上次的位置。
log_term: 上次和 followrt 同步的 term
commit:当前 raft_log 已经 committed 的数据位置
m.entries.last().unwrap().index 记录的是 append 到 raft_log 的最后一条 entry 的位置,
用这个来更新对应的 pr 的记录,
同时更新和 follower 发送消息的滑动窗口
1 2 3 4 5 6 7 8 9 10 11 12 13 pub fn update_state (&mut self , last: u64 ) { match self .state { ProgressState::Replicate => { self .optimistic_update (last); self .ins.add (last); } ProgressState::Probe => self .pause (), ProgressState::Snapshot => panic! ( "updating progress state in unhandled state {:?}" , self .state ), } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 fn prepare_send_entries ( &mut self , m: &mut Message, pr: &mut Progress, term: u64 , ents: Vec <Entry>, ) { m.set_msg_type (MessageType::MsgAppend); m.index = pr.next_idx - 1 ; m.log_term = term; m.set_entries (ents.into ()); m.commit = self .raft_log.committed; if !m.entries.is_empty () { let last = m.entries.last ().unwrap ().index; pr.update_state (last); } }
RaftCore::maybe_send_append 暂时忽略 snanshot 部分的逻辑,仅看 entries 的逻辑如下:
先基于 RaftLog::entries 提取出 propose 的 entries;
再基于 RaftLog::term 提取出 m 对应的 log_term ,log_term 需要是连续的?
基于 RaftCore::prepare_send_entries 和上面提取到的 {entries, term, to} 将 m 封装成待发送给 to 的 MessageType::MsgAppend 类型的消息。
如果是 m 已经封装好了,则直接走 try_batching,
RaftCore::send 将 逻辑大致如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 fn maybe_send_append ( &mut self , to: u64 , pr: &mut Progress, allow_empty: bool , msgs: &mut Vec <Message>, ) -> bool { let mut m = Message::default (); m.to = to; if pr.pending_request_snapshot != INVALID_INDEX { } else { let ents = self .raft_log.entries ( pr.next_idx, self .max_msg_size, GetEntriesContext (GetEntriesFor::SendAppend { to, term: self .term, aggressively: !allow_empty, }), ); if !allow_empty && ents.as_ref ().ok ().map_or (true , |e| e.is_empty ()) { return false ; } let term = self .raft_log.term (pr.next_idx - 1 ); match (term, ents) { (Ok (term), Ok (mut ents)) => { if self .batch_append && self .try_batching (to, msgs, pr, &mut ents) { return true ; } self .prepare_send_entries (&mut m, pr, term, ents) } } } self .send (m, msgs); true }
RaftCore::send RaftCore::send 中给 MessageType::MsgAppend 类型的消息 m 添加赋值 leader 的 term,然后将数据添加到消息队列 msgs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 fn send (&mut self , mut m: Message, msgs: &mut Vec <Message>) { if m.from == INVALID_ID { m.from = self .id; } if m.get_msg_type () == MessageType::MsgRequestVote || m.get_msg_type () == MessageType::MsgRequestPreVote || m.get_msg_type () == MessageType::MsgRequestVoteResponse || m.get_msg_type () == MessageType::MsgRequestPreVoteResponse { assert_eq! (m.term, 0 ); } else { assert_ne! (m.term, 0 ); if m.get_msg_type () != MessageType::MsgPropose && m.get_msg_type () != MessageType::MsgReadIndex { m.term = self .term; } } if m.get_msg_type () == MessageType::MsgRequestVote || m.get_msg_type () == MessageType::MsgRequestPreVote { m.priority = self .priority; } msgs.push (m); }
Raft::handle_append_entries 下面来看看 follower 怎么处理接受到的 MsgAppend 消息。
如果 follower 当前正在向 leader 请求 snapshot,则拒绝本次来自 leader 的 Append 请求。
1 2 3 4 5 6 7 8 9 10 fn send_request_snapshot (&mut self ) { let mut m = Message::default (); m.set_msg_type (MessageType::MsgAppendResponse); m.index = self .raft_log.committed; m.reject = true ; m.reject_hint = self .raft_log.last_index (); m.to = self .leader_id; m.request_snapshot = self .pending_request_snapshot; self .r.send (m, &mut self .msgs); }
检测接受到的消息 m 是否是过时的消息。m.index 是 follower 在 leader 端记录的上次 committed 的位置,如果 m.index < self.raft.log.committed
则说明接受到的是过时的消息,此时回应 leader 自己真实的 committed 位置(leader 端不会处理这种情况);
不是过时的消息,则调用 RaftLog::maybe_append
尝试 append 到 follower 的 raft_log 中
返回 Some,则表示本次 append 成功,并将返回的 last_index
返回给 leader,用于更新 leader 侧关于此 follower 的 pr.next_idx
返回 None:就要拒绝本次 Append 请求。
返回 None
,说明:follower 中的 m.index
对应的 term 和 m.log_term 不匹配,则说明 follower 中存储着的是旧值,
通过 RaftLog::find_conflict_by_term
找到 raft_log 中第一个 {hint_index, hint_term}
,并满足 hint_term <= log_term
的 hint_index。
拒绝本次请求,再将相关参数设置,让 leader 重新发送一次 Append 请求。
设置 follower 的 committed 位置
发现一个问题,在 rust 里,proto 定义的类成员变量都是 pub 的,因此直接对成员变量复制和使用 set_xxx 效果是一致的。比如
1 2 to_send.commit = self .raft_log.committed; to_send.set_commit (self .raft_log.committed);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 pub fn handle_append_entries (&mut self , m: &Message) { if self .pending_request_snapshot != INVALID_INDEX { self .send_request_snapshot (); return ; } if m.index < self .raft_log.committed { let mut to_send = Message::default (); to_send.set_msg_type (MessageType::MsgAppendResponse); to_send.to = m.from; to_send.index = self .raft_log.committed; to_send.commit = self .raft_log.committed; self .r.send (to_send, &mut self .msgs); return ; } let mut to_send = Message::default (); to_send.to = m.from; to_send.set_msg_type (MessageType::MsgAppendResponse); if let Some ((_, last_idx)) = self .raft_log .maybe_append (m.index, m.log_term, m.commit, &m.entries) { to_send.index = last_idx; } else { let hint_index = cmp::min (m.index, self .raft_log.last_index ()); let (hint_index, hint_term) = self .raft_log.find_conflict_by_term (hint_index, m.log_term); if hint_term.is_none () { fatal!( self .logger, "term({index}) must be valid" , index = hint_index ) } to_send.index = m.index; to_send.reject = true ; to_send.reject_hint = hint_index; to_send.log_term = hint_term.unwrap (); } to_send.commit = self .raft_log.committed self .r.send (to_send, &mut self .msgs); }
下面来阐述下 RaftLog::maybe_append 及相关冲突检测细节。
RaftLog::find_conflict_by_term 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 pub fn find_conflict_by_term (&self , index: u64 , term: u64 ) -> (u64 , Option <u64 >) { let mut conflict_index = index; let last_index = self .last_index (); if index > last_index { warn!( self .unstable.logger, "index({}) is out of range [0, last_index({})] in find_conflict_by_term" , index, last_index, ); return (index, None ); } loop { match self .term (conflict_index) { Ok (t) => { if t > term { conflict_index -= 1 } else { return (conflict_index, Some (t)); } } Err (_) => return (conflict_index, None ), } } }
RaftLog::find_conflict 所谓冲突,即 index 相同而 term 不一致。那么我们就对 leaoder 发送过来的 entries 中每个 entry 进行冲突检测。 RaftLog::term
函数根据指定的 index 返回本地 raft_log 该 index 对应的 term,RaftLog::match_term
在此基础上判断指定 index 返回的 term 是否是指定的 term。
函数返回 false,则表示 e.index 对应的 term 不匹配(不存在视为不匹配的一种case)。由 ents 是当前 leader 发送过来消息,那么如果 follower 的 raft_log 中即便之前存在 {e.index, old_term}
,那么将新的 {e.index, e.term}
组合对应的数据写入 follower 的 raft_log,来覆盖旧的数据。当然,如果 raft_log 中不存在 e.index 对应的 term,那么说明当前 entry 是新的,也是可以直接 append。
函数全部返回 true,即 RaftLog::find_conflict
返回 0,则表示 ents
已经都包含在 follower 的 raft_log 中。
1 2 3 4 5 6 7 8 9 10 11 12 pub fn match_term (&self , idx: u64 , term: u64 ) -> bool { self .term (idx).map (|t| t == term).unwrap_or (false ) } pub fn find_conflict (&self , ents: &[Entry]) -> u64 { for e in ents { if !self .match_term (e.index, e.term) { return e.index; } } 0 }
RaftLog::maybe_append RaftLog::maybe_append 函数传入参数中的 {idx, term, committed}
分别是 {m.index, m.log_term, m.committed}
,由 Raft::bcast_append
idx: leader 中记录的 follower 已经复制的 entry 的最后一个位置
term: 该 idx 对应的 term
committed: 复制时 leader 的 committed 位置
因此,这个 {idx, term}
肯定需要在 follower 中能找到对应的匹配值,否则这个 follower 就落后 leader 太多,需要接受 leader 的 snapshot。
返回 false
表示 leader 中关于这个 follower 记录的 {idx, term}
有误,此时 RaftLpg::maybe_append
返回 None, 再由上层的 Raft::handle_append_entries
拒绝本次的 append 请求;
self.match(idx, term)
返回 true
说明 leader 记录的关于这个 follower 的 {idx, term}
pair 在 follower 的 raft_log 中是存在的, 但该 pair 不一定是最新的。比如 当前 leader 曾经同步过 {idx, term}
pair 给这个 follower,但是之后 leader 宕机过,后来又重新选上 leader,那么此时关于这个 follower 的 laster 最近的同步记录可能就是这个 {idx, term}
基于 RaftLog::find_conflict
判断新加入的 ents 和 follower 的 raft_log 是否有冲突:
conflict_idx == 0: 则表示 ents 已全部都在 follower 的 raft_log 中;
conglict_idx < self.committed : 已经 committed 之前的 entry 不可能存在冲突,这种情况下则fatal,让 follower 直接 crash,重启同步 leader 的 snapshot;
conflict_idx >= self.committed :如果 ents 和 follower 的 raft_log 存在冲突,即 (idx, conflict_idx) 区间已经存在于 follower 的 raft_log 中,则需要将 ents 中前 confict_idx - idx - 1
个元素去掉,再 append 到 follower 的 raft_log 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 pub fn maybe_append ( &mut self , idx: u64 , term: u64 , committed: u64 , ents: &[Entry],) -> Option <(u64 , u64 )> { if self .match_term (idx, term) { let conflict_idx = self .find_conflict (ents); if conflict_idx == 0 { } else if conflict_idx <= self .committed { fatal!( self .unstable.logger, "entry {} conflict with committed entry {}" , conflict_idx, self .committed ) } else { let start = (conflict_idx - (idx + 1 )) as usize ; self .append (&ents[start..]); if self .persisted > conflict_idx - 1 { self .persisted = conflict_idx - 1 ; } } let last_new_index = idx + ents.len () as u64 ; self .commit_to (cmp::min (committed, last_new_index)); return Some ((conflict_idx, last_new_index)); } None }
RaftLog::handle_append_response 存在冲突 下面 leader 开始处理 follower 返回的 MsgAppend 响应。
1. 先来考虑 follower 拒绝了leader 发出的 MsgAppend 请求。
如果 follower 的 raft_log 的尾部存在没有 committed 的部分,那么 leader 需要知道 follower 当前 committed 的具体位置,再向 follower 同步自 committed 之后数据。
最朴素的实现方式即逐个 entry 探测。
1 2 3 4 5 6 For example, if the leader has: idx 1 2 3 4 5 6 7 8 9 ----------------- term (L) 1 3 3 3 5 5 5 5 5 term (F) 1 1 1 1 2 2
leader 发送了一个 {idx=9, term=5}
的 MsgAppend 请求,收到的回复是 {reject_hint = 6, log_term = 2}
,那么 leader 将会再次尝试发送 {idx=6, term=5}
,follower 返回的是 {reject_hint = 5, term = 2}
leader 的 log_term 总是大于 follower 返回的 reject_term
2. 根据 m.from 取出 leader 中对应的 Process:
更新 follower 的状态为 active,说明 follower 是活跃的;
更新 pr 中该 follower 的 committed 位置。
3. 如果 m.reject 则再探测一次4. 更新 pr 和 leader 同步的位置; - Probe: 变成 replicate 模式 - replicate 模式,变成滑动窗口; -
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 fn handle_append_response (&mut self , m: &Message) { let mut next_probe_index : u64 = m.reject_hint; if m.reject && m.log_term > 0 { next_probe_index = self .raft_log .find_conflict_by_term (m.reject_hint, m.log_term) .0 ; } let pr = match self .prs.get_mut (m.from) { Some (pr) => pr, None => return ; }; pr.recent_active = true ; pr.update_committed (m.commit); if m.reject { if pr.maybe_decr_to (m.index, next_probe_index, m.request_snapshot) { if pr.state == ProgressState::Replicate { pr.become_probe (); } self .send_append (m.from); } return ; }
不存在冲突 如果不村子冲突,正常情况下,代码逻辑比较简单。
leader 在对应的 pr 中更新 follower 的 commited、append 位置;
如果接受到半数以上的回应,则 commit
leader 更换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 fn handle_append_response (&mut self , m: &Message) { let pr = match self .prs.get_mut (m.from) { Some (pr) => pr, None => return ; }; pr.recent_active = true ; pr.update_committed (m.commit); let old_paused = pr.is_paused (); if !pr.maybe_update (m.index) { return ; } match pr.state { ProgressState::Probe => pr.become_replicate (), ProgressState::Snapshot => { if pr.is_snapshot_caught_up () { pr.become_probe (); } } ProgressState::Replicate => pr.ins.free_to (m.get_index ()), } if self .maybe_commit () { if self .should_bcast_commit () { self .bcast_append () } } else if old_paused { self .send_append (m.from); } self .send_append_aggressively (m.from); if Some (m.from) == self .r.lead_transferee { let last_index = self .r.raft_log.last_index (); let pr = self .prs.get_mut (m.from).unwrap (); if pr.matched == last_index { info!( self .logger, "sent MsgTimeoutNow to {from} after received MsgAppResp" , from = m.from; ); self .send_timeout_now (m.from); } } }
消息通信就到此位置,下面从 MessageType::MsgTimeoutNow 触发 follower 进入选举状态,对 candidate 无效。