和 Morsel 有关的 SourceOperator 的类继承关系如下:
从 Morsel 到 OlapScanOperator 的执行流程如下:
OlapScanPrepareOperator 和 OlapScanOperator 共享一个 MorselQueue,也是通过这个 MorselQueue 建立依赖关系 : 当 OlapScanPrepareOperator::pull_chunk 执行完毕,OlapScanOperator::pull_chunk 就可以从该 MorselQueue 中取出 morsel,进而才从存储层中读取数据。
构建 Pipeline 的过程为这两个 Pipelines 的输入设置了同一个 MorselQueue,具体过程在 FragmentExecutor::_prepare_pipeline_driver 函数处。
OlapScanNode::decompose_to_pipeline 通过 decompose_to_pipeline 函数将 ExecNode 分解为上图中 Operators,生成两个 Piplines。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 pipeline::OpFactories OlapScanNode::decompose_to_pipeline (pipeline::PipelineBuilderContext* context) { auto * morsel_queue_factory = context->morsel_queue_factory_of_source_operator (id ()); size_t dop = morsel_queue_factory->size (); bool shared_morsel_queue = morsel_queue_factory->is_shared (); size_t max_buffer_capacity = pipeline::ScanOperator::max_buffer_capacity () * dop; size_t default_buffer_capacity = std::min <size_t >(max_buffer_capacity, estimated_max_concurrent_chunks ()); pipeline::ChunkBufferLimiterPtr buffer_limiter = std::make_unique <pipeline::DynamicChunkBufferLimiter>( max_buffer_capacity, default_buffer_capacity, _mem_limit, runtime_state ()->chunk_size ()); auto scan_ctx_factory = std::make_shared <pipeline::OlapScanContextFactory>( this , dop, shared_morsel_queue, _enable_shared_scan, std::move (buffer_limiter)); auto && rc_rf_probe_collector = std::make_shared <RcRfProbeCollector>(2 , std::move (this ->runtime_filter_collector ())); auto scan_prepare_op = std::make_shared <pipeline::OlapScanPrepareOperatorFactory>(context->next_operator_id (), id (), this , scan_ctx_factory); scan_prepare_op->set_degree_of_parallelism (shared_morsel_queue ? 1 : dop); this ->init_runtime_filter_for_operator (scan_prepare_op.get (), context, rc_rf_probe_collector); auto scan_prepare_pipeline = pipeline::OpFactories{ std::move (scan_prepare_op), std::make_shared <pipeline::NoopSinkOperatorFactory>(context->next_operator_id (), id ()), }; context->add_pipeline (scan_prepare_pipeline); auto scan_op = std::make_shared <pipeline::OlapScanOperatorFactory>(context->next_operator_id (), this , std::move (scan_ctx_factory)); this ->init_runtime_filter_for_operator (scan_op.get (), context, rc_rf_probe_collector); return pipeline::decompose_scan_node_to_pipeline (scan_op, this , context); }
在执行此函数前,已经使用 convert-scan-range-to-morsel-queue-factory 函数为每个输入分配了一个 MorselQueueFactory。
下面需要将每个 OlapScanNode 分解为 Operators,后续再根据 pipeline_dop,生成 pipeline_dop 个 PipelineDrivers,而这 pipeline_dop 个 PipelineDrivers 要么共享一个 MorselQueue,要么每个 PipelineDriver 都分配一个 MorselQueue,具体情况由 MorselQueueFactory::is_shared()d函数来确定:
SharedMorselQueueFactory::is_shared() 为 true
IndividualMorselQueueFactory::is_shared() 为 false
在 shared_morsel_queue 为 true 时,共享同一个 MorselQueue 的 OlapScanOperators,也需要共享同一个 OlapScanContext。此时,会忽略 OlapScanContextFactory::get_or_create 的传入参数 driver_sequence,只创建一个 OlapScanContext 对象。 反之,则创建 pipeline_dop 个 OlapScanContext。逻辑和 MorselQueueFactory::create 函数类似。
1 2 3 4 5 6 7 8 9 10 11 OlapScanContextPtr OlapScanContextFactory::get_or_create (int32_t driver_sequence) { DCHECK_LT (driver_sequence, _dop); int32_t idx = _shared_morsel_queue ? 0 : driver_sequence; DCHECK_LT (idx, _contexts.size ()); if (_contexts[idx] == nullptr ) { _contexts[idx] = std::make_shared <OlapScanContext>(_scan_node, _dop, _shared_scan, _chunk_buffer); } return _contexts[idx]; }
OlapScanContext OlapScanContextFactory 的构造函数中会根据是否 shared_morsel_queue 的值为 _contexts 数组初始化合理大小。
1 2 3 4 5 6 7 8 9 10 OlapScanContextFactory (vectorized::OlapScanNode* const scan_node, int32_t dop, bool shared_morsel_queue, bool shared_scan, ChunkBufferLimiterPtr chunk_buffer_limiter) : _scan_node(scan_node), _dop(dop), _shared_morsel_queue(shared_morsel_queue), _shared_scan(shared_scan), _chunk_buffer(shared_scan ? BalanceStrategy::kRoundRobin : BalanceStrategy::kDirect, dop, std::move (chunk_buffer_limiter)), _contexts(shared_morsel_queue ? 1 : dop) {}
OlapScanContext 字段大致分为以下三部分。
part1: 待读取的数据元信息
_conjunct_ctxs: 是本次的 SQL 查询中 where 后的 AND predicates,以及某些查询条件改写后 _not_push_down_conjuncts: 是无法下推到存储层的判断条件 _key_ranges: 是本次查询范围 _dict_optimize_parser: 用于低基数优化,rewrite 谓词
part2: 用于 shared_scan 机制
part3: 用于防止本次查询的数据在 SQL 执行过程中被删除了,事先增加一次引用计数
OlapScanContext 字段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class OlapScanContext final : public ContextWithDependency {public :private : vectorized::OlapScanNode* _scan_node; std::vector<ExprContext*> _conjunct_ctxs; vectorized::OlapScanConjunctsManager _conjuncts_manager; std::vector<ExprContext*> _not_push_down_conjuncts; std::vector<std::unique_ptr<OlapScanRange>> _key_ranges; vectorized::DictOptimizeParser _dict_optimize_parser; ObjectPool _obj_pool; using ActiveInputKey = std::pair<int32_t , int32_t >; using ActiveInputSet = phmap::parallel_flat_hash_set< ActiveInputKey, typename phmap::Hash<ActiveInputKey>, typename phmap::EqualTo<ActiveInputKey>, typename std::allocator<ActiveInputKey>, NUM_LOCK_SHARD_LOG, std::mutex, true >; BalancedChunkBuffer& _chunk_buffer; ActiveInputSet _active_inputs; bool _shared_scan; std::atomic<bool > _is_prepare_finished{false }; std::vector<TabletSharedPtr> _tablets; std::vector<std::vector<RowsetSharedPtr>> _tablet_rowsets; }
capture_tablet_rowsets StarRocks 中 Operators/ExecNode 的执行流程基本都是 prepare -> open -> get_next -> close
。 而 OlapScanContext::capture_tablet_rowsets 会在 OlapScanPrepareOperator::prepare 中执行,防止在 SQL 执行过程中(即 get_next 函数读取数据过程中)tablet/rowset 被删除 。函数本身是比较简单:
基于 scan_range->tablet_id 从 TabletManger 中获得 tablet
再基于 scan_range->version 从 tablet 中获得本次需要查询的 rowsets
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Status OlapScanContext::capture_tablet_rowsets (const std::vector<TInternalScanRange*>& olap_scan_ranges) { _tablet_rowsets.resize (olap_scan_ranges.size ()); _tablets.resize (olap_scan_ranges.size ()); for (int i = 0 ; i < olap_scan_ranges.size (); ++i) { auto * scan_range = olap_scan_ranges[i]; int64_t version = strtoul (scan_range->version.c_str (), nullptr , 10 ); ASSIGN_OR_RETURN (TabletSharedPtr tablet, vectorized::OlapScanNode::get_tablet (scan_range)); { std::shared_lock l (tablet->get_header_lock()) ; RETURN_IF_ERROR (tablet->capture_consistent_rowsets (Version (0 , version), &_tablet_rowsets[i])); Rowset::acquire_readers (_tablet_rowsets[i]); } _tablets[i] = std::move (tablet); } return Status::OK (); }
OlapScanPrepareOperator OlapScanPrepareOperator 内就一个 OlapScanContext 字段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 class OlapScanPrepareOperator final : public SourceOperator {public : OlapScanPrepareOperator (OperatorFactory* factory, int32_t id, const string& name, int32_t plan_node_id, int32_t driver_sequence, OlapScanContextPtr ctx) : SourceOperator (factory, id, name, plan_node_id, driver_sequence), _ctx(std::move (ctx)) { _ctx->ref (); } ~OlapScanPrepareOperator::~OlapScanPrepareOperator () { auto * state = runtime_state (); if (state == nullptr ) { return ; } _ctx->unref (state); } Status prepare (RuntimeState* state) override ; void close (RuntimeState* state) override ; bool has_output () const override ; bool is_finished () const override ; StatusOr<vectorized::ChunkPtr> pull_chunk (RuntimeState* state) override ; private : OlapScanContextPtr _ctx; };
pull_chunk 顾名思义,在 OlapScanPrepareOperator 中需要完成的是准备工作,在这里完成的给每个 SourceOperator::_morsel_queue 检测本次所需读取的 tablets/rowsets 是否存在,存在则将其赋值给 _morsel_queue。
pull_chunk 执行完,OlapScanPrepareOperator 的生命周期就结束了,会进入下一个算子 NoopSinkOperator:PipelineDriver 检测到 OlapScanPrepareOperator 算子完成后就会将其结果 push 到 NoopSinkOperator 中 。
由 OlapScanPrepareOperator 和 OpenSacnOperator 共享一个 OlapScanContext,因此 OlapScanPrepareOperator 的下一个算子 NoopSinkOperator::push_chunk 没有任何操作,因为只要 OlapScanPrepareOperator::pull_chunk 执行完毕,则 OpenSacnOperator 的前置依赖就完成了,就可以开始执行:即从存储层中读数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 Status OlapScanPrepareOperator::prepare (RuntimeState* state) { RETURN_IF_ERROR (SourceOperator::prepare (state)); RETURN_IF_ERROR (_ctx->prepare (state)); RETURN_IF_ERROR (_ctx->capture_tablet_rowsets (_morsel_queue->olap_scan_ranges ())); return Status::OK (); } StatusOr<vectorized::ChunkPtr> OlapScanPrepareOperator::pull_chunk (RuntimeState* state) { Status status = _ctx->parse_conjuncts (state, runtime_in_filters (), runtime_bloom_filters ()); _morsel_queue->set_key_ranges (_ctx->key_ranges ()); _morsel_queue->set_tablets (_ctx->tablets ()); _morsel_queue->set_tablet_rowsets (_ctx->tablet_rowsets ()); _ctx->set_prepare_finished (); if (!status.ok ()) { _ctx->set_finished (); return status; } return nullptr ; } Status NoopSinkOperator::push_chunk (RuntimeState* state, const vectorized::ChunkPtr& chunk) override { return Status::OK (); }