Pipeline: Operator 状态机(2)

Pipeline

Pipeline 由一组 Operators 组成,一个 Pipeline 内部 Operator 是串行执行的,第一个是 SourceOperatos,最后一个是 SinkOperator,数据流从 Source 流向 Sink:从前一个 Operator 拉取数据(Operator::pull_chunk),再将该数据推到下一个Operator(Operator::push_chunk),这就是 Pipeline 的 pull-push 模型。

PipelineDriver-2

OperatorStage

每个 Operator 也有对应的状态 OperatorStage,用于保证与 Operator 特定状态相关的 Callback(Preapre、Close 等函数) 只调用一次。

1
2
3
4
5
6
7
8
9
10
enum OperatorStage {
INIT = 0,
PREPARED = 1,
PRECONDITION_NOT_READY = 2,
PROCESSING = 3,
FINISHING = 4,
FINISHED = 5,
CANCELLED = 6,
CLOSED = 7,
};

OperatorStage 的状态流如下图,实际上阻塞只会发生在 SourceOperator/SinkOperator,中间的 PipelineJob 是不会阻塞(只会因为时间片用完重新调度)的,只是个计算流。

Pipeline-Operator-stage-1

Operator 创建时状态为 OperatorStage::INIT,提交给 GloablDriverExecutor 时,先先会调用 PipelineDriver::prepare 函数将所有的 Operators 设置为 OperatorStage::PREPARED 状态,

  • HashJoinProbeOperator 设置为 PRECONDITION_NOT_READY,
  • 其他的通过 PipelineDriver::submit_operators 函数统一设置为 OperatorStage::PROCESSING 状态

后续就由 PipelineDriver 状态机推动了,并由 PipelineDriver::_mark_operator_xxx 系列函数更改状态,这些函数可以保证 Operator 每个状态都只被调用一次:

1
2
3
4
5
6
7
8
9
10
11
12
// class PipelineDriver

// 所有的 Operator 设置为 OperatorStage::PROCESSING 状态
void mark_precondition_ready(RuntimeState* runtime_state);
// 将 operator 设置为 OperatorStage::FINISHING 状态
Status _mark_operator_finishing(OperatorPtr& op, RuntimeState* runtime_state);
// 将 operator 设置为 OperatorStage::FINISHED 状态
Status _mark_operator_finished(OperatorPtr& op, RuntimeState* runtime_state);
// 将 operator 设置为 OperatorStage::CANCELLED 状态
Status _mark_operator_cancelled(OperatorPtr& op, RuntimeState* runtime_state);
// 将 operator 设置为 OperatorStage::CLOSED 状态
Status _mark_operator_closed(OperatorPtr& op, RuntimeState* runtime_state);

对应着 Operator 的不同状态更改函数

1
2
3
4
5
6
// class Operator
virtual void set_precondition_ready(RuntimeState* state);
virtual Status set_finishing(RuntimeState* state);
virtual Status set_finished(RuntimeState* state);
virtual Status set_cancelled(RuntimeState* state);
virtual void close(RuntimeState* state);

PipelineDriver:process

PipelineDriver:process 函数会依次处理 PipelineDriver::_operators,但是 process 函数有时间片,超过最大时间片 YIELD_PREEMPT_MAX_TIME_SPENT,就需要主动让出 CPU,让其他线程执行,将当前 Driver 放回到 DriverQueue 等待下次调度,如果发生阻塞则放到 poller 中。因此也需要更新统计信息,基于这些统计信息 DriverQueue 决定下次何时调度次 PipelineDriver。

PipelineDriver::_first_unfinished 记录着当前 Pipelien 的执行进度,即处理到哪一个 Operator,每次都是从 _first_unfinished 开始处理。

Part1: check

在 (curr_op, next_op) 传递数据之前,需要先做一些前置校验工作:

  1. curr_op 是否已经处理结束

    对于已经处理结束的,需要通过 _mark_operator_finishing 操作调用 Operator::set_finishing 函数来更改当前 Operator 的状态。通过 new_first_unfinished 记录已经完成的 Operator 的下标索引。

    为什么 i == 0 时,_mark_operator_finishing 传入的参数包括 curr_op ?

    1. OperatorStage::FINISHING 表示 Operator 不会再有输入,等之前 pushed chunk 处理完,则进入 OperatorStage::FINISHED 状态。因此当 curr_op->is_finished() 为 true 时,即表示 next_op 不会再有输入,需要将 next_op 设置为 OperatorStage::FINISHING 状态
    2. SourceOperator 本身就不需要输入(更准确地说,是没有上游 Operator push_chunk 给 SourceOperator),等 SoureOperator 从网络/本地磁盘 pull 所有数据,curr_op->is_finished() 也会为 true,此时将 SourceOperator 标记为 OperatorStage::FINISHING,来表示 Source 不会再 pull。
  2. 要能在 (curr_op, next_op) 之间完成数据传递,前提是 curr_op 有就绪的数据输出,且 next_op 准备好接受数据

    这一步通过 Operator::has_output 和 Operator::need_input 判断

  3. 当前 FragmentInstance 没有取消,这个 Pipeline 才需要继续执行

(curr_op, next_op) 一次数据流处理时间记录在 time_spent。这部分逻辑如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
for (size_t i = _first_unfinished; i < num_operators - 1; ++i) {
{
SCOPED_RAW_TIMER(&time_spent);
auto& curr_op = _operators[i];
auto& next_op = _operators[i + 1];

// 1.
if (curr_op->is_finished()) {
if (i == 0) {
// For source operators
RETURN_IF_ERROR(return_status = _mark_operator_finishing(curr_op, runtime_state));
}
RETURN_IF_ERROR(return_status = _mark_operator_finishing(next_op, runtime_state));
new_first_unfinished = i + 1;
continue;
}

// try successive operator pairs
if (!curr_op->has_output() || !next_op->need_input()) {
continue;
}

if (_check_fragment_is_canceled(runtime_state)) {
return _state;
}
//...
}
//...
}

Part2: pull-push

for 循环的第二部分,就是从 curr_op 拉取数据流推向 next_op,任何一步失败,都会将 bad status 返回给 GlobalDriverExecutor,进而取消整个 query。

第二部分代码是比较简单的,如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
for (size_t i = _first_unfinished; i < num_operators - 1; ++i) {
{
//...above code

//1. pull chunk from current operator
StatusOr<vectorized::ChunkPtr> maybe_chunk;
{
SCOPED_TIMER(curr_op->_pull_timer);
maybe_chunk = curr_op->pull_chunk(runtime_state);
}
return_status = maybe_chunk.status();
if (!return_status.ok() && !return_status.is_end_of_file()) {
return return_status;
}

// 快速 check 下 query 是否被取消
if (_check_fragment_is_canceled(runtime_state)) {
return _state;
}

//2. push chunk
if (return_status.ok()) {
if (maybe_chunk.value() && maybe_chunk.value()->num_rows() > 0) {
// 本次读取的数据超过限制
size_t row_num = maybe_chunk.value()->num_rows();
if (UNLIKELY(row_num > runtime_state->chunk_size())) {
return Status::InternalError(fmt::format(
"Intermediate chunk size must noe be greater than {} "
"actually {} after {}-th operator {} in {}",
runtime_state->chunk_size(), row_num,
i, curr_op->get_name(), to_readable_string()));
}

{
SCOPED_TIMER(next_op->_push_timer);
return_status = next_op->push_chunk(runtime_state, maybe_chunk.value());
}

if (!return_status.ok() && !return_status.is_end_of_file()) {
return return_status;
}

num_chunks_moved += 1;
}
}

// 3. Check curr_op finished again
if (curr_op->is_finished()) {
if (i == 0) {
// For source operators
RETURN_IF_ERROR(return_status = _mark_operator_finishing(curr_op, runtime_state));
}
RETURN_IF_ERROR(return_status = _mark_operator_finishing(next_op, runtime_state));
new_first_unfinished = i + 1;
continue;
}
}
///...
}

part3: time

time_spent 记录了 (curr_op, next_op) 之间一次 {pull, push} 操作耗时。而每次 PipelineDriver 执行的最大时间片是 YIELD_MAX_TIME_SPENT,如果 query 设置了具体的 WorkGroup,最大的时间片是 YIELD_PREEMPT_MAX_TIME_SPENT,如果 time_spent 超过了这两个阈值的其中一个,则需要主动让出 CPU 给予其他线程机会,此时标记 should_yield 为 true,跳出 for-loop 循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
for (size_t i = _first_unfinished; i < num_operators - 1; ++i) {
//...above code

if (time_spent >= OVERLOADED_MAX_TIME_SPEND_NS) {
StarRocksMetrics::instance()->pipe_driver_overloaded.increment(1);
}
// yield when total chunks moved or time spent on-core for evaluation
// exceed the designated thresholds.
if (time_spent >= YIELD_MAX_TIME_SPENT) {
should_yield = true;
COUNTER_UPDATE(_yield_by_time_limit_counter, 1);
break;
}
if (_workgroup != nullptr && time_spent >= YIELD_PREEMPT_MAX_TIME_SPENT &&
_workgroup->driver_sched_entity()->in_queue()->should_yield(this, time_spent)) {
should_yield = true;
COUNTER_UPDATE(_yield_by_preempt_counter, 1);
break;
}
}

Part4:

一个 for-loop 完成,要么是本轮时间片用完要么是是当前 Pipeline 执行完。

  • new_first_unfinished 标记着本轮已经完成 Operator 的下标索引。

    首先要 [_first_unfinished, new_first_unfinished) 区间的 Operator 通过 _mark_operator_finished 函数设置标记为完成, 再用 new_first_unfinished 更新 _first_unfinished。

  • SinkOperator::is_finished 返回 true 表示当前 Pipeline 所有 Operators 执行完毕,

    finish_operators 函数遍历所有的 Operators,并对所有的 Operator 调用 _mark_operator_finished 函数。

    1
    2
    3
    4
    5
    void PipelineDriver::finish_operators(RuntimeState* runtime_state) {
    for (auto& op : _operators) {
    _mark_operator_finished(op, runtime_state);
    }
    }

    _mark_operator_finished 会调用 Operator::set_finished 函数,使得所有的 Operators 都进入 OperatorStage::FINISHED 状态。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    Status PipelineDriver::_mark_operator_finished(OperatorPtr& op, 
    RuntimeState* state) {
    RETURN_IF_ERROR(_mark_operator_finishing(op, state));
    auto& op_state = _operator_stages[op->get_id()];
    RETURN_IF(op_state >= OperatorStage::FINISHED, Status::OK());

    {
    SCOPED_TIMER(op->_finished_timer);
    op_state = OperatorStage::FINISHED;
    return op->set_finished(state);
    }
    }

    finish_operators 函数结束,再更改 PipelineDrvier 状态,使 drvier 进入 DriverState::PENDING_FINISH 或者 DriverState::FINISH 状态。

    正常一个 Operator 生命周期结束时行为:

    • Source::is_finished –> Source::set_finishing –> Source::set_finished
    • PrevOperator::is_finished –> Operator::set_finishing –> Operator::set_finished
    • PrevOperator::is_finished –> Sink::set_finishing –> Sink::is_finished –> Sink::set_finished –> Operators::close
  • 否则, 就说明是本轮时间片用完,会更改当前 PipelineDriver 的状态,让 Executor 根据此状态判断是否加入 blocked_driver_poller 还是放回 _drvier_queue。

代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
while (true) {
RETURN_IF_LIMIT_EXCEEDED(runtime_state, "Pipeline");

size_t num_chunks_moved = 0;
bool should_yield = false;
size_t num_operators = _operators.size();
size_t new_first_unfinished = _first_unfinished;

for (size_t i = _first_unfinished; i < num_operators - 1; ++i) {
/** above code**/
}

// close finished operators and update _first_unfinished index
for (auto i = _first_unfinished; i < new_first_unfinished; ++i) {
RETURN_IF_ERROR(return_status = _mark_operator_finished(_operators[i], runtime_state));
}
_first_unfinished = new_first_unfinished;

if (sink_operator()->is_finished()) {
finish_operators(runtime_state);
set_driver_state(
is_still_pending_finish()
? DriverState::PENDING_FINISH
: DriverState::FINISH);
return _state;
}

// no chunk moved in current round means that the driver is blocked.
// should yield means that the CPU core is occupied the driver for a
// very long time so that the driver should switch off the core and
// give chance for another ready driver to run.
if (num_chunks_moved == 0 || should_yield) {
if (is_precondition_block()) {
set_driver_state(DriverState::PRECONDITION_BLOCK);
COUNTER_UPDATE(_block_by_precondition_counter, 1);
} else if (!sink_operator()->is_finished()
&& !sink_operator()->need_input()) {
set_driver_state(DriverState::OUTPUT_FULL);
COUNTER_UPDATE(_block_by_output_full_counter, 1);
} else if (!source_operator()->is_finished()
&& !source_operator()->has_output()) {
set_driver_state(DriverState::INPUT_EMPTY);
COUNTER_UPDATE(_block_by_input_empty_counter, 1);
} else {
set_driver_state(DriverState::READY);
}
return _state;
}