Election
本节主要说如何触发 election,以及怎么 election。
Raft::tick
tick 用于推动 raft 内部时钟。
1 2 3 4 5 6 7 8
| pub fn tick(&mut self) -> bool { match self.state { StateRole::Follower | StateRole::PreCandidate | StateRole::Candidate => { self.tick_election() } StateRole::Leader => self.tick_heartbeat(), } }
|
Raft::tick_election
非 leader 节点会调用 Raft::tick_election 来驱动内部数据,监控是否需要 election。
pass_election_timeout()
函数用来判断选举是否超时;
promotable
字段表示 follower 是否还在集群中
如果一个 alive 的 follower,election 超时,则会让自己进入 MessageType::MsgHup 状态,触发 election。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| pub fn pass_election_timeout(&self) -> bool { self.election_elapsed >= self.randomized_election_timeout }
pub fn tick_election(&mut self) -> bool { self.election_elapsed += 1; if !self.pass_election_timeout() || !self.promotable { return false; }
self.election_elapsed = 0; let m = new_message(INVALID_ID, MessageType::MsgHup, Some(self.id)); let _ = self.step(m); true }
|
Raft::hup
先看当前 (applied, committed]
区间的数据是否包含 confChange:如果包含则当前不能选举(待解释)。
如果 (applied, committed]
没有 confChange 类型的数据,则进入 Raft::campaign
函数开始选举。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| fn hup(&mut self, transfer_leader: bool) { if self.state == StateRole::Leader { return; }
let first_index = match self.raft_log.unstable.maybe_first_index() { Some(idx) => idx, None => self.raft_log.applied + 1, };
let ents = self.raft_log.slice( first_index, self.raft_log.committed + 1, None, GetEntriesContext(GetEntriesFor::TransferLeader), ) .unwrap_or_else(|e| { fatal!(); });
if self.num_pending_conf(&ents) != 0 { return; }; }
|
Raft::campaign
Raft::campaign
函数,开启新一轮的 election,竞争成为 leader。
开启 pre_vote,则 follower 先变成 pre_candidate 状态
pre_candiate 状态的 raft_node,先尝试收集半数以上的心跳,如果能收集到半数以上的心跳,则转变为 candidate 状态。
否则,follower 直接变成 candidate 状态
在 candidate 状态上的节点,需要自增 term,向其他node发送消息,让其他 ndoe 给自己投票,并且强迫其他 node 进入新一轮 election。
在 campaign 函数中,poll
函数中判断自己是否能成为 leader。如果能成为 leader
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| pub fn campaign(&mut self, campaign_type: &'static [u8]) { let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION { self.become_pre_candidate(); (MessageType::MsgRequestPreVote, self.term + 1) } else { self.become_candidate(); (MessageType::MsgRequestVote, self.term) };
let self_id = self.id; if VoteResult::Won == self.poll(self_id, vote_msg, true) { return; }
let (commit, commit_term) = self.raft_log.commit_info(); let mut voters = [0; 7];
for id in self.prs.conf().voters().ids().iter() { if id == self_id { continue; }
voters[voter_cnt] = id; let mut m = new_message(id, vote_msg, None); m.term = term; m.index = self.raft_log.last_index(); m.log_term = self.raft_log.last_term(); m.commit = commit; m.commit_term = commit_term; if campaign_type == CAMPAIGN_TRANSFER { m.context = campaign_type.into(); } self.r.send(m, &mut self.msgs); } }
|
Raft::poll
Raft::poll
函数的参数 {from, vote}
:vote
为 true,则表示接受到了 from
的投票;反之,from
拒绝给自己选票。
- 先将这个投票结果
{from, vote}
记录在 ProgressTracker::votes 字段中;
- 再基于 ProgressTracker::votes 中的结果,统计自己是否赢得本轮 election。
根据 ProgressTracker::tally_votes
的结果:
- VoteResult::Won:获得多数票数的,即主动变成 leader (如果开启了 pre_vote,当前状态是 StateRole::PreCandidate, 则变成 Candidate);
- VoteResult::Lost:失败的节点(即半数以上的节点拒绝给该节点投票)则主动变成 follower(但是此时不知道谁是 leader);
- VoteResult::Pending:即没成功也没失败的节点,如果有其他节点成为 leader后,会广播心跳信息,让自己变成 follower。或者,等待 election_timeout,进入下一轮选举。
代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| fn poll(&mut self, from: u64, t: MessageType, vote: bool) -> VoteResult { self.prs.record_vote(from, vote); let (gr, rj, res) = self.prs.tally_votes();
match res { VoteResult::Won => { if self.state == StateRole::PreCandidate { self.campaign(CAMPAIGN_ELECTION); } else { self.become_leader(); self.bcast_append(); } } VoteResult::Lost => { let term = self.term; self.become_follower(term, INVALID_ID); } VoteResult::Pending => (), } res }
|
Raft::step
每次接受到消息时, 会先进入 Raft::step
函数中进行处理,再根据具体的消息类型进行特殊处理。
- 本节点消息:对于来自本节点的消息
m
,他的 m.term
是 0,直接根据其消息类型 m.get_message_type()
进行处理。 比如 leader 节点处理 proposal 时。
- 其他节点消息:对于来自己其他节点的消息
m
,从 raft 论文可知,要区分 m.term < self.term
和 m.term > self.term
和 m.term == self.term
三种case进行处理
m.term > self.term
这种情况下,如果消息类型还是:
MessageType::MsgAppend
MessageType::MsgHeartbeat
MessageType::MsgSnapshot
那么说明当前集群已经产生了一个新 leader,此时接收到的消息是用来广播,告知其他节点成为 m.from
的 follower。
如果接受到的消息类型是:
MessageType::MsgRequestPreVote
MessageType::MsgRequestVote
说明集群有一个节点 m.from
率先发起了新一轮 election。但是接受到这个请求的一端 m.to
需要判断是否扰动,不能盲目的给 m.from
投票:如果当前 leader 仍存在并且在 lease 之中,那么接受到此请求的节点就忽略本次的投票请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| pub fn step(&mut self, m: Message) -> Result<()> { if m.term == 0 { } else if m.term > self.term { if m.get_msg_type() == MessageType::MsgRequestVote || m.get_msg_type() == MessageType::MsgRequestPreVote { let force = m.context == CAMPAIGN_TRANSFER; let in_lease = self.check_quorum && self.leader_id != INVALID_ID && self.election_elapsed < self.election_timeout; if !force && in_lease { return Ok(()); } } if m.get_msg_type() == MessageType::MsgRequestPreVote || (m.get_msg_type() == MessageType::MsgRequestPreVoteResponse && !m.reject) { } else { if m.get_msg_type() == MessageType::MsgAppend || m.get_msg_type() == MessageType::MsgHeartbeat || m.get_msg_type() == MessageType::MsgSnapshot { self.become_follower(m.term, m.from); } else { self.become_follower(m.term, INVALID_ID); } } } }
|
m.term < self.term
如果消息类型是 MessageType::MsgHeartbeat
or MessageType::MsgAppend
todo
如果消息类型是 MessageType::MsgRequestPreVote
,则直接向缓存区添加一个拒接的提示。
其他的消息类型都直接忽略。
return
上述三种case,只有 m.term < self.term
会直接return。其他的两种不会,仍然会根据具体消息类型继续处理。MessageType::MsgAppend
类型的已经讲解,本次主要是讲解投票。
接受到 MsgVote 请求后, 需要先判断自己还是否有投票权:
- 上次已经投递给这个节点,或者
- 当前没有投票,且没有所属的 leader,比如调用的
self.become_follower(m.term, INVALID_ID)
就会这样,或者
- 请求是 MessageType::MsgRequestPreVote 并且 m.term > self.term
那么说明当前节点具有投票的能力,如果在此基础上,仍然同时满足以下两个条件:
- 当前节点的 raft_log 和 m 的一样新,或者对方选举时的优先级更高,那么就会把票投给 m,
- 否则就会拒绝
代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| match m.get_msg_type() { MessageType::MsgRequestVote | MessageType::MsgRequestPreVote => { let can_vote = (self.vote == m.from) || (self.vote == INVALID_ID && self.leader_id == INVALID_ID) || (m.get_msg_type() == MessageType::MsgRequestPreVote && m.term > self.term); if can_vote && self.raft_log.is_up_to_date(m.index, m.log_term) && (m.index > self.raft_log.last_index() || m.priority >= self.priority) { let mut to_send = new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None); to_send.reject = false; to_send.term = m.term; self.r.send(to_send, &mut self.msgs); if m.get_msg_type() == MessageType::MsgRequestVote { self.election_elapsed = 0; self.vote = m.from; } } else { let mut to_send = new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None); to_send.reject = true; to_send.term = self.term; let (commit, commit_term) = self.raft_log.commit_info(); to_send.commit = commit; to_send.commit_term = commit_term; self.r.send(to_send, &mut self.msgs); self.maybe_commit_by_vote(&m); } } }
|
发送端接受到回应
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| MessageType::MsgRequestPreVoteResponse | MessageType::MsgRequestVoteResponse => { if (self.state == StateRole::PreCandidate && m.get_msg_type() != MessageType::MsgRequestPreVoteResponse) || (self.state == StateRole::Candidate && m.get_msg_type() != MessageType::MsgRequestVoteResponse) { return Ok(()); }
self.poll(m.from, m.get_msg_type(), !m.reject); self.maybe_commit_by_vote(&m); }
|
maybe_commit_by_vote
节点接受到对方的 MsgVote 信息之后,会尝试提取出里面的 commit 信息来提交自己的 raft_log。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| fn maybe_commit_by_vote(&mut self, m: &Message) { if m.commit == 0 || m.commit_term == 0 { return; } let last_commit = self.raft_log.committed; if m.commit <= last_commit || self.state == StateRole::Leader { return; } if !self.raft_log.maybe_commit(m.commit, m.commit_term) { return; }
if self.state != StateRole::Candidate && self.state != StateRole::PreCandidate { return; }
let ents = self.raft_log.slice( last_commit + 1, self.raft_log.committed + 1, None, GetEntriesContext(GetEntriesFor::CommitByVote), ).unwrap_or_else(|e| { fatal!(); }); if self.num_pending_conf(&ents) != 0 { let term = self.term; self.become_follower(term, INVALID_ID); } }
|