/// Create a batch system with the given thread name prefix and pool size. /// /// `sender` and `controller` should be paired. pubfncreate_system<N: Fsm, C: Fsm>( cfg: &Config, sender: mpsc::LooseBoundedSender<C::Message>, controller: Box<C>, ) -> (BatchRouter<N, C>, BatchSystem<N, C>) { letstate_cnt = Arc::new(AtomicUsize::new(0)); letcontrol_box = BasicMailbox::new(sender, controller, state_cnt.clone()); let (tx, rx) = channel::unbounded(); let (tx2, rx2) = channel::unbounded(); letnormal_scheduler = NormalScheduler { sender: tx.clone(), low_sender: tx2.clone(), }; letcontrol_scheduler = ControlScheduler { sender: tx.clone(), low_sender: tx2, }; letpool_state_builder = PoolStateBuilder { max_batch_size: cfg.max_batch_size(), reschedule_duration: cfg.reschedule_duration.0, fsm_receiver: rx.clone(), fsm_sender: tx, pool_size: cfg.pool_size, }; letrouter = Router::new(control_box, normal_scheduler, control_scheduler, state_cnt); letsystem = BatchSystem { name_prefix: None, router: router.clone(), receiver: rx, low_receiver: rx2, pool_size: cfg.pool_size, max_batch_size: cfg.max_batch_size(), workers: Arc::new(Mutex::new(Vec::new())), joinable_workers: Arc::new(Mutex::new(Vec::new())), reschedule_duration: cfg.reschedule_duration.0, low_priority_pool_size: cfg.low_priority_pool_size, pool_state_builder: Some(pool_state_builder), }; (router, system) }
Batch
Batch 是用来存储 Fsm 的数据结构,normals 和 control 不会同时存在:
normals: 表征的是待处理的 PeersFsm<EK, ER> 对象
control:表征的是待处理的 StoreFsm 对象
结构如下:
1 2 3 4 5 6
/// A basic struct for a round of polling. #[allow(clippy::vec_box)] pubstructBatch<N, C> { normals: Vec<Option<NormalFsm<N>>>, control: Option<Box<C>>, }
下面的函数标题,为便于描述使用了 Batch:: 的 c++-style。
Batch::push
传入的 Fsm 只可能是 Normal 和 Control 其中的一个,并且 normals 可以多个,control 只能是一个。
// Fetch batch after every round is finished. It's helpful to protect regions // from becoming hungry if some regions are hot points. Since we fetch new fsm every time // calling `poll`, we do not need to configure a large value for `self.max_batch_size`. letmut run = true; while run && self.fetch_fsm(&mut batch) { letmut max_batch_size = std::cmp::max(self.max_batch_size, batch.normals.len()); { letbatch_size = &mutself.max_batch_size; self.handler.begin(max_batch_size, |cfg| { *batch_size = cfg.max_batch_size(); }); } max_batch_size = std::cmp::max(self.max_batch_size, batch.normals.len()); // 1. handle_control if batch.control.is_some() { letlen = self.handler.handle_control(batch.control.as_mut().unwrap()); if batch.control.as_ref().unwrap().is_stopped() { batch.remove_control(&self.router.control_box); } elseifletSome(len) = len { batch.release_control(&self.router.control_box, len); } } // 2. handle_normal letmut hot_fsm_count = 0; for (i, p) in batch.normals.iter_mut().enumerate() { letp = p.as_mut().unwrap(); // 处理 letres = self.handler.handle_normal(p); if p.is_stopped() { p.policy = Some(ReschedulePolicy::Remove); reschedule_fsms.push(i); } elseif p.get_priority() != self.handler.get_priority() { p.policy = Some(ReschedulePolicy::Schedule); reschedule_fsms.push(i); } else { if p.timer.saturating_elapsed() >= self.reschedule_duration { hot_fsm_count += 1; if hot_fsm_count % 2 == 0 { p.policy = Some(ReschedulePolicy::Schedule); reschedule_fsms.push(i); continue; } } ifletHandleResult::StopAt { progress, skip_end } = res { p.policy = Some(ReschedulePolicy::Release(progress)); reschedule_fsms.push(i); if skip_end { to_skip_end.push(i); } } } } // 3. 尝试提取 control letmut fsm_cnt = batch.normals.len(); while batch.normals.len() < max_batch_size { ifletOk(fsm) = self.fsm_receiver.try_recv() { run = batch.push(fsm); } // If we receive a ControlFsm, break this cycle and call `end`. Because ControlFsm // may change state of the handler, we shall deal with it immediately after // calling `begin` of `Handler`. if !run || fsm_cnt >= batch.normals.len() { break; } letp = batch.normals[fsm_cnt].as_mut().unwrap(); letres = self.handler.handle_normal(p); if p.is_stopped() { p.policy = Some(ReschedulePolicy::Remove); reschedule_fsms.push(fsm_cnt); } elseifletHandleResult::StopAt { progress, skip_end } = res { p.policy = Some(ReschedulePolicy::Release(progress)); reschedule_fsms.push(fsm_cnt); if skip_end { to_skip_end.push(fsm_cnt); } } fsm_cnt += 1; } self.handler.light_end(&mut batch.normals); foroffsetin &to_skip_end { batch.schedule(&self.router, *offset, true); } to_skip_end.clear(); self.handler.end(&mut batch.normals);
// Because release use `swap_remove` internally, so using pop here // to remove the correct FSM. whileletSome(r) = reschedule_fsms.pop() { batch.schedule(&self.router, r, false); } } // while-end ifletSome(fsm) = batch.control.take() { self.router.control_scheduler.schedule(fsm); info!("poller will exit, release the left ControlFsm"); } letleft_fsm_cnt = batch.normals.len(); if left_fsm_cnt > 0 { info!( "poller will exit, schedule {} left NormalFsms", left_fsm_cnt ); foriin0..left_fsm_cnt { letto_schedule = match batch.normals[i].take() { Some(f) => f, None => continue, }; self.router.normal_scheduler.schedule(to_schedule.fsm); } } batch.clear(); } }
pubstructRouter<N: Fsm, C: Fsm, Ns, Cs> { normals: Arc<Mutex<NormalMailMap<N>>>, caches: Cell<LruCache<u64, BasicMailbox<N>>>, pub(super) control_box: BasicMailbox<C>, // TODO: These two schedulers should be unified as single one. However // it's not possible to write FsmScheduler<Fsm=C> + FsmScheduler<Fsm=N> // for now. pub(crate) normal_scheduler: Ns, pub(crate) control_scheduler: Cs,
// Count of Mailboxes that is not destroyed. // Added when a Mailbox created, and subtracted it when a Mailbox destroyed. state_cnt: Arc<AtomicUsize>, // Indicates the router is shutdown down or not. shutdown: Arc<AtomicBool>, }