在前面几篇已经叙述了 PipelineDriver、DriverQueue、PipelinePoller 等设计,下面再来看看是 GlobalDriverExecutor 怎么将他们组合在一起的。
由于 _driver_queue 和 _blocked_driver_poller 是在 _thread_pool 中所有 workers 间共享的。一个 query FragemntInstance 划分成 dop 个 PipelineDrivers 分发(dispatch)给 n 个 worker 去执行。但是有可能 driver0 开始分配给 worker0,在执行过程中被添加到 blocked_driver_poller,后又被 worker1 取出,最终 driver0 在 work1 中执行。
这里是不是能优化下?分为两个队列 LocalReadyQueue,RemoteReadyQueue:只有 local_ready_queue 中满了,才会从其他 workers 中窃取(steal) PipelineDrivers 放到 remote_ready_queue,减少 corss-core 通信。
worker
Executor 的所有 workers 都是阻塞等待在 DrvierQueue::take 处,当 DriverQueue 中有添加了新的 Ready PipelineDrvier 时,就会有一个 worker 解除阻塞。
- 对于取出的 ready_driver,worker 会先检测其状态,过滤一些已经处于终态(CANCELED、FINISH、INTERNEL_ERROR)的 Driver,会调用 GlobalDriverExecutor::_finalize_driver 函数,如果是 FragementInstance 的最后一个 PipelineDriver,则生成该 FragmentInstance 的查询统计信息(即 profile),汇报给 Frontend。
- 对于正常处于 READY/Running 状态的 driver,则调用 PipelineDriver::process 函数,推动 pipeline 状态机前进。
- 如果返回的状态 maybe_state.is_not_ok,则通过 QueryContext::cancel 将当前 Backend 上该 query 的所有 FragmentInstances 标记为取消状态。后续该 query 所有的 drivers,无论是在 Poller 中或者正在 Executor 中的,在执行前会去检测 fragment_ctx::is_canceled,如果返回 true,则取消本次执行,进入 step(1)
- 如果 maybe_state.is_ok,则会根据此时 driver 的状态,判断是重新放回到 driver_queue、或是 blocked_driver_poller,或者说直接完成了。
在执行过程中,会设置并更新 Driver、DriverQueue 的统计信息,便于 DrvierQueue 更为准确地调度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| void GlobalDriverExecutor::_worker_thread() { auto current_thread = Thread::current_thread(); const int worker_id = _next_id++; while (true) { if (_num_threads_setter.should_shrink()) { break; }
auto maybe_driver = this->_driver_queue->take(); auto driver = maybe_driver.value(); DCHECK(driver != nullptr);
auto* query_ctx = driver->query_ctx(); auto* fragment_ctx = driver->fragment_ctx();
driver->increment_schedule_times(); _schedule_count++;
auto runtime_state_ptr = fragment_ctx->runtime_state_ptr(); auto* runtime_state = runtime_state_ptr.get(); { SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER( runtime_state->instance_mem_tracker());
if (fragment_ctx->is_canceled()) { driver->cancel_operators(runtime_state); if (driver->is_still_pending_finish()) { driver->set_driver_state(DriverState::PENDING_FINISH); _blocked_driver_poller->add_blocked_driver(driver); } else { _finalize_driver(driver, runtime_state, DriverState::CANCELED); } continue; } if (driver->is_finished()) { _finalize_driver(driver, runtime_state, driver->driver_state()); continue; }
int64_t start_time = driver->get_active_time();
StatusOr<DriverState> maybe_state; TRY_CATCH_ALL(maybe_state, driver->process(runtime_state, worker_id));
Status status = maybe_state.status(); this->_driver_queue->update_statistics(driver); int64_t end_time = driver->get_active_time(); _driver_execution_ns += end_time - start_time;
if (!status.ok()) { query_ctx->cancel(status); driver->cancel_operators(runtime_state); if (driver->is_still_pending_finish()) { driver->set_driver_state(DriverState::PENDING_FINISH); _blocked_driver_poller->add_blocked_driver(driver); } else { _finalize_driver(driver, runtime_state, DriverState::INTERNAL_ERROR); } continue; } auto driver_state = maybe_state.value(); switch (driver_state) { case READY: case RUNNING: { this->_driver_queue->put_back_from_executor(driver); break; } case FINISH: case CANCELED: case INTERNAL_ERROR: { _finalize_driver(driver, runtime_state, driver_state); break; } case INPUT_EMPTY: case OUTPUT_FULL: case PENDING_FINISH: case PRECONDITION_BLOCK: { _blocked_driver_poller->add_blocked_driver(driver); break; } default: DCHECK(false); } } } }
|
Coroutione
那么这个协程模型体现在哪呢?
PipelineDriver::process 中变量 should_yield 被设置为 true 时,
- 要么是当前状态受阻,则该 driver 放入到 _blocked_driver_poller
- 或者是当前时间片用完,都会重新放回到 _driver_queue,重新调度分发,给其他 drivers 执行的可能
只要有 Ready/Running 状态的 driver,Executor 的 worker 就不会阻塞。