Pipeline Morsel 和 OlapScanOperator (2)

和 Morsel 有关的 SourceOperator 的类继承关系如下:
Pipeline-OlapScanOperator-2

从 Morsel 到 OlapScanOperator 的执行流程如下:
Pipeline-OlapScanOperator-1

OlapScanPrepareOperator 和 OlapScanOperator 共享一个 MorselQueue,也是通过这个 MorselQueue 建立依赖关系: 当 OlapScanPrepareOperator::pull_chunk 执行完毕,OlapScanOperator::pull_chunk 就可以从该 MorselQueue 中取出 morsel,进而才从存储层中读取数据。

构建 Pipeline 的过程为这两个 Pipelines 的输入设置了同一个 MorselQueue,具体过程在 FragmentExecutor::_prepare_pipeline_driver 函数处。

OlapScanNode::decompose_to_pipeline

通过 decompose_to_pipeline 函数将 ExecNode 分解为上图中 Operators,生成两个 Piplines。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
pipeline::OpFactories OlapScanNode::decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
// Set the dop according to requested parallelism and number of morsels
auto* morsel_queue_factory = context->morsel_queue_factory_of_source_operator(id());
size_t dop = morsel_queue_factory->size();
bool shared_morsel_queue = morsel_queue_factory->is_shared();

size_t max_buffer_capacity = pipeline::ScanOperator::max_buffer_capacity() * dop;
size_t default_buffer_capacity = std::min<size_t>(max_buffer_capacity, estimated_max_concurrent_chunks());
pipeline::ChunkBufferLimiterPtr buffer_limiter = std::make_unique<pipeline::DynamicChunkBufferLimiter>(
max_buffer_capacity, default_buffer_capacity, _mem_limit, runtime_state()->chunk_size());

auto scan_ctx_factory = std::make_shared<pipeline::OlapScanContextFactory>(
this, dop, shared_morsel_queue, _enable_shared_scan, std::move(buffer_limiter));

auto&& rc_rf_probe_collector = std::make_shared<RcRfProbeCollector>(2, std::move(this->runtime_filter_collector()));

// scan_prepare_op.
auto scan_prepare_op = std::make_shared<pipeline::OlapScanPrepareOperatorFactory>(context->next_operator_id(), id(),
this, scan_ctx_factory);
scan_prepare_op->set_degree_of_parallelism(shared_morsel_queue ? 1 : dop);
this->init_runtime_filter_for_operator(scan_prepare_op.get(), context, rc_rf_probe_collector);

auto scan_prepare_pipeline = pipeline::OpFactories{
std::move(scan_prepare_op),
std::make_shared<pipeline::NoopSinkOperatorFactory>(context->next_operator_id(), id()),
};
context->add_pipeline(scan_prepare_pipeline);

// scan_op.
auto scan_op = std::make_shared<pipeline::OlapScanOperatorFactory>(context->next_operator_id(), this,
std::move(scan_ctx_factory));
this->init_runtime_filter_for_operator(scan_op.get(), context, rc_rf_probe_collector);

return pipeline::decompose_scan_node_to_pipeline(scan_op, this, context);
}

在执行此函数前,已经使用 convert-scan-range-to-morsel-queue-factory 函数为每个输入分配了一个 MorselQueueFactory。

下面需要将每个 OlapScanNode 分解为 Operators,后续再根据 pipeline_dop,生成 pipeline_dop 个 PipelineDrivers,而这 pipeline_dop 个 PipelineDrivers 要么共享一个 MorselQueue,要么每个 PipelineDriver 都分配一个 MorselQueue,具体情况由 MorselQueueFactory::is_shared()d函数来确定:

  • SharedMorselQueueFactory::is_shared() 为 true
  • IndividualMorselQueueFactory::is_shared() 为 false

shared_morsel_queue 为 true 时,共享同一个 MorselQueue 的 OlapScanOperators,也需要共享同一个 OlapScanContext。此时,会忽略 OlapScanContextFactory::get_or_create 的传入参数 driver_sequence,只创建一个 OlapScanContext 对象。 反之,则创建 pipeline_dop 个 OlapScanContext。逻辑和 MorselQueueFactory::create 函数类似。

1
2
3
4
5
6
7
8
9
10
11
OlapScanContextPtr OlapScanContextFactory::get_or_create(int32_t driver_sequence) {
DCHECK_LT(driver_sequence, _dop);
// ScanOperators sharing one morsel use the same context.
int32_t idx = _shared_morsel_queue ? 0 : driver_sequence;
DCHECK_LT(idx, _contexts.size());

if (_contexts[idx] == nullptr) {
_contexts[idx] = std::make_shared<OlapScanContext>(_scan_node, _dop, _shared_scan, _chunk_buffer);
}
return _contexts[idx];
}

OlapScanContext

OlapScanContextFactory 的构造函数中会根据是否 shared_morsel_queue 的值为 _contexts 数组初始化合理大小。

1
2
3
4
5
6
7
8
9
10
OlapScanContextFactory(vectorized::OlapScanNode* const scan_node, int32_t dop,
bool shared_morsel_queue, bool shared_scan,
ChunkBufferLimiterPtr chunk_buffer_limiter)
: _scan_node(scan_node),
_dop(dop),
_shared_morsel_queue(shared_morsel_queue),
_shared_scan(shared_scan),
_chunk_buffer(shared_scan ? BalanceStrategy::kRoundRobin : BalanceStrategy::kDirect,
dop, std::move(chunk_buffer_limiter)),
_contexts(shared_morsel_queue ? 1 : dop) {}

OlapScanContext 字段大致分为以下三部分。

  • part1: 待读取的数据元信息

    _conjunct_ctxs: 是本次的 SQL 查询中 where 后的 AND predicates,以及某些查询条件改写后
    _not_push_down_conjuncts: 是无法下推到存储层的判断条件
    _key_ranges: 是本次查询范围
    _dict_optimize_parser: 用于低基数优化,rewrite 谓词

  • part2: 用于 shared_scan 机制

  • part3: 用于防止本次查询的数据在 SQL 执行过程中被删除了,事先增加一次引用计数

OlapScanContext 字段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class OlapScanContext final : public ContextWithDependency {
public:

private:
// part1: meta data
vectorized::OlapScanNode* _scan_node;

std::vector<ExprContext*> _conjunct_ctxs;
vectorized::OlapScanConjunctsManager _conjuncts_manager;
// The conjuncts couldn't push down to storage engine
std::vector<ExprContext*> _not_push_down_conjuncts;
std::vector<std::unique_ptr<OlapScanRange>> _key_ranges;
vectorized::DictOptimizeParser _dict_optimize_parser;
ObjectPool _obj_pool;

// part2: For shared_scan mechanism
using ActiveInputKey = std::pair<int32_t, int32_t>;
using ActiveInputSet = phmap::parallel_flat_hash_set<
ActiveInputKey, typename phmap::Hash<ActiveInputKey>, typename phmap::EqualTo<ActiveInputKey>,
typename std::allocator<ActiveInputKey>, NUM_LOCK_SHARD_LOG, std::mutex, true>;
BalancedChunkBuffer& _chunk_buffer;
ActiveInputSet _active_inputs;
bool _shared_scan;

std::atomic<bool> _is_prepare_finished{false};

// parrt3: avoid to be deleted beacuse of compactions,
// increase reference in perpare stage
std::vector<TabletSharedPtr> _tablets;
std::vector<std::vector<RowsetSharedPtr>> _tablet_rowsets;
}

capture_tablet_rowsets

StarRocks 中 Operators/ExecNode 的执行流程基本都是 prepare -> open -> get_next -> close
OlapScanContext::capture_tablet_rowsets 会在 OlapScanPrepareOperator::prepare 中执行,防止在 SQL 执行过程中(即 get_next 函数读取数据过程中)tablet/rowset 被删除。函数本身是比较简单:

  1. 基于 scan_range->tablet_id 从 TabletManger 中获得 tablet
  2. 再基于 scan_range->version 从 tablet 中获得本次需要查询的 rowsets

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Status OlapScanContext::capture_tablet_rowsets(const std::vector<TInternalScanRange*>& olap_scan_ranges) {
_tablet_rowsets.resize(olap_scan_ranges.size());
_tablets.resize(olap_scan_ranges.size());
for (int i = 0; i < olap_scan_ranges.size(); ++i) {
auto* scan_range = olap_scan_ranges[i];
int64_t version = strtoul(scan_range->version.c_str(), nullptr, 10);
//1. 获得 tablet
ASSIGN_OR_RETURN(TabletSharedPtr tablet, vectorized::OlapScanNode::get_tablet(scan_range));

// Capture row sets of this version tablet.
{
// 2. 获得 rowset
std::shared_lock l(tablet->get_header_lock());
RETURN_IF_ERROR(tablet->capture_consistent_rowsets(Version(0, version), &_tablet_rowsets[i]));
Rowset::acquire_readers(_tablet_rowsets[i]);
}

_tablets[i] = std::move(tablet);
}

return Status::OK();
}

OlapScanPrepareOperator

OlapScanPrepareOperator 内就一个 OlapScanContext 字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class OlapScanPrepareOperator final : public SourceOperator {
public:
OlapScanPrepareOperator(OperatorFactory* factory, int32_t id,
const string& name, int32_t plan_node_id,
int32_t driver_sequence, OlapScanContextPtr ctx)
: SourceOperator(factory, id, name, plan_node_id, driver_sequence),
_ctx(std::move(ctx)) {
_ctx->ref();
}

~OlapScanPrepareOperator::~OlapScanPrepareOperator() {
auto* state = runtime_state();
if (state == nullptr) {
return;
}

_ctx->unref(state);
}

Status prepare(RuntimeState* state) override;
void close(RuntimeState* state) override;

bool has_output() const override;
bool is_finished() const override;

StatusOr<vectorized::ChunkPtr> pull_chunk(RuntimeState* state) override;

private:
OlapScanContextPtr _ctx;
};

pull_chunk

顾名思义,在 OlapScanPrepareOperator 中需要完成的是准备工作,在这里完成的给每个 SourceOperator::_morsel_queue 检测本次所需读取的 tablets/rowsets 是否存在,存在则将其赋值给 _morsel_queue。

pull_chunk 执行完,OlapScanPrepareOperator 的生命周期就结束了,会进入下一个算子 NoopSinkOperator:PipelineDriver 检测到 OlapScanPrepareOperator 算子完成后就会将其结果 push 到 NoopSinkOperator 中

由 OlapScanPrepareOperator 和 OpenSacnOperator 共享一个 OlapScanContext,因此 OlapScanPrepareOperator 的下一个算子 NoopSinkOperator::push_chunk 没有任何操作,因为只要 OlapScanPrepareOperator::pull_chunk 执行完毕,则 OpenSacnOperator 的前置依赖就完成了,就可以开始执行:即从存储层中读数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Status OlapScanPrepareOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(SourceOperator::prepare(state));

RETURN_IF_ERROR(_ctx->prepare(state));
RETURN_IF_ERROR(_ctx->capture_tablet_rowsets(_morsel_queue->olap_scan_ranges()));

return Status::OK();
}

StatusOr<vectorized::ChunkPtr> OlapScanPrepareOperator::pull_chunk(RuntimeState* state) {
Status status = _ctx->parse_conjuncts(state, runtime_in_filters(), runtime_bloom_filters());

_morsel_queue->set_key_ranges(_ctx->key_ranges());
_morsel_queue->set_tablets(_ctx->tablets());
_morsel_queue->set_tablet_rowsets(_ctx->tablet_rowsets());

_ctx->set_prepare_finished();
if (!status.ok()) {
_ctx->set_finished();
return status;
}

return nullptr;
}
Status NoopSinkOperator::push_chunk(RuntimeState* state, const vectorized::ChunkPtr& chunk) override {
return Status::OK();
}