raft: Election

Election

本节主要说如何触发 election,以及怎么 election。

Raft::tick

tick 用于推动 raft 内部时钟。

1
2
3
4
5
6
7
8
pub fn tick(&mut self) -> bool {
match self.state {
StateRole::Follower | StateRole::PreCandidate | StateRole::Candidate => {
self.tick_election()
}
StateRole::Leader => self.tick_heartbeat(),
}
}

Raft::tick_election

非 leader 节点会调用 Raft::tick_election 来驱动内部数据,监控是否需要 election。

  • pass_election_timeout() 函数用来判断选举是否超时;
  • promotable 字段表示 follower 是否还在集群中

如果一个 alive 的 follower,election 超时,则会让自己进入 MessageType::MsgHup 状态,触发 election。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
pub fn pass_election_timeout(&self) -> bool {
self.election_elapsed >= self.randomized_election_timeout
}

pub fn tick_election(&mut self) -> bool {
self.election_elapsed += 1;
if !self.pass_election_timeout() || !self.promotable {
return false;
}

self.election_elapsed = 0;
let m = new_message(INVALID_ID, MessageType::MsgHup, Some(self.id));
let _ = self.step(m);
true
}

Raft::hup

先看当前 (applied, committed] 区间的数据是否包含 confChange:如果包含则当前不能选举(待解释)。

如果 (applied, committed] 没有 confChange 类型的数据,则进入 Raft::campaign 函数开始选举。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
fn hup(&mut self, transfer_leader: bool) {
if self.state == StateRole::Leader {
return;
}

let first_index = match self.raft_log.unstable.maybe_first_index() {
Some(idx) => idx,
None => self.raft_log.applied + 1,
};

// 取出已 committd 但是尚未 applied
let ents = self.raft_log.slice(
first_index,
self.raft_log.committed + 1,
None,
GetEntriesContext(GetEntriesFor::TransferLeader),
)
.unwrap_or_else(|e| { fatal!(/* ... */); });

// 不能包含 confChange
if self.num_pending_conf(&ents) != 0 {
return;
};
//...
}

Raft::campaign

Raft::campaign 函数,开启新一轮的 election,竞争成为 leader。

  • 开启 pre_vote,则 follower 先变成 pre_candidate 状态

    pre_candiate 状态的 raft_node,先尝试收集半数以上的心跳,如果能收集到半数以上的心跳,则转变为 candidate 状态。

  • 否则,follower 直接变成 candidate 状态

    在 candidate 状态上的节点,需要自增 term,向其他node发送消息,让其他 ndoe 给自己投票,并且强迫其他 node 进入新一轮 election。

在 campaign 函数中,poll 函数中判断自己是否能成为 leader。如果能成为 leader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
pub fn campaign(&mut self, campaign_type: &'static [u8]) {
let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION {
self.become_pre_candidate();
// 让其他 node 率先进入 candiate 阶段
(MessageType::MsgRequestPreVote, self.term + 1)
} else {
self.become_candidate();
(MessageType::MsgRequestVote, self.term)
};

// 如果给自己投票后能变成 leader,说明集群只有一个 node
let self_id = self.id;
if VoteResult::Won == self.poll(self_id, vote_msg, true) {
return;
}

let (commit, commit_term) = self.raft_log.commit_info(); // 当前自己的 commit 信息
let mut voters = [0; 7];

// 向所有其他 node 发送投票请求
for id in self.prs.conf().voters().ids().iter() {
if id == self_id {
continue;
}

voters[voter_cnt] = id;
let mut m = new_message(id, vote_msg, None);
m.term = term;
m.index = self.raft_log.last_index();
m.log_term = self.raft_log.last_term();
m.commit = commit;
m.commit_term = commit_term;
if campaign_type == CAMPAIGN_TRANSFER {
m.context = campaign_type.into();
}
self.r.send(m, &mut self.msgs);
}
}

Raft::poll

Raft::poll 函数的参数 {from, vote}vote 为 true,则表示接受到了 from 的投票;反之,from 拒绝给自己选票。

  • 先将这个投票结果 {from, vote} 记录在 ProgressTracker::votes 字段中;
  • 再基于 ProgressTracker::votes 中的结果,统计自己是否赢得本轮 election。

根据 ProgressTracker::tally_votes 的结果:

  • VoteResult::Won:获得多数票数的,即主动变成 leader (如果开启了 pre_vote,当前状态是 StateRole::PreCandidate, 则变成 Candidate);
  • VoteResult::Lost:失败的节点(即半数以上的节点拒绝给该节点投票)则主动变成 follower(但是此时不知道谁是 leader);
  • VoteResult::Pending:即没成功也没失败的节点,如果有其他节点成为 leader后,会广播心跳信息,让自己变成 follower。或者,等待 election_timeout,进入下一轮选举。

代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fn poll(&mut self, from: u64, t: MessageType, vote: bool) -> VoteResult {
self.prs.record_vote(from, vote);
let (gr, rj, res) = self.prs.tally_votes();

match res {
VoteResult::Won => {
if self.state == StateRole::PreCandidate {
self.campaign(CAMPAIGN_ELECTION);
} else {
self.become_leader(); // 先变成 leader
self.bcast_append(); // 再给其他节点广播空消息
}
}
VoteResult::Lost => {
let term = self.term;
self.become_follower(term, INVALID_ID);
}
VoteResult::Pending => (),
}
res
}

Raft::step

每次接受到消息时, 会先进入 Raft::step 函数中进行处理,再根据具体的消息类型进行特殊处理。

  • 本节点消息:对于来自本节点的消息m,他的 m.term 是 0,直接根据其消息类型 m.get_message_type() 进行处理。 比如 leader 节点处理 proposal 时。
  • 其他节点消息:对于来自己其他节点的消息m,从 raft 论文可知,要区分 m.term < self.termm.term > self.termm.term == self.term 三种case进行处理

m.term > self.term

这种情况下,如果消息类型还是:

  • MessageType::MsgAppend
  • MessageType::MsgHeartbeat
  • MessageType::MsgSnapshot

那么说明当前集群已经产生了一个新 leader,此时接收到的消息是用来广播,告知其他节点成为 m.from 的 follower。

如果接受到的消息类型是:

  • MessageType::MsgRequestPreVote
  • MessageType::MsgRequestVote

说明集群有一个节点 m.from 率先发起了新一轮 election。但是接受到这个请求的一端 m.to 需要判断是否扰动,不能盲目的给 m.from 投票:如果当前 leader 仍存在并且在 lease 之中,那么接受到此请求的节点就忽略本次的投票请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
pub fn step(&mut self, m: Message) -> Result<()> {
if m.term == 0 {
// local message
} else if m.term > self.term {
if m.get_msg_type() == MessageType::MsgRequestVote
|| m.get_msg_type() == MessageType::MsgRequestPreVote
{
let force = m.context == CAMPAIGN_TRANSFER; // 不是集群强制切换 leader
let in_lease = self.check_quorum
&& self.leader_id != INVALID_ID
&& self.election_elapsed < self.election_timeout;
// 如果是扰动,则忽略本次投票请求
if !force && in_lease {
return Ok(());
}
}
if m.get_msg_type() == MessageType::MsgRequestPreVote
|| (m.get_msg_type() == MessageType::MsgRequestPreVoteResponse && !m.reject)
{
// 这种情况无需操作
} else {
if m.get_msg_type() == MessageType::MsgAppend
|| m.get_msg_type() == MessageType::MsgHeartbeat
|| m.get_msg_type() == MessageType::MsgSnapshot
{
// 成为 m.from 的 follower
self.become_follower(m.term, m.from);
} else {
// 不知道成为谁的 follower
self.become_follower(m.term, INVALID_ID);
}
}
}
//...
}

m.term < self.term

如果消息类型是 MessageType::MsgHeartbeat or MessageType::MsgAppend

todo

如果消息类型是 MessageType::MsgRequestPreVote,则直接向缓存区添加一个拒接的提示。

其他的消息类型都直接忽略。

return

上述三种case,只有 m.term < self.term 会直接return。其他的两种不会,仍然会根据具体消息类型继续处理。MessageType::MsgAppend 类型的已经讲解,本次主要是讲解投票。

接受到 MsgVote 请求后, 需要先判断自己还是否有投票权:

  • 上次已经投递给这个节点,或者
  • 当前没有投票,且没有所属的 leader,比如调用的 self.become_follower(m.term, INVALID_ID) 就会这样,或者
  • 请求是 MessageType::MsgRequestPreVote 并且 m.term > self.term

那么说明当前节点具有投票的能力,如果在此基础上,仍然同时满足以下两个条件:

  • 当前节点的 raft_log 和 m 的一样新,或者对方选举时的优先级更高,那么就会把票投给 m,
  • 否则就会拒绝

代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
match m.get_msg_type() {
MessageType::MsgRequestVote | MessageType::MsgRequestPreVote => {
let can_vote = (self.vote == m.from) ||
(self.vote == INVALID_ID && self.leader_id == INVALID_ID) ||
(m.get_msg_type() == MessageType::MsgRequestPreVote && m.term > self.term);

if can_vote
&& self.raft_log.is_up_to_date(m.index, m.log_term)
&& (m.index > self.raft_log.last_index() || m.priority >= self.priority)
{
let mut to_send =
new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None);
to_send.reject = false;
to_send.term = m.term;
self.r.send(to_send, &mut self.msgs);
if m.get_msg_type() == MessageType::MsgRequestVote {
// Only record real votes.
self.election_elapsed = 0;
self.vote = m.from; // 记录投票
}
} else {
let mut to_send =
new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None);
to_send.reject = true;
to_send.term = self.term;
let (commit, commit_term) = self.raft_log.commit_info();
to_send.commit = commit;
to_send.commit_term = commit_term;
self.r.send(to_send, &mut self.msgs);
self.maybe_commit_by_vote(&m);
}
}
//...
}

发送端接受到回应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
MessageType::MsgRequestPreVoteResponse | MessageType::MsgRequestVoteResponse => {
// 类型不匹配,则忽略
if (self.state == StateRole::PreCandidate
&& m.get_msg_type() != MessageType::MsgRequestPreVoteResponse)
|| (self.state == StateRole::Candidate
&& m.get_msg_type() != MessageType::MsgRequestVoteResponse)
{
return Ok(());
}

// 接送到对端的投票,来看自己能成为什么角色
self.poll(m.from, m.get_msg_type(), !m.reject);
self.maybe_commit_by_vote(&m);
}

maybe_commit_by_vote

节点接受到对方的 MsgVote 信息之后,会尝试提取出里面的 commit 信息来提交自己的 raft_log。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/// Commits the logs using commit info in vote message.
fn maybe_commit_by_vote(&mut self, m: &Message) {
if m.commit == 0 || m.commit_term == 0 {
return;
}
let last_commit = self.raft_log.committed;
if m.commit <= last_commit || self.state == StateRole::Leader {
return;
}
// commit
if !self.raft_log.maybe_commit(m.commit, m.commit_term) {
return;
}

if self.state != StateRole::Candidate && self.state != StateRole::PreCandidate {
return;
}

// 取出刚才 committed 的数据
let ents = self.raft_log.slice(
last_commit + 1,
self.raft_log.committed + 1,
None,
GetEntriesContext(GetEntriesFor::CommitByVote),
).unwrap_or_else(|e| { fatal!(); });

// 转变为 follower
if self.num_pending_conf(&ents) != 0 {
let term = self.term;
self.become_follower(term, INVALID_ID);
}
}