Pipeline Pipeline 由一组 Operators 组成,一个 Pipeline 内部 Operator 是串行执行的,第一个是 SourceOperatos,最后一个是 SinkOperator,数据流从 Source 流向 Sink:从前一个 Operator 拉取数据(Operator::pull_chunk ),再将该数据推到下一个Operator(Operator::push_chunk ),这就是 Pipeline 的 pull-push 模型。
OperatorStage 每个 Operator 也有对应的状态 OperatorStage,用于保证与 Operator 特定状态相关的 Callback(Preapre、Close 等函数) 只调用一次。
1 2 3 4 5 6 7 8 9 10 enum OperatorStage { INIT = 0 , PREPARED = 1 , PRECONDITION_NOT_READY = 2 , PROCESSING = 3 , FINISHING = 4 , FINISHED = 5 , CANCELLED = 6 , CLOSED = 7 , };
OperatorStage 的状态流如下图,实际上阻塞只会发生在 SourceOperator/SinkOperator,中间的 PipelineJob 是不会阻塞(只会因为时间片用完重新调度)的,只是个计算流。
Operator 创建时状态为 OperatorStage::INIT,提交给 GloablDriverExecutor 时,先先会调用 PipelineDriver::prepare 函数将所有的 Operators 设置为 OperatorStage::PREPARED 状态,
HashJoinProbeOperator 设置为 PRECONDITION_NOT_READY,
其他的通过 PipelineDriver::submit_operators 函数统一设置为 OperatorStage::PROCESSING 状态
后续就由 PipelineDriver 状态机推动了,并由 PipelineDriver::_mark_operator_xxx 系列函数更改状态,这些函数可以保证 Operator 每个状态都只被调用一次:
1 2 3 4 5 6 7 8 9 10 11 12 void mark_precondition_ready (RuntimeState* runtime_state) ;Status _mark_operator_finishing(OperatorPtr& op, RuntimeState* runtime_state); Status _mark_operator_finished(OperatorPtr& op, RuntimeState* runtime_state); Status _mark_operator_cancelled(OperatorPtr& op, RuntimeState* runtime_state); Status _mark_operator_closed(OperatorPtr& op, RuntimeState* runtime_state);
对应着 Operator 的不同状态更改函数
1 2 3 4 5 6 virtual void set_precondition_ready (RuntimeState* state) ;virtual Status set_finishing (RuntimeState* state) ;virtual Status set_finished (RuntimeState* state) ;virtual Status set_cancelled (RuntimeState* state) ;virtual void close (RuntimeState* state) ;
PipelineDriver:process PipelineDriver:process 函数会依次处理 PipelineDriver::_operators,但是 process 函数有时间片,超过最大时间片 YIELD_PREEMPT_MAX_TIME_SPENT ,就需要主动让出 CPU,让其他线程执行,将当前 Driver 放回到 DriverQueue 等待下次调度,如果发生阻塞则放到 poller 中。因此也需要更新统计信息,基于这些统计信息 DriverQueue 决定下次何时调度次 PipelineDriver。
PipelineDriver::_first_unfinished 记录着当前 Pipelien 的执行进度,即处理到哪一个 Operator,每次都是从 _first_unfinished 开始处理。
Part1: check 在 (curr_op, next_op) 传递数据之前,需要先做一些前置校验工作:
curr_op 是否已经处理结束
对于已经处理结束的,需要通过 _mark_operator_finishing 操作调用 Operator::set_finishing 函数来更改当前 Operator 的状态。通过 new_first_unfinished 记录已经完成的 Operator 的下标索引。
为什么 i == 0 时,_mark_operator_finishing 传入的参数包括 curr_op ?
OperatorStage::FINISHING 表示 Operator 不会再有输入,等之前 pushed chunk 处理完,则进入 OperatorStage::FINISHED 状态。因此当 curr_op->is_finished() 为 true 时,即表示 next_op 不会再有输入,需要将 next_op 设置为 OperatorStage::FINISHING 状态
SourceOperator 本身就不需要输入(更准确地说,是没有上游 Operator push_chunk 给 SourceOperator),等 SoureOperator 从网络/本地磁盘 pull 所有数据,curr_op->is_finished() 也会为 true,此时将 SourceOperator 标记为 OperatorStage::FINISHING,来表示 Source 不会再 pull。
要能在 (curr_op, next_op) 之间完成数据传递,前提是 curr_op 有就绪的数据输出,且 next_op 准备好接受数据
这一步通过 Operator::has_output 和 Operator::need_input 判断
当前 FragmentInstance 没有取消,这个 Pipeline 才需要继续执行
(curr_op, next_op) 一次数据流处理时间记录在 time_spent。这部分逻辑如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 for (size_t i = _first_unfinished; i < num_operators - 1 ; ++i) { { SCOPED_RAW_TIMER (&time_spent); auto & curr_op = _operators[i]; auto & next_op = _operators[i + 1 ]; if (curr_op->is_finished ()) { if (i == 0 ) { RETURN_IF_ERROR (return_status = _mark_operator_finishing(curr_op, runtime_state)); } RETURN_IF_ERROR (return_status = _mark_operator_finishing(next_op, runtime_state)); new_first_unfinished = i + 1 ; continue ; } if (!curr_op->has_output () || !next_op->need_input ()) { continue ; } if (_check_fragment_is_canceled(runtime_state)) { return _state; } } }
Part2: pull-push for 循环的第二部分,就是从 curr_op 拉取数据流推向 next_op,任何一步失败,都会将 bad status 返回给 GlobalDriverExecutor,进而取消整个 query。
第二部分代码是比较简单的,如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 for (size_t i = _first_unfinished; i < num_operators - 1 ; ++i) { { StatusOr<vectorized::ChunkPtr> maybe_chunk; { SCOPED_TIMER (curr_op->_pull_timer); maybe_chunk = curr_op->pull_chunk (runtime_state); } return_status = maybe_chunk.status (); if (!return_status.ok () && !return_status.is_end_of_file ()) { return return_status; } if (_check_fragment_is_canceled(runtime_state)) { return _state; } if (return_status.ok ()) { if (maybe_chunk.value () && maybe_chunk.value ()->num_rows () > 0 ) { size_t row_num = maybe_chunk.value ()->num_rows (); if (UNLIKELY (row_num > runtime_state->chunk_size ())) { return Status::InternalError (fmt::format( "Intermediate chunk size must noe be greater than {} " "actually {} after {}-th operator {} in {}" , runtime_state->chunk_size (), row_num, i, curr_op->get_name (), to_readable_string ())); } { SCOPED_TIMER (next_op->_push_timer); return_status = next_op->push_chunk (runtime_state, maybe_chunk.value ()); } if (!return_status.ok () && !return_status.is_end_of_file ()) { return return_status; } num_chunks_moved += 1 ; } } if (curr_op->is_finished ()) { if (i == 0 ) { RETURN_IF_ERROR (return_status = _mark_operator_finishing(curr_op, runtime_state)); } RETURN_IF_ERROR (return_status = _mark_operator_finishing(next_op, runtime_state)); new_first_unfinished = i + 1 ; continue ; } } }
part3: time time_spent 记录了 (curr_op, next_op) 之间一次 {pull, push} 操作耗时。而每次 PipelineDriver 执行的最大时间片是 YIELD_MAX_TIME_SPENT ,如果 query 设置了具体的 WorkGroup,最大的时间片是 YIELD_PREEMPT_MAX_TIME_SPENT ,如果 time_spent 超过了这两个阈值的其中一个,则需要主动让出 CPU 给予其他线程机会,此时标记 should_yield 为 true,跳出 for-loop 循环。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 for (size_t i = _first_unfinished; i < num_operators - 1 ; ++i) { if (time_spent >= OVERLOADED_MAX_TIME_SPEND_NS) { StarRocksMetrics::instance ()->pipe_driver_overloaded.increment (1 ); } if (time_spent >= YIELD_MAX_TIME_SPENT) { should_yield = true ; COUNTER_UPDATE (_yield_by_time_limit_counter, 1 ); break ; } if (_workgroup != nullptr && time_spent >= YIELD_PREEMPT_MAX_TIME_SPENT && _workgroup->driver_sched_entity ()->in_queue ()->should_yield (this , time_spent)) { should_yield = true ; COUNTER_UPDATE (_yield_by_preempt_counter, 1 ); break ; } }
Part4: 一个 for-loop 完成,要么是本轮时间片用完要么是是当前 Pipeline 执行完。
new_first_unfinished 标记着本轮已经完成 Operator 的下标索引。
首先要 [_first_unfinished, new_first_unfinished) 区间的 Operator 通过 _mark_operator_finished 函数设置标记为完成, 再用 new_first_unfinished 更新 _first_unfinished。
SinkOperator::is_finished 返回 true 表示当前 Pipeline 所有 Operators 执行完毕,
finish_operators 函数遍历所有的 Operators,并对所有的 Operator 调用 _mark_operator_finished 函数。
1 2 3 4 5 void PipelineDriver::finish_operators (RuntimeState* runtime_state) { for (auto & op : _operators) { _mark_operator_finished(op, runtime_state); } }
_mark_operator_finished 会调用 Operator::set_finished 函数,使得所有的 Operators 都进入 OperatorStage::FINISHED 状态。
1 2 3 4 5 6 7 8 9 10 11 12 Status PipelineDriver::_mark_operator_finished(OperatorPtr& op, RuntimeState* state) { RETURN_IF_ERROR (_mark_operator_finishing(op, state)); auto & op_state = _operator_stages[op->get_id ()]; RETURN_IF (op_state >= OperatorStage::FINISHED, Status::OK ()); { SCOPED_TIMER (op->_finished_timer); op_state = OperatorStage::FINISHED; return op->set_finished (state); } }
finish_operators 函数结束,再更改 PipelineDrvier 状态,使 drvier 进入 DriverState::PENDING_FINISH 或者 DriverState::FINISH 状态。
正常一个 Operator 生命周期结束时行为:
Source::is_finished –> Source::set_finishing –> Source::set_finished
PrevOperator::is_finished –> Operator::set_finishing –> Operator::set_finished
PrevOperator::is_finished –> Sink::set_finishing –> Sink::is_finished –> Sink::set_finished –> Operators::close
否则, 就说明是本轮时间片用完,会更改当前 PipelineDriver 的状态,让 Executor 根据此状态判断是否加入 blocked_driver_poller 还是放回 _drvier_queue。
代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 while (true ) { RETURN_IF_LIMIT_EXCEEDED (runtime_state, "Pipeline" ); size_t num_chunks_moved = 0 ; bool should_yield = false ; size_t num_operators = _operators.size (); size_t new_first_unfinished = _first_unfinished; for (size_t i = _first_unfinished; i < num_operators - 1 ; ++i) { } for (auto i = _first_unfinished; i < new_first_unfinished; ++i) { RETURN_IF_ERROR (return_status = _mark_operator_finished(_operators[i], runtime_state)); } _first_unfinished = new_first_unfinished; if (sink_operator ()->is_finished ()) { finish_operators (runtime_state); set_driver_state ( is_still_pending_finish () ? DriverState::PENDING_FINISH : DriverState::FINISH); return _state; } if (num_chunks_moved == 0 || should_yield) { if (is_precondition_block ()) { set_driver_state (DriverState::PRECONDITION_BLOCK); COUNTER_UPDATE (_block_by_precondition_counter, 1 ); } else if (!sink_operator ()->is_finished () && !sink_operator ()->need_input ()) { set_driver_state (DriverState::OUTPUT_FULL); COUNTER_UPDATE (_block_by_output_full_counter, 1 ); } else if (!source_operator ()->is_finished () && !source_operator ()->has_output ()) { set_driver_state (DriverState::INPUT_EMPTY); COUNTER_UPDATE (_block_by_input_empty_counter, 1 ); } else { set_driver_state (DriverState::READY); } return _state; }