tikv: Router 设计详解

FsmScheduler

trait FsmScheduler 有两个具体的类:

  • NormalScheduler:针对 FsmTypes::Normal 类型的 Fsm,传入的实际上即 Box
  • ControlScheduler:针对 FsmType::Control 类型的 Fsm,传入的即 Box

这两个结构内部完全一致,只是针对不同的FSm类型:

1
2
3
4
5
6
7
8
9
pub enum FsmTypes<N, C> {
Normal(Box<N>),
Control(Box<C>),
// Used as a signal that scheduler should be shutdown.
Empty,
}

impl_sched!(NormalScheduler, FsmTypes::Normal, Fsm = N);
impl_sched!(ControlScheduler, FsmTypes::Control, Fsm = C);

结构如下。

tikv-router-1

impl_sched

每个 FsmScheduler 内部有两个发送端:

  • sender:负责 Priority::Normal 的 Fsm
  • low_sender:负责 Priority::Low 类型的 Fsm
    下面不加区分,统一称呼 sender。
    函数 FsmScheduler::schedule(fsm) 用于将 fsm 通过 sender 发送到到 channel 中,对应的接收端 fsm_receiver 在 Poller::fetch_fsm中接收后,在 Poller::poll 函数中处理。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    // A macro to introduce common definition of scheduler.
    macro_rules! impl_sched {
    ($name:ident, $ty:path, Fsm = $fsm:tt) => {
    pub struct $name<N, C> {
    sender: channel::Sender<FsmTypes<N, C>>,
    low_sender: channel::Sender<FsmTypes<N, C>>,
    }

    impl<N, C> Clone for $name<N, C> {
    #[inline]
    fn clone(&self) -> $name<N, C> {
    $name {
    sender: self.sender.clone(),
    low_sender: self.low_sender.clone(),
    }
    }
    }

    impl<N, C> FsmScheduler for $name<N, C>
    where
    $fsm: Fsm,
    {
    type Fsm = $fsm;

    #[inline]
    fn schedule(&self, fsm: Box<Self::Fsm>) {
    let sender = match fsm.get_priority() {
    Priority::Normal => &self.sender,
    Priority::Low => &self.low_sender,
    };
    match sender.send($ty(fsm)) {
    Ok(()) => {}
    // TODO: use debug instead.
    Err(SendError($ty(fsm))) => warn!("failed to schedule fsm {:p}", fsm),
    _ => unreachable!(),
    }
    }

    fn shutdown(&self) {
    // TODO: close it explicitly once it's supported.
    // Magic number, actually any number greater than poll pool size works.
    for _ in 0..256 {
    let _ = self.sender.send(FsmTypes::Empty);
    let _ = self.low_sender.send(FsmTypes::Empty);
    }
    }
    }
    };
    }

FsmState

FsmState记录的是 Fsm 的状态、内部数据

  • status 内部有三种状态值:
    • NOTIFYSTATE_NOTIFIED:已通知 Fsm 去处理消息
    • NOTIFYSTATE_IDLE:当前 Fsm 是空闲的
    • NOTIFYSTATE_DROP:需要删除这个 Fsm
  • data:存储 Fsm 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
pub struct FsmState<N> {
status: AtomicUsize,
data: AtomicPtr<N>,
state_cnt: Arc<AtomicUsize>,
}

impl<N: Fsm> FsmState<N> {
pub fn new(data: Box<N>, state_cnt: Arc<AtomicUsize>) -> FsmState<N> {
state_cnt.fetch_add(1, Ordering::Relaxed);
FsmState {
status: AtomicUsize::new(NOTIFYSTATE_IDLE),
data: AtomicPtr::new(Box::into_raw(data)),
state_cnt,
}
}
//...
}

task_fsm

FsmState::task_fsm 函数用于从 FsmState::data 中提取出 Fsm。如果已经被提取了则返回None。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
pub fn take_fsm(&self) -> Option<Box<N>> {
let res = self.status.compare_exchange(
NOTIFYSTATE_IDLE,
NOTIFYSTATE_NOTIFIED,
Ordering::AcqRel,
Ordering::Acquire,
);
if res.is_err() {
return None;
}

let p = self.data.swap(ptr::null_mut(), Ordering::AcqRel);
if !p.is_null() {
Some(unsafe { Box::from_raw(p) })
} else {
panic!("inconsistent status and data, something should be wrong.");
}
}

notify

通知FsmState::data 中的 Fsm 去处理消息:

  • 从 FsmState::data 中提取出 Fsm
  • 基于传入的 FsmScheduler 对象进行调度,即使用 scheduler 中的 sender 发送到 Poller 中,让后台Poller::poll 去执行 Fsm。

代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
pub fn notify<S: FsmScheduler<Fsm = N>>(
&self,
scheduler: &S,
mailbox: Cow<'_, BasicMailbox<N>>,
) {
match self.take_fsm() {
None => {}
Some(mut n) => {
n.set_mailbox(mailbox);
scheduler.schedule(n);
}
}
}

Clear

清理状态和数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#[inline]
pub fn clear(&self) {
match self.status.swap(NOTIFYSTATE_DROP, Ordering::AcqRel) {
NOTIFYSTATE_NOTIFIED | NOTIFYSTATE_DROP => return,
_ => {}
}

let ptr = self.data.swap(ptr::null_mut(), Ordering::SeqCst);
if !ptr.is_null() {
unsafe {
Box::from_raw(ptr); // 基于 RAIII 释放资源
}
}
}

BasicMailbox

  • sender:这里面的 sender 传递的是 Fsm::Message。
  • state:用来保存 Fsm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/// A basic mailbox.
///
/// Every mailbox should have one and only one owner, who will receive all
/// messages sent to this mailbox.
///
/// When a message is sent to a mailbox, its owner will be checked whether it's
/// idle. An idle owner will be scheduled via `FsmScheduler` immediately, which
/// will drive the fsm to poll for messages.
pub struct BasicMailbox<Owner: Fsm> {
sender: mpsc::LooseBoundedSender<Owner::Message>,
state: Arc<FsmState<Owner>>,
}

impl<Owner: Fsm> BasicMailbox<Owner> {
#[inline]
pub fn new(
sender: mpsc::LooseBoundedSender<Owner::Message>,
fsm: Box<Owner>,
state_cnt: Arc<AtomicUsize>,
) -> BasicMailbox<Owner> {
BasicMailbox {
sender,
state: Arc::new(FsmState::new(fsm, state_cnt)),
}
}
}

send

send 函数的逻辑:

  • self.sender.try_send(msg) 发送消息给对应的 Fsm,因为 self.sender 对应的接收端在 Fsm 中;
  • self.state.notify(scheduler, Cow::Borrowed(self)) 则是唤醒对应的 Fsm(由 Scheduler 类型决定) ,让其 FsmDelegate 去处理该msg。
    即在 Poller线程中,先提取出唤醒的 Fsm,再在 Fsm::handle_msg 中提取出msg,处理该Msg。代码如下。
1
2
3
4
5
6
7
8
9
10
#[inline]
pub fn try_send<S: FsmScheduler<Fsm = Owner>>(
&self,
msg: Owner::Message,
scheduler: &S,
) -> Result<(), TrySendError<Owner::Message>> {
self.sender.try_send(msg)?;
self.state.notify(scheduler, Cow::Borrowed(self));
Ok(())
}

close

关闭channl的发送端并清理 state 中数据。

1
2
3
4
pub(crate) fn close(&self) {
self.sender.close_sender();
self.state.clear();
}

Router

Router 统一管理这些 BasicMailbox 的发送端,由 { region_id, mailbox } 之间进行映射。因此,基于 Router 就可以实现将从 RPC Serveice::batch_raft or Serveice::raft接收到的消息发送给指定 raftstore 中指定的 region。

1
2
3
4
5
6
7
8
9
10
pub struct Router<N: Fsm, C: Fsm, Ns, Cs> {
normals: Arc<Mutex<NormalMailMap<N>>>,
caches: Cell<LruCache<u64, BasicMailbox<N>>>,
pub(super) control_box: BasicMailbox<C>,
pub(crate) normal_scheduler: Ns,
pub(crate) control_scheduler: Cs,

state_cnt: Arc<AtomicUsize>,
shutdown: Arc<AtomicBool>,
}

register

register 、register_all 函数用于添加 {region_id, mail_box} ,其中addr实际上就是region_id。
因此,每当新生成region时,就需要调用 router.register 来建立映射关系。自然,当 RegionMerge 时,也需要 router.release 即将不存在的映射关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
pub fn register(&self, addr: u64, mailbox: BasicMailbox<N>) {
let mut normals = self.normals.lock().unwrap();
if let Some(mailbox) = normals.map.insert(addr, mailbox) {
mailbox.close(); // 清理已经存在的,即旧的
}
normals
.alive_cnt
.store(normals.map.len(), Ordering::Relaxed);
}

pub fn register_all(&self, mailboxes: Vec<(u64, BasicMailbox<N>)>) {
let mut normals = self.normals.lock().unwrap();
normals.map.reserve(mailboxes.len());
for (addr, mailbox) in mailboxes {
if let Some(m) = normals.map.insert(addr, mailbox) {
m.close();
}
}
normals
.alive_cnt
.store(normals.map.len(), Ordering::Relaxed);
}

比如,在启动系统时,RaftBatchSystem::start_system 函数中就调用了 Router::register_all 函数来中注册启动时已存在的 {region, mailbox} 的映射关系。

1
self.router.register_all(mailboxes);

在产生新的region时,也会调用 Router::register 函数来注册这个关系,比如在 on_ready_split_region 函数中:

1
2
let mailbox = BasicMailbox::new(sender, new_peer, self.ctx.router.state_cnt().clone());
self.ctx.router.register(new_region_id, mailbox);

check_do

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#[inline]
fn check_do<F, R>(&self, addr: u64, mut f: F) -> CheckDoResult<R>
where
F: FnMut(&BasicMailbox<N>) -> Option<R>,
{
let caches = unsafe { &mut *self.caches.as_ptr() };
let mut connected = true;
if let Some(mailbox) = caches.get(&addr) {
match f(mailbox) {
Some(r) => return CheckDoResult::Valid(r),
None => {
connected = false;
}
}
}

let (cnt, mailbox) = {
let mut boxes = self.normals.lock().unwrap();
let cnt = boxes.map.len();
let b = match boxes.map.get_mut(&addr) {
Some(mailbox) => mailbox.clone(),
None => {
drop(boxes);
if !connected {
caches.remove(&addr);
}
return CheckDoResult::NotExist;
}
};
(cnt, b)
};
if cnt > caches.capacity() || cnt < caches.capacity() / 2 {
caches.resize(cnt);
}

let res = f(&mailbox);
match res {
Some(r) => {
caches.insert(addr, mailbox);
CheckDoResult::Valid(r)
}
None => {
if !connected {
caches.remove(&addr);
}
CheckDoResult::Invalid
}
}
}

send

send 方法中 MailBox::try_send 传递的是 NormalScheduler,即触发 PeerFsmDelegate 去处理此处send函数中的 msg。
而 Router::check_do函数作用:

  • 先基于 addr 找到对应的 mailbox;
  • 然后调用对应的 MailBox::try_send 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
pub fn try_send(
&self,
addr: u64,
msg: N::Message,
) -> Either<Result<(), TrySendError<N::Message>>, N::Message> {
let mut msg = Some(msg);
let res = self.check_do(addr, |mailbox| {
let m = msg.take().unwrap();
match mailbox.try_send(m, &self.normal_scheduler) {
Ok(()) => Some(Ok(())),
r @ Err(TrySendError::Full(_)) => {
CHANNEL_FULL_COUNTER_VEC
.with_label_values(&["normal"])
.inc();
Some(r)
}
Err(TrySendError::Disconnected(m)) => {
msg = Some(m);
None
}
}
});
match res {
CheckDoResult::Valid(r) => Either::Left(r),
CheckDoResult::Invalid => Either::Left(Err(TrySendError::Disconnected(msg.unwrap()))),
CheckDoResult::NotExist => Either::Right(msg.unwrap()),
}
}

send_control

send_control 函数发送的 msg,最终会触发 StoreFsmDelegate 来处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
#[inline]
pub fn send_control(&self, msg: C::Message) -> Result<(), TrySendError<C::Message>> {
match self.control_box.try_send(msg, &self.control_scheduler) {
Ok(()) => Ok(()),
r @ Err(TrySendError::Full(_)) => {
CHANNEL_FULL_COUNTER_VEC
.with_label_values(&["control"])
.inc();
r
}
r => r,
}
}

broadcast_normal

广播,即通知所有的 PeerFsm,这里的消息是由 msg_gen() 函数生成的。

1
2
3
4
5
6
7
/// Try to notify all normal fsm a message.
pub fn broadcast_normal(&self, mut msg_gen: impl FnMut() -> N::Message) {
let mailboxes = self.normals.lock().unwrap();
for mailbox in mailboxes.map.values() {
let _ = mailbox.force_send(msg_gen(), &self.normal_scheduler);
}
}