Pipeline: Operator 状态机(2)
Pipeline
Pipeline 由一组 Operators 组成,一个 Pipeline 内部 Operator 是串行执行的,第一个是 SourceOperatos,最后一个是 SinkOperator,数据流从 Source 流向 Sink:从前一个 Operator 拉取数据(Operator::pull_chunk),再将该数据推到下一个Operator(Operator::push_chunk),这就是 Pipeline 的 pull-push 模型。
OperatorStage
每个 Operator 也有对应的状态 OperatorStage,用于保证与 Operator 特定状态相关的 Callback(Preapre、Close 等函数) 只调用一次。
1 | enum OperatorStage { |
OperatorStage 的状态流如下图,实际上阻塞只会发生在 SourceOperator/SinkOperator,中间的 PipelineJob 是不会阻塞(只会因为时间片用完重新调度)的,只是个计算流。
Operator 创建时状态为 OperatorStage::INIT,提交给 GloablDriverExecutor 时,先先会调用 PipelineDriver::prepare 函数将所有的 Operators 设置为 OperatorStage::PREPARED 状态,
- HashJoinProbeOperator 设置为 PRECONDITION_NOT_READY,
- 其他的通过 PipelineDriver::submit_operators 函数统一设置为 OperatorStage::PROCESSING 状态
后续就由 PipelineDriver 状态机推动了,并由 PipelineDriver::_mark_operator_xxx 系列函数更改状态,这些函数可以保证 Operator 每个状态都只被调用一次:
1 | // class PipelineDriver |
对应着 Operator 的不同状态更改函数
1 | // class Operator |
PipelineDriver:process
PipelineDriver:process 函数会依次处理 PipelineDriver::_operators,但是 process 函数有时间片,超过最大时间片 YIELD_PREEMPT_MAX_TIME_SPENT,就需要主动让出 CPU,让其他线程执行,将当前 Driver 放回到 DriverQueue 等待下次调度,如果发生阻塞则放到 poller 中。因此也需要更新统计信息,基于这些统计信息 DriverQueue 决定下次何时调度次 PipelineDriver。
PipelineDriver::_first_unfinished 记录着当前 Pipelien 的执行进度,即处理到哪一个 Operator,每次都是从 _first_unfinished 开始处理。
Part1: check
在 (curr_op, next_op) 传递数据之前,需要先做一些前置校验工作:
curr_op 是否已经处理结束
对于已经处理结束的,需要通过 _mark_operator_finishing 操作调用 Operator::set_finishing 函数来更改当前 Operator 的状态。通过 new_first_unfinished 记录已经完成的 Operator 的下标索引。
为什么 i == 0 时,_mark_operator_finishing 传入的参数包括 curr_op ?
- OperatorStage::FINISHING 表示 Operator 不会再有输入,等之前 pushed chunk 处理完,则进入 OperatorStage::FINISHED 状态。因此当 curr_op->is_finished() 为 true 时,即表示 next_op 不会再有输入,需要将 next_op 设置为 OperatorStage::FINISHING 状态
- SourceOperator 本身就不需要输入(更准确地说,是没有上游 Operator push_chunk 给 SourceOperator),等 SoureOperator 从网络/本地磁盘 pull 所有数据,curr_op->is_finished() 也会为 true,此时将 SourceOperator 标记为 OperatorStage::FINISHING,来表示 Source 不会再 pull。
要能在 (curr_op, next_op) 之间完成数据传递,前提是 curr_op 有就绪的数据输出,且 next_op 准备好接受数据
这一步通过 Operator::has_output 和 Operator::need_input 判断
当前 FragmentInstance 没有取消,这个 Pipeline 才需要继续执行
(curr_op, next_op) 一次数据流处理时间记录在 time_spent。这部分逻辑如下。
1 | for (size_t i = _first_unfinished; i < num_operators - 1; ++i) { |
Part2: pull-push
for 循环的第二部分,就是从 curr_op 拉取数据流推向 next_op,任何一步失败,都会将 bad status 返回给 GlobalDriverExecutor,进而取消整个 query。
第二部分代码是比较简单的,如下。
1 | for (size_t i = _first_unfinished; i < num_operators - 1; ++i) { |
part3: time
time_spent 记录了 (curr_op, next_op) 之间一次 {pull, push} 操作耗时。而每次 PipelineDriver 执行的最大时间片是 YIELD_MAX_TIME_SPENT,如果 query 设置了具体的 WorkGroup,最大的时间片是 YIELD_PREEMPT_MAX_TIME_SPENT,如果 time_spent 超过了这两个阈值的其中一个,则需要主动让出 CPU 给予其他线程机会,此时标记 should_yield 为 true,跳出 for-loop 循环。
1 | for (size_t i = _first_unfinished; i < num_operators - 1; ++i) { |
Part4:
一个 for-loop 完成,要么是本轮时间片用完要么是是当前 Pipeline 执行完。
new_first_unfinished 标记着本轮已经完成 Operator 的下标索引。
首先要 [_first_unfinished, new_first_unfinished) 区间的 Operator 通过 _mark_operator_finished 函数设置标记为完成, 再用 new_first_unfinished 更新 _first_unfinished。
SinkOperator::is_finished 返回 true 表示当前 Pipeline 所有 Operators 执行完毕,
finish_operators 函数遍历所有的 Operators,并对所有的 Operator 调用 _mark_operator_finished 函数。
1
2
3
4
5void PipelineDriver::finish_operators(RuntimeState* runtime_state) {
for (auto& op : _operators) {
_mark_operator_finished(op, runtime_state);
}
}_mark_operator_finished 会调用 Operator::set_finished 函数,使得所有的 Operators 都进入 OperatorStage::FINISHED 状态。
1
2
3
4
5
6
7
8
9
10
11
12Status PipelineDriver::_mark_operator_finished(OperatorPtr& op,
RuntimeState* state) {
RETURN_IF_ERROR(_mark_operator_finishing(op, state));
auto& op_state = _operator_stages[op->get_id()];
RETURN_IF(op_state >= OperatorStage::FINISHED, Status::OK());
{
SCOPED_TIMER(op->_finished_timer);
op_state = OperatorStage::FINISHED;
return op->set_finished(state);
}
}finish_operators 函数结束,再更改 PipelineDrvier 状态,使 drvier 进入 DriverState::PENDING_FINISH 或者 DriverState::FINISH 状态。
正常一个 Operator 生命周期结束时行为:
- Source::is_finished –> Source::set_finishing –> Source::set_finished
- PrevOperator::is_finished –> Operator::set_finishing –> Operator::set_finished
- PrevOperator::is_finished –> Sink::set_finishing –> Sink::is_finished –> Sink::set_finished –> Operators::close
否则, 就说明是本轮时间片用完,会更改当前 PipelineDriver 的状态,让 Executor 根据此状态判断是否加入 blocked_driver_poller 还是放回 _drvier_queue。
代码如下。
1 | while (true) { |