raft: replication

raft_rs

这边博客主要是从源码角度分析下 raft-rs 的日志复制和选举过程。

Entry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// The entry is a type of change that needs to be applied. It contains two data fields.
// While the fields are built into the model; their usage is determined by the entry_type.
//
// For normal entries, the data field should contain the data change that should be applied.
// The context field can be used for any contextual data that might be relevant to the
// application of the data.
//
// For configuration changes, the data will contain the ConfChange message and the
// context will provide anything needed to assist the configuration change. The context
// if for the user to set and use in this case.
message Entry {
EntryType entry_type = 1;
uint64 term = 2;
uint64 index = 3;
bytes data = 4;
bytes context = 6;
}
  • term:选举任期,每次选举之后递增1。它的主要作用是标记信息的时效性,比方说当一个节点发出来的消息中携带的term是2,而另一个节点携带的term是3,那我们就认为第一个节点的信息过时了。
  • index:当前这个entry在整个raft日志中的位置索引。有了Term和Index之后,一个log entry就能被唯一标识。
  • type:当前entry的类型,目前etcd支持两种类型:EntryNormal和EntryConfChange,EntryNormal代表当前Entry是对状态机的操作,EntryConfChange则代表对当前集群配置进行更改的操作,比如增加或者减少节点。
  • data:一个被序列化后的byte数组,代表当前entry真正要执行的操作,比方说如果上面的Type是EntryNormal,那这里的Data就可能是具体要更改的key-value pair,如果Type是EntryConfChange,那Data就是具体的配置更改项ConfChange。raft算法本身并不关心这个数据是什么,它只是把这段数据当做log同步过程中的payload来处理,具体对这个数据的解析则有上层应用来完成。

Message

TiKV 接受用户发送过来的 command,并将其封装成 entry,然后调用 RawNode::propose 方法,将数据进一步封装。

Message 由 proto 定义,在 raft 之间传输数据的消息格式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
message Message {
MessageType msg_type = 1;
uint64 to = 2;
uint64 from = 3;
uint64 term = 4;
uint64 log_term = 5;
uint64 index = 6;
repeated Entry entries = 7;
uint64 commit = 8;
Snapshot snapshot = 9;
uint64 request_snapshot = 13;
bool reject = 10;
uint64 reject_hint = 11;
bytes context = 12;
uint64 priority = 14;
}
  • Type:当前传递的消息类型,它的取值有很多个,但大致可以分成两类:
    • Raft 协议相关的,包括心跳 MsgHeartbeat、日志 MsgApp、投票消息MsgVote等。
    • 上层应用触发的(没错,上层应用并不是通过api与raft库交互的,而是通过发消息),比如应用对数据更改的消息MsgPropose

不同类型的消息会用到下面不同的字段:

  • to, from:分别代表了这个消息的接受者和发送者。
  • term:这个消息发出时整个集群所处的任期。
  • log_term:消息发出者所保存的日志中最后一条的任期号,一般MsgVote会用到这个字段。
  • index:日志索引号。如果当前消息是MsgVote的话,代表这个 candidate 最后一条日志的索引号,它跟上面的 log_term 一起代表这个 candidate 所拥有的最新日志信息,这样别人就可以比较自己的日志是不是比 candidata 的日志要新,从而决定是否投票。
  • entries:需要存储的日志。
  • commit:已经提交的日志的索引值,用来向别人同步日志的提交信息。
  • snapshot:一般跟 MsgSnap 合用,用来放置具体的 Snapshot 值。
  • reject,reject_hint:代表对方节点拒绝了当前节点的请求(MsgVote/MsgApp/MsgSnap…)

propose

RawNode::propose 函数,将传入的数据 data 封装成 message 格式,再交给 Raft::step 函数处理。

注意,propose 的目的是将传入的数据 data 封装成可以发送到其他的 TiKV 的消息格式,在 RawNode::propose 函数中只是简单处理了下,下面进一步的操作,会进入 Raft::step 函数。

1
2
3
4
5
6
7
8
9
10
11
/// Propose proposes data be appended to the raft log.
pub fn propose(&mut self, context: Vec<u8>, data: Vec<u8>) -> Result<()> {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgPropose);
m.from = self.raft.id;
let mut e = Entry::default();
e.data = data.into();
e.context = context.into();
m.set_entries(vec![e].into());
self.raft.step(m)
}

RawNode::propose 封装完的 m 的 m.term 并没初始化,默认值是 0, 即本地消息,因此在 Raft::step 函数中会进入 Raft::step_leader 函数,最终会进入 MessageType::MsgPropose 处理分支。处理逻辑如下:

  1. 如果待处理的 m.entries 是空的,不处理;
  2. 当集群的配置发生更改,而这个 leader 已经被标记为删除。即便这个 leader 仍然 alive,也就不能接受 proposal;
  3. 正在迁移 leader 也不能处理 proposal
  4. 在 Raft::append_entry 中处理 m.entries,传入的类型是可变类型,是因为在 Raft::append_entry 需要丰富 m.entries 中每个 entry 的信息,比如 term, index。
  5. Raft::bcast_append 广播给其他的 tikv

这些过程走完,其实消息并没有发送出去,只是缓存在 RaftLog 中 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
fn step_leader(&mut self, mut m: Message) -> Result<()> {
match m.get_msg_type() {
MessageType::MsgPropose => {
if m.entries.is_empty() {
fatal!(self.logger, "stepped empty MsgProp");
}
if !self.prs.progress().contains_key(&self.id) {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
return Err(Error::ProposalDropped);
}
if self.lead_transferee.is_some() {
debug!(
self.logger,
"[term {term}] transfer leadership to {lead_transferee} is in progress; dropping \
proposal",
term = self.term,
lead_transferee = self.lead_transferee.unwrap();
);
return Err(Error::ProposalDropped);
}

//...
if !self.append_entry(m.mut_entries()) {
// return ProposalDropped when uncommitted size limit is reached
debug!(
self.logger,
"entries are dropped due to overlimit of max uncommitted size, uncommitted_size: {}",
self.uncommitted_size()
);
return Err(Error::ProposalDropped);
}
self.bcast_append();
return Ok(());
}
//...
}
}

Raft::append_entry

当前 raft 的已经本地保存的 entry 的 index 是 self.raft_log.last_index(),新加入的 entries 的 index 就需要在 last_index 之后连续递增,同时每个 entry 的 term 都是当前 raft leader 的 term。

1
2
3
4
5
6
7
8
9
10
11
message Entry {
EntryType entry_type = 1;
uint64 term = 2;
uint64 index = 3;
bytes data = 4;
bytes context = 6;

// Deprecated! It is kept for backward compatibility.
// TODO: remove it in the next major release.
bool sync_log = 5;
}

填充完 entries,则调用 RaftLog::append 将其 append 到 raft_log 中,这样在 leader 节点上就不会丢失。

1
2
3
4
5
6
7
8
9
10
11
12
13
pub fn append_entry(&mut self, es: &mut [Entry]) -> bool {
if !self.maybe_increase_uncommitted_size(es) {
return false;
}

let li = self.raft_log.last_index();
for (i, e) in es.iter_mut().enumerate() {
e.term = self.term;
e.index = li + 1 + i as u64;
}
self.raft_log.append(es);
true
}

Raft::bcast_append

1
2
3
4
5
6
7
8
9
10
11
/// Sends RPC, with entries to all peers that are not up-to-date
/// according to the progress recorded in r.prs().
pub fn bcast_append(&mut self) {
let self_id = self.id;
let core = &mut self.r;
let msgs = &mut self.msgs;
self.prs
.iter_mut()
.filter(|&(id, _)| *id != self_id)
.for_each(|(id, pr)| core.send_append(*id, pr, msgs));
}

RaftCore::maybe_send_append 函数,将待发送的消息,封装成 Message

RaftLog::entries

RaftCore::maybe_send_append 函数需要将发送给 to 的消息封装起来。那么问题来了,该函数的入参并没传入 propose 中封装的 m,那又怎么得到该 m.entries。

依赖的是 RaftLog::entries: 由于上面 Raft::append_entry 将 entries

  • idx: 是我们需要读取的 entries 的起始位置,实际上传入的 pr.next_idx
  • self.last_index() 记录的是 raft_log 最后一个 entry 的位置,

那么,[idx, last_index + 1) 即刚在 Raft::append_entry 函数中写入 raft_log 的 entries 数据,那么调用 RaftLog::slice 函数,即可获得所需 entries 的视图,即 slice。

1
2
3
4
5
6
7
8
9
10
11
12
13
pub fn entries(
&self,
idx: u64,
max_size: impl Into<Option<u64>>,
context: GetEntriesContext,
) -> Result<Vec<Entry>> {
let max_size = max_size.into();
let last = self.last_index();
if idx > last {
return Ok(Vec::new());
}
self.slice(idx, last + 1, max_size, context)
}

RaftCore::prepare_send_entries

在 RaftCore::prepare_send_entries 函数中,将待发送的消息封装成 MessageType::MsgAppend 类型。在 Raft::append_entry 中已填充了 m.entries 的
{term, index},当 m 需要发送给其他 TiKV 节点,则需要填充 leader 中记录的 follower 的 {index, log_term, commit}

  • p.next_idx:复制给 pr 对应的 raft-node 下一条数据的位置,next_index -1 则是上次的位置。
  • log_term: 上次和 followrt 同步的 term
  • commit:当前 raft_log 已经 committed 的数据位置

m.entries.last().unwrap().index 记录的是 append 到 raft_log 的最后一条 entry 的位置,

  • 用这个来更新对应的 pr 的记录,
  • 同时更新和 follower 发送消息的滑动窗口
1
2
3
4
5
6
7
8
9
10
11
12
13
pub fn update_state(&mut self, last: u64) {
match self.state {
ProgressState::Replicate => {
self.optimistic_update(last);
self.ins.add(last); // 右移滑动窗口
}
ProgressState::Probe => self.pause(),
ProgressState::Snapshot => panic!(
"updating progress state in unhandled state {:?}",
self.state
),
}
}

完整的代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fn prepare_send_entries(
&mut self,
m: &mut Message,
pr: &mut Progress,
term: u64,
ents: Vec<Entry>,
) {
m.set_msg_type(MessageType::MsgAppend);
m.index = pr.next_idx - 1; // 对端 committed 的位置
m.log_term = term; // 对端 committed 的 term
m.set_entries(ents.into());
m.commit = self.raft_log.committed; // 本地commit的记录
if !m.entries.is_empty() {
let last = m.entries.last().unwrap().index;
pr.update_state(last);
}
}

RaftCore::maybe_send_append

暂时忽略 snanshot 部分的逻辑,仅看 entries 的逻辑如下:

  1. 先基于 RaftLog::entries 提取出 propose 的 entries;

  2. 再基于 RaftLog::term 提取出 m 对应的 log_term,log_term 需要是连续的?

  3. 基于 RaftCore::prepare_send_entries 和上面提取到的 {entries, term, to} 将 m 封装成待发送给 to 的 MessageType::MsgAppend 类型的消息。

    如果是 m 已经封装好了,则直接走 try_batching,

    1

  4. RaftCore::send 将
    逻辑大致如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
fn maybe_send_append(
&mut self,
to: u64,
pr: &mut Progress,
allow_empty: bool,
msgs: &mut Vec<Message>,
) -> bool {
//...
let mut m = Message::default();
m.to = to;
if pr.pending_request_snapshot != INVALID_INDEX {
//...
} else {
let ents = self.raft_log.entries(
pr.next_idx,
self.max_msg_size,
GetEntriesContext(GetEntriesFor::SendAppend {
to,
term: self.term,
aggressively: !allow_empty,
}),
);
if !allow_empty && ents.as_ref().ok().map_or(true, |e| e.is_empty()) {
return false;
}
let term = self.raft_log.term(pr.next_idx - 1); // 上一个 entry 的 term
match (term, ents) {
(Ok(term), Ok(mut ents)) => {
// 上次已经封装好,则不需要再次封装,直接走 try_batching
if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) {
return true;
}
self.prepare_send_entries(&mut m, pr, term, ents)
}
//...
}
}
self.send(m, msgs);
true
}

RaftCore::send

RaftCore::send 中给 MessageType::MsgAppend 类型的消息 m 添加赋值 leader 的 term,然后将数据添加到消息队列 msgs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
fn send(&mut self, mut m: Message, msgs: &mut Vec<Message>) {
if m.from == INVALID_ID {
m.from = self.id;
}
if m.get_msg_type() == MessageType::MsgRequestVote
|| m.get_msg_type() == MessageType::MsgRequestPreVote
|| m.get_msg_type() == MessageType::MsgRequestVoteResponse
|| m.get_msg_type() == MessageType::MsgRequestPreVoteResponse
{
assert_eq!(m.term, 0);
} else {
assert_ne!(m.term, 0);
if m.get_msg_type() != MessageType::MsgPropose
&& m.get_msg_type() != MessageType::MsgReadIndex
{
m.term = self.term;
}
}
if m.get_msg_type() == MessageType::MsgRequestVote
|| m.get_msg_type() == MessageType::MsgRequestPreVote
{
m.priority = self.priority;
}
msgs.push(m);
}

Raft::handle_append_entries

下面来看看 follower 怎么处理接受到的 MsgAppend 消息。

  1. 如果 follower 当前正在向 leader 请求 snapshot,则拒绝本次来自 leader 的 Append 请求。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
     fn send_request_snapshot(&mut self) {
    let mut m = Message::default();
    m.set_msg_type(MessageType::MsgAppendResponse);
    m.index = self.raft_log.committed;
    m.reject = true;
    m.reject_hint = self.raft_log.last_index();
    m.to = self.leader_id;
    m.request_snapshot = self.pending_request_snapshot;
    self.r.send(m, &mut self.msgs);
    }
  2. 检测接受到的消息 m 是否是过时的消息。m.index 是 follower 在 leader 端记录的上次 committed 的位置,如果 m.index < self.raft.log.committed 则说明接受到的是过时的消息,此时回应 leader 自己真实的 committed 位置(leader 端不会处理这种情况);

  3. 不是过时的消息,则调用 RaftLog::maybe_append 尝试 append 到 follower 的 raft_log 中

    • RaftLog::maybe_append 返回 Some,则表示本次 append 成功,并将返回的 last_index 返回给 leader,用于更新 leader 侧关于此 follower 的 pr.next_idx

    • RaftLog::maybe_append 返回 None:就要拒绝本次 Append 请求。

      maybe_append 返回 None ,说明:follower 中的 m.index 对应的 term 和 m.log_term 不匹配,则说明 follower 中存储着的是旧值,

      通过 RaftLog::find_conflict_by_term 找到 raft_log 中第一个 {hint_index, hint_term},并满足 hint_term <= log_term 的 hint_index。

      拒绝本次请求,再将相关参数设置,让 leader 重新发送一次 Append 请求。

  4. 设置 follower 的 committed 位置

发现一个问题,在 rust 里,proto 定义的类成员变量都是 pub 的,因此直接对成员变量复制和使用 set_xxx 效果是一致的。比如

1
2
to_send.commit = self.raft_log.committed;
to_send.set_commit(self.raft_log.committed);

完整的代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/// For a given message, append the entries to the log.
pub fn handle_append_entries(&mut self, m: &Message) {
// 当前已经存在等待 snapshot, 则不接受新的 entry
if self.pending_request_snapshot != INVALID_INDEX {
self.send_request_snapshot();
return;
}
// 旧消息
if m.index < self.raft_log.committed {
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgAppendResponse);
to_send.to = m.from;
to_send.index = self.raft_log.committed;
to_send.commit = self.raft_log.committed;
self.r.send(to_send, &mut self.msgs);
return;
}

let mut to_send = Message::default();
to_send.to = m.from;
to_send.set_msg_type(MessageType::MsgAppendResponse);

if let Some((_, last_idx)) = self
.raft_log
.maybe_append(m.index, m.log_term, m.commit, &m.entries)
{
to_send.index = last_idx;
} else {
// {m.index, m.log_term} 不存在 follower 的 raft_log
// 拒绝请求
let hint_index = cmp::min(m.index, self.raft_log.last_index());
let (hint_index, hint_term) =
self.raft_log.find_conflict_by_term(hint_index, m.log_term);

if hint_term.is_none() {
fatal!(
self.logger,
"term({index}) must be valid",
index = hint_index
)
}

to_send.index = m.index;
to_send.reject = true;
to_send.reject_hint = hint_index;
to_send.log_term = hint_term.unwrap();
}
to_send.commit = self.raft_log.committed
self.r.send(to_send, &mut self.msgs);
}

下面来阐述下 RaftLog::maybe_append 及相关冲突检测细节。

RaftLog::find_conflict_by_term

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
pub fn find_conflict_by_term(&self, index: u64, term: u64) -> (u64, Option<u64>) {
let mut conflict_index = index;

let last_index = self.last_index();
if index > last_index {
warn!(
self.unstable.logger,
"index({}) is out of range [0, last_index({})] in find_conflict_by_term",
index,
last_index,
);
return (index, None);
}

loop {
match self.term(conflict_index) {
Ok(t) => {
// 直到找到 log_term <= term
if t > term {
conflict_index -= 1
} else {
return (conflict_index, Some(t));
}
}
Err(_) => return (conflict_index, None),
}
}
}

RaftLog::find_conflict

所谓冲突,即 index 相同而 term 不一致。那么我们就对 leaoder 发送过来的 entries 中每个 entry 进行冲突检测。 RaftLog::term 函数根据指定的 index 返回本地 raft_log 该 index 对应的 term,RaftLog::match_term 在此基础上判断指定 index 返回的 term 是否是指定的 term。

  • RaftLog::match_term 函数返回 false,则表示 e.index 对应的 term 不匹配(不存在视为不匹配的一种case)。由 ents 是当前 leader 发送过来消息,那么如果 follower 的 raft_log 中即便之前存在 {e.index, old_term},那么将新的 {e.index, e.term} 组合对应的数据写入 follower 的 raft_log,来覆盖旧的数据。当然,如果 raft_log 中不存在 e.index 对应的 term,那么说明当前 entry 是新的,也是可以直接 append。

  • RaftLog::match_term 函数全部返回 true,即 RaftLog::find_conflict 返回 0,则表示 ents 已经都包含在 follower 的 raft_log 中。

1
2
3
4
5
6
7
8
9
10
11
12
pub fn match_term(&self, idx: u64, term: u64) -> bool {
self.term(idx).map(|t| t == term).unwrap_or(false)
}

pub fn find_conflict(&self, ents: &[Entry]) -> u64 {
for e in ents {
if !self.match_term(e.index, e.term) {
return e.index;
}
}
0
}

RaftLog::maybe_append

RaftLog::maybe_append 函数传入参数中的 {idx, term, committed} 分别是 {m.index, m.log_term, m.committed},由 Raft::bcast_append 函数中设置的参数可知,分别表示:

  • idx: leader 中记录的 follower 已经复制的 entry 的最后一个位置
  • term: 该 idx 对应的 term
  • committed: 复制时 leader 的 committed 位置

因此,这个 {idx, term} 肯定需要在 follower 中能找到对应的匹配值,否则这个 follower 就落后 leader 太多,需要接受 leader 的 snapshot。

  • RaftLog::match_term 返回 false

    表示 leader 中关于这个 follower 记录的 {idx, term} 有误,此时 RaftLpg::maybe_append 返回 None, 再由上层的 Raft::handle_append_entries 拒绝本次的 append 请求;

  • self.match(idx, term) 返回 true

    说明 leader 记录的关于这个 follower 的 {idx, term} pair 在 follower 的 raft_log 中是存在的, 但该 pair 不一定是最新的。比如 当前 leader 曾经同步过 {idx, term} pair 给这个 follower,但是之后 leader 宕机过,后来又重新选上 leader,那么此时关于这个 follower 的 laster 最近的同步记录可能就是这个 {idx, term} pair。(分析不一定对,后续看论文再仔细check)

    基于 RaftLog::find_conflict 判断新加入的 ents 和 follower 的 raft_log 是否有冲突:

    • conflict_idx == 0: 则表示 ents 已全部都在 follower 的 raft_log 中;
    • conglict_idx < self.committed: 已经 committed 之前的 entry 不可能存在冲突,这种情况下则fatal,让 follower 直接 crash,重启同步 leader 的 snapshot;
    • conflict_idx >= self.committed:如果 ents 和 follower 的 raft_log 存在冲突,即 (idx, conflict_idx) 区间已经存在于 follower 的 raft_log 中,则需要将 ents 中前 confict_idx - idx - 1 个元素去掉,再 append 到 follower 的 raft_log 中。

整体代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
pub fn maybe_append( &mut self, idx: u64, term: u64, committed: u64, ents: &[Entry],) -> Option<(u64, u64)> {
if self.match_term(idx, term) {
let conflict_idx = self.find_conflict(ents);
if conflict_idx == 0 {
} else if conflict_idx <= self.committed {
fatal!(
self.unstable.logger,
"entry {} conflict with committed entry {}",
conflict_idx,
self.committed
)
} else {
let start = (conflict_idx - (idx + 1)) as usize;
self.append(&ents[start..]);
if self.persisted > conflict_idx - 1 {
self.persisted = conflict_idx - 1;
}
}
let last_new_index = idx + ents.len() as u64;
self.commit_to(cmp::min(committed, last_new_index));
return Some((conflict_idx, last_new_index));
}
None
}

RaftLog::handle_append_response

存在冲突

下面 leader 开始处理 follower 返回的 MsgAppend 响应。

1. 先来考虑 follower 拒绝了leader 发出的 MsgAppend 请求。

如果 follower 的 raft_log 的尾部存在没有 committed 的部分,那么 leader 需要知道 follower 当前 committed 的具体位置,再向 follower 同步自 committed 之后数据。

最朴素的实现方式即逐个 entry 探测。

1
2
3
4
5
6
For example, if the leader has:

idx 1 2 3 4 5 6 7 8 9
-----------------
term (L) 1 3 3 3 5 5 5 5 5
term (F) 1 1 1 1 2 2

leader 发送了一个 {idx=9, term=5} 的 MsgAppend 请求,收到的回复是 {reject_hint = 6, log_term = 2},那么 leader 将会再次尝试发送 {idx=6, term=5},follower 返回的是 {reject_hint = 5, term = 2},逐个尝试。

leader 的 log_term 总是大于 follower 返回的 reject_term

2. 根据 m.from 取出 leader 中对应的 Process:

  • 更新 follower 的状态为 active,说明 follower 是活跃的;
  • 更新 pr 中该 follower 的 committed 位置。

3. 如果 m.reject 则再探测一次
4. 更新 pr 和 leader 同步的位置;
- Probe: 变成 replicate 模式
- replicate 模式,变成滑动窗口;
-

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
fn handle_append_response(&mut self, m: &Message) {
let mut next_probe_index: u64 = m.reject_hint;
if m.reject && m.log_term > 0 {
next_probe_index = self
.raft_log
.find_conflict_by_term(m.reject_hint, m.log_term)
.0;
}
let pr = match self.prs.get_mut(m.from) {
Some(pr) => pr,
None => return;
};
pr.recent_active = true;

// update followers committed index via append response
pr.update_committed(m.commit);

if m.reject {
if pr.maybe_decr_to(m.index, next_probe_index, m.request_snapshot) {
if pr.state == ProgressState::Replicate {
pr.become_probe();
}
self.send_append(m.from);
}
return;
}

不存在冲突

如果不村子冲突,正常情况下,代码逻辑比较简单。

  • leader 在对应的 pr 中更新 follower 的 commited、append 位置;
  • 如果接受到半数以上的回应,则 commit
  • leader 更换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
fn handle_append_response(&mut self, m: &Message) {
// 确保 follower 还在集群中
let pr = match self.prs.get_mut(m.from) {
Some(pr) => pr,
None => return;
};
pr.recent_active = true; // 更新状态为活跃

// 基于发送的回应消息,更新 follower 当前 committed 的位置
// 这个无论是否发生冲突,follower 都要回应正确自己的 committed
pr.update_committed(m.commit);

// 这个是更新的 follower 的 append 的位置,
// - 这个记录会记录在 pr 中的 matched 字段,
// - pr.next_idx
let old_paused = pr.is_paused();
if !pr.maybe_update(m.index) {
return;
}

match pr.state {
ProgressState::Probe => pr.become_replicate(),
ProgressState::Snapshot => {
if pr.is_snapshot_caught_up() {
pr.become_probe();
}
}
ProgressState::Replicate => pr.ins.free_to(m.get_index()), // 成功append,则释放已经发送消息
}

// leader 自己是先将 proposal 发送出去的
// 如果等待了半数以上的回应,则 commit proposal
if self.maybe_commit() {
if self.should_bcast_commit() {
self.bcast_append()
}
} else if old_paused {
// 如果从 probe -> replicated 模式
// 则先发送消息
self.send_append(m.from);
}

// 尝试发送可能堵塞的消息
self.send_append_aggressively(m.from);

// Transfer leadership is in progress.
// 如果正在更换 leader, 并且 followr 已经和 leader 保持完全同步
// 则发送 MsgTimeout 请求
if Some(m.from) == self.r.lead_transferee {
let last_index = self.r.raft_log.last_index();
let pr = self.prs.get_mut(m.from).unwrap();
if pr.matched == last_index {
info!(
self.logger,
"sent MsgTimeoutNow to {from} after received MsgAppResp",
from = m.from;
);
self.send_timeout_now(m.from);
}
}
}

消息通信就到此位置,下面从 MessageType::MsgTimeoutNow 触发 follower 进入选举状态,对 candidate 无效。