PlanFragmentBuilder: 从 PhysicalPlan 到 ExecPlan

输入的 SQL 经过 Parser 后生成 AST 并最终转化为 Relation Tree, 基于 Relation 生成逻辑计划 logicalPlan,逻辑计划再经过优化器生成物理计划 PhysicalPlan,最终生成执行计划 ExecPlan。从 LogicalPlan 到 ExecPlan 的三个步骤如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static ExecPlan createQueryPlan(Relation relation, ConnectContext session,
TResultSinkType resultSinkType) {
QueryRelation query = (QueryRelation) relation;
List<String> colNames = query.getColumnOutputNames();

//1. Build Logical plan
ColumnRefFactory columnRefFactory = new ColumnRefFactory();
LogicalPlan logicalPlan = new RelationTransformer(
columnRefFactory, session).transformWithSelectLimit(query);

//2. Optimize logical plan and build physical plan
OptExpression optimizedPlan = new Optimizer().optimize(
session,
logicalPlan.getRoot(),
new PhysicalPropertySet(),
new ColumnRefSet(logicalPlan.getOutputColumn()),
columnRefFactory);

//3. Build fragment exec plan
return new PlanFragmentBuilder().createPhysicalPlan(
optimizedPlan, session,
logicalPlan.getOutputColumn(),
columnRefFactory, colNames,
resultSinkType,
!session.getSessionVariable().isSingleNodeExecPlan());
}

本节主要讲解 PlanFragmentBuilder 是何将物理计划 optimizedPlan 转化成执行计划 ExecPlan。

createPhysicalPlan

经过优化器后生成的物理计划 optimizedPlan 是由 OptExpression 构成的树。createPhysicalPlan 函数主要有两步:

  1. createOutputFragment 函数将 OptExpression 转化为 PlanNode

    一个 Fragment 包含由 PlanNode 组成的子树,每个 Fragment.PlanNode Tree 的叶结点是 ScanNode 或者 ExchangeNode:ScanNode 用于从 BE 节点存储层读取数据,ExchangeNode 用于接受其他的 Fragments 的输出。 生成的 Fragments 保存在 ExecPlan.fragments 中

  2. finalizeFragments 函数为生成的 ExecPlan.fragments 中的每个 Fragment 都分配一个 DataSink,

    DataSink 和 ExchangeNode 配对,即 DataSink 是数据发送端,ExchangeNode 是数据接收端,这样 Fragments 才串成完整的 tree,形成 MPP 架构。

createPhysicalPlan 函数逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static ExecPlan createPhysicalPlan(OptExpression plan, 
ConnectContext connectContext,
List<ColumnRefOperator> outputColumns,
ColumnRefFactory columnRefFactory,
List<String> colNames,
TResultSinkType resultSinkType,
boolean hasOutputFragment) {
ExecPlan execPlan =
new ExecPlan(connectContext, colNames, plan, outputColumns);
createOutputFragment(
new PhysicalPlanTranslator(columnRefFactory).visit(plan, execPlan),
execPlan, outputColumns, hasOutputFragment);
execPlan.setPlanCount(plan.getPlanCount());
return finalizeFragments(execPlan, resultSinkType);
}

下面以 OlapScanNode、JoinNode 为例进行说明如何生成 Fragments。

visitPhysicalOlapScan

OptExpression Tree 采用的是后序递归遍历,因为会先遍历到叶节点,比如 OlapScanNode。

下面就需要根据 PhysicalOlapScanOperator 中的信息生成 OlapScanNode,并生成 ExecPlan.fragments 中的第一个 Fragment, 而 OlapScanNode 即该 Fragment 的 root node。

因为 OlapScanNode 最终是要在BE上执行的,因此要包含具体的存储信息的,比如要访问的列(tupleDescriptor)、要访问的 Tablets 位于哪些BE节点上,以及一些能下推到存储层的谓词等。

下面代码重点关注设置 tablet 信息部分,其余部分略去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public PlanFragment visitPhysicalOlapScan(OptExpression optExpr, ExecPlan context) {
PhysicalOlapScanOperator node = (PhysicalOlapScanOperator) optExpr.getOp();

OlapTable referenceTable = (OlapTable) node.getTable();
context.getDescTbl().addReferencedTable(referenceTable);
// 构造 tupleDescriptor,后面填充要访问的列信息
TupleDescriptor tupleDescriptor = context.getDescTbl().createTupleDescriptor();
tupleDescriptor.setTable(referenceTable);

// 构造 ScanNode 对象
OlapScanNode scanNode = new OlapScanNode(context.getNextNodeId(),
tupleDescriptor,
"OlapScanNode");
scanNode.setLimit(node.getLimit());
scanNode.computeStatistics(optExpr.getStatistics());

// 1. set tablet
try {
// {index_id, partition_ids, tablet_ids}
scanNode.updateScanInfo(node.getSelectedPartitionId(),
node.getSelectedTabletId(),
node.getSelectedIndexId());
long selectedIndexId = node.getSelectedIndexId();
long totalTabletsNum = 0;
long localBeId = -1;
// 1.1. 筛选出非空分区
List<Long> selectedNonEmptyPartitionIds =
node.getSelectedPartitionId().stream().filter(p -> {
List<Long> selectTabletIds = scanNode.getPartitionToScanTabletMap().get(p);
return selectTabletIds != null && !selectTabletIds.isEmpty();
}).collect(Collectors.toList());
scanNode.setSelectedPartitionIds(selectedNonEmptyPartitionIds);
// 1.2 设置要访问的 tablet
for (Long partitionId : selectedNonEmptyPartitionIds) {
List<Long> selectTabletIds = scanNode.getPartitionToScanTabletMap().get(partitionId);
final Partition partition = referenceTable.getPartition(partitionId);
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
Map<Long, Integer> tabletId2BucketSeq = Maps.newHashMap();
List<Long> allTabletIds = selectedTable.getTabletIdsInOrder();
for (int i = 0; i < allTabletIds.size(); i++) {
tabletId2BucketSeq.put(allTabletIds.get(i), i);
}
scanNode.setTabletId2BucketSeq(tabletId2BucketSeq);
// ScanNode 本次要访问的所有 tablets
List<Tablet> tablets = selectTabletIds.stream().map(
selectedTable::getTablet).collect(Collectors.toList());
// {table, partition, tablets} 确定了本次访问的 tablets
scanNode.addScanRangeLocations(partition,
selectedTable,
tablets,
localBeId);
// totalTabletsNum 用于 explain 显示
totalTabletsNum += selectedTable.getTablets().size();
}
scanNode.setTotalTabletsNum(totalTabletsNum);
} catch (UserException e) {
throw new StarRocksPlannerException(
"Build Exec OlapScanNode fail, scan info is invalid," + e.getMessage(),
INTERNAL_ERROR);
}

//...
context.getScanNodes().add(scanNode);
// 5. 生成新的 Fragment
PlanFragment fragment = new PlanFragment(
context.getNextFragmentId(),
scanNode, // 根节点
DataPartition.RANDOM);
fragment.setQueryGlobalDicts(node.getGlobalDicts());
context.getFragments().add(fragment);
return fragment;
}

addScanRangeLocationsp

addScanRangeLocationsp 函数是为了记录 tablets 的所有可用副本在 BE 节点上的位置,为后续下发计算任务准备。

TScanRangeLocations

这里是用的 thrift protoc 定义的 TScanRangeLocations,

1
2
3
4
5
6
7
8
9
struct TScanRangeLocations {
1: required PlanNodes.TScanRange scan_range
2: list<TScanRangeLocation> locations // non-empty list
}

struct TScanRange {
4: optional TInternalScanRange internal_scan_range
//...
}
  • TScanRangeLocations.scan_range 记录的本次 scan_range 的元信息,在 BE 中会根据这些元信息找到每个 tablet 中要访问的 rowsets。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    struct TInternalScanRange {
    1: required list<Types.TNetworkAddress> hosts
    2: required string schema_hash
    3: required string version
    4: required string version_hash // Deprecated
    5: required Types.TTabletId tablet_id
    6: required string db_name
    7: optional list<TKeyRange> partition_column_ranges
    8: optional string index_name
    9: optional string table_name
    10: optional i64 partition_id
    }
  • TScanRangeLocations.locations 则记录的的是该 tablet 所有副本在BE上的位置
    1
    2
    3
    4
    5
    struct TScanRangeLocation {
    1: required Types.TNetworkAddress server
    2: optional i32 volume_id = -1 // Just used for hdfs
    3: optional i64 backend_id
    }

addScanRangeLocations 函数就是填充 TScanRangeLocations 对象,然后所有 tablets 的 scanRangeLocations 结构都保存到 result 中。这一步主要是为后面 Coordinator 中调度准备,完整代码详见 addScanRangeLocations

visitPhysicalHashJoin

HashJoin 先生成左右两个子节点的 Fragment 作为输入,再来生成执行 HashJoin 的 Fragment。

1
2
3
4
5
public PlanFragment visitPhysicalHashJoin(OptExpression optExpr, ExecPlan context) {
PlanFragment leftFragment = visit(optExpr.inputAt(0), context);
PlanFragment rightFragment = visit(optExpr.inputAt(1), context);
return visitPhysicalJoin(leftFragment, rightFragment, optExpr, context);
}

visitPhysicalJoin

主要分为一下三个部分

1. DistributionMode

获取 PhysicalJoinOperator 中的信息,并传递给 HashJoinNode。 PhysicalJoinOperator 和它的两个子节点可能组合方式如下:
PlanFragmentBuilder-2
出于简洁目的,使用 PhysicalOlapScanOp (OlapScanNode)表征该子节点数据来自于本地,用 PhysicalOlapScanOp(ExchangeNode)表示数据来自于其他Fragment 的输出。

  1. 左右子节点都是 ExchangeNode,并且数据分布属性都是 DistributionSpec.DistributionType.SHUFFLE

  2. 左右子节点都不是 ExchangeNode,则数据可以直接通过 OlapScanNode 从本地获取

    如果 isColocateJoin 函数为 true,即数据源分布属性是 HashDistributionDesc.SourceType.LOCAL,则是不需要 shuffle,因为在 Coordinator 中已经完成了 Shuffle,BE 中计算时能直接在 bucket 层次进行 join。

    这个场景一般是 join-key 就是 table 的 key-column,这样构建 hashmap 的数据本身就是正交的,无需 shuffle 来达到正交的目标。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private boolean isColocateJoin(OptExpression optExpression) {
    return optExpression.getRequiredProperties().stream().allMatch(
    physicalPropertySet -> {
    if (!physicalPropertySet.getDistributionProperty().isShuffle()) {
    return false;
    }
    HashDistributionDesc.SourceType hashSourceType = ((HashDistributionSpec)
    (physicalPropertySet.getDistributionProperty().getSpec()))
    .getHashDistributionDesc().getSourceType();
    return hashSourceType.equals(HashDistributionDesc.SourceType.LOCAL);
    });
    }

    如果 isColocateJoin 函数为 false, 判断 isShuffleJoin 函数是否为 true。判断 PhysicalJoinOperator 的数据源分布是否需要 shuffle。这个场景一般是 join-key 包含了 value-column,需要在 BE 节点上对 value-column 进行 shuffle 得到正交的 join-key 数据。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public boolean isShuffleJoin(OptExpression optExpression) {
    return optExpression.getRequiredProperties().stream().allMatch(
    physicalPropertySet -> {
    if (!physicalPropertySet.getDistributionProperty().isShuffle()) {
    return false;
    }
    HashDistributionDesc.SourceType hashSourceType = ((HashDistributionSpec)
    (physicalPropertySet.getDistributionProperty().getSpec()))
    .getHashDistributionDesc().getSourceType();
    return hashSourceType.equals(HashDistributionDesc.SourceType.SHUFFLE_JOIN) ||
    hashSourceType.equals(HashDistributionDesc.SourceType.SHUFFLE_ENFORCE) ||
    hashSourceType.equals(HashDistributionDesc.SourceType.SHUFFLE_AGG);
    });
    }
  3. 右子节点是 ExchangeNode 且数据源分布是 DistributionSpec.DistributionType.BROADCAST

    此时不关注左子节点是 OlapScanNode or ExchangeNode 以及是啥数据源分布,因为 BE 在执行时是将右表的数据全部发送到左表。

  4. other

TODO: 优化器何时会使得 isShuffleJoin 为 true

这部分代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private PlanFragment visitPhysicalJoin(PlanFragment leftFragment, 
PlanFragment rightFragment,
OptExpression optExpr,
ExecPlan context) {
PhysicalJoinOperator node = (PhysicalJoinOperator) optExpr.getOp();

ColumnRefSet leftChildColumns =
optExpr.inputAt(0).getLogicalProperty().getOutputColumns();
ColumnRefSet rightChildColumns =
optExpr.inputAt(1).getLogicalProperty().getOutputColumns();

//...
JoinNode.DistributionMode distributionMode;
if (isExchangeWithDistributionType(
leftFragmentPlanRoot, DistributionSpec.DistributionType.SHUFFLE)
&& isExchangeWithDistributionType(
rightFragmentPlanRoot, DistributionSpec.DistributionType.SHUFFLE)) {
distributionMode = JoinNode.DistributionMode.PARTITIONED;
} else if (isExchangeWithDistributionType(
rightFragmentPlanRoot, DistributionSpec.DistributionType.BROADCAST)) {
distributionMode = JoinNode.DistributionMode.BROADCAST;
} else if (!(leftFragmentPlanRoot instanceof ExchangeNode)
&& !(rightFragmentPlanRoot instanceof ExchangeNode)) {
if (isColocateJoin(optExpr)) {
distributionMode = HashJoinNode.DistributionMode.COLOCATE;
} else if (isShuffleJoin(optExpr)) {
distributionMode = JoinNode.DistributionMode.SHUFFLE_HASH_BUCKET;
} else {
distributionMode = JoinNode.DistributionMode.COLOCATE;
}
} else if (isShuffleJoin(optExpr)) {
distributionMode = JoinNode.DistributionMode.SHUFFLE_HASH_BUCKET;
} else {
distributionMode = JoinNode.DistributionMode.LOCAL_HASH_BUCKET;
}
//...
}

2. JoinNode

根据 PhysicalHashJoinOperator 信息生生成 JoinNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
JoinNode joinNode;
if (node instanceof PhysicalHashJoinOperator) {
joinNode = new HashJoinNode(
context.getNextNodeId(),
leftFragment.getPlanRoot(), rightFragment.getPlanRoot(),
joinOperator, eqJoinConjuncts, otherJoinConjuncts);
} else if (node instanceof PhysicalMergeJoinOperator) {
joinNode = new MergeJoinNode(
context.getNextNodeId(),
leftFragment.getPlanRoot(), rightFragment.getPlanRoot(),
joinOperator, eqJoinConjuncts, otherJoinConjuncts);
} else {
throw new StarRocksPlannerException("unknown join operator: " + node, INTERNAL_ERROR);
}

3. JoinFragmentBuilder

下面就是要根据不同 JoinNode.DistributionMode 将 JoinNode 转化为 PlanFragment。

JoinNode.DistributionMode.BROADCAST

此时 rightFragment 的根节点是 ExchangeNode,并且数据源分布是 JoinNode.DistributionMode.BROADCAST。

在BE端生成 Pipeline 时,只会产生一个 HashJoinBuildOperator,即所有的 PipeleDrivers 共享一个 HashJoinBuildOperator。

1
2
3
4
5
6
7
8
9
10
11
HashJoinerPtr create_builder(int driver_sequence) {
if (_is_broadcast()) {
driver_sequence = BROADCAST_BUILD_DRIVER_SEQUENCE;
}

if (!_builder_map[driver_sequence]) {
_builder_map[driver_sequence] =
std::make_shared<HashJoiner>(_param, _read_only_probers);
}
return _builder_map[driver_sequence];
}

在BE端执行时,HashJoinBuildOperator 中的数据全部发送给左表。因此 右表的变成左表的依赖,因此需要将右表变成左表的子节点,将左表的 fragment 作为输出返回。

注意:context.getFragments() 中包含的是所有独立的 Fragment 子树,而 rightFragment 的根节点是 ExchangeNode,说明 rightFragment 对应生产者 Fragment 已经是一个独立的 Fragment subtree,因此需要将 leftFragment、rightFragment 从 context.getFragments() 中移除,再将 leftFragment 作为一个新的 Fragment 子树加入 context.getFragments()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if (distributionMode.equals(JoinNode.DistributionMode.BROADCAST)) {
setJoinPushDown(joinNode);

// Connect parent and child fragment
rightFragment.getPlanRoot().setFragment(leftFragment);

context.getFragments().remove(rightFragment);
context.getFragments().remove(leftFragment);

context.getFragments().add(leftFragment);
leftFragment.setPlanRoot(joinNode);
// 构建依赖关系
leftFragment.addChild(rightFragment.getChild(0));
leftFragment.mergeQueryGlobalDicts(rightFragment.getQueryGlobalDicts());
// 返回左表的fragment
return leftFragment;
}

JoinNode.DistributionMode.PARTITIONED

此时,左右fragments的根节点都是 ExchangeNode,且都需要进行 Shuffle。那么就需新建个 joinFragment,并将 rightFragment 和 rightFragment 作为子节点。在 BE 端执行时,leftFragment、rightFragment 通过 ExchangeSink 将数据发送到啊 ExchangeNode,在 JoinFragment 中进行 join 操作。

此时的数据数据分区方式是 TPartitionType.HASH_PARTITIONED,BE 端会根据分区方式选择计算hash值的函数,主要是区分于 TPartitionType.BUCKET_SHUFFLE_HASH_PARTITIONED。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if (distributionMode.equals(JoinNode.DistributionMode.PARTITIONED)) {
DataPartition lhsJoinPartition = new DataPartition(TPartitionType.HASH_PARTITIONED,
leftFragment.getDataPartition().getPartitionExprs());
DataPartition rhsJoinPartition = new DataPartition(TPartitionType.HASH_PARTITIONED,
rightFragment.getDataPartition().getPartitionExprs());

leftFragment.getChild(0).setOutputPartition(lhsJoinPartition);
rightFragment.getChild(0).setOutputPartition(rhsJoinPartition);

context.getFragments().remove(leftFragment);
context.getFragments().remove(rightFragment);

PlanFragment joinFragment = new PlanFragment(context.getNextFragmentId(),
joinNode, lhsJoinPartition);
joinFragment.addChild(leftFragment.getChild(0));
joinFragment.addChild(rightFragment.getChild(0));

joinFragment.mergeQueryGlobalDicts(leftFragment.getQueryGlobalDicts());
joinFragment.mergeQueryGlobalDicts(rightFragment.getQueryGlobalDicts());
context.getFragments().add(joinFragment);
return joinFragment;
}

JoinNode.DistributionMode.COLOCATE

这时 leftFragment、rightFragment 的子节点数据源都是来自 OlapScanNode 而不是 ExchangeNode。

BE 端在执行时,就是能够在一个 Fragment 中从本地读取 build-table 和 probe-table 的数据直接进行 join 操作,而不是从 ExchangeNode 读取。此外 COLOCATE-Join 是直接按照 join-key 分布,那么就可以直接 bucket-join。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (distributionMode.equals(JoinNode.DistributionMode.COLOCATE)) {
joinNode.setColocate(true, "");
setJoinPushDown(joinNode);

joinNode.setChild(0, leftFragment.getPlanRoot());
joinNode.setChild(1, rightFragment.getPlanRoot());
leftFragment.setPlanRoot(joinNode);
leftFragment.addChildren(rightFragment.getChildren());
context.getFragments().remove(rightFragment);
context.getFragments().remove(leftFragment);
context.getFragments().add(leftFragment);

leftFragment.mergeQueryGlobalDicts(rightFragment.getQueryGlobalDicts());

return leftFragment;
}

JoinNode.DistributionMode.SHUFFLE_HASH_BUCKET

如果 join-key 不全是表的 key-column,包含了 value-column,那么就得使用 shuffle 将数据进行正交。

  • 如果 build-table 和 probe-table 的数据源都不是 ExchangeNode,就不用生成新的 joinFragment,直接复用 leftFragment 即可。
  • 如果其中有一个是 ExchangeNode,则删除该 ExchangeNode 对应的 fragment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
if (distributionMode.equals(JoinNode.DistributionMode.SHUFFLE_HASH_BUCKET)) {
setJoinPushDown(joinNode);

if (!(leftFragment.getPlanRoot() instanceof ExchangeNode)
&& !(rightFragment.getPlanRoot() instanceof ExchangeNode)) {
// 合并到一个 leftFragment 中
// 此时,left、right 节点在一个 be 节点上,直接本地执行
joinNode.setChild(0, leftFragment.getPlanRoot());
joinNode.setChild(1, rightFragment.getPlanRoot());
leftFragment.setPlanRoot(joinNode);
// 删除 rightFragment
leftFragment.addChildren(rightFragment.getChildren());
context.getFragments().remove(rightFragment);

context.getFragments().remove(leftFragment);
context.getFragments().add(leftFragment);

leftFragment.mergeQueryGlobalDicts(rightFragment.getQueryGlobalDicts());
return leftFragment;
}

if (leftFragment.getPlanRoot() instanceof ExchangeNode
&& !(rightFragment.getPlanRoot() instanceof ExchangeNode)) {
return computeShuffleHashBucketPlanFragment(context, rightFragment,
leftFragment, joinNode);
}
return computeShuffleHashBucketPlanFragment(context, leftFragment,
rightFragment, joinNode);
}

computeShuffleHashBucketPlanFragment 函数的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public PlanFragment computeShuffleHashBucketPlanFragment(ExecPlan context,
/** no exchange-node **/
PlanFragment stayFragment,
/** exchange-node **/
PlanFragment removeFragment,
JoinNode hashJoinNode) {
hashJoinNode.setPartitionExprs(removeFragment.getDataPartition().getPartitionExprs());
DataPartition dataPartition = new DataPartition(TPartitionType.HASH_PARTITIONED,
removeFragment.getDataPartition().getPartitionExprs());
removeFragment.getChild(0).setOutputPartition(dataPartition);

context.getFragments().remove(removeFragment);
context.getFragments().remove(stayFragment);

context.getFragments().add(stayFragment);
stayFragment.setPlanRoot(hashJoinNode);
stayFragment.addChildren(removeFragment.getChildren());
stayFragment.mergeQueryGlobalDicts(removeFragment.getQueryGlobalDicts());
return stayFragment;
}

JoinNode.DistributionMode.LOCAL_HASH_BUCKET

LOCAL_HASH_BUCKET 和 SHUFFLE_HASH_BUCKET 类似,只是分区方式不同。

visitPhysicalDistribution

用于生成新的 Fragment,这个 Fragment 子树的第数据源节点是 ExchangeNode,而不是 OlapScanNode,那么就需要在 inputFragment 和 newFragment 之间确定数据分区方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public PlanFragment visitPhysicalDistribution(OptExpression optExpr, ExecPlan context) {
PlanFragment inputFragment = visit(optExpr.inputAt(0), context);
PhysicalDistributionOperator distribution = (PhysicalDistributionOperator) optExpr.getOp();

ExchangeNode exchangeNode = new ExchangeNode(context.getNextNodeId(),
inputFragment.getPlanRoot(), distribution.getDistributionSpec().getType());

DataPartition dataPartition;
if (DistributionSpec.DistributionType.GATHER.equals(
distribution.getDistributionSpec().getType())) {
exchangeNode.setNumInstances(1);
dataPartition = DataPartition.UNPARTITIONED;
GatherDistributionSpec spec = (GatherDistributionSpec)
distribution.getDistributionSpec();
if (spec.hasLimit()) {
exchangeNode.setLimit(spec.getLimit());
}
} else if (DistributionSpec.DistributionType.BROADCAST.equals(
distribution.getDistributionSpec().getType())) {
exchangeNode.setNumInstances(inputFragment.getPlanRoot().getNumInstances());
dataPartition = DataPartition.UNPARTITIONED;
} else if (DistributionSpec.DistributionType.SHUFFLE.equals(
distribution.getDistributionSpec().getType())) {
exchangeNode.setNumInstances(inputFragment.getPlanRoot().getNumInstances());
List<ColumnRefOperator> partitionColumns = getShuffleColumns(
(HashDistributionSpec) distribution.getDistributionSpec());
List<Expr> distributeExpressions = partitionColumns.stream().map(
e -> ScalarOperatorToExpr.buildExecExpression(
e, new ScalarOperatorToExpr.FormatterContext(context.getColRefToExpr())))
.collect(Collectors.toList());
dataPartition = DataPartition.hashPartitioned(distributeExpressions);
} else {
throw new StarRocksPlannerException("Unsupport exchange type : "
+ distribution.getDistributionSpec().getType(), INTERNAL_ERROR);
}
exchangeNode.setDataPartition(dataPartition);

// 生成新的 Fragment
PlanFragment fragment = new PlanFragment(
context.getNextFragmentId(), exchangeNode, dataPartition);
fragment.setQueryGlobalDicts(distribution.getGlobalDicts());
inputFragment.setDestination(exchangeNode);
// 设置输入的 fragment 的输出分区方式
inputFragment.setOutputPartition(dataPartition);

context.getFragments().add(fragment);
return fragment;
}

finalizeFragments

Fragment tree 构建完毕,在 finalizeFragments 函数中为每个 Fragment 设置DataSink,并翻转fragments,使 fragments[0] 是root-fragment。

1
2
3
4
5
6
7
private static ExecPlan finalizeFragments(ExecPlan execPlan, TResultSinkType resultSinkType) {
List<PlanFragment> fragments = execPlan.getFragments();
for (PlanFragment fragment : fragments) {
fragment.createDataSink(resultSinkType);
}
Collections.reverse(fragments);
}

createDataSink

给每个 Fragment 生成一个 DataSink,和该 Fragment 的 DestFragment 的 ExchangeNode 配对

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void createDataSink(TResultSinkType resultSinkType) {
if (sink != null) {
return;
}
if (destNode != null) {
// we're streaming to an exchange node
DataStreamSink streamSink = new DataStreamSink(destNode.getId());
streamSink.setPartition(outputPartition);
streamSink.setMerge(destNode.isMerge());
streamSink.setFragment(this);
sink = streamSink;
} else {
if (planRoot == null) {
// only output expr, no FROM clause
// "select 1 + 2"
return;
}
// add ResultSink
// we're streaming to an result sink
sink = new ResultSink(planRoot.getId(), resultSinkType);
}
}

到此,生成的是 PlanFragment, 下一步需要在 Coordinator 中将 PlanFragment 转化为 FragmentInstance 在 BE 中执行。