单个 ScanOperator::pull_chunk 从存储层读取数据的流程,如图所示:
每个 ScanOperator 都是需要从数据源读取数据,比如子类 OlapScanOperator 需要从存储层读取数据,这就涉及到 Io 任务。 StarRocks 将一个 ScanOperator 划分为 _io_tasks_per_scan_operator 个 io-tasks(_io_tasks_per_scan_operator 默认值是 4 个)。此外,ScanOperator 将读取到的数据存到 OlapScanContext::_chunk_buffer 中,该 ChunkBuffer 是所有 Pipeline ScanOperators 共享,因此是个并发写入的数据结构。由于每个 Pipeline 都有一个唯一的 driver_queue,因此可以作为 ChunkBuffer 的 key,即 ChunkBuffer 内部的映射关系是 {driver_sequeue, SubBuffer} 。
图中的 ChunkSource 功能类似 TabletReader,不过处理的数据粒度是 Morsel。从前文可知一个 Morsel 的范围可能是一个 Tablet,也可能只是一个 Segment 的部分行数据。一个 io-task 对应一个 ChunkSource,用于从存储层读取数据,并将所有 ChunkSource 读取到的数据缓存到当前 ScanOperator 对应的 ChunkBuffer[driver_seq] 中。
io-task 是个异步任务,内部调用 ChunkSource 读取数据,该异步任务由 ScanExecutor 执行。ChunkBuffer::try_get 函数获取数据是个非阻塞操作,不会阻塞等待 io-task 成功读取到数据,ScanOperator::pull_chunk 就能返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class ScanOperator : public SourceOperator { private : workgroup::ScanExecutor* _scan_executor = nullptr ; mutable std::shared_mutex _task_mutex; std::vector<std::atomic<bool >> _is_io_task_running; std::vector<ChunkSourcePtr> _chunk_sources; int32_t _chunk_source_idx = -1 ; mutable SpinLock _scan_status_mutex; Status _scan_status; }
ScanOperator::pull_chunk 下面就直接来看 ScanOperator::pull_chunk 函数执行流程:
所有的 io_tasks 异步从存储层获取数据,如果其中一个 io-task 失败则会调用 _set_scan_status 函数将 _scan_status 更改为错误状态,让本次查询任务尽快结束
_try_to_trigger_next_scan: 从 MorseQueue 中获取 Morsel,异步从存储层中读取数据(chunk),并缓存在 ChunkBuffer
get_chunk_from_buffer: 将 io-task 读取到的数据返回给上层
对获得的数据 chunk,会再进行 runtime_bloom_filter 过滤。这个 bloom_filter 不是由用户创建的索引,而是运行时生成的。一般是用于 HashJoin 场景:Build 侧的表会在运行时生成一个 bloom_filter 发送给 Probe 侧,来过滤 Probe 侧的数据,减少需要探测的数据,减少网络开销。
下面代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 StatusOr<vectorized::ChunkPtr> ScanOperator::pull_chunk (RuntimeState* state) { RETURN_IF_ERROR (_get_scan_status()); _peak_buffer_size_counter->set (buffer_size ()); RETURN_IF_ERROR (_try_to_trigger_next_scan(state)); vectorized::ChunkPtr res = get_chunk_from_buffer (); if (res != nullptr ) { begin_pull_chunk (res); auto [tablet_id, is_eos] = _should_emit_eos(res); eval_runtime_bloom_filters (res.get ()); res->owner_info ().set_owner_id (tablet_id, is_eos); } return res; }
_try_to_trigger_next_scan _try_to_trigger_next_scan 函数的语义是:
尝试从 _chunk_sources[_chunk_source_idx + 1] 对应的 Morsel 指向的存储文件中继续读取数据。如果 _chunk_sources 中有数据可读,则继续读取,即 _trigger_next_scan 函数完成的功能。
没有数据可读的 _chunk_sources[idx] 也不会闲置(即该 _chunk_sources[idx] 对应的 Morsel 对应的数据已经读取完毕),则需要从 MorselQueue 继续获取一个 NextMorsel,创建新的 _chunk_sources[idx],重新向 ScanExecutor 提交一个 io-task,即 _pickup_morsel 函数完成的功能。
下面是一些指标:
_num_running_io_tasks:用于表征当前正在运行的 io-task 数量,不能超过上限 _io_tasks_per_scan_operator
_is_io_task_running[i]:用于表征 _chunk_sources[i] 正在执行读取操作
_chunk_source_idx:用于当前正在执行读取操作的 chunk_source
因为一共有 _io_tasks_per_scan_operator 个 io-tasks,这里采用轮询的方式来看哪个 _chunk_sources[i] 可以继续读取数据,忽略正在读取 chunk_source,并用 to_sched 数组记录已经没有数据可读的 chunk_source:
_trigger_next_scan: 异步执行 io-task,将读取到的数据缓存到 ChunkBuffer 中
_pickup_morsel: 从 MorselQueue 获取下一个 morsel,再执行 _trigger_next_scan 函数
因此,_try_to_trigger_next_scan 函数不会存在阻塞。整体执行逻辑如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 Status ScanOperator::_try_to_trigger_next_scan(RuntimeState* state) { int total_cnt = _io_tasks_per_scan_operator; if (_num_running_io_tasks >= _io_tasks_per_scan_operator) { return Status::OK (); } int cnt = _io_tasks_per_scan_operator; int to_sched[_io_tasks_per_scan_operator]; int size = 0 ; while (--cnt >= 0 ) { _chunk_source_idx = (_chunk_source_idx + 1 ) % _io_tasks_per_scan_operator; int i = _chunk_source_idx; if (_is_io_task_running[i]) { total_cnt -= 1 ; continue ; } if (_chunk_sources[i] != nullptr && _chunk_sources[i]->has_next_chunk ()) { RETURN_IF_ERROR (_trigger_next_scan(state, i)); total_cnt -= 1 ; } else { to_sched[size++] = i; } } size = std::min (size, total_cnt); for (int i = 0 ; i < size; i++) { int idx = to_sched[i]; RETURN_IF_ERROR (_pickup_morsel(state, idx)); } return Status::OK (); }
_pickup_morsel OlapScanPrepareOperator 和 ScanOperator 共享一个 _morsel_queue ,因此当 OlapScanPrepareOperator::pull_chunk 函数中完成了 MorselQueue 参数设置,_pick_morsel 就可以通过 MorselQueue::try_get 函数获得 Morsel,并基于获得的 Morsel 创建 OlapChunkSource,赋值给 _chunk_sources[chunk_source_index]。
完成获取 Morsel 和创建 ChunkSource 就可以进入 _trigger_next_scan 函数从存储层读取数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 Status ScanOperator::_pickup_morsel(RuntimeState* state, int chunk_source_index) { DCHECK (_morsel_queue != nullptr ); _close_chunk_source(state, chunk_source_index); bool need_detach = true ; attach_chunk_source (chunk_source_index); DeferOp defer ([&]() { if (need_detach) { detach_chunk_source(chunk_source_index); } }) ; ASSIGN_OR_RETURN (auto morsel, _morsel_queue->try_get ()); if (morsel != nullptr ) { COUNTER_UPDATE (_morsels_counter, 1 ); _chunk_sources[chunk_source_index] = create_chunk_source (std::move (morsel), chunk_source_index); auto status = _chunk_sources[chunk_source_index]->prepare (state); if (!status.ok ()) { _chunk_sources[chunk_source_index] = nullptr ; set_finishing (state); return status; } need_detach = false ; RETURN_IF_ERROR (_trigger_next_scan(state, chunk_source_index)); } return Status::OK (); }
create_chunk_source 每个 ChunkSource 都需要:1)一个 Morsel 来指定本次需要读取的数据范围;2)同时需要一个 chunk_buffer 来存储从存储层获得的数据。
从下面的构造函数传递关系可知,ChunkSource::_chunk_buffer 即 OlapScanContext::_chunk_buffer。又由 OlapScanContextFactory::get_or_create 函数可知,这个 _chunk_buffer 是在 pipeline_dop 个 Pipeline ScanOperators 间共享,即存储着 pipeline_dop 个 Pipeline ScanOperators 从存储层读取的数据。
因此,这个 chunk_buffer 内部映射关系是 {driver_sequence, buffer} 。代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 ChunkSourcePtr OlapScanOperator::create_chunk_source (MorselPtr morsel, int32_t chunk_source_index) { auto * olap_scan_node = down_cast <vectorized::OlapScanNode*>(_scan_node); return std::make_shared <OlapChunkSource>( this , _chunk_source_profiles[chunk_source_index].get (), std::move (morsel), olap_scan_node, _ctx.get ()); } OlapChunkSource::OlapChunkSource (ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, vectorized::OlapScanNode* scan_node, OlapScanContext* scan_ctx) : ChunkSource (op, runtime_profile, std::move (morsel), scan_ctx->get_chunk_buffer ()), _scan_node(scan_node), _scan_ctx(scan_ctx), _limit(scan_node->limit ()), _scan_range(down_cast <ScanMorsel*>(_morsel.get ())->get_olap_scan_range ()) {} ChunkSource::ChunkSource (ScanOperator* scan_op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, BalancedChunkBuffer& chunk_buffer) : _scan_op(scan_op), _scan_operator_seq(scan_op->get_driver_sequence ()), _runtime_profile(runtime_profile), _morsel(std::move (morsel)), _chunk_buffer(chunk_buffer), _chunk_token(nullptr ) {} OlapScanContextPtr OlapScanContextFactory::get_or_create (int32_t driver_sequence) { DCHECK_LT (driver_sequence, _dop); int32_t idx = _shared_morsel_queue ? 0 : driver_sequence; DCHECK_LT (idx, _contexts.size ()); if (_contexts[idx] == nullptr ) { _contexts[idx] = std::make_shared <OlapScanContext>( _scan_node, _dop, _shared_scan, _chunk_buffer); } return _contexts[idx]; }
_trigger_next_scan create_chunk_source 函数完成了 ChunkSource 的创建,下面就需要异步从存储层读取数据。 _trigger_next_scan 函数内会封装一个 ScanTask 对象,并向 _scan_executor 提交该 io-task,最终任务由 workgroup::ScanExecutor 执行,
_trigger_next_scan 函数包含了许多统计信息,下面只是关注功能。从功能上说,就是调用 ChunkSource::buffer_next_batch_chunks_blocking 函数从存储层读取数据。*_finish_chunk_source_task* 用于在读完后更新统计信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 Status ScanOperator::_trigger_next_scan(RuntimeState* state, int chunk_source_index) { ChunkBufferTokenPtr buffer_token; if (buffer_token = pin_chunk (1 ); buffer_token == nullptr ) { return Status::OK (); } _chunk_sources[chunk_source_index]->pin_chunk_token (std::move (buffer_token)); _num_running_io_tasks++; _is_io_task_running[chunk_source_index] = true ; workgroup::ScanTask task; task.workgroup = _workgroup.get (); task.priority = vectorized::OlapScanNode::compute_priority (_submit_task_counter->value ()); const auto io_task_start_nano = MonotonicNanos (); task.work_function = [wp = _query_ctx, this , state, chunk_source_index, query_trace_ctx, driver_id, io_task_start_nano]() { if (auto sp = wp.lock ()) { SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER ( state->instance_mem_tracker ()); auto & chunk_source = _chunk_sources[chunk_source_index]; int64_t prev_cpu_time = chunk_source->get_cpu_time_spent (); int64_t prev_scan_rows = chunk_source->get_scan_rows (); int64_t prev_scan_bytes = chunk_source->get_scan_bytes (); auto status = chunk_source->buffer_next_batch_chunks_blocking ( state, kIOTaskBatchSize, _workgroup.get ()); if (!status.ok () && !status.is_end_of_file ()) { _set_scan_status(status); } int64_t delta_cpu_time = chunk_source->get_cpu_time_spent () - prev_cpu_time; _finish_chunk_source_task( state, chunk_source_index, delta_cpu_time, chunk_source->get_scan_rows () - prev_scan_rows, chunk_source->get_scan_bytes () - prev_scan_bytes); } }; if (_scan_executor->submit (std::move (task))) { _io_task_retry_cnt = 0 ; } else { _chunk_sources[chunk_source_index]->unpin_chunk_token (); _num_running_io_tasks--; _is_io_task_running[chunk_source_index] = false ; if (++_io_task_retry_cnt > 100 ) { return Status::RuntimeError ("ScanOperator failed to offer io task due to thread pool overload" ); } } return Status::OK (); }
buffer_next_batch_chunks_blocking buffer_next_batch_chunks_blocking 主要有两步:
_read_chunk: 从存储层读取数据,由子类实现
将读取到的数据插入到 _chunk_buffer 中,ChunkSource::_scan_operator_seq 即 _driver_sequence
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 Status ChunkSource::buffer_next_batch_chunks_blocking (RuntimeState* state, size_t batch_size, const workgroup::WorkGroup* running_wg) { using namespace vectorized; if (!_status.ok ()) { return _status; } int64_t time_spent_ns = 0 ; auto [tablet_id, version] = _morsel->get_lane_owner_and_version (); for (size_t i = 0 ; i < batch_size && !state->is_cancelled (); ++i) { { SCOPED_RAW_TIMER (&time_spent_ns); if (_chunk_token == nullptr && (_chunk_token = _chunk_buffer.limiter ()->pin (1 )) == nullptr ) { break ; } ChunkPtr chunk; _status = _read_chunk(state, &chunk); if (chunk == nullptr ) { chunk = std::make_shared <vectorized::Chunk>(); } chunk->owner_info ().set_owner_id (tablet_id, false ); _chunk_buffer.put (_scan_operator_seq, std::move (chunk), std::move (_chunk_token)); } if (time_spent_ns >= YIELD_MAX_TIME_SPENT) { break ; } if (running_wg != nullptr && time_spent_ns >= YIELD_PREEMPT_MAX_TIME_SPENT && _scan_sched_entity(running_wg)->in_queue ()->should_yield (running_wg, time_spent_ns)) { break ; } } return _status; }
get_chunk_from_buffer ScanOperator::pull_chunk 第一步在读取完数据后, 第二步就是要从 ChunkSource 中获取数据,第二步由 get_chunk_from_buffer 函数完成。
从 get_chunk_from_buffer 可以看出,这里 ScanOperator 获得是自己 Pipeline 读取到的数据,即只要自己Pipeline的 io-task 读取到数据就行。
BalancedChunkBuffer::try_get 这里是个非阻塞等待的操作,如果没有读取到数据,则返回 NULL 给上层,上层任务会任务这个 operator 的 Pipeline 处于 NOT-READY 状态,将其加入 BlockedPipelineDrvier 中,等 Poller 唤醒。
1 2 3 4 5 6 7 ChunkPtr OlapScanOperator::get_chunk_from_buffer () { vectorized::ChunkPtr chunk = nullptr ; if (_ctx->get_chunk_buffer ().try_get (_driver_sequence, &chunk)) { return chunk; } return nullptr ; }
到此,ScanOperator::pull_chunk 基本流程结束,具体如何从存储层读取数据待后续更新了,目前先关注 Pipeline 引擎。