FragmentInstance: BE 构建执行计划子树

本文主要阐述 BE 节点从接受 FE 执行计划到构建 PipelineDriver 并提交给 GlobalDriverExecutor 的过程。

每个 Query 在 StarRocks-FE 生成的物理计划(Physical Plan)后会被拆分为 PlanFragment 来实现 MPP,而 FragmentInstance 是 PlanFragment 的执行实例。简而言之,Query 最终是由多个 BE 节点上的 FragmentInstances 执行的,一个 FragmentInstance 对应着BE 节点上的 FragmentExecutor。

概念理解可以参考 技术内幕 | StarRocks Pipeline 执行框架(上)

PInternalServiceImplBase

_exec_plan_fragment_by_pipeline

开启了 Pipeline 引擎后,最终都是调用 _exec_plan_fragment_by_pipeline 函数来执行 FragmentInstance,内部主要由 pipeline::FragmentExecutor 完成,一共就两个操作:

  1. FragmentExecutor::prepare

    将 FE 传递过来的 FragmentInstance 反序列化生成物理执行计划,即 ExecNode-Tree,然后 ExecNode 的子类(比如 OlapScanNode,HashJoinNode, ExchangeNode等)需要实现 ExecNode::decompose_to_pipeline 函数,通过 decompose_to_pipeline 函数将所有的 ExecNode 分解为 Pipeline Operators。

    再根据 CPU 核数计算 pipeline_dop,生成 dop 个 PipelineDrivers。

  2. FragmentExecutor::execute

    execute 函数比较简单,就是将生成的 drivers 提交给 GlobalDriverExecutor,提交后的状态变化就是前几篇博客所述。因此,本文着重 prepare 函数中的故事。

RPC 接口代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template <typename T>
Status PInternalServiceImplBase<T>::_exec_plan_fragment_by_pipeline(
const TExecPlanFragmentParams& t_common_param,
const TExecPlanFragmentParams& t_unique_request) {

pipeline::FragmentExecutor fragment_executor;
auto status =
fragment_executor.prepare(_exec_env, t_common_param, t_unique_request);
if (status.ok()) {
return fragment_executor.execute(_exec_env);
} else {
return status.is_duplicate_rpc_invocation() ? Status::OK() : status;
}
}

FragmentExecutor

一个 BE 节点可能会在存在一个 query 的多个 FragmentInstances 实例,统一由 QueryContext 进行管理。

在 FragmentExecutor::prepare 阶段需要完成的操作即 6 个 _prepare_xxx 系列函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class FragmentExecutor {
public:
FragmentExecutor();
Status prepare(ExecEnv* exec_env, const TExecPlanFragmentParams& common_request,
const TExecPlanFragmentParams& unique_request);
private:
// Several steps of prepare a fragment
// 1. query context
// 2. fragment context
// 3. workgroup
// 4. runtime state
// 5. exec plan
// 6. pipeline driver
Status _prepare_query_ctx(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request);
Status _prepare_fragment_ctx(const UnifiedExecPlanFragmentParams& request);
Status _prepare_workgroup(const UnifiedExecPlanFragmentParams& request);
Status _prepare_runtime_state(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request);
Status _prepare_exec_plan(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request);
Status _prepare_global_dict(const UnifiedExecPlanFragmentParams& request);
Status _prepare_pipeline_driver(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request);
Status _prepare_stream_load_pipe(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request);

Status _decompose_data_sink_to_operator(RuntimeState* runtime_state,
PipelineBuilderContext* context,
const UnifiedExecPlanFragmentParams& request,
std::unique_ptr<starrocks::DataSink>& datasink,
const TDataSink& thrift_sink,
const std::vector<TExpr>& output_exprs);
//... other methods or fields

int64_t _fragment_start_time = 0;
QueryContext* _query_ctx = nullptr;
std::shared_ptr<FragmentContext> _fragment_ctx = nullptr;
};

_prepare_query_ctx

如图,每个 BE 节点都有一个 QueryContextManger 用于管理在一个 BE 上执行的所有 query,QueryContextManger 可以理解为Map,内部在 {query_id, query_context} 之间建立映射关系。

在 StaRocks-BE 中,全局唯一的对象基本都存储在类 ExecEnv 中,并在 ExecEnv::_init 函数中初始化。

1
2
3
4
5
6
7
8
9
Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_query_context_mgr = new pipeline::QueryContextManager(6);
RETURN_IF_ERROR(_query_context_mgr->init());
//...
}

pipeline::QueryContextManager* query_context_mgr() {
return _query_context_mgr;
}

因为,BE 接受到一个新的 query,需要先在 query_context_mgr 中注册,再使用该 query 的参数对 _query_ctx 进行初始化。

这里重要的是设置查询超时时间,默认是 300s,这是一个 query 最大的可执行时间,其他设置基本都是默认关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
Status FragmentExecutor::_prepare_query_ctx(
ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request) {
// prevent an identical fragment instance from multiple execution caused by FE's
// duplicate invocations of rpc exec_plan_fragment.
const auto& params = request.common().params;
const auto& query_id = params.query_id;
const auto& fragment_instance_id = request.fragment_instance_id();
const auto& query_options = request.common().query_options;

// 1. 去重
auto&& existing_query_ctx = exec_env->query_context_mgr()->get(query_id);
if (existing_query_ctx) {
auto&& existingfragment_ctx =
existing_query_ctx->fragment_mgr()->get(fragment_instance_id);
if (existingfragment_ctx) {
return Status::DuplicateRpcInvocation(
"Duplicate invocations of exec_plan_fragment");
}
}

// 2. 通过 query_id 注册新的 query_contex
_query_ctx = exec_env->query_context_mgr()->get_or_register(query_id);
_query_ctx->set_exec_env(exec_env);

// 3.1 设置 fragment_instances 个数
if (params.__isset.instances_number) {
_query_ctx->set_total_fragments(params.instances_number);
}

// 3.2 设置查询超时时间
_query_ctx->set_delivery_expire_seconds(_calc_delivery_expired_seconds(request));
_query_ctx->set_query_expire_seconds(_calc_query_expired_seconds(request));
// initialize query's deadline
_query_ctx->extend_delivery_lifetime();
_query_ctx->extend_query_lifetime();

// 3.3 是否开启 query profile,默认关闭
if (query_options.__isset.enable_profile && query_options.enable_profile) {
_query_ctx->set_report_profile();
}
if (query_options.__isset.pipeline_profile_level) {
_query_ctx->set_profile_level(query_options.pipeline_profile_level);
}

// 3.4 是否开启查询 trace,默认关闭
bool enable_query_trace = false;
if (query_options.__isset.enable_query_debug_trace
&& query_options.enable_query_debug_trace) {
enable_query_trace = true;
}
_query_ctx->set_query_trace(std::make_shared<starrocks::debug::QueryTrace>(
query_id, enable_query_trace));

return Status::OK();
}

_prepare_fragment_ctx

创建一个 FragmentContext 对象,设置所属的 query,自己的 fragment_instance_id,以及 FE 地址。只有等后续几个 _prepare_xxx 函数都成功执行,才会将此 _fragment_ctx 注册到 _query_ctx 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Status FragmentExecutor::_prepare_fragment_ctx(
const UnifiedExecPlanFragmentParams& request) {
const auto& coord = request.common().coord;
const auto& query_id = request.common().params.query_id;
const auto& fragment_instance_id = request.fragment_instance_id();

_fragment_ctx = std::make_shared<FragmentContext>();

_fragment_ctx->set_query_id(query_id);
_fragment_ctx->set_fragment_instance_id(fragment_instance_id);
_fragment_ctx->set_fe_addr(coord);

return Status::OK();
}