FsmScheduler trait FsmScheduler 有两个具体的类:
NormalScheduler:针对 FsmTypes::Normal 类型的 Fsm,传入的实际上即 Box
ControlScheduler:针对 FsmType::Control 类型的 Fsm,传入的即 Box
这两个结构内部完全一致,只是针对不同的FSm类型:
1 2 3 4 5 6 7 8 9 pub enum FsmTypes <N, C> { Normal (Box <N>), Control (Box <C>), Empty, } impl_sched!(NormalScheduler, FsmTypes::Normal, Fsm = N); impl_sched!(ControlScheduler, FsmTypes::Control, Fsm = C);
结构如下。
impl_sched 每个 FsmScheduler 内部有两个发送端:
sender:负责 Priority::Normal 的 Fsm
low_sender:负责 Priority::Low 类型的 Fsm 下面不加区分,统一称呼 sender。 函数 FsmScheduler::schedule(fsm) 用于将 fsm 通过 sender 发送到到 channel 中,对应的接收端 fsm_receiver 在 Poller::fetch_fsm中接收后,在 Poller::poll 函数中处理。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 macro_rules! impl_sched { ($name:ident, $ty:path, Fsm = $fsm:tt) => { pub struct $name<N, C> { sender: channel::Sender<FsmTypes<N, C>>, low_sender: channel::Sender<FsmTypes<N, C>>, } impl <N, C> Clone for $name<N, C> { #[inline] fn clone (&self ) -> $name<N, C> { $name { sender: self .sender.clone (), low_sender: self .low_sender.clone (), } } } impl <N, C> FsmScheduler for $name<N, C> where $fsm: Fsm, { type Fsm = $fsm; #[inline] fn schedule (&self , fsm: Box <Self ::Fsm>) { let sender = match fsm.get_priority () { Priority::Normal => &self .sender, Priority::Low => &self .low_sender, }; match sender.send ($ty (fsm)) { Ok (()) => {} Err (SendError ($ty (fsm))) => warn!("failed to schedule fsm {:p}" , fsm), _ => unreachable! (), } } fn shutdown (&self ) { for _ in 0 ..256 { let _ = self .sender.send (FsmTypes::Empty); let _ = self .low_sender.send (FsmTypes::Empty); } } } }; }
FsmState FsmState记录的是 Fsm 的状态、内部数据
status 内部有三种状态值:
NOTIFYSTATE_NOTIFIED:已通知 Fsm 去处理消息
NOTIFYSTATE_IDLE:当前 Fsm 是空闲的
NOTIFYSTATE_DROP:需要删除这个 Fsm
data:存储 Fsm 对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 pub struct FsmState <N> { status: AtomicUsize, data: AtomicPtr<N>, state_cnt: Arc<AtomicUsize>, } impl <N: Fsm> FsmState<N> { pub fn new (data: Box <N>, state_cnt: Arc<AtomicUsize>) -> FsmState<N> { state_cnt.fetch_add (1 , Ordering::Relaxed); FsmState { status: AtomicUsize::new (NOTIFYSTATE_IDLE), data: AtomicPtr::new (Box ::into_raw (data)), state_cnt, } } }
task_fsm FsmState::task_fsm 函数用于从 FsmState::data 中提取出 Fsm。如果已经被提取了则返回None。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 pub fn take_fsm (&self ) -> Option <Box <N>> { let res = self .status.compare_exchange ( NOTIFYSTATE_IDLE, NOTIFYSTATE_NOTIFIED, Ordering::AcqRel, Ordering::Acquire, ); if res.is_err () { return None ; } let p = self .data.swap (ptr::null_mut (), Ordering::AcqRel); if !p.is_null () { Some (unsafe { Box ::from_raw (p) }) } else { panic! ("inconsistent status and data, something should be wrong." ); } }
notify 通知FsmState::data 中的 Fsm 去处理消息:
从 FsmState::data 中提取出 Fsm
基于传入的 FsmScheduler 对象进行调度,即使用 scheduler 中的 sender 发送到 Poller 中,让后台Poller::poll 去执行 Fsm。
代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 pub fn notify <S: FsmScheduler<Fsm = N>>( &self , scheduler: &S, mailbox: Cow<'_ , BasicMailbox<N>>, ) { match self .take_fsm () { None => {} Some (mut n) => { n.set_mailbox (mailbox); scheduler.schedule (n); } } }
Clear 清理状态和数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 #[inline] pub fn clear (&self ) { match self .status.swap (NOTIFYSTATE_DROP, Ordering::AcqRel) { NOTIFYSTATE_NOTIFIED | NOTIFYSTATE_DROP => return , _ => {} } let ptr = self .data.swap (ptr::null_mut (), Ordering::SeqCst); if !ptr.is_null () { unsafe { Box ::from_raw (ptr); } } }
BasicMailbox
sender:这里面的 sender 传递的是 Fsm::Message。
state:用来保存 Fsm
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 pub struct BasicMailbox <Owner: Fsm> { sender: mpsc::LooseBoundedSender<Owner::Message>, state: Arc<FsmState<Owner>>, } impl <Owner: Fsm> BasicMailbox<Owner> { #[inline] pub fn new ( sender: mpsc::LooseBoundedSender<Owner::Message>, fsm: Box <Owner>, state_cnt: Arc<AtomicUsize>, ) -> BasicMailbox<Owner> { BasicMailbox { sender, state: Arc::new (FsmState::new (fsm, state_cnt)), } } }
send send 函数的逻辑:
self.sender.try_send(msg) 发送消息给对应的 Fsm,因为 self.sender 对应的接收端在 Fsm 中;
self.state.notify(scheduler, Cow::Borrowed(self)) 则是唤醒对应的 Fsm(由 Scheduler 类型决定) ,让其 FsmDelegate 去处理该msg。 即在 Poller线程中,先提取出唤醒的 Fsm,再在 Fsm::handle_msg 中提取出msg,处理该Msg。代码如下。
1 2 3 4 5 6 7 8 9 10 #[inline] pub fn try_send <S: FsmScheduler<Fsm = Owner>>( &self , msg: Owner::Message, scheduler: &S, ) -> Result <(), TrySendError<Owner::Message>> { self .sender.try_send (msg)?; self .state.notify (scheduler, Cow::Borrowed (self )); Ok (()) }
close 关闭channl的发送端并清理 state 中数据。
1 2 3 4 pub (crate ) fn close (&self ) { self .sender.close_sender (); self .state.clear (); }
Router Router 统一管理这些 BasicMailbox 的发送端,由 { region_id, mailbox } 之间进行映射。因此,基于 Router 就可以实现将从 RPC Serveice::batch_raft or Serveice::raft接收到的消息发送给指定 raftstore 中指定的 region。
1 2 3 4 5 6 7 8 9 10 pub struct Router <N: Fsm, C: Fsm, Ns, Cs> { normals: Arc<Mutex<NormalMailMap<N>>>, caches: Cell<LruCache<u64 , BasicMailbox<N>>>, pub (super ) control_box: BasicMailbox<C>, pub (crate ) normal_scheduler: Ns, pub (crate ) control_scheduler: Cs, state_cnt: Arc<AtomicUsize>, shutdown: Arc<AtomicBool>, }
register register 、register_all 函数用于添加 {region_id, mail_box} ,其中addr实际上就是region_id。 因此,每当新生成region时,就需要调用 router.register 来建立映射关系。自然,当 RegionMerge 时,也需要 router.release 即将不存在的映射关系。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 pub fn register (&self , addr: u64 , mailbox: BasicMailbox<N>) { let mut normals = self .normals.lock ().unwrap (); if let Some (mailbox) = normals.map.insert (addr, mailbox) { mailbox.close (); } normals .alive_cnt .store (normals.map.len (), Ordering::Relaxed); } pub fn register_all (&self , mailboxes: Vec <(u64 , BasicMailbox<N>)>) { let mut normals = self .normals.lock ().unwrap (); normals.map.reserve (mailboxes.len ()); for (addr, mailbox) in mailboxes { if let Some (m) = normals.map.insert (addr, mailbox) { m.close (); } } normals .alive_cnt .store (normals.map.len (), Ordering::Relaxed); }
比如,在启动系统时,RaftBatchSystem::start_system 函数中就调用了 Router::register_all 函数来中注册启动时已存在的 {region, mailbox} 的映射关系。
1 self .router.register_all (mailboxes);
在产生新的region时,也会调用 Router::register 函数来注册这个关系,比如在 on_ready_split_region 函数中:
1 2 let mailbox = BasicMailbox::new (sender, new_peer, self .ctx.router.state_cnt ().clone ());self .ctx.router.register (new_region_id, mailbox);
check_do 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 #[inline] fn check_do <F, R>(&self , addr: u64 , mut f: F) -> CheckDoResult<R>where F: FnMut (&BasicMailbox<N>) -> Option <R>, { let caches = unsafe { &mut *self .caches.as_ptr () }; let mut connected = true ; if let Some (mailbox) = caches.get (&addr) { match f (mailbox) { Some (r) => return CheckDoResult::Valid (r), None => { connected = false ; } } } let (cnt, mailbox) = { let mut boxes = self .normals.lock ().unwrap (); let cnt = boxes.map.len (); let b = match boxes.map.get_mut (&addr) { Some (mailbox) => mailbox.clone (), None => { drop (boxes); if !connected { caches.remove (&addr); } return CheckDoResult::NotExist; } }; (cnt, b) }; if cnt > caches.capacity () || cnt < caches.capacity () / 2 { caches.resize (cnt); } let res = f (&mailbox); match res { Some (r) => { caches.insert (addr, mailbox); CheckDoResult::Valid (r) } None => { if !connected { caches.remove (&addr); } CheckDoResult::Invalid } } }
send send 方法中 MailBox::try_send 传递的是 NormalScheduler,即触发 PeerFsmDelegate 去处理此处send函数中的 msg。 而 Router::check_do函数作用:
先基于 addr 找到对应的 mailbox;
然后调用对应的 MailBox::try_send 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 pub fn try_send ( &self , addr: u64 , msg: N::Message, ) -> Either<Result <(), TrySendError<N::Message>>, N::Message> { let mut msg = Some (msg); let res = self .check_do (addr, |mailbox| { let m = msg.take ().unwrap (); match mailbox.try_send (m, &self .normal_scheduler) { Ok (()) => Some (Ok (())), r @ Err (TrySendError::Full (_)) => { CHANNEL_FULL_COUNTER_VEC .with_label_values (&["normal" ]) .inc (); Some (r) } Err (TrySendError::Disconnected (m)) => { msg = Some (m); None } } }); match res { CheckDoResult::Valid (r) => Either::Left (r), CheckDoResult::Invalid => Either::Left (Err (TrySendError::Disconnected (msg.unwrap ()))), CheckDoResult::NotExist => Either::Right (msg.unwrap ()), } }
send_control send_control 函数发送的 msg,最终会触发 StoreFsmDelegate 来处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 #[inline] pub fn send_control (&self , msg: C::Message) -> Result <(), TrySendError<C::Message>> { match self .control_box.try_send (msg, &self .control_scheduler) { Ok (()) => Ok (()), r @ Err (TrySendError::Full (_)) => { CHANNEL_FULL_COUNTER_VEC .with_label_values (&["control" ]) .inc (); r } r => r, } }
broadcast_normal 广播,即通知所有的 PeerFsm,这里的消息是由 msg_gen() 函数生成的。
1 2 3 4 5 6 7 pub fn broadcast_normal (&self , mut msg_gen: impl FnMut () -> N::Message) { let mailboxes = self .normals.lock ().unwrap (); for mailbox in mailboxes.map.values () { let _ = mailbox.force_send (msg_gen (), &self .normal_scheduler); } }