GloablDriverExecutor: 实现协程模型的执行器

在前面几篇已经叙述了 PipelineDriver、DriverQueue、PipelinePoller 等设计,下面再来看看是 GlobalDriverExecutor 怎么将他们组合在一起的。

pipeline-1

由于 _driver_queue 和 _blocked_driver_poller 是在 _thread_pool 中所有 workers 间共享的。一个 query FragemntInstance 划分成 dop 个 PipelineDrivers 分发(dispatch)给 n 个 worker 去执行。但是有可能 driver0 开始分配给 worker0,在执行过程中被添加到 blocked_driver_poller,后又被 worker1 取出,最终 driver0 在 work1 中执行。

这里是不是能优化下?分为两个队列 LocalReadyQueue,RemoteReadyQueue:只有 local_ready_queue 中满了,才会从其他 workers 中窃取(steal) PipelineDrivers 放到 remote_ready_queue,减少 corss-core 通信

worker

Executor 的所有 workers 都是阻塞等待在 DrvierQueue::take 处,当 DriverQueue 中有添加了新的 Ready PipelineDrvier 时,就会有一个 worker 解除阻塞。

  1. 对于取出的 ready_driver,worker 会先检测其状态,过滤一些已经处于终态(CANCELED、FINISH、INTERNEL_ERROR)的 Driver,会调用 GlobalDriverExecutor::_finalize_driver 函数,如果是 FragementInstance 的最后一个 PipelineDriver,则生成该 FragmentInstance 的查询统计信息(即 profile),汇报给 Frontend。
  2. 对于正常处于 READY/Running 状态的 driver,则调用 PipelineDriver::process 函数,推动 pipeline 状态机前进。
  • 如果返回的状态 maybe_state.is_not_ok,则通过 QueryContext::cancel 将当前 Backend 上该 query 的所有 FragmentInstances 标记为取消状态。后续该 query 所有的 drivers,无论是在 Poller 中或者正在 Executor 中的,在执行前会去检测 fragment_ctx::is_canceled,如果返回 true,则取消本次执行,进入 step(1)
  • 如果 maybe_state.is_ok,则会根据此时 driver 的状态,判断是重新放回到 driver_queue、或是 blocked_driver_poller,或者说直接完成了。

在执行过程中,会设置并更新 Driver、DriverQueue 的统计信息,便于 DrvierQueue 更为准确地调度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
void GlobalDriverExecutor::_worker_thread() {
auto current_thread = Thread::current_thread();
const int worker_id = _next_id++;
while (true) {
if (_num_threads_setter.should_shrink()) {
break;
}

// 0. 取出 READY 状态的 driver
auto maybe_driver = this->_driver_queue->take();
auto driver = maybe_driver.value();
DCHECK(driver != nullptr);

auto* query_ctx = driver->query_ctx();
auto* fragment_ctx = driver->fragment_ctx();

// 统计信息
driver->increment_schedule_times();
_schedule_count++;

auto runtime_state_ptr = fragment_ctx->runtime_state_ptr();
auto* runtime_state = runtime_state_ptr.get();
{
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(
runtime_state->instance_mem_tracker());

// 1.1 终态检测: 检测 fragment 是否已经取消
if (fragment_ctx->is_canceled()) {
driver->cancel_operators(runtime_state);
if (driver->is_still_pending_finish()) {
driver->set_driver_state(DriverState::PENDING_FINISH);
_blocked_driver_poller->add_blocked_driver(driver);
} else {
_finalize_driver(driver,
runtime_state,
DriverState::CANCELED);
}
continue;
}

//1.2 终态检测:driver 是否已经处于终态
if (driver->is_finished()) {
_finalize_driver(driver, runtime_state, driver->driver_state());
continue;
}

// 统计信息
int64_t start_time = driver->get_active_time();

// 2. 推动 PipelineDriver
StatusOr<DriverState> maybe_state;
TRY_CATCH_ALL(maybe_state, driver->process(runtime_state, worker_id));

Status status = maybe_state.status();
this->_driver_queue->update_statistics(driver);
int64_t end_time = driver->get_active_time();
_driver_execution_ns += end_time - start_time;

// 2.1 PipelineDriver 执行过程出错
if (!status.ok()) {
// 取消整个 query
query_ctx->cancel(status);
driver->cancel_operators(runtime_state);
if (driver->is_still_pending_finish()) {
driver->set_driver_state(DriverState::PENDING_FINISH);
_blocked_driver_poller->add_blocked_driver(driver);
} else {
_finalize_driver(driver,
runtime_state,
DriverState::INTERNAL_ERROR);
}
continue;
}

// 2.2 没有出错则更新状态,重新放回 driver_queue 或是添加到 poller
auto driver_state = maybe_state.value();
switch (driver_state) {
// 重新放回 ready_driver
case READY:
case RUNNING: {
this->_driver_queue->put_back_from_executor(driver);
break;
}
// 终态
case FINISH:
case CANCELED:
case INTERNAL_ERROR: {
_finalize_driver(driver, runtime_state, driver_state);
break;
}
// 阻塞
case INPUT_EMPTY:
case OUTPUT_FULL:
case PENDING_FINISH:
case PRECONDITION_BLOCK: {
_blocked_driver_poller->add_blocked_driver(driver);
break;
}
default:
DCHECK(false);
}
}
}
}

Coroutione

那么这个协程模型体现在哪呢?

PipelineDriver::process 中变量 should_yield 被设置为 true 时,

  • 要么是当前状态受阻,则该 driver 放入到 _blocked_driver_poller
  • 或者是当前时间片用完,都会重新放回到 _driver_queue,重新调度分发,给其他 drivers 执行的可能

只要有 Ready/Running 状态的 driver,Executor 的 worker 就不会阻塞。