DriverQueue 的继承派生关系如下图所示。QuerySharedDriverQueue 是用于没有设置 ResuorceGroup 的 query,即 default ResourceGroup,而 WorkGroupDriverQueue 是针对设置了 ResourceGroup 的 query。
本文专注于 QuerySharedDriverQueue 的实现,ResuorceGroup 后文单独讲解。下文 DriverQueue 一般指 QuerySharedDriverQueue。
DriverQueue 本质上是个调度 PipelineDrivers 的数据结构,这样 DrvierQueue 本身就不需要占据一个线程,它可以在 GloablEexecuotr::work_thread 所在的核上运行。
DriverAcct DriverQueue 实现调度,需要依赖 PipelineDriver 运行时的统计信息。比如,在前文我们说过了 PipelineDriver 每执行一次 PipelineDriver:process 函数都会更新统计信息。
统计信息由 DriverAcct 记录:
1 2 3 4 5 6 7 8 9 10 11 12 13 struct DriverAcct { int64_t get_last_time_spent () { return last_time_spent; } int64_t get_accumulated_time_spent () const { return accumulated_time_spent; } int64_t schedule_times{0 }; int64_t schedule_effective_times{0 }; int64_t last_time_spent{0 }; int64_t last_chunks_moved{0 }; int64_t accumulated_time_spent{0 }; int64_t accumulated_chunks_moved{0 }; int64_t accumulated_rows_moved{0 }; };
PipelineDriver 中统计信息如下。利用这些统计信息就能实现更为准确的调度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 DriverAcct& PipelineDriver::driver_acct () { return _driver_acct; } void PipelineDriver::_update_driver_acct(size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent) { driver_acct ().update_last_chunks_moved (total_chunks_moved); driver_acct ().update_accumulated_rows_moved (total_rows_moved); driver_acct ().update_last_time_spent (time_spent); } void PipelineDriver::_update_statistics(size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent) { _update_driver_acct(total_chunks_moved, total_rows_moved, time_spent); if (ScanOperator* scan = source_scan_operator ()) { query_ctx ()->incr_cur_scan_rows_num ( scan->get_last_scan_rows_num ()); query_ctx ()->incr_cur_scan_bytes ( scan->get_last_scan_bytes ()); } int64_t accounted_cpu_cost = driver_acct ().get_last_time_spent () + source_operator ()->get_last_growth_cpu_time_ns () + sink_operator ()->get_last_growth_cpu_time_ns (); query_ctx ()->incr_cpu_cost (accounted_cpu_cost); }
SubQuerySharedDriverQueue QuerySharedDriverQueue 基于多级反馈队列实现,每一 level 的时间片不同,每一 level 的 PipelineDriver 是由 SubQuerySharedDriverQueue 来存储。
SubQuerySharedDriverQueue 有三个字段:
queue:用于存放 Pipelinerivers,如果 driver 已经 DriverState::CANCELED 状态则从头部压入,否则从尾部压入
pending_cancel_queue:用于存放正在取消的 driver
cancelled_set:用于记录已经取消的 drivers,如果 queue 中取出的 drivers 已经在 canceled_set 中,则忽略该 driver,从 queue 中重新取
结构如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class SubQuerySharedDriverQueue {public : void update_accu_time (const DriverRawPtr driver) { _accu_consume_time.fetch_add ( driver->driver_acct ().get_last_time_spent ()); } double accu_time_after_divisor () { return _accu_consume_time.load () / factor_for_normal; } void put (const DriverRawPtr driver) ; void cancel (const DriverRawPtr driver) ; DriverRawPtr take () ; std::deque<DriverRawPtr> queue; std::queue<DriverRawPtr> pending_cancel_queue; std::unordered_set<DriverRawPtr> cancelled_set; size_t num_drivers = 0 ; double factor_for_normal = 0 ; private : std::atomic<int64_t > _accu_consume_time = 0 ; };
put driver 如果已经处于 DriverState::CANCELED 状态,则从头部压入,否则从尾部压入。cancel 函数是直接压入 pending_cancel_queue 中。 queue 和 pending_cancel_queue 是可能存在重复的 driver,因此需要 cancelled_set 去重。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void SubQuerySharedDriverQueue::put (const DriverRawPtr driver) { if (driver->driver_state () == DriverState::CANCELED) { queue.emplace_front (driver); } else { queue.emplace_back (driver); } num_drivers++; } void SubQuerySharedDriverQueue::cancel (const DriverRawPtr driver) { if (cancelled_set.count (driver) == 0 ) { DCHECK (driver->is_in_ready_queue ()); pending_cancel_queue.emplace (driver); } }
take 从 SubQuerySharedDriverQueue 中获取 driver 时
如果 pending_cancel_queue 不为空,率先从 pending_cancel_queue 中取,并将获得的 driver 记录在 cancelld_set 中。
否则,再从 queue 中获取,如果 driver 已经在 cancelled_set 中记录,则忽略并重新从 queue 中获取。
代码也是比较简单的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 DriverRawPtr SubQuerySharedDriverQueue::take () { DCHECK (!empty ()); if (!pending_cancel_queue.empty ()) { DriverRawPtr driver = pending_cancel_queue.front (); pending_cancel_queue.pop (); cancelled_set.insert (driver); --num_drivers; return driver; } while (!queue.empty ()) { DriverRawPtr driver = queue.front (); queue.pop_front (); auto iter = cancelled_set.find (driver); if (iter != cancelled_set.end ()) { cancelled_set.erase (iter); } else { --num_drivers; return driver; } } return nullptr ; }
QuerySharedDriverQueue QuerySharedDriverQueue 一共有 QUEUE_SIZE(值是 8)个 SubQuerySharedDriverQueue,每个 SubQueue 对应的时间片依次是 {0.2s, 0.6s, 1.2s, 2s, 3s, 4.2s, 5.6s, 7.2s} :
比如 SubQueue[0] 的时间片范围是 [0, 0.2) ,
超过 7.2s 的 Pipeline 都位于最后一个队列 SubQueue[7] 中,即 SubQueue[7] 的时间范围是 [5.6, +∞)。
每个 Driver 消耗的时间片记录在 DriverAcct::accumulated_time_spent
,PipelineDriver 每执行一次就会调用一次 _update_statistics 函数来更新 DriverAcct 中的统计值,当 accumulated_time_spent 超过当前 SubQueue 的时间片区间,driver 就进入下一个 SubQueue。
QuerySharedDriverQueue 的构造函数如下 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 QuerySharedDriverQueue::QuerySharedDriverQueue () { double factor = 1 ; for (int i = QUEUE_SIZE - 1 ; i >= 0 ; --i) { _queues[i].factor_for_normal = factor; factor *= RATIO_OF_ADJACENT_QUEUE; } int64_t time_slice = 0 ; for (int i = 0 ; i < QUEUE_SIZE; ++i) { time_slice += LEVEL_TIME_SLICE_BASE_NS * (i + 1 ); _level_time_slices[i] = time_slice; } }
_compute_driver_level 向 QuerySharedDriverQueue 添加 driver 时,从上一次所处的层 PipelineDriver::get_driver_queue_level
开始,再基于 accumulated_time_spent 计算 driver 本次将位于 DriverQueue 中的哪一层。
因此,一个长时间的运行的 driver 在 DriverQueue 中不断下沉。
1 2 3 4 5 6 7 8 9 int QuerySharedDriverQueue::_compute_driver_level(const DriverRawPtr driver) const { int time_spent = driver->driver_acct ().get_accumulated_time_spent (); for (int i = driver->get_driver_queue_level (); i < QUEUE_SIZE; ++i) { if (time_spent < _level_time_slices[i]) { return i; } } return QUEUE_SIZE - 1 ; }
put_back put_back 函数将 READY 的 driver 放入到 DriverQueue,
PipelineDriver::_driver_queue_level 记录每次在 DrvierQueue 中的层。
PipelineDriver::_in_ready_queue 标记是否被 put_back
到 DrvierQueue 中
PipelineDriver::_in_queue 即 driver 所属的 DriverQueue
插入一个 driver 后,通过 _cv 解除 take 函数处的阻塞,Executor 就可以继续通过 DriverQueue::take 函数获得新的可执行的 driver
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void QuerySharedDriverQueue::put_back (const DriverRawPtr driver) { int level = _compute_driver_level(driver); driver->set_driver_queue_level (level); { std::lock_guard<std::mutex> lock (_global_mutex) ; _queues[level].put (driver); driver->set_in_ready_queue (true ); driver->set_in_queue (this ); _cv.notify_one (); ++_num_drivers; } } void QuerySharedDriverQueue::put_back_from_executor (const DriverRawPtr driver) { put_back (driver); }
take put_back 函数是通过 _compute_driver_level 函数来确定 SubQueue 的层数。take 函数则通过 SubQuerySharedDriverQueue::accu_time_after_divisor() 来确定。
_accu_consume_time 字段是该 SubQueue 的耗时
这个值实际上和 DriverAcct::accumulated_time_spent
大小一样:每次更新统计时都是累加的 DriverAcct::last_time_spent
。因此 put_back/take 的衡量标准是一样的。
1 2 3 4 5 6 7 8 void DriverAcct::update_last_time_spent (int64_t time_spent) { this ->last_time_spent = time_spent; this ->accumulated_time_spent += time_spent; } void SubQuerySharedDriverQueue::update_accu_time (const DriverRawPtr driver) { _accu_consume_time.fetch_add (driver->driver_acct ().get_last_time_spent ()); }
factor_for_normal 是在 QUEUE_SIZE 个 SubQueue 的正则系数
二者相处得到归一化的时间系数,使得每一层都有机会被访问到 。
1 2 3 double SubQuerySharedDriverQueue::accu_time_after_divisor () { return _accu_consume_time.load () / factor_for_normal; }
确定好 level 后,就可以直接从该 level 提取出 READY 状态的 PipelineDriver。如果整个 DriverQueue 都空的,则基于 _cv 阻塞等待。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 StatusOr<DriverRawPtr> QuerySharedDriverQueue::take () { int queue_idx = -1 ; double target_accu_time = 0 ; DriverRawPtr driver_ptr; { std::unique_lock<std::mutex> lock (_global_mutex) ; while (true ) { RETURN_IF (_is_closed, Status::Cancelled ("Shutdown" )); for (int i = 0 ; i < QUEUE_SIZE; ++i) { if (!_queues[i].empty ()) { double local_target_time = _queues[i].accu_time_after_divisor (); if (queue_idx < 0 || local_target_time < target_accu_time) { target_accu_time = local_target_time; queue_idx = i; } } } if (queue_idx >= 0 ) { break ; } _cv.wait (lock); } driver_ptr = _queues[queue_idx].take (); driver_ptr->set_in_ready_queue (false ); --_num_drivers; } return driver_ptr; }