Pipeline: PipelineDriver 状态机(1)

本篇来看看 PipelineDriver 的执行流程,以及如何与 PipelineDriverPoller 交互。

Pipeline DriverState

一个 PipelineDriver 的状态有如下 10 种:

1
2
3
4
5
6
7
8
9
10
11
12
enum DriverState : uint32_t {
NOT_READY = 0,
READY = 1,
RUNNING = 2,
INPUT_EMPTY = 3,
OUTPUT_FULL = 4,
PRECONDITION_BLOCK = 5,
FINISH = 6,
CANCELED = 7,
INTERNAL_ERROR = 8,
PENDING_FINISH = 9,
};

他们之间的状态变化如下:
PipelineaDriver-1

在此处不单独讲解每个状态,后面融到代码流程一起讲解。

GlobalDriverExecutor::submit

先回顾下 GlobalDrvierExecutor 整体设计:

pipeline-1

PipelineDriver 创建完,并通过 submit 接口传递给 DriverExecutor。Executor 会先根据该 drvier 当前状态来判断是进入 _driver_queue 还是 _blocked_driver_poller 中。

比如,在 Morsel 和 OlapScanOperator(2) 中提到的 OlapScanPrepareOperator 和 OlapScanOperator:需要等待 PrepareOperator::pull_chunk 执行完(因为需要先获取待访问的 tablets 和 rowsets),ScanOperator 才能执行。然而 Prepare 和 Scan 分属于两个 PipelineDriver。他们的依赖关系如下:

Pipeline-OlapScanOperator-1
因此,而 pipeline0 在被提交给 Executor 后会进入 _driver_queue,pipeline1 提交后则进入 _blocked_driver_poller。这里的初始判断条件:

  • HashJoin:Join 需要先等待 Build 阶段完成,才能进入 Probe 阶段。Build 阶段即 Probe 阶段的 Precondition,Probe 则会进入 submit 函数第一个分支,并添加到 _blocked_driver_poller
  • 其他,都是使用 SourceOperator::is_finished 函数和 SourceOperator::has_output 函数来判断进入哪个 driver_queue

代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void GlobalDriverExecutor::submit(DriverRawPtr driver) {
driver->start_schedule(_schedule_count, _driver_execution_ns);

if (driver->is_precondition_block()) {
driver->set_driver_state(DriverState::PRECONDITION_BLOCK);
driver->mark_precondition_not_ready();
this->_blocked_driver_poller->add_blocked_driver(driver);
} else {
driver->submit_operators();

// Try to add the driver to poller first.
if (!driver->source_operator()->is_finished()
&& !driver->source_operator()->has_output()) {
driver->set_driver_state(DriverState::INPUT_EMPTY);
this->_blocked_driver_poller->add_blocked_driver(driver);
} else {
this->_driver_queue->put_back(driver);
}
}
}

PipelineDriverPoller

PipelineDriverPoller 的主要字段解释如下:

  • _driver_queue: PipelineDriverPoller 和 GlobalDriverExecutor 共享 _driver_queue,存放着 READY 状态可执行的 PipelineDriver。

  • _blocked_drivers: 存储当前 Backends 中所有暂时无法执行的 blocked PipelineDrivers,

    _global_mutex 是用于保护 _blocked_drivers,在 poll_thread 和 PipelineDriverPoller::add_blocked_driver 函数之间形成互斥。

    _cond 则是配合 _gloabl_mutex,上层调用 add_blocked_driver 函数添加 blocked driver 时,唤醒正在阻塞等待的 poller_thread。

  • _local_blocked_drivers: 表征的是当前 poller_thread 中正在处理 blocked PipelineDrivers。

    但是我觉得没必要引入一个 _local_mutex,SpinLock 就能满足要求。详见 poller_thread 分析。

类 PipelineDriverPoller 主要字段及其构造函数如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class PipelineDriverPoller {
public:
explicit PipelineDriverPoller(DriverQueue* driver_queue)
: _driver_queue(driver_queue),
_polling_thread(nullptr),
_is_polling_thread_initialized(false),
_is_shutdown(false) { }

using DriverList = std::list<DriverRawPtr>;
//... other methods

private:
mutable std::mutex _global_mutex;
std::condition_variable _cond;
DriverList _blocked_drivers;

mutable std::shared_mutex _local_mutex;
DriverList _local_blocked_drivers;

DriverQueue* _driver_queue;
//...
};

// PipelineDriverPoller 在 Executor 构造
GlobalDriverExecutor::GlobalDriverExecutor(std::string name,
std::unique_ptr<ThreadPool> pool,
bool enable_resource_group)
: Base(std::move(name)),
// driver_queue 在 Executor 中构造
_driver_queue(enable_resource_group
? std::unique_ptr<DriverQueue>(new WorkGroupDriverQueue())
: std::make_unique<QuerySharedDriverQueue>()),
_thread_pool(std::move(pool)),
// 传递 _driver_queue 指针给 poller
_blocked_driver_poller(new PipelineDriverPoller(_driver_queue.get())),
_exec_state_reporter(new ExecStateReporter()) {
}

add_blocked_driver

add_blocked_driver 函数是 Executor 向 Poller 添加 blocked drivers 的函数接口,这里需要使用 _global_mutex 来与 poller_thread 形成互斥。

_pending_timer_sw 用于统计每次 driver 处于阻塞状态的时间,即加入 poller 时重置,从 poller 中删除后计算本次 blocked 耗时。多次阻塞总耗时更新到 PipelineDriver::_pending_timer 中,查询结束可以在 query profile 中搜索 ‘PendingTime’ 字段查看。

1
2
3
4
5
6
void PipelineDriverPoller::add_blocked_driver(const DriverRawPtr driver) {
std::unique_lock<std::mutex> lock(_global_mutex);
_blocked_drivers.push_back(driver);
driver->_pending_timer_sw->reset();
_cond.notify_one();
}

run_internal

下面是 Poller 的重点:poller_thread。可以将 poller_thread 分为三个部分:

Part1

等待 Executor 将 blocked drivers 加入到 Poller 中。

在一个正常部署的 StarRocks 集群中,任意一个时刻都应该有查询存在,且生成的 PipelineDriver 会很多。因此,add_blocked_driver 函数与 poller_thread 之间的 race condition 应该比较激烈。poller_thread 在局部作用域里使用 std::mutex + std::condition_variable ,来缩小 _gloab_mutex 的范围。

如果有新 blocked drivers 加入,则将其从 _blocked_drivers 中移到 tmp_blocked_drivers,局部作用域结束就释放了 _gloabl_mutex。poller_thread 后续就直接对 poller_thread 进行操作,也就不会阻塞 add_blocked_driver 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void PipelineDriverPoller::run_internal() {
_is_polling_thread_initialized.store(true, std::memory_order_release);

DriverList tmp_blocked_drivers;
while (!_is_shutdown.load(std::memory_order_acquire)) {
// part1 start...
// 1. 局部作用域
{
std::unique_lock<std::mutex> lock(_global_mutex);
// 将 _blocked_drivers 中的元素移到 tmp_blocked_drivers
tmp_blocked_drivers.splice(
tmp_blocked_drivers.end(), _blocked_drivers);

if (_local_blocked_drivers.empty() && tmp_blocked_drivers.empty()
&& _blocked_drivers.empty()) {
// 超时等待
std::cv_status cv_status = std::cv_status::no_timeout;
while (!_is_shutdown.load(std::memory_order_acquire)
&& _blocked_drivers.empty()) {
cv_status = _cond.wait_for(lock, 10ms);
}
if (cv_status == std::cv_status::timeout) {
continue;
}

if (_is_shutdown.load(std::memory_order_acquire)) {
break;
}
// 获得本轮的 blocked drivers
tmp_blocked_drivers.splice(
tmp_blocked_drivers.end(), _blocked_drivers);
}
}
//...
}
}

Part2

得到本轮 tmp_blocked_drivers 后,将其加入到 _local_blocked_drivers 中。后续对 _local_blocked_drivers 中的 drivers 当前状态进行判断。因为有个 iterate_immutable_driver 函数需要读取 _local_blocked_drivers,故而也用 _local_mutex 对其进行互斥保护。

但是,iterate_immutable_driver 是来源于 HTTP 请求,通常情况下请求几乎不会被执行,可能开发人员会在调试时使用下,因次这里几乎没有必要使用 std::mutex,更好的选择显然是使用 SpinLock。

需要对 _local_blocked_drivers 中的每个 driver 进行遍历,检测是否 READY 或者处于终止状态

  • case1: QueryContext::is_query_expired

    每个 query 都有个 query_timeout(默认值 300s),超过这个时间就会 cancel 该 query。这时会主动调用 FragmentContext::cancel 函数,标记这个 query。由于 FragmentInstance 的所有 PipelineDrievrs 共享一个 FragmentContext 对象,因此其他 PipelineDriver 就会可以通过检测 FragmentContext::is_canceled 来查看当前 query 是否仍处于取消状态

    实际上这个状态会上报,再下发取消其他节点的 FragementInstances?

  • case2: FragmentContext::is_canceled

    触发 FragmentContext::cancel 操作的场景很多,除了上述的查询超时,还有在查询过程中 Operators::pull_chunk、Operators::push_chunk 执行遇到某些问题,此时在 GloablDriverExecutor 会调用 QueryContex::cancel 将该 query 的所有 FragementInstance 都进行 cancel。如此,该 query 下所有的 FragmentInstances 的所有 PipelineDriver 都不会再被执行。

  • case3: DriverState::PENDING_FINISH

    从代码中,不难发现只有当 PipelineDriver::pending_finish 为 false 时,这个 blocked_driver 才会调用 remove_blocked_driver 函数将其从 _local_blocked_drivers 中删除。

    1
    2
    3
    4
    5
    6
    7
    void PipelineDriverPoller::remove_blocked_driver(
    DriverList& local_blocked_drivers, DriverList::iterator& driver_it) {
    auto& driver = *driver_it;
    driver->_pending_timer->update(
    driver->_pending_timer_sw->elapsed_time());
    local_blocked_drivers.erase(driver_it++);
    }

    转为 DriverState::PENDING_FINISH 状态的前提是 is_still_pending_finish 函数返回 true:

    1
    2
    3
    4
     bool PipelineDriver::is_still_pending_finish() {
    return source_operator()->pending_finish()
    || sink_operator()->pending_finish();
    }

    实际上 SourceOperator 中,只有 ScanOperator 实现了这个函数,且返回值为 false

    1
    2
    3
    bool ScanOperator::pending_finish() const {
    return false;
    }

    也就是说, is_still_pending_finish 函数的返回值取决于 SinkOperator::pending_finish 了。

    这也是这个阶段存在的原因

    实际上就是为了等待 SinkOperator 中的数据处理完,比如 ExchangeSinkOperator 中的 buffer 消耗完才能进入 DriverState::FINISH 阶段。因此 is_still_pending_finish 返回 true 时,blocked drvier 并没有加入 ready drivers 中,而是被忽略了,等待下一次 poll。

  • case4: PipelineDriver::is_finished
    query 终态有三种:

    1
    2
    3
    4
    5
    bool is_finished() {
    return _state == DriverState::FINISH
    || _state == DriverState::CANCELED
    || _state == DriverState::INTERNAL_ERROR;
    }

    已经处于终态的可以直接从 _local_blocked_drivers 中删除,并加入 ready_drivers 中。

  • case5: PipelineDriver::is_not_blocked
    这个分支才是用于检测 PipelineDriver 是否解除阻塞,可以继续执行了。比如 OlapScanPrepareOperator::pull_chunk 执行完毕,OlapScanOperator 的 PipelineDrvier::is_not_block 函数才会返回 true,从 _local_blocked_drivers 中转入 ready_drivers。

  • case6

    PipelineDriver 仍无法 Ready

Part2 代码如下.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
void PipelineDriverPoller::run_internal() {
std::vector<DriverRawPtr> ready_drivers;
while (!_is_shutdown.load(std::memory_order_acquire)) {
// above part1 code ...

// part2 start ...
{
std::unique_lock write_lock(_local_mutex);

if (!tmp_blocked_drivers.empty()) {
_local_blocked_drivers.splice(
_local_blocked_drivers.end(), tmp_blocked_drivers);
}

auto driver_it = _local_blocked_drivers.begin();
while (driver_it != _local_blocked_drivers.end()) {
auto* driver = *driver_it;
auto* fragment_ctx = driver->fragment_ctx();

if (driver->query_ctx()->is_query_expired()) {
// case1: 查询超时
fragment_ctx->cancel(Status::TimedOut(fmt::format(
"Query exceeded time limit of {} seconds",
driver->query_ctx()->get_query_expire_seconds())));

driver->cancel_operators(fragment_ctx->runtime_state());
if (driver->is_still_pending_finish()) {
driver->set_driver_state(DriverState::PENDING_FINISH);
++driver_it;
} else {
driver->set_driver_state(DriverState::FINISH);
remove_blocked_driver(_local_blocked_drivers, driver_it);
ready_drivers.emplace_back(driver);
}
} else if (fragment_ctx->is_canceled()) {
//case2: query 已经被 cancel
driver->cancel_operators(fragment_ctx->runtime_state());
if (driver->is_still_pending_finish()) {
driver->set_driver_state(DriverState::PENDING_FINISH);
++driver_it;
} else {
driver->set_driver_state(DriverState::CANCELED);
remove_blocked_driver(_local_blocked_drivers, driver_it);
ready_drivers.emplace_back(driver);
}
} else if (driver->pending_finish()) {
// case3: 处于即将完成的状态
if (driver->is_still_pending_finish()) {
++driver_it;
} else {
driver->set_driver_state(
fragment_ctx->is_canceled()
? DriverState::CANCELED
: DriverState::FINISH);
remove_blocked_driver(_local_blocked_drivers, driver_it);
ready_drivers.emplace_back(driver);
}
} else if (driver->is_finished()) {
// case4: 已经完成
remove_blocked_driver(_local_blocked_drivers, driver_it);
ready_drivers.emplace_back(driver);
} else if (driver->is_not_blocked()) {
// case5: 解除阻塞
driver->set_driver_state(DriverState::READY);
remove_blocked_driver(_local_blocked_drivers, driver_it);
ready_drivers.emplace_back(driver);
} else {
// case6: NOT_READY
++driver_it;
}
}
}

//...
} // while-loop
}

Part3

第三部分比较简单,就是将 ready 加入 DriverQueue,让 Executor 就会从该 driver_queue 取出可执行的 PipelineDrivers。这部分使用了 spin-loop 的优化,这个优化详情分析可见 RocksDB 系列的 WriteThread 如何自适应优化线程同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void PipelineDriverPoller::run_internal() {
int spin_count = 0;
while (!_is_shutdown.load(std::memory_order_acquire)) {
//...above code

// part3 start...

if (ready_drivers.empty()) {
spin_count += 1;
} else {
spin_count = 0;

_driver_queue->put_back(ready_drivers);
ready_drivers.clear();
}

if (spin_count != 0 && spin_count % 64 == 0) {
#ifdef __x86_64__
_mm_pause();
#else
// TODO: Maybe there's a better intrinsic like _mm_pause on non-x86_64 architecture.
sched_yield();
#endif
}
if (spin_count == 640) {
spin_count = 0;
sched_yield();
}
}
}

PipelineDriverPoller::run_internal 完整的代码可见 PipelineDriverPoller::run_internal