QuerySharedDriverQueue: 多级反馈队列

DriverQueue 的继承派生关系如下图所示。QuerySharedDriverQueue 是用于没有设置 ResuorceGroup 的 query,即 default ResourceGroup,而 WorkGroupDriverQueue 是针对设置了 ResourceGroup 的 query。

本文专注于 QuerySharedDriverQueue 的实现,ResuorceGroup 后文单独讲解。下文 DriverQueue 一般指 QuerySharedDriverQueue。

DriverQueue 本质上是个调度 PipelineDrivers 的数据结构,这样 DrvierQueue 本身就不需要占据一个线程,它可以在 GloablEexecuotr::work_thread 所在的核上运行。

pipeline-driver-queue-1

DriverAcct

DriverQueue 实现调度,需要依赖 PipelineDriver 运行时的统计信息。比如,在前文我们说过了 PipelineDriver 每执行一次 PipelineDriver:process 函数都会更新统计信息。

统计信息由 DriverAcct 记录:

1
2
3
4
5
6
7
8
9
10
11
12
13
struct DriverAcct {
//...
int64_t get_last_time_spent() { return last_time_spent; }
int64_t get_accumulated_time_spent() const { return accumulated_time_spent; }

int64_t schedule_times{0};
int64_t schedule_effective_times{0};
int64_t last_time_spent{0};
int64_t last_chunks_moved{0};
int64_t accumulated_time_spent{0};
int64_t accumulated_chunks_moved{0};
int64_t accumulated_rows_moved{0};
};

PipelineDriver 中统计信息如下。利用这些统计信息就能实现更为准确的调度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
DriverAcct& PipelineDriver::driver_acct() { 
return _driver_acct;
}

void PipelineDriver::_update_driver_acct(size_t total_chunks_moved,
size_t total_rows_moved,
size_t time_spent) {
driver_acct().update_last_chunks_moved(total_chunks_moved);
driver_acct().update_accumulated_rows_moved(total_rows_moved);
driver_acct().update_last_time_spent(time_spent);
}

void PipelineDriver::_update_statistics(size_t total_chunks_moved,
size_t total_rows_moved,
size_t time_spent) {
_update_driver_acct(total_chunks_moved,
total_rows_moved,
time_spent);

// Update statistics of scan operator
if (ScanOperator* scan = source_scan_operator()) {
query_ctx()->incr_cur_scan_rows_num(
scan->get_last_scan_rows_num());
query_ctx()->incr_cur_scan_bytes(
scan->get_last_scan_bytes());
}

// Update cpu cost of this query
int64_t accounted_cpu_cost
= driver_acct().get_last_time_spent()
+ source_operator()->get_last_growth_cpu_time_ns()
+ sink_operator()->get_last_growth_cpu_time_ns();
query_ctx()->incr_cpu_cost(accounted_cpu_cost);
}

SubQuerySharedDriverQueue

QuerySharedDriverQueue 基于多级反馈队列实现,每一 level 的时间片不同,每一 level 的 PipelineDriver 是由 SubQuerySharedDriverQueue 来存储。

SubQuerySharedDriverQueue 有三个字段:

  • queue:用于存放 Pipelinerivers,如果 driver 已经 DriverState::CANCELED 状态则从头部压入,否则从尾部压入
  • pending_cancel_queue:用于存放正在取消的 driver
  • cancelled_set:用于记录已经取消的 drivers,如果 queue 中取出的 drivers 已经在 canceled_set 中,则忽略该 driver,从 queue 中重新取

结构如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class SubQuerySharedDriverQueue {
public:
void update_accu_time(const DriverRawPtr driver) {
_accu_consume_time.fetch_add(
driver->driver_acct().get_last_time_spent());
}

double accu_time_after_divisor() {
return _accu_consume_time.load() / factor_for_normal;
}

void put(const DriverRawPtr driver);
void cancel(const DriverRawPtr driver);
DriverRawPtr take();

std::deque<DriverRawPtr> queue;
std::queue<DriverRawPtr> pending_cancel_queue;
std::unordered_set<DriverRawPtr> cancelled_set;
size_t num_drivers = 0;

// factor for normalization
double factor_for_normal = 0;

private:
std::atomic<int64_t> _accu_consume_time = 0;
};

put

driver 如果已经处于 DriverState::CANCELED 状态,则从头部压入,否则从尾部压入。cancel 函数是直接压入 pending_cancel_queue 中。 queue 和 pending_cancel_queue 是可能存在重复的 driver,因此需要 cancelled_set 去重。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void SubQuerySharedDriverQueue::put(const DriverRawPtr driver) {
if (driver->driver_state() == DriverState::CANCELED) {
queue.emplace_front(driver);
} else {
queue.emplace_back(driver);
}
num_drivers++;
}

void SubQuerySharedDriverQueue::cancel(const DriverRawPtr driver) {
if (cancelled_set.count(driver) == 0) {
DCHECK(driver->is_in_ready_queue());
pending_cancel_queue.emplace(driver);
}
}

take

从 SubQuerySharedDriverQueue 中获取 driver 时

  • 如果 pending_cancel_queue 不为空,率先从 pending_cancel_queue 中取,并将获得的 driver 记录在 cancelld_set 中。
  • 否则,再从 queue 中获取,如果 driver 已经在 cancelled_set 中记录,则忽略并重新从 queue 中获取。

代码也是比较简单的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
DriverRawPtr SubQuerySharedDriverQueue::take() {
DCHECK(!empty());
if (!pending_cancel_queue.empty()) {
DriverRawPtr driver = pending_cancel_queue.front();
pending_cancel_queue.pop();
cancelled_set.insert(driver);
--num_drivers;
return driver;
}

while (!queue.empty()) {
DriverRawPtr driver = queue.front();
queue.pop_front();
auto iter = cancelled_set.find(driver);
if (iter != cancelled_set.end()) {
cancelled_set.erase(iter);
} else {
--num_drivers;
return driver;
}
}
return nullptr;
}

QuerySharedDriverQueue

QuerySharedDriverQueue 一共有 QUEUE_SIZE(值是 8)个 SubQuerySharedDriverQueue,每个 SubQueue 对应的时间片依次是 {0.2s, 0.6s, 1.2s, 2s, 3s, 4.2s, 5.6s, 7.2s}:

  • 比如 SubQueue[0] 的时间片范围是 [0, 0.2),
  • 超过 7.2s 的 Pipeline 都位于最后一个队列 SubQueue[7] 中,即 SubQueue[7] 的时间范围是 [5.6, +∞)。

每个 Driver 消耗的时间片记录在 DriverAcct::accumulated_time_spent,PipelineDriver 每执行一次就会调用一次 _update_statistics 函数来更新 DriverAcct 中的统计值,当 accumulated_time_spent 超过当前 SubQueue 的时间片区间,driver 就进入下一个 SubQueue。

QuerySharedDriverQueue 的构造函数如下 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
QuerySharedDriverQueue::QuerySharedDriverQueue() {
double factor = 1;
for (int i = QUEUE_SIZE - 1; i >= 0; --i) {
// initialize factor for every sub queue,
// Higher priority queues have more execution time,
// so they have a larger factor.
_queues[i].factor_for_normal = factor;
factor *= RATIO_OF_ADJACENT_QUEUE;
}

int64_t time_slice = 0;
for (int i = 0; i < QUEUE_SIZE; ++i) {
time_slice += LEVEL_TIME_SLICE_BASE_NS * (i + 1);
_level_time_slices[i] = time_slice;
}
}

_compute_driver_level

向 QuerySharedDriverQueue 添加 driver 时,从上一次所处的层 PipelineDriver::get_driver_queue_level 开始,再基于 accumulated_time_spent 计算 driver 本次将位于 DriverQueue 中的哪一层。

因此,一个长时间的运行的 driver 在 DriverQueue 中不断下沉。

1
2
3
4
5
6
7
8
9
int QuerySharedDriverQueue::_compute_driver_level(const DriverRawPtr driver) const {
int time_spent = driver->driver_acct().get_accumulated_time_spent();
for (int i = driver->get_driver_queue_level(); i < QUEUE_SIZE; ++i) {
if (time_spent < _level_time_slices[i]) {
return i;
}
}
return QUEUE_SIZE - 1;
}

put_back

put_back 函数将 READY 的 driver 放入到 DriverQueue,

  • PipelineDriver::_driver_queue_level 记录每次在 DrvierQueue 中的层。
  • PipelineDriver::_in_ready_queue 标记是否被 put_back 到 DrvierQueue 中
  • PipelineDriver::_in_queue 即 driver 所属的 DriverQueue

插入一个 driver 后,通过 _cv 解除 take 函数处的阻塞,Executor 就可以继续通过 DriverQueue::take 函数获得新的可执行的 driver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void QuerySharedDriverQueue::put_back(const DriverRawPtr driver) {
int level = _compute_driver_level(driver);
driver->set_driver_queue_level(level);
{
std::lock_guard<std::mutex> lock(_global_mutex);
_queues[level].put(driver);
driver->set_in_ready_queue(true);
driver->set_in_queue(this);
_cv.notify_one();
++_num_drivers;
}
}

// QuerySharedDriverQueue::put_back_from_executor is identical to put_back.
void QuerySharedDriverQueue::put_back_from_executor(const DriverRawPtr driver) {
put_back(driver);
}

take

put_back 函数是通过 _compute_driver_level 函数来确定 SubQueue 的层数。take 函数则通过 SubQuerySharedDriverQueue::accu_time_after_divisor() 来确定。

  • _accu_consume_time 字段是该 SubQueue 的耗时

    这个值实际上和 DriverAcct::accumulated_time_spent 大小一样:每次更新统计时都是累加的 DriverAcct::last_time_spent。因此 put_back/take 的衡量标准是一样的。

    1
    2
    3
    4
    5
    6
    7
    8
    void DriverAcct::update_last_time_spent(int64_t time_spent) {
    this->last_time_spent = time_spent;
    this->accumulated_time_spent += time_spent;
    }

    void SubQuerySharedDriverQueue::update_accu_time(const DriverRawPtr driver) {
    _accu_consume_time.fetch_add(driver->driver_acct().get_last_time_spent());
    }
  • factor_for_normal 是在 QUEUE_SIZE 个 SubQueue 的正则系数

二者相处得到归一化的时间系数,使得每一层都有机会被访问到

1
2
3
double SubQuerySharedDriverQueue::accu_time_after_divisor() {
return _accu_consume_time.load() / factor_for_normal;
}

确定好 level 后,就可以直接从该 level 提取出 READY 状态的 PipelineDriver。如果整个 DriverQueue 都空的,则基于 _cv 阻塞等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
StatusOr<DriverRawPtr> QuerySharedDriverQueue::take() {
// -1 means no candidates; else has candidate.
int queue_idx = -1;
double target_accu_time = 0;
DriverRawPtr driver_ptr;

{
std::unique_lock<std::mutex> lock(_global_mutex);
while (true) {
RETURN_IF(_is_closed, Status::Cancelled("Shutdown"));
// Find the queue with the smallest execution time.
for (int i = 0; i < QUEUE_SIZE; ++i) {
// we just search for queue has element
if (!_queues[i].empty()) {
double local_target_time = _queues[i].accu_time_after_divisor();
if (queue_idx < 0 || local_target_time < target_accu_time) {
target_accu_time = local_target_time;
queue_idx = i;
}
}
}

if (queue_idx >= 0) {
break;
}
_cv.wait(lock);
}
// record queue's index to accumulate time for it.
driver_ptr = _queues[queue_idx].take();
driver_ptr->set_in_ready_queue(false);

--_num_drivers;
}

// next pipeline driver to execute.
return driver_ptr;
}