本文主要阐述 BE 节点从接受 FE 执行计划到构建 PipelineDriver 并提交给 GlobalDriverExecutor 的过程。
每个 Query 在 StarRocks-FE 生成的物理计划(Physical Plan)后会被拆分为 PlanFragment 来实现 MPP,而 FragmentInstance 是 PlanFragment 的执行实例。简而言之,Query 最终是由多个 BE 节点上的 FragmentInstances 执行的,一个 FragmentInstance 对应着BE 节点上的 FragmentExecutor。
概念理解可以参考 技术内幕 | StarRocks Pipeline 执行框架(上)。
PInternalServiceImplBase
_exec_plan_fragment_by_pipeline
开启了 Pipeline 引擎后,最终都是调用 _exec_plan_fragment_by_pipeline 函数来执行 FragmentInstance,内部主要由 pipeline::FragmentExecutor 完成,一共就两个操作:
FragmentExecutor::prepare
将 FE 传递过来的 FragmentInstance 反序列化生成物理执行计划,即 ExecNode-Tree,然后 ExecNode 的子类(比如 OlapScanNode,HashJoinNode, ExchangeNode等)需要实现 ExecNode::decompose_to_pipeline 函数,通过 decompose_to_pipeline 函数将所有的 ExecNode 分解为 Pipeline Operators。
再根据 CPU 核数计算 pipeline_dop,生成 dop 个 PipelineDrivers。
FragmentExecutor::execute
execute 函数比较简单,就是将生成的 drivers 提交给 GlobalDriverExecutor,提交后的状态变化就是前几篇博客所述。因此,本文着重 prepare 函数中的故事。
RPC 接口代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| template <typename T> Status PInternalServiceImplBase<T>::_exec_plan_fragment_by_pipeline( const TExecPlanFragmentParams& t_common_param, const TExecPlanFragmentParams& t_unique_request) {
pipeline::FragmentExecutor fragment_executor; auto status = fragment_executor.prepare(_exec_env, t_common_param, t_unique_request); if (status.ok()) { return fragment_executor.execute(_exec_env); } else { return status.is_duplicate_rpc_invocation() ? Status::OK() : status; } }
|
FragmentExecutor
一个 BE 节点可能会在存在一个 query 的多个 FragmentInstances 实例,统一由 QueryContext 进行管理。
在 FragmentExecutor::prepare 阶段需要完成的操作即 6 个 _prepare_xxx 系列函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| class FragmentExecutor { public: FragmentExecutor(); Status prepare(ExecEnv* exec_env, const TExecPlanFragmentParams& common_request, const TExecPlanFragmentParams& unique_request); private: Status _prepare_query_ctx(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request); Status _prepare_fragment_ctx(const UnifiedExecPlanFragmentParams& request); Status _prepare_workgroup(const UnifiedExecPlanFragmentParams& request); Status _prepare_runtime_state(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request); Status _prepare_exec_plan(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request); Status _prepare_global_dict(const UnifiedExecPlanFragmentParams& request); Status _prepare_pipeline_driver(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request); Status _prepare_stream_load_pipe(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request);
Status _decompose_data_sink_to_operator(RuntimeState* runtime_state, PipelineBuilderContext* context, const UnifiedExecPlanFragmentParams& request, std::unique_ptr<starrocks::DataSink>& datasink, const TDataSink& thrift_sink, const std::vector<TExpr>& output_exprs);
int64_t _fragment_start_time = 0; QueryContext* _query_ctx = nullptr; std::shared_ptr<FragmentContext> _fragment_ctx = nullptr; };
|
_prepare_query_ctx
如图,每个 BE 节点都有一个 QueryContextManger 用于管理在一个 BE 上执行的所有 query,QueryContextManger 可以理解为Map,内部在 {query_id, query_context} 之间建立映射关系。
在 StaRocks-BE 中,全局唯一的对象基本都存储在类 ExecEnv 中,并在 ExecEnv::_init 函数中初始化。
1 2 3 4 5 6 7 8 9
| Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { _query_context_mgr = new pipeline::QueryContextManager(6); RETURN_IF_ERROR(_query_context_mgr->init()); }
pipeline::QueryContextManager* query_context_mgr() { return _query_context_mgr; }
|
因为,BE 接受到一个新的 query,需要先在 query_context_mgr 中注册,再使用该 query 的参数对 _query_ctx 进行初始化。
这里重要的是设置查询超时时间,默认是 300s,这是一个 query 最大的可执行时间,其他设置基本都是默认关闭。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| Status FragmentExecutor::_prepare_query_ctx( ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request) { const auto& params = request.common().params; const auto& query_id = params.query_id; const auto& fragment_instance_id = request.fragment_instance_id(); const auto& query_options = request.common().query_options;
auto&& existing_query_ctx = exec_env->query_context_mgr()->get(query_id); if (existing_query_ctx) { auto&& existingfragment_ctx = existing_query_ctx->fragment_mgr()->get(fragment_instance_id); if (existingfragment_ctx) { return Status::DuplicateRpcInvocation( "Duplicate invocations of exec_plan_fragment"); } }
_query_ctx = exec_env->query_context_mgr()->get_or_register(query_id); _query_ctx->set_exec_env(exec_env);
if (params.__isset.instances_number) { _query_ctx->set_total_fragments(params.instances_number); }
_query_ctx->set_delivery_expire_seconds(_calc_delivery_expired_seconds(request)); _query_ctx->set_query_expire_seconds(_calc_query_expired_seconds(request)); _query_ctx->extend_delivery_lifetime(); _query_ctx->extend_query_lifetime();
if (query_options.__isset.enable_profile && query_options.enable_profile) { _query_ctx->set_report_profile(); } if (query_options.__isset.pipeline_profile_level) { _query_ctx->set_profile_level(query_options.pipeline_profile_level); }
bool enable_query_trace = false; if (query_options.__isset.enable_query_debug_trace && query_options.enable_query_debug_trace) { enable_query_trace = true; } _query_ctx->set_query_trace(std::make_shared<starrocks::debug::QueryTrace>( query_id, enable_query_trace));
return Status::OK(); }
|
_prepare_fragment_ctx
创建一个 FragmentContext 对象,设置所属的 query,自己的 fragment_instance_id,以及 FE 地址。只有等后续几个 _prepare_xxx 函数都成功执行,才会将此 _fragment_ctx 注册到 _query_ctx 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Status FragmentExecutor::_prepare_fragment_ctx( const UnifiedExecPlanFragmentParams& request) { const auto& coord = request.common().coord; const auto& query_id = request.common().params.query_id; const auto& fragment_instance_id = request.fragment_instance_id();
_fragment_ctx = std::make_shared<FragmentContext>();
_fragment_ctx->set_query_id(query_id); _fragment_ctx->set_fragment_instance_id(fragment_instance_id); _fragment_ctx->set_fe_addr(coord);
return Status::OK(); }
|