Pipeline: Morsel 和 OlapScanOperator (3)

单个 ScanOperator::pull_chunk 从存储层读取数据的流程,如图所示:

Pipeline-OlapScanOperator-3

每个 ScanOperator 都是需要从数据源读取数据,比如子类 OlapScanOperator 需要从存储层读取数据,这就涉及到 Io 任务。 StarRocks 将一个 ScanOperator 划分为 _io_tasks_per_scan_operator 个 io-tasks(_io_tasks_per_scan_operator 默认值是 4 个)。此外,ScanOperator 将读取到的数据存到 OlapScanContext::_chunk_buffer 中,该 ChunkBuffer 是所有 Pipeline ScanOperators 共享,因此是个并发写入的数据结构。由于每个 Pipeline 都有一个唯一的 driver_queue,因此可以作为 ChunkBuffer 的 key,即 ChunkBuffer 内部的映射关系是 {driver_sequeue, SubBuffer}

图中的 ChunkSource 功能类似 TabletReader,不过处理的数据粒度是 Morsel。从前文可知一个 Morsel 的范围可能是一个 Tablet,也可能只是一个 Segment 的部分行数据。一个 io-task 对应一个 ChunkSource,用于从存储层读取数据,并将所有 ChunkSource 读取到的数据缓存到当前 ScanOperator 对应的 ChunkBuffer[driver_seq] 中。

io-task 是个异步任务,内部调用 ChunkSource 读取数据,该异步任务由 ScanExecutor 执行。ChunkBuffer::try_get 函数获取数据是个非阻塞操作,不会阻塞等待 io-task 成功读取到数据,ScanOperator::pull_chunk 就能返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class ScanOperator : public SourceOperator {
//...
private:
workgroup::ScanExecutor* _scan_executor = nullptr;

mutable std::shared_mutex _task_mutex; // Protects the chunk-source from concurrent close and read
std::vector<std::atomic<bool>> _is_io_task_running;
std::vector<ChunkSourcePtr> _chunk_sources;
int32_t _chunk_source_idx = -1;

mutable SpinLock _scan_status_mutex;
Status _scan_status;
//...
}

ScanOperator::pull_chunk

下面就直接来看 ScanOperator::pull_chunk 函数执行流程:

  • 所有的 io_tasks 异步从存储层获取数据,如果其中一个 io-task 失败则会调用 _set_scan_status 函数将 _scan_status 更改为错误状态,让本次查询任务尽快结束

  • _try_to_trigger_next_scan: 从 MorseQueue 中获取 Morsel,异步从存储层中读取数据(chunk),并缓存在 ChunkBuffer

  • get_chunk_from_buffer: 将 io-task 读取到的数据返回给上层

    对获得的数据 chunk,会再进行 runtime_bloom_filter 过滤。这个 bloom_filter 不是由用户创建的索引,而是运行时生成的。一般是用于 HashJoin 场景:Build 侧的表会在运行时生成一个 bloom_filter 发送给 Probe 侧,来过滤 Probe 侧的数据,减少需要探测的数据,减少网络开销。

下面代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
StatusOr<vectorized::ChunkPtr> ScanOperator::pull_chunk(RuntimeState* state) {
// 1.
RETURN_IF_ERROR(_get_scan_status());

// 统计内存
_peak_buffer_size_counter->set(buffer_size());

//2.
RETURN_IF_ERROR(_try_to_trigger_next_scan(state));

// 3.
vectorized::ChunkPtr res = get_chunk_from_buffer();
if (res != nullptr) {
begin_pull_chunk(res);
// for query cache mechanism,
// we should emit EOS chunk when we receive the last chunk.
auto [tablet_id, is_eos] = _should_emit_eos(res);
eval_runtime_bloom_filters(res.get());
res->owner_info().set_owner_id(tablet_id, is_eos);
}
return res;
}

_try_to_trigger_next_scan

_try_to_trigger_next_scan 函数的语义是:

  • 尝试从 _chunk_sources[_chunk_source_idx + 1] 对应的 Morsel 指向的存储文件中继续读取数据。如果 _chunk_sources 中有数据可读,则继续读取,即 _trigger_next_scan 函数完成的功能。
  • 没有数据可读的 _chunk_sources[idx] 也不会闲置(即该 _chunk_sources[idx] 对应的 Morsel 对应的数据已经读取完毕),则需要从 MorselQueue 继续获取一个 NextMorsel,创建新的 _chunk_sources[idx],重新向 ScanExecutor 提交一个 io-task,即 _pickup_morsel 函数完成的功能。

下面是一些指标:

  • _num_running_io_tasks:用于表征当前正在运行的 io-task 数量,不能超过上限 _io_tasks_per_scan_operator
  • _is_io_task_running[i]:用于表征 _chunk_sources[i] 正在执行读取操作
  • _chunk_source_idx:用于当前正在执行读取操作的 chunk_source

因为一共有 _io_tasks_per_scan_operator 个 io-tasks,这里采用轮询的方式来看哪个 _chunk_sources[i] 可以继续读取数据,忽略正在读取 chunk_source,并用 to_sched 数组记录已经没有数据可读的 chunk_source:

  • _trigger_next_scan: 异步执行 io-task,将读取到的数据缓存到 ChunkBuffer 中
  • _pickup_morsel: 从 MorselQueue 获取下一个 morsel,再执行 _trigger_next_scan 函数

因此,_try_to_trigger_next_scan 函数不会存在阻塞。整体执行逻辑如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Status ScanOperator::_try_to_trigger_next_scan(RuntimeState* state) {
int total_cnt = _io_tasks_per_scan_operator;
if (_num_running_io_tasks >= _io_tasks_per_scan_operator) {
return Status::OK();
}

// Avoid uneven distribution when io tasks execute very fast, so we start
// traverse the chunk_source array from last visit idx
int cnt = _io_tasks_per_scan_operator;
int to_sched[_io_tasks_per_scan_operator];
int size = 0;

// pick up already started chunk source.
while (--cnt >= 0) {
_chunk_source_idx = (_chunk_source_idx + 1) % _io_tasks_per_scan_operator;
int i = _chunk_source_idx;
if (_is_io_task_running[i]) {
total_cnt -= 1;
continue;
}
if (_chunk_sources[i] != nullptr && _chunk_sources[i]->has_next_chunk()) {
// 仍有数据可读的 chunk_source,则触发 io-task
RETURN_IF_ERROR(_trigger_next_scan(state, i));
total_cnt -= 1;
} else {
// 没有的则记录
to_sched[size++] = i;
}
}

size = std::min(size, total_cnt);
// 获取下一个 morsel,创建新的 chunk_source
for (int i = 0; i < size; i++) {
int idx = to_sched[i];
RETURN_IF_ERROR(_pickup_morsel(state, idx));
}

return Status::OK();
}

_pickup_morsel

OlapScanPrepareOperator 和 ScanOperator 共享一个 _morsel_queue,因此当 OlapScanPrepareOperator::pull_chunk 函数中完成了 MorselQueue 参数设置,_pick_morsel 就可以通过 MorselQueue::try_get 函数获得 Morsel,并基于获得的 Morsel 创建 OlapChunkSource,赋值给 _chunk_sources[chunk_source_index]。

完成获取 Morsel 和创建 ChunkSource 就可以进入 _trigger_next_scan 函数从存储层读取数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Status ScanOperator::_pickup_morsel(RuntimeState* state, int chunk_source_index) {
DCHECK(_morsel_queue != nullptr);
_close_chunk_source(state, chunk_source_index);

// NOTE: attach an active source before really creating it, to avoid the race condition
bool need_detach = true;
attach_chunk_source(chunk_source_index);
DeferOp defer([&]() {
if (need_detach) {
detach_chunk_source(chunk_source_index);
}
});

// 获得 Morsel
ASSIGN_OR_RETURN(auto morsel, _morsel_queue->try_get());

//... igonre query cache code

if (morsel != nullptr) {
COUNTER_UPDATE(_morsels_counter, 1);
// 基于 Morsel 创建 chunk_source
_chunk_sources[chunk_source_index] = create_chunk_source(std::move(morsel), chunk_source_index);
auto status = _chunk_sources[chunk_source_index]->prepare(state);
if (!status.ok()) {
_chunk_sources[chunk_source_index] = nullptr;
set_finishing(state);
return status;
}
// 从存储层查询
need_detach = false;
RETURN_IF_ERROR(_trigger_next_scan(state, chunk_source_index));
}

return Status::OK();
}

create_chunk_source

每个 ChunkSource 都需要:1)一个 Morsel 来指定本次需要读取的数据范围;2)同时需要一个 chunk_buffer 来存储从存储层获得的数据。

从下面的构造函数传递关系可知,ChunkSource::_chunk_buffer 即 OlapScanContext::_chunk_buffer。又由 OlapScanContextFactory::get_or_create 函数可知,这个 _chunk_buffer 是在 pipeline_dop 个 Pipeline ScanOperators 间共享,即存储着 pipeline_dop 个 Pipeline ScanOperators 从存储层读取的数据。

因此,这个 chunk_buffer 内部映射关系是 {driver_sequence, buffer} 。代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 0.
ChunkSourcePtr OlapScanOperator::create_chunk_source(MorselPtr morsel,
int32_t chunk_source_index) {
auto* olap_scan_node = down_cast<vectorized::OlapScanNode*>(_scan_node);
return std::make_shared<OlapChunkSource>(
this, _chunk_source_profiles[chunk_source_index].get(),
std::move(morsel), olap_scan_node, _ctx.get());
}

// 1.
OlapChunkSource::OlapChunkSource(ScanOperator* op,
RuntimeProfile* runtime_profile,
MorselPtr&& morsel,
vectorized::OlapScanNode* scan_node,
OlapScanContext* scan_ctx)
: ChunkSource(op,
runtime_profile,
std::move(morsel),
scan_ctx->get_chunk_buffer()), // 创建 ChubkBuffer
_scan_node(scan_node),
_scan_ctx(scan_ctx),
_limit(scan_node->limit()),
_scan_range(down_cast<ScanMorsel*>(_morsel.get())->get_olap_scan_range()) {}

ChunkSource::ChunkSource(ScanOperator* scan_op,
RuntimeProfile* runtime_profile,
MorselPtr&& morsel,
BalancedChunkBuffer& chunk_buffer)
: _scan_op(scan_op),
_scan_operator_seq(scan_op->get_driver_sequence()),
_runtime_profile(runtime_profile),
_morsel(std::move(morsel)),
_chunk_buffer(chunk_buffer), // 共享一个 ChunkBuffer
_chunk_token(nullptr) {}

// 2.
OlapScanContextPtr OlapScanContextFactory::get_or_create(int32_t driver_sequence) {
DCHECK_LT(driver_sequence, _dop);
// ScanOperators sharing one morsel use the same context.
int32_t idx = _shared_morsel_queue ? 0 : driver_sequence;
DCHECK_LT(idx, _contexts.size());
// _chunk_buffer 是 pipeline_dop 个 Pipeline ScanOperators 共享的
if (_contexts[idx] == nullptr) {
_contexts[idx] = std::make_shared<OlapScanContext>(
_scan_node, _dop, _shared_scan, _chunk_buffer);
}
return _contexts[idx];
}

_trigger_next_scan

create_chunk_source 函数完成了 ChunkSource 的创建,下面就需要异步从存储层读取数据。 _trigger_next_scan 函数内会封装一个 ScanTask 对象,并向 _scan_executor 提交该 io-task,最终任务由 workgroup::ScanExecutor 执行,

_trigger_next_scan 函数包含了许多统计信息,下面只是关注功能。从功能上说,就是调用 ChunkSource::buffer_next_batch_chunks_blocking 函数从存储层读取数据。*_finish_chunk_source_task* 用于在读完后更新统计信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
Status ScanOperator::_trigger_next_scan(RuntimeState* state, int chunk_source_index) {
ChunkBufferTokenPtr buffer_token;
if (buffer_token = pin_chunk(1); buffer_token == nullptr) {
return Status::OK();
}

// 0. 更改状态
_chunk_sources[chunk_source_index]->pin_chunk_token(std::move(buffer_token));
_num_running_io_tasks++;
_is_io_task_running[chunk_source_index] = true;

// 1. 封装 io-task
workgroup::ScanTask task;
task.workgroup = _workgroup.get();
task.priority =
vectorized::OlapScanNode::compute_priority(_submit_task_counter->value());
const auto io_task_start_nano = MonotonicNanos();
task.work_function = [wp = _query_ctx, this, state, chunk_source_index,
query_trace_ctx, driver_id,
io_task_start_nano]() {
if (auto sp = wp.lock()) {
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(
state->instance_mem_tracker());

auto& chunk_source = _chunk_sources[chunk_source_index];

//... other statistic info
int64_t prev_cpu_time = chunk_source->get_cpu_time_spent();
int64_t prev_scan_rows = chunk_source->get_scan_rows();
int64_t prev_scan_bytes = chunk_source->get_scan_bytes();

// 1. 阻塞读取数据
auto status = chunk_source->buffer_next_batch_chunks_blocking(
state, kIOTaskBatchSize, _workgroup.get());
if (!status.ok() && !status.is_end_of_file()) {
_set_scan_status(status);
}

int64_t delta_cpu_time = chunk_source->get_cpu_time_spent() - prev_cpu_time;
// 2. 更新状态
_finish_chunk_source_task(
state, chunk_source_index, delta_cpu_time,
chunk_source->get_scan_rows() - prev_scan_rows,
chunk_source->get_scan_bytes() - prev_scan_bytes);
}
};

// 2. 提交 io-task
if (_scan_executor->submit(std::move(task))) {
_io_task_retry_cnt = 0;
} else {
// 3. 失败,则回滚状态
_chunk_sources[chunk_source_index]->unpin_chunk_token();
_num_running_io_tasks--;
_is_io_task_running[chunk_source_index] = false;

// 最多失败次数
if (++_io_task_retry_cnt > 100) {
return Status::RuntimeError("ScanOperator failed to offer io task due to thread pool overload");
}
}

return Status::OK();
}

buffer_next_batch_chunks_blocking

buffer_next_batch_chunks_blocking 主要有两步:

  • _read_chunk: 从存储层读取数据,由子类实现
  • 将读取到的数据插入到 _chunk_buffer 中,ChunkSource::_scan_operator_seq 即 _driver_sequence
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Status ChunkSource::buffer_next_batch_chunks_blocking(RuntimeState* state, size_t batch_size,
const workgroup::WorkGroup* running_wg) {
using namespace vectorized;

if (!_status.ok()) {
return _status;
}

int64_t time_spent_ns = 0;
auto [tablet_id, version] = _morsel->get_lane_owner_and_version();
for (size_t i = 0; i < batch_size && !state->is_cancelled(); ++i) {
{
SCOPED_RAW_TIMER(&time_spent_ns);
if (_chunk_token == nullptr && (_chunk_token = _chunk_buffer.limiter()->pin(1)) == nullptr) {
break;
}

// 从存储层读取数据
ChunkPtr chunk;
_status = _read_chunk(state, &chunk);
if (chunk == nullptr) {
chunk = std::make_shared<vectorized::Chunk>();
}
//.. ingore if fail to read
chunk->owner_info().set_owner_id(tablet_id, false);
// 将读取的数据存在 _chunk_buffer 中
_chunk_buffer.put(_scan_operator_seq, std::move(chunk), std::move(_chunk_token));
}

if (time_spent_ns >= YIELD_MAX_TIME_SPENT) {
break;
}

if (running_wg != nullptr && time_spent_ns >= YIELD_PREEMPT_MAX_TIME_SPENT &&
_scan_sched_entity(running_wg)->in_queue()->should_yield(running_wg, time_spent_ns)) {
break;
}
}

return _status;
}

get_chunk_from_buffer

ScanOperator::pull_chunk 第一步在读取完数据后, 第二步就是要从 ChunkSource 中获取数据,第二步由 get_chunk_from_buffer 函数完成。

从 get_chunk_from_buffer 可以看出,这里 ScanOperator 获得是自己 Pipeline 读取到的数据,即只要自己Pipeline的 io-task 读取到数据就行。

BalancedChunkBuffer::try_get 这里是个非阻塞等待的操作,如果没有读取到数据,则返回 NULL 给上层,上层任务会任务这个 operator 的 Pipeline 处于 NOT-READY 状态,将其加入 BlockedPipelineDrvier 中,等 Poller 唤醒。

1
2
3
4
5
6
7
ChunkPtr OlapScanOperator::get_chunk_from_buffer() {
vectorized::ChunkPtr chunk = nullptr;
if (_ctx->get_chunk_buffer().try_get(_driver_sequence, &chunk)) {
return chunk;
}
return nullptr;
}

到此,ScanOperator::pull_chunk 基本流程结束,具体如何从存储层读取数据待后续更新了,目前先关注 Pipeline 引擎。