Pipeline: PipelineDriver 状态机(1)
本篇来看看 PipelineDriver 的执行流程,以及如何与 PipelineDriverPoller 交互。
Pipeline DriverState
一个 PipelineDriver 的状态有如下 10 种:
1 | enum DriverState : uint32_t { |
他们之间的状态变化如下:
在此处不单独讲解每个状态,后面融到代码流程一起讲解。
GlobalDriverExecutor::submit
先回顾下 GlobalDrvierExecutor 整体设计:
PipelineDriver 创建完,并通过 submit 接口传递给 DriverExecutor。Executor 会先根据该 drvier 当前状态来判断是进入 _driver_queue 还是 _blocked_driver_poller 中。
比如,在 Morsel 和 OlapScanOperator(2) 中提到的 OlapScanPrepareOperator 和 OlapScanOperator:需要等待 PrepareOperator::pull_chunk 执行完(因为需要先获取待访问的 tablets 和 rowsets),ScanOperator 才能执行。然而 Prepare 和 Scan 分属于两个 PipelineDriver。他们的依赖关系如下:
因此,而 pipeline0 在被提交给 Executor 后会进入 _driver_queue,pipeline1 提交后则进入 _blocked_driver_poller。这里的初始判断条件:
- HashJoin:Join 需要先等待 Build 阶段完成,才能进入 Probe 阶段。Build 阶段即 Probe 阶段的 Precondition,Probe 则会进入 submit 函数第一个分支,并添加到 _blocked_driver_poller 中
- 其他,都是使用 SourceOperator::is_finished 函数和 SourceOperator::has_output 函数来判断进入哪个 driver_queue
代码如下。
1 | void GlobalDriverExecutor::submit(DriverRawPtr driver) { |
PipelineDriverPoller
PipelineDriverPoller 的主要字段解释如下:
_driver_queue: PipelineDriverPoller 和 GlobalDriverExecutor 共享 _driver_queue,存放着 READY 状态可执行的 PipelineDriver。
_blocked_drivers: 存储当前 Backends 中所有暂时无法执行的 blocked PipelineDrivers,
_global_mutex 是用于保护 _blocked_drivers,在 poll_thread 和 PipelineDriverPoller::add_blocked_driver 函数之间形成互斥。
_cond 则是配合 _gloabl_mutex,上层调用 add_blocked_driver 函数添加 blocked driver 时,唤醒正在阻塞等待的 poller_thread。
_local_blocked_drivers: 表征的是当前 poller_thread 中正在处理 blocked PipelineDrivers。
但是我觉得没必要引入一个 _local_mutex,SpinLock 就能满足要求。详见 poller_thread 分析。
类 PipelineDriverPoller 主要字段及其构造函数如下。
1 | class PipelineDriverPoller { |
add_blocked_driver
add_blocked_driver 函数是 Executor 向 Poller 添加 blocked drivers 的函数接口,这里需要使用 _global_mutex 来与 poller_thread 形成互斥。
_pending_timer_sw 用于统计每次 driver 处于阻塞状态的时间,即加入 poller 时重置,从 poller 中删除后计算本次 blocked 耗时。多次阻塞总耗时更新到 PipelineDriver::_pending_timer 中,查询结束可以在 query profile 中搜索 ‘PendingTime’ 字段查看。
1 | void PipelineDriverPoller::add_blocked_driver(const DriverRawPtr driver) { |
run_internal
下面是 Poller 的重点:poller_thread。可以将 poller_thread 分为三个部分:
Part1
等待 Executor 将 blocked drivers 加入到 Poller 中。
在一个正常部署的 StarRocks 集群中,任意一个时刻都应该有查询存在,且生成的 PipelineDriver 会很多。因此,add_blocked_driver 函数与 poller_thread 之间的 race condition 应该比较激烈。poller_thread 在局部作用域里使用 std::mutex + std::condition_variable ,来缩小 _gloab_mutex 的范围。
如果有新 blocked drivers 加入,则将其从 _blocked_drivers 中移到 tmp_blocked_drivers,局部作用域结束就释放了 _gloabl_mutex。poller_thread 后续就直接对 poller_thread 进行操作,也就不会阻塞 add_blocked_driver 函数。
1 | void PipelineDriverPoller::run_internal() { |
Part2
得到本轮 tmp_blocked_drivers 后,将其加入到 _local_blocked_drivers 中。后续对 _local_blocked_drivers 中的 drivers 当前状态进行判断。因为有个 iterate_immutable_driver 函数需要读取 _local_blocked_drivers,故而也用 _local_mutex 对其进行互斥保护。
但是,iterate_immutable_driver 是来源于 HTTP 请求,通常情况下请求几乎不会被执行,可能开发人员会在调试时使用下,因次这里几乎没有必要使用 std::mutex,更好的选择显然是使用 SpinLock。
需要对 _local_blocked_drivers 中的每个 driver 进行遍历,检测是否 READY 或者处于终止状态
case1: QueryContext::is_query_expired
每个 query 都有个 query_timeout(默认值 300s),超过这个时间就会 cancel 该 query。这时会主动调用 FragmentContext::cancel 函数,标记这个 query。由于 FragmentInstance 的所有 PipelineDrievrs 共享一个 FragmentContext 对象,因此其他 PipelineDriver 就会可以通过检测 FragmentContext::is_canceled 来查看当前 query 是否仍处于取消状态
实际上这个状态会上报,再下发取消其他节点的 FragementInstances?
case2: FragmentContext::is_canceled
触发 FragmentContext::cancel 操作的场景很多,除了上述的查询超时,还有在查询过程中 Operators::pull_chunk、Operators::push_chunk 执行遇到某些问题,此时在 GloablDriverExecutor 会调用 QueryContex::cancel 将该 query 的所有 FragementInstance 都进行 cancel。如此,该 query 下所有的 FragmentInstances 的所有 PipelineDriver 都不会再被执行。
case3: DriverState::PENDING_FINISH
从代码中,不难发现只有当 PipelineDriver::pending_finish 为 false 时,这个 blocked_driver 才会调用 remove_blocked_driver 函数将其从 _local_blocked_drivers 中删除。
1
2
3
4
5
6
7void PipelineDriverPoller::remove_blocked_driver(
DriverList& local_blocked_drivers, DriverList::iterator& driver_it) {
auto& driver = *driver_it;
driver->_pending_timer->update(
driver->_pending_timer_sw->elapsed_time());
local_blocked_drivers.erase(driver_it++);
}转为 DriverState::PENDING_FINISH 状态的前提是 is_still_pending_finish 函数返回 true:
1
2
3
4bool PipelineDriver::is_still_pending_finish() {
return source_operator()->pending_finish()
|| sink_operator()->pending_finish();
}实际上 SourceOperator 中,只有 ScanOperator 实现了这个函数,且返回值为 false
1
2
3bool ScanOperator::pending_finish() const {
return false;
}也就是说, is_still_pending_finish 函数的返回值取决于 SinkOperator::pending_finish 了。
这也是这个阶段存在的原因
实际上就是为了等待 SinkOperator 中的数据处理完,比如 ExchangeSinkOperator 中的 buffer 消耗完才能进入 DriverState::FINISH 阶段。因此 is_still_pending_finish 返回 true 时,blocked drvier 并没有加入 ready drivers 中,而是被忽略了,等待下一次 poll。
case4: PipelineDriver::is_finished
query 终态有三种:1
2
3
4
5bool is_finished() {
return _state == DriverState::FINISH
|| _state == DriverState::CANCELED
|| _state == DriverState::INTERNAL_ERROR;
}已经处于终态的可以直接从 _local_blocked_drivers 中删除,并加入 ready_drivers 中。
case5: PipelineDriver::is_not_blocked
这个分支才是用于检测 PipelineDriver 是否解除阻塞,可以继续执行了。比如 OlapScanPrepareOperator::pull_chunk 执行完毕,OlapScanOperator 的 PipelineDrvier::is_not_block 函数才会返回 true,从 _local_blocked_drivers 中转入 ready_drivers。case6
PipelineDriver 仍无法 Ready
Part2 代码如下.
1 | void PipelineDriverPoller::run_internal() { |
Part3
第三部分比较简单,就是将 ready 加入 DriverQueue,让 Executor 就会从该 driver_queue 取出可执行的 PipelineDrivers。这部分使用了 spin-loop 的优化,这个优化详情分析可见 RocksDB 系列的 WriteThread 如何自适应优化线程同步。
1 | void PipelineDriverPoller::run_internal() { |
PipelineDriverPoller::run_internal 完整的代码可见 PipelineDriverPoller::run_internal。