tikv: BatchSysntem 设计详解

RaftBatchSystem 包含了整个 MultiRaft 系统的同步等相关事情。 从创建 BatchSystem 的函数 create_system 来看整体关系。

tikv-batchsystem-1

create_system

函数 fn create_system 创建 BatchRouter、BatchSystem

  • router: BatchRouter 类型,最终为的 wrapper 是 ServerRaftStoreRouter 类型, 负责将 received 的 raft 消息转发给 raftstore 中对应的 region。
  • system: BatchSystem 类型,为后台工作线程,根据 router 发送过来的消息类型进行处理。
    router 和 batch_system 之间通过 channel 进行消息通讯,即下面 rx/tx、rx2/tx2 之间的传递的是 FsmTypes,实际上传递是 PeerFsm 和 StoreFsm 对象。
    后面的 Poller::fetch_fsm 函数也可以进一步验证。
1
2
3
4
5
6
pub enum FsmTypes<N, C> {
Normal(Box<N>),
Control(Box<C>),
// Used as a signal that scheduler should be shutdown.
Empty,
}

router

下面来看看 batch_system::create_system 函数的实现。

  • sender: 传入的是 store_tx, 表征的是发送给 StoreFsm 对象消息的一端,用于 StoreFsm 通信;
  • controller: 传入的是 store_fsm,表征的是即 StoreFsm 对象

因此,control_box 即消息的发送端,用于生成 router。此外,router 存储的 NormalScheduler、ControlScheduler 也都是 channel 的发送端。
NormalScheduler::schedule 和 ControlScheduler::schedule 即将发送,对应的 recevier 就会接受到,最终即 fsm_receiver 接受到对应的事件。

system

BatchSystem 主要是channel的接收端,其中传入的 router 只是为了将部分消息重新调度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/// Create a batch system with the given thread name prefix and pool size.
///
/// `sender` and `controller` should be paired.
pub fn create_system<N: Fsm, C: Fsm>(
cfg: &Config,
sender: mpsc::LooseBoundedSender<C::Message>,
controller: Box<C>,
) -> (BatchRouter<N, C>, BatchSystem<N, C>) {
let state_cnt = Arc::new(AtomicUsize::new(0));
let control_box = BasicMailbox::new(sender, controller, state_cnt.clone());
let (tx, rx) = channel::unbounded();
let (tx2, rx2) = channel::unbounded();
let normal_scheduler = NormalScheduler {
sender: tx.clone(),
low_sender: tx2.clone(),
};
let control_scheduler = ControlScheduler {
sender: tx.clone(),
low_sender: tx2,
};
let pool_state_builder = PoolStateBuilder {
max_batch_size: cfg.max_batch_size(),
reschedule_duration: cfg.reschedule_duration.0,
fsm_receiver: rx.clone(),
fsm_sender: tx,
pool_size: cfg.pool_size,
};
let router = Router::new(control_box, normal_scheduler, control_scheduler, state_cnt);
let system = BatchSystem {
name_prefix: None,
router: router.clone(),
receiver: rx,
low_receiver: rx2,
pool_size: cfg.pool_size,
max_batch_size: cfg.max_batch_size(),
workers: Arc::new(Mutex::new(Vec::new())),
joinable_workers: Arc::new(Mutex::new(Vec::new())),
reschedule_duration: cfg.reschedule_duration.0,
low_priority_pool_size: cfg.low_priority_pool_size,
pool_state_builder: Some(pool_state_builder),
};
(router, system)
}

Batch

Batch 是用来存储 Fsm 的数据结构,normals 和 control 不会同时存在:

  • normals: 表征的是待处理的 PeersFsm<EK, ER> 对象
  • control:表征的是待处理的 StoreFsm 对象

结构如下:

1
2
3
4
5
6
/// A basic struct for a round of polling.
#[allow(clippy::vec_box)]
pub struct Batch<N, C> {
normals: Vec<Option<NormalFsm<N>>>,
control: Option<Box<C>>,
}

下面的函数标题,为便于描述使用了 Batch:: 的 c++-style。

Batch::push

传入的 Fsm 只可能是 Normal 和 Control 其中的一个,并且 normals 可以多个,control 只能是一个。

1
2
3
4
5
6
7
8
9
10
11
12
13
fn push(&mut self, fsm: FsmTypes<N, C>) -> bool {
match fsm {
FsmTypes::Normal(n) => {
self.normals.push(Some(NormalFsm::new(n)));
}
FsmTypes::Control(c) => {
assert!(self.control.is_none());
self.control = Some(c);
}
FsmTypes::Empty => return false,
}
true
}

Poller

Batch毕竟只是数据结构,Poller 用于处理 Batch 中对象包含的消息。

1
2
3
4
5
6
7
8
pub struct Poller<N: Fsm, C: Fsm, Handler> {
pub router: Router<N, C, NormalScheduler<N, C>, ControlScheduler<N, C>>,
pub fsm_receiver: channel::Receiver<FsmTypes<N, C>>,
pub handler: Handler,
pub max_batch_size: usize,
pub reschedule_duration: Duration,
pub joinable_workers: Option<Arc<Mutex<Vec<ThreadId>>>>,
}
  • router: 消息生产端,用于后续构建 ServerRaftStoreRouter,基于 router 向 region 发消息。
  • fsm_receiver: 用于接收消息
  • handler: PollHandler 对象,用于处理接收到的消息

Poller::fetch_fsm

字段 fsm_receiver 即 fn create_system 函数中的创建的 channel 的接受端 rx,当发送端 tx 发送消息时,fsm_receiver 调用 recv 就可以提取出消息,消息的类型是 Fsm。
因此,当上游准备妥,将消息通过 tx 发送到 channel, 则从 fsm_receiver 中提取出来放到 batch 中待Poller::poll 函数中执行。
其中, fsm_receiver 提取的是 fsm 是 PeerFsm<EK, ER> 或者 StoreFsm 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fn fetch_fsm(&mut self, batch: &mut Batch<N, C>) -> bool {
if batch.control.is_some() {
return true;
}

if let Ok(fsm) = self.fsm_receiver.try_recv() {
return batch.push(fsm);
}

if batch.is_empty() {
self.handler.pause();
if let Ok(fsm) = self.fsm_receiver.recv() {
return batch.push(fsm);
}
}
!batch.is_empty()
}

Poller::poll

Poll 则是负责运行从 self.fsm_receiver 中提取出来的 Fsm,根据其 Normal or Control 属性,来决定是调用 handle_normal 还是 handle_control 。
根据返回结构来确定,是否需要删除、或是重新调度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
impl<N: Fsm, C: Fsm, Handler: PollHandler<N, C>> Poller<N, C, Handler> {
pub fn poll(&mut self) {
fail_point!("poll");
let mut batch = Batch::with_capacity(self.max_batch_size);
let mut reschedule_fsms = Vec::with_capacity(self.max_batch_size);
let mut to_skip_end = Vec::with_capacity(self.max_batch_size);

// Fetch batch after every round is finished. It's helpful to protect regions
// from becoming hungry if some regions are hot points. Since we fetch new fsm every time
// calling `poll`, we do not need to configure a large value for `self.max_batch_size`.
let mut run = true;
while run && self.fetch_fsm(&mut batch) {
let mut max_batch_size = std::cmp::max(self.max_batch_size, batch.normals.len());
{
let batch_size = &mut self.max_batch_size;
self.handler.begin(max_batch_size, |cfg| {
*batch_size = cfg.max_batch_size();
});
}
max_batch_size = std::cmp::max(self.max_batch_size, batch.normals.len());
// 1. handle_control
if batch.control.is_some() {
let len = self.handler.handle_control(batch.control.as_mut().unwrap());
if batch.control.as_ref().unwrap().is_stopped() {
batch.remove_control(&self.router.control_box);
} else if let Some(len) = len {
batch.release_control(&self.router.control_box, len);
}
}
// 2. handle_normal
let mut hot_fsm_count = 0;
for (i, p) in batch.normals.iter_mut().enumerate() {
let p = p.as_mut().unwrap();
// 处理
let res = self.handler.handle_normal(p);
if p.is_stopped() {
p.policy = Some(ReschedulePolicy::Remove);
reschedule_fsms.push(i);
} else if p.get_priority() != self.handler.get_priority() {
p.policy = Some(ReschedulePolicy::Schedule);
reschedule_fsms.push(i);
} else {
if p.timer.saturating_elapsed() >= self.reschedule_duration {
hot_fsm_count += 1;
if hot_fsm_count % 2 == 0 {
p.policy = Some(ReschedulePolicy::Schedule);
reschedule_fsms.push(i);
continue;
}
}
if let HandleResult::StopAt { progress, skip_end } = res {
p.policy = Some(ReschedulePolicy::Release(progress));
reschedule_fsms.push(i);
if skip_end {
to_skip_end.push(i);
}
}
}
}
// 3. 尝试提取 control
let mut fsm_cnt = batch.normals.len();
while batch.normals.len() < max_batch_size {
if let Ok(fsm) = self.fsm_receiver.try_recv() {
run = batch.push(fsm);
}
// If we receive a ControlFsm, break this cycle and call `end`. Because ControlFsm
// may change state of the handler, we shall deal with it immediately after
// calling `begin` of `Handler`.
if !run || fsm_cnt >= batch.normals.len() {
break;
}
let p = batch.normals[fsm_cnt].as_mut().unwrap();
let res = self.handler.handle_normal(p);
if p.is_stopped() {
p.policy = Some(ReschedulePolicy::Remove);
reschedule_fsms.push(fsm_cnt);
} else if let HandleResult::StopAt { progress, skip_end } = res {
p.policy = Some(ReschedulePolicy::Release(progress));
reschedule_fsms.push(fsm_cnt);
if skip_end {
to_skip_end.push(fsm_cnt);
}
}
fsm_cnt += 1;
}
self.handler.light_end(&mut batch.normals);
for offset in &to_skip_end {
batch.schedule(&self.router, *offset, true);
}
to_skip_end.clear();
self.handler.end(&mut batch.normals);

// Because release use `swap_remove` internally, so using pop here
// to remove the correct FSM.
while let Some(r) = reschedule_fsms.pop() {
batch.schedule(&self.router, r, false);
}
} // while-end
if let Some(fsm) = batch.control.take() {
self.router.control_scheduler.schedule(fsm);
info!("poller will exit, release the left ControlFsm");
}
let left_fsm_cnt = batch.normals.len();
if left_fsm_cnt > 0 {
info!(
"poller will exit, schedule {} left NormalFsms",
left_fsm_cnt
);
for i in 0..left_fsm_cnt {
let to_schedule = match batch.normals[i].take() {
Some(f) => f,
None => continue,
};
self.router.normal_scheduler.schedule(to_schedule.fsm);
}
}
batch.clear();
}
}

PollHandler

Poll 中的 handler 字段,是 trait PollHandler 对象,实例化类有 RaftPoller 或 ApplyPoller。

tikv-batchsystem-2

RaftPoller

1
2
3
4
5
6
7
8
9
10
11
12
13
pub struct RaftPoller<EK: KvEngine + 'static, ER: RaftEngine + 'static, T: 'static> {
tag: String,
store_msg_buf: Vec<StoreMsg<EK>>,
peer_msg_buf: Vec<PeerMsg<EK>>,
previous_metrics: RaftReadyMetrics,
timer: TiInstant,
poll_ctx: PollContext<EK, ER, T>,
messages_per_tick: usize,
cfg_tracker: Tracker<Config>,
trace_event: TraceEvent,
last_flush_time: TiInstant,
need_flush_events: bool,
}

RaftPoller 用于处理消息:

  • RaftPoller::handle_normal: PeerFsm 的 receiver 字段提取 mailbox send 过来的信息,再处理
  • RaftPoller::handle_control: StoreFsm 的 receiver 字段提取 mailbox send 过来的信息,再处理

handle_control

handle_control ,将消息从 StoreFsm 中取出来到 self.store_msg_buf,再同意在 StoreFsmDelegate::handle_msg 中处理。
store.receiver 对应的发送端 tx 是在函数中 create_raft_batch_system 函数中,StoreFsm::new返回的 tx,最终传递到 Router中,由 Router 中的 tx 发送消息触发 store.receiver 可以接受消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fn handle_control(&mut self, store: &mut StoreFsm<EK>) -> Option<usize> {
let mut expected_msg_count = None;
while self.store_msg_buf.len() < self.messages_per_tick {
match store.receiver.try_recv() { //??? who send
Ok(msg) => self.store_msg_buf.push(msg),
Err(TryRecvError::Empty) => {
expected_msg_count = Some(0);
break;
}
Err(TryRecvError::Disconnected) => {
store.store.stopped = true;
expected_msg_count = Some(0);
break;
}
}
}
let mut delegate = StoreFsmDelegate {
fsm: store,
ctx: &mut self.poll_ctx,
};
delegate.handle_msgs(&mut self.store_msg_buf);
expected_msg_count
}

handle_normal

handle_normal 也类似,先将消息提取到 self.peer_msg_buf 中,最大可以存放 messages_per_tick 个,提取结束,统一将数据交过 PeerFsmDelegate::handle_msg 处理。
同理,peer.receiver 的发送端是 PeerFsm::create 函数返回的 tx,这一般是和 region状态相关,搜索下 PeerFsm::create 出现在的函数中就能知道。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
fn handle_normal(
&mut self,
peer: &mut impl DerefMut<Target = PeerFsm<EK, ER>>,
) -> HandleResult {
let mut handle_result = HandleResult::KeepProcessing;

while self.peer_msg_buf.len() < self.messages_per_tick {
match peer.receiver.try_recv() {
Ok(msg) => {
self.peer_msg_buf.push(msg);
}
Err(TryRecvError::Empty) => {
handle_result = HandleResult::stop_at(0, false);
break;
}
Err(TryRecvError::Disconnected) => {
peer.stop();
handle_result = HandleResult::stop_at(0, false);
break;
}
}
}

let mut delegate = PeerFsmDelegate::new(peer, &mut self.poll_ctx);
delegate.handle_msgs(&mut self.peer_msg_buf);
if !delegate.collect_ready() && self.poll_ctx.sync_write_worker.is_some() {
if let HandleResult::StopAt { skip_end, .. } = &mut handle_result {
*skip_end = true;
}
}

handle_result
}

BatchSystem

下面来仔细看下 BatchSystem,他本质上是个线程池,workers 用于记录生成的线程

  • pool_size: 处理正常优先级的线程个数
  • low_priority_pool_size:处理低优先级任务的线程个数
  • work:用于存储所有线程的 JoinHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
pub struct BatchSystem<N: Fsm, C: Fsm> {
name_prefix: Option<String>,
router: BatchRouter<N, C>,
receiver: channel::Receiver<FsmTypes<N, C>>,
low_receiver: channel::Receiver<FsmTypes<N, C>>,
pool_size: usize,
max_batch_size: usize,
workers: Arc<Mutex<Vec<JoinHandle<()>>>>,
joinable_workers: Arc<Mutex<Vec<ThreadId>>>,
reschedule_duration: Duration,
low_priority_pool_size: usize,
pool_state_builder: Option<PoolStateBuilder<N, C>>,
}

spawn

BatchSystem::spawn 函数用于创建线程池,并创建两种不同优先级的线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/// Start the batch system.
pub fn spawn<B>(&mut self, name_prefix: String, mut builder: B)
where
B: HandlerBuilder<N, C>,
B::Handler: Send + 'static,
{
for i in 0..self.pool_size {
self.start_poller(
thd_name!(format!("{}-{}", name_prefix, i)),
Priority::Normal,
&mut builder,
);
}
for i in 0..self.low_priority_pool_size {
self.start_poller(
thd_name!(format!("{}-low-{}", name_prefix, i)),
Priority::Low,
&mut builder,
);
}
self.name_prefix = Some(name_prefix);
}

start_poller

启动单个线程, 并将 Poller::poll 函数作为线程入口函数执行,这样就可以不断处理从 Router 发过来的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
fn start_poller<B>(&mut self, name: String, priority: Priority, builder: &mut B)
where
B: HandlerBuilder<N, C>,
B::Handler: Send + 'static,
{
let handler = builder.build(priority);
let receiver = match priority {
Priority::Normal => self.receiver.clone(),
Priority::Low => self.low_receiver.clone(),
};
let mut poller = Poller {
router: self.router.clone(),
fsm_receiver: receiver,
handler,
max_batch_size: self.max_batch_size,
reschedule_duration: self.reschedule_duration,
joinable_workers: if priority == Priority::Normal {
Some(Arc::clone(&self.joinable_workers))
} else {
None
},
};
let props = tikv_util::thread_group::current_properties();
let t = thread::Builder::new()
.name(name)
.spawn(move || {
tikv_util::thread_group::set_properties(props);
set_io_type(IOType::ForegroundWrite);
poller.poll();
})
.unwrap();
self.workers.lock().unwrap().push(t);
}

Router

消息生产端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
pub struct Router<N: Fsm, C: Fsm, Ns, Cs> {
normals: Arc<Mutex<NormalMailMap<N>>>,
caches: Cell<LruCache<u64, BasicMailbox<N>>>,
pub(super) control_box: BasicMailbox<C>,
// TODO: These two schedulers should be unified as single one. However
// it's not possible to write FsmScheduler<Fsm=C> + FsmScheduler<Fsm=N>
// for now.
pub(crate) normal_scheduler: Ns,
pub(crate) control_scheduler: Cs,

// Count of Mailboxes that is not destroyed.
// Added when a Mailbox created, and subtracted it when a Mailbox destroyed.
state_cnt: Arc<AtomicUsize>,
// Indicates the router is shutdown down or not.
shutdown: Arc<AtomicBool>,
}

tikv-batchsystem-3

tikv-batchsystem-4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#[inline]
fn check_do<F, R>(&self, addr: u64, mut f: F) -> CheckDoResult<R>
where
F: FnMut(&BasicMailbox<N>) -> Option<R>,
{
let caches = unsafe { &mut *self.caches.as_ptr() };
let mut connected = true;
if let Some(mailbox) = caches.get(&addr) {
match f(mailbox) {
Some(r) => return CheckDoResult::Valid(r),
None => {
connected = false;
}
}
}

let (cnt, mailbox) = {
let mut boxes = self.normals.lock().unwrap();
let cnt = boxes.map.len();
let b = match boxes.map.get_mut(&addr) {
Some(mailbox) => mailbox.clone(),
None => {
drop(boxes);
if !connected {
caches.remove(&addr);
}
return CheckDoResult::NotExist;
}
};
(cnt, b)
};
if cnt > caches.capacity() || cnt < caches.capacity() / 2 {
caches.resize(cnt);
}

let res = f(&mailbox);
match res {
Some(r) => {
caches.insert(addr, mailbox);
CheckDoResult::Valid(r)
}
None => {
if !connected {
caches.remove(&addr);
}
CheckDoResult::Invalid
}
}
}

ServerRaftStoreRouter

Router 和 ServerRaftStoreRouter 之间的关系如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
pub type BatchRouter<N, C> = Router<N, C, NormalScheduler<N, C>, ControlScheduler<N, C>>;

pub struct RaftRouter<EK, ER>
where
EK: KvEngine,
ER: RaftEngine,
{
pub router: BatchRouter<PeerFsm<EK, ER>, StoreFsm<EK>>,
}

pub struct ServerRaftStoreRouter<EK: KvEngine, ER: RaftEngine> {
router: RaftRouter<EK, ER>,
local_reader: RefCell<LocalReader<RaftRouter<EK, ER>, EK>>,
}

Router::send

try_send函数,基于 tx 向Poller中发送 msg。

  • 先基于 addr 寻找到对应的 mailbox,
  • 再基于 mailbox 将消息发送给 Poller 中的 fsm_recevier
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/// Force sending message to control fsm.
#[inline]
pub fn send_control(&self, msg: C::Message) -> Result<(), TrySendError<C::Message>> {
match self.control_box.try_send(msg, &self.control_scheduler) {
Ok(()) => Ok(()),
r @ Err(TrySendError::Full(_)) => {
CHANNEL_FULL_COUNTER_VEC
.with_label_values(&["control"])
.inc();
r
}
r => r,
}
}

当调用ServerRaftStoreRouter::send函数时,就会触发对应的 中的poll线程 fsm_receiver,接受到一个 Control(StroeFsm) 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
impl<EK: KvEngine, ER: RaftEngine> StoreRouter<EK> for ServerRaftStoreRouter<EK, ER> {
fn send(&self, msg: StoreMsg<EK>) -> RaftStoreResult<()> {
StoreRouter::send(&self.router, msg)
}
}

impl<EK, ER> StoreRouter<EK> for RaftRouter<EK, ER>
where
EK: KvEngine,
ER: RaftEngine,
{
#[inline]
fn send(&self, msg: StoreMsg<EK>) -> Result<()> {
match self.send_control(msg) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => Err(Error::Transport(DiscardReason::Full)),
Err(TrySendError::Disconnected(_)) => {
Err(Error::Transport(DiscardReason::Disconnected))
}
}
}
}

create_raft_batch_system

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
pub fn create_raft_batch_system<EK: KvEngine, ER: RaftEngine>(
cfg: &Config,
) -> (RaftRouter<EK, ER>, RaftBatchSystem<EK, ER>) {
let (store_tx, store_fsm) = StoreFsm::new(cfg);
let (apply_router, apply_system) = create_apply_batch_system(cfg);
let (router, system) =
batch_system::create_system(&cfg.store_batch_system, store_tx, store_fsm);
let raft_router = RaftRouter { router };
let system = RaftBatchSystem {
system,
workers: None,
apply_router,
apply_system,
router: raft_router.clone(),
store_writers: StoreWriters::new(),
};
(raft_router, system)
}