输入的 SQL 经过 Parser 后生成 AST 并最终转化为 Relation Tree, 基于 Relation 生成逻辑计划 logicalPlan,逻辑计划再经过优化器生成物理计划 PhysicalPlan,最终生成执行计划 ExecPlan。从 LogicalPlan 到 ExecPlan 的三个步骤如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public static ExecPlan createQueryPlan (Relation relation, ConnectContext session, TResultSinkType resultSinkType) { QueryRelation query = (QueryRelation) relation; List<String> colNames = query.getColumnOutputNames(); ColumnRefFactory columnRefFactory = new ColumnRefFactory (); LogicalPlan logicalPlan = new RelationTransformer ( columnRefFactory, session).transformWithSelectLimit(query); OptExpression optimizedPlan = new Optimizer ().optimize( session, logicalPlan.getRoot(), new PhysicalPropertySet (), new ColumnRefSet (logicalPlan.getOutputColumn()), columnRefFactory); return new PlanFragmentBuilder ().createPhysicalPlan( optimizedPlan, session, logicalPlan.getOutputColumn(), columnRefFactory, colNames, resultSinkType, !session.getSessionVariable().isSingleNodeExecPlan()); }
本节主要讲解 PlanFragmentBuilder 是何将物理计划 optimizedPlan 转化成执行计划 ExecPlan。
createPhysicalPlan 经过优化器后生成的物理计划 optimizedPlan 是由 OptExpression 构成的树。createPhysicalPlan 函数主要有两步:
createOutputFragment 函数将 OptExpression 转化为 PlanNode
一个 Fragment 包含由 PlanNode 组成的子树,每个 Fragment.PlanNode Tree 的叶结点是 ScanNode 或者 ExchangeNode:ScanNode 用于从 BE 节点存储层读取数据,ExchangeNode 用于接受其他的 Fragments 的输出。 生成的 Fragments 保存在 ExecPlan.fragments 中
finalizeFragments 函数为生成的 ExecPlan.fragments 中的每个 Fragment 都分配一个 DataSink,
DataSink 和 ExchangeNode 配对,即 DataSink 是数据发送端,ExchangeNode 是数据接收端,这样 Fragments 才串成完整的 tree,形成 MPP 架构。
createPhysicalPlan 函数逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static ExecPlan createPhysicalPlan (OptExpression plan, ConnectContext connectContext, List<ColumnRefOperator> outputColumns, ColumnRefFactory columnRefFactory, List<String> colNames, TResultSinkType resultSinkType, boolean hasOutputFragment) { ExecPlan execPlan = new ExecPlan (connectContext, colNames, plan, outputColumns); createOutputFragment( new PhysicalPlanTranslator (columnRefFactory).visit(plan, execPlan), execPlan, outputColumns, hasOutputFragment); execPlan.setPlanCount(plan.getPlanCount()); return finalizeFragments(execPlan, resultSinkType); }
下面以 OlapScanNode、JoinNode 为例进行说明如何生成 Fragments。
visitPhysicalOlapScan OptExpression Tree 采用的是后序递归遍历,因为会先遍历到叶节点,比如 OlapScanNode。
下面就需要根据 PhysicalOlapScanOperator 中的信息生成 OlapScanNode,并生成 ExecPlan.fragments 中的第一个 Fragment, 而 OlapScanNode 即该 Fragment 的 root node。
因为 OlapScanNode 最终是要在BE上执行的,因此要包含具体的存储信息的,比如要访问的列(tupleDescriptor)、要访问的 Tablets 位于哪些BE节点上,以及一些能下推到存储层的谓词等。
下面代码重点关注设置 tablet 信息部分,其余部分略去。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 public PlanFragment visitPhysicalOlapScan (OptExpression optExpr, ExecPlan context) { PhysicalOlapScanOperator node = (PhysicalOlapScanOperator) optExpr.getOp(); OlapTable referenceTable = (OlapTable) node.getTable(); context.getDescTbl().addReferencedTable(referenceTable); TupleDescriptor tupleDescriptor = context.getDescTbl().createTupleDescriptor(); tupleDescriptor.setTable(referenceTable); OlapScanNode scanNode = new OlapScanNode (context.getNextNodeId(), tupleDescriptor, "OlapScanNode" ); scanNode.setLimit(node.getLimit()); scanNode.computeStatistics(optExpr.getStatistics()); try { scanNode.updateScanInfo(node.getSelectedPartitionId(), node.getSelectedTabletId(), node.getSelectedIndexId()); long selectedIndexId = node.getSelectedIndexId(); long totalTabletsNum = 0 ; long localBeId = -1 ; List<Long> selectedNonEmptyPartitionIds = node.getSelectedPartitionId().stream().filter(p -> { List<Long> selectTabletIds = scanNode.getPartitionToScanTabletMap().get(p); return selectTabletIds != null && !selectTabletIds.isEmpty(); }).collect(Collectors.toList()); scanNode.setSelectedPartitionIds(selectedNonEmptyPartitionIds); for (Long partitionId : selectedNonEmptyPartitionIds) { List<Long> selectTabletIds = scanNode.getPartitionToScanTabletMap().get(partitionId); final Partition partition = referenceTable.getPartition(partitionId); final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId); Map<Long, Integer> tabletId2BucketSeq = Maps.newHashMap(); List<Long> allTabletIds = selectedTable.getTabletIdsInOrder(); for (int i = 0 ; i < allTabletIds.size(); i++) { tabletId2BucketSeq.put(allTabletIds.get(i), i); } scanNode.setTabletId2BucketSeq(tabletId2BucketSeq); List<Tablet> tablets = selectTabletIds.stream().map( selectedTable::getTablet).collect(Collectors.toList()); scanNode.addScanRangeLocations(partition, selectedTable, tablets, localBeId); totalTabletsNum += selectedTable.getTablets().size(); } scanNode.setTotalTabletsNum(totalTabletsNum); } catch (UserException e) { throw new StarRocksPlannerException ( "Build Exec OlapScanNode fail, scan info is invalid," + e.getMessage(), INTERNAL_ERROR); } context.getScanNodes().add(scanNode); PlanFragment fragment = new PlanFragment ( context.getNextFragmentId(), scanNode, DataPartition.RANDOM); fragment.setQueryGlobalDicts(node.getGlobalDicts()); context.getFragments().add(fragment); return fragment; }
addScanRangeLocationsp addScanRangeLocationsp 函数是为了记录 tablets 的所有可用副本在 BE 节点上的位置,为后续下发计算任务准备。
TScanRangeLocations 这里是用的 thrift protoc 定义的 TScanRangeLocations,
1 2 3 4 5 6 7 8 9 struct TScanRangeLocations { 1 : required PlanNodes.TScanRange scan_range 2 : list <TScanRangeLocation> locations } struct TScanRange { 4 : optional TInternalScanRange internal_scan_range }
TScanRangeLocations.scan_range 记录的本次 scan_range 的元信息,在 BE 中会根据这些元信息找到每个 tablet 中要访问的 rowsets。 1 2 3 4 5 6 7 8 9 10 11 12 struct TInternalScanRange { 1 : required list <Types.TNetworkAddress> hosts 2 : required string schema_hash 3 : required string version 4 : required string version_hash 5 : required Types.TTabletId tablet_id 6 : required string db_name 7 : optional list <TKeyRange> partition_column_ranges 8 : optional string index_name 9 : optional string table_name 10 : optional i64 partition_id }
TScanRangeLocations.locations 则记录的的是该 tablet 所有副本在BE上的位置 1 2 3 4 5 struct TScanRangeLocation { 1 : required Types.TNetworkAddress server 2 : optional i32 volume_id = -1 3 : optional i64 backend_id }
addScanRangeLocations 函数就是填充 TScanRangeLocations 对象,然后所有 tablets 的 scanRangeLocations 结构都保存到 result 中。这一步主要是为后面 Coordinator 中调度准备,完整代码详见 addScanRangeLocations 。
visitPhysicalHashJoin HashJoin 先生成左右两个子节点的 Fragment 作为输入,再来生成执行 HashJoin 的 Fragment。
1 2 3 4 5 public PlanFragment visitPhysicalHashJoin (OptExpression optExpr, ExecPlan context) { PlanFragment leftFragment = visit(optExpr.inputAt(0 ), context); PlanFragment rightFragment = visit(optExpr.inputAt(1 ), context); return visitPhysicalJoin(leftFragment, rightFragment, optExpr, context); }
visitPhysicalJoin 主要分为一下三个部分
1. DistributionMode 获取 PhysicalJoinOperator 中的信息,并传递给 HashJoinNode。 PhysicalJoinOperator 和它的两个子节点可能组合方式如下: 出于简洁目的,使用 PhysicalOlapScanOp (OlapScanNode)表征该子节点数据来自于本地,用 PhysicalOlapScanOp(ExchangeNode)表示数据来自于其他Fragment 的输出。
左右子节点都是 ExchangeNode,并且数据分布属性都是 DistributionSpec.DistributionType.SHUFFLE
左右子节点都不是 ExchangeNode,则数据可以直接通过 OlapScanNode 从本地获取
如果 isColocateJoin 函数为 true,即数据源分布属性是 HashDistributionDesc.SourceType.LOCAL ,则是不需要 shuffle,因为在 Coordinator 中已经完成了 Shuffle,BE 中计算时能直接在 bucket 层次进行 join。
这个场景一般是 join-key 就是 table 的 key-column,这样构建 hashmap 的数据本身就是正交的,无需 shuffle 来达到正交的目标。
1 2 3 4 5 6 7 8 9 10 11 12 private boolean isColocateJoin (OptExpression optExpression) { return optExpression.getRequiredProperties().stream().allMatch( physicalPropertySet -> { if (!physicalPropertySet.getDistributionProperty().isShuffle()) { return false ; } HashDistributionDesc.SourceType hashSourceType = ((HashDistributionSpec) (physicalPropertySet.getDistributionProperty().getSpec())) .getHashDistributionDesc().getSourceType(); return hashSourceType.equals(HashDistributionDesc.SourceType.LOCAL); }); }
如果 isColocateJoin 函数为 false, 判断 isShuffleJoin 函数是否为 true。判断 PhysicalJoinOperator 的数据源分布是否需要 shuffle。这个场景一般是 join-key 包含了 value-column,需要在 BE 节点上对 value-column 进行 shuffle 得到正交的 join-key 数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public boolean isShuffleJoin (OptExpression optExpression) { return optExpression.getRequiredProperties().stream().allMatch( physicalPropertySet -> { if (!physicalPropertySet.getDistributionProperty().isShuffle()) { return false ; } HashDistributionDesc.SourceType hashSourceType = ((HashDistributionSpec) (physicalPropertySet.getDistributionProperty().getSpec())) .getHashDistributionDesc().getSourceType(); return hashSourceType.equals(HashDistributionDesc.SourceType.SHUFFLE_JOIN) || hashSourceType.equals(HashDistributionDesc.SourceType.SHUFFLE_ENFORCE) || hashSourceType.equals(HashDistributionDesc.SourceType.SHUFFLE_AGG); }); }
右子节点是 ExchangeNode 且数据源分布是 DistributionSpec.DistributionType.BROADCAST
此时不关注左子节点是 OlapScanNode or ExchangeNode 以及是啥数据源分布,因为 BE 在执行时是将右表的数据全部发送到左表。
other
TODO : 优化器何时会使得 isShuffleJoin 为 true
这部分代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private PlanFragment visitPhysicalJoin (PlanFragment leftFragment, PlanFragment rightFragment, OptExpression optExpr, ExecPlan context) { PhysicalJoinOperator node = (PhysicalJoinOperator) optExpr.getOp(); ColumnRefSet leftChildColumns = optExpr.inputAt(0 ).getLogicalProperty().getOutputColumns(); ColumnRefSet rightChildColumns = optExpr.inputAt(1 ).getLogicalProperty().getOutputColumns(); JoinNode.DistributionMode distributionMode; if (isExchangeWithDistributionType( leftFragmentPlanRoot, DistributionSpec.DistributionType.SHUFFLE) && isExchangeWithDistributionType( rightFragmentPlanRoot, DistributionSpec.DistributionType.SHUFFLE)) { distributionMode = JoinNode.DistributionMode.PARTITIONED; } else if (isExchangeWithDistributionType( rightFragmentPlanRoot, DistributionSpec.DistributionType.BROADCAST)) { distributionMode = JoinNode.DistributionMode.BROADCAST; } else if (!(leftFragmentPlanRoot instanceof ExchangeNode) && !(rightFragmentPlanRoot instanceof ExchangeNode)) { if (isColocateJoin(optExpr)) { distributionMode = HashJoinNode.DistributionMode.COLOCATE; } else if (isShuffleJoin(optExpr)) { distributionMode = JoinNode.DistributionMode.SHUFFLE_HASH_BUCKET; } else { distributionMode = JoinNode.DistributionMode.COLOCATE; } } else if (isShuffleJoin(optExpr)) { distributionMode = JoinNode.DistributionMode.SHUFFLE_HASH_BUCKET; } else { distributionMode = JoinNode.DistributionMode.LOCAL_HASH_BUCKET; } }
2. JoinNode 根据 PhysicalHashJoinOperator 信息生生成 JoinNode
1 2 3 4 5 6 7 8 9 10 11 12 13 14 JoinNode joinNode; if (node instanceof PhysicalHashJoinOperator) { joinNode = new HashJoinNode ( context.getNextNodeId(), leftFragment.getPlanRoot(), rightFragment.getPlanRoot(), joinOperator, eqJoinConjuncts, otherJoinConjuncts); } else if (node instanceof PhysicalMergeJoinOperator) { joinNode = new MergeJoinNode ( context.getNextNodeId(), leftFragment.getPlanRoot(), rightFragment.getPlanRoot(), joinOperator, eqJoinConjuncts, otherJoinConjuncts); } else { throw new StarRocksPlannerException ("unknown join operator: " + node, INTERNAL_ERROR); }
3. JoinFragmentBuilder 下面就是要根据不同 JoinNode.DistributionMode 将 JoinNode 转化为 PlanFragment。
JoinNode.DistributionMode.BROADCAST 此时 rightFragment 的根节点是 ExchangeNode,并且数据源分布是 JoinNode.DistributionMode.BROADCAST。
在BE端生成 Pipeline 时,只会产生一个 HashJoinBuildOperator,即所有的 PipeleDrivers 共享一个 HashJoinBuildOperator。
1 2 3 4 5 6 7 8 9 10 11 HashJoinerPtr create_builder (int driver_sequence) { if (_is_broadcast()) { driver_sequence = BROADCAST_BUILD_DRIVER_SEQUENCE; } if (!_builder_map[driver_sequence]) { _builder_map[driver_sequence] = std::make_shared <HashJoiner>(_param, _read_only_probers); } return _builder_map[driver_sequence]; }
在BE端执行时,HashJoinBuildOperator 中的数据全部发送给左表。因此 右表的变成左表的依赖 ,因此需要将右表变成左表的子节点,将左表的 fragment 作为输出返回。
注意 :context.getFragments() 中包含的是所有独立的 Fragment 子树,而 rightFragment 的根节点是 ExchangeNode,说明 rightFragment 对应生产者 Fragment 已经是一个独立的 Fragment subtree,因此需要将 leftFragment、rightFragment 从 context.getFragments() 中移除,再将 leftFragment 作为一个新的 Fragment 子树加入 context.getFragments()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 if (distributionMode.equals(JoinNode.DistributionMode.BROADCAST)) { setJoinPushDown(joinNode); rightFragment.getPlanRoot().setFragment(leftFragment); context.getFragments().remove(rightFragment); context.getFragments().remove(leftFragment); context.getFragments().add(leftFragment); leftFragment.setPlanRoot(joinNode); leftFragment.addChild(rightFragment.getChild(0 )); leftFragment.mergeQueryGlobalDicts(rightFragment.getQueryGlobalDicts()); return leftFragment; }
JoinNode.DistributionMode.PARTITIONED 此时,左右fragments的根节点都是 ExchangeNode,且都需要进行 Shuffle。那么就需新建个 joinFragment,并将 rightFragment 和 rightFragment 作为子节点。在 BE 端执行时,leftFragment、rightFragment 通过 ExchangeSink 将数据发送到啊 ExchangeNode,在 JoinFragment 中进行 join 操作。
此时的数据数据分区方式是 TPartitionType.HASH_PARTITIONED,BE 端会根据分区方式选择计算hash值的函数,主要是区分于 TPartitionType.BUCKET_SHUFFLE_HASH_PARTITIONED。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 if (distributionMode.equals(JoinNode.DistributionMode.PARTITIONED)) { DataPartition lhsJoinPartition = new DataPartition (TPartitionType.HASH_PARTITIONED, leftFragment.getDataPartition().getPartitionExprs()); DataPartition rhsJoinPartition = new DataPartition (TPartitionType.HASH_PARTITIONED, rightFragment.getDataPartition().getPartitionExprs()); leftFragment.getChild(0 ).setOutputPartition(lhsJoinPartition); rightFragment.getChild(0 ).setOutputPartition(rhsJoinPartition); context.getFragments().remove(leftFragment); context.getFragments().remove(rightFragment); PlanFragment joinFragment = new PlanFragment (context.getNextFragmentId(), joinNode, lhsJoinPartition); joinFragment.addChild(leftFragment.getChild(0 )); joinFragment.addChild(rightFragment.getChild(0 )); joinFragment.mergeQueryGlobalDicts(leftFragment.getQueryGlobalDicts()); joinFragment.mergeQueryGlobalDicts(rightFragment.getQueryGlobalDicts()); context.getFragments().add(joinFragment); return joinFragment; }
JoinNode.DistributionMode.COLOCATE 这时 leftFragment、rightFragment 的子节点数据源都是来自 OlapScanNode 而不是 ExchangeNode。
BE 端在执行时,就是能够在一个 Fragment 中从本地读取 build-table 和 probe-table 的数据直接进行 join 操作,而不是从 ExchangeNode 读取。此外 COLOCATE-Join 是直接按照 join-key 分布,那么就可以直接 bucket-join。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 if (distributionMode.equals(JoinNode.DistributionMode.COLOCATE)) { joinNode.setColocate(true , "" ); setJoinPushDown(joinNode); joinNode.setChild(0 , leftFragment.getPlanRoot()); joinNode.setChild(1 , rightFragment.getPlanRoot()); leftFragment.setPlanRoot(joinNode); leftFragment.addChildren(rightFragment.getChildren()); context.getFragments().remove(rightFragment); context.getFragments().remove(leftFragment); context.getFragments().add(leftFragment); leftFragment.mergeQueryGlobalDicts(rightFragment.getQueryGlobalDicts()); return leftFragment; }
JoinNode.DistributionMode.SHUFFLE_HASH_BUCKET 如果 join-key 不全是表的 key-column,包含了 value-column,那么就得使用 shuffle 将数据进行正交。
如果 build-table 和 probe-table 的数据源都不是 ExchangeNode,就不用生成新的 joinFragment,直接复用 leftFragment 即可。
如果其中有一个是 ExchangeNode,则删除该 ExchangeNode 对应的 fragment
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 if (distributionMode.equals(JoinNode.DistributionMode.SHUFFLE_HASH_BUCKET)) { setJoinPushDown(joinNode); if (!(leftFragment.getPlanRoot() instanceof ExchangeNode) && !(rightFragment.getPlanRoot() instanceof ExchangeNode)) { joinNode.setChild(0 , leftFragment.getPlanRoot()); joinNode.setChild(1 , rightFragment.getPlanRoot()); leftFragment.setPlanRoot(joinNode); leftFragment.addChildren(rightFragment.getChildren()); context.getFragments().remove(rightFragment); context.getFragments().remove(leftFragment); context.getFragments().add(leftFragment); leftFragment.mergeQueryGlobalDicts(rightFragment.getQueryGlobalDicts()); return leftFragment; } if (leftFragment.getPlanRoot() instanceof ExchangeNode && !(rightFragment.getPlanRoot() instanceof ExchangeNode)) { return computeShuffleHashBucketPlanFragment(context, rightFragment, leftFragment, joinNode); } return computeShuffleHashBucketPlanFragment(context, leftFragment, rightFragment, joinNode); }
computeShuffleHashBucketPlanFragment 函数的逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public PlanFragment computeShuffleHashBucketPlanFragment (ExecPlan context, PlanFragment stayFragment, PlanFragment removeFragment, JoinNode hashJoinNode) { hashJoinNode.setPartitionExprs(removeFragment.getDataPartition().getPartitionExprs()); DataPartition dataPartition = new DataPartition (TPartitionType.HASH_PARTITIONED, removeFragment.getDataPartition().getPartitionExprs()); removeFragment.getChild(0 ).setOutputPartition(dataPartition); context.getFragments().remove(removeFragment); context.getFragments().remove(stayFragment); context.getFragments().add(stayFragment); stayFragment.setPlanRoot(hashJoinNode); stayFragment.addChildren(removeFragment.getChildren()); stayFragment.mergeQueryGlobalDicts(removeFragment.getQueryGlobalDicts()); return stayFragment; }
JoinNode.DistributionMode.LOCAL_HASH_BUCKET LOCAL_HASH_BUCKET 和 SHUFFLE_HASH_BUCKET 类似,只是分区方式不同。
visitPhysicalDistribution 用于生成新的 Fragment,这个 Fragment 子树的第数据源节点是 ExchangeNode,而不是 OlapScanNode,那么就需要在 inputFragment 和 newFragment 之间确定数据分区方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public PlanFragment visitPhysicalDistribution (OptExpression optExpr, ExecPlan context) { PlanFragment inputFragment = visit(optExpr.inputAt(0 ), context); PhysicalDistributionOperator distribution = (PhysicalDistributionOperator) optExpr.getOp(); ExchangeNode exchangeNode = new ExchangeNode (context.getNextNodeId(), inputFragment.getPlanRoot(), distribution.getDistributionSpec().getType()); DataPartition dataPartition; if (DistributionSpec.DistributionType.GATHER.equals( distribution.getDistributionSpec().getType())) { exchangeNode.setNumInstances(1 ); dataPartition = DataPartition.UNPARTITIONED; GatherDistributionSpec spec = (GatherDistributionSpec) distribution.getDistributionSpec(); if (spec.hasLimit()) { exchangeNode.setLimit(spec.getLimit()); } } else if (DistributionSpec.DistributionType.BROADCAST.equals( distribution.getDistributionSpec().getType())) { exchangeNode.setNumInstances(inputFragment.getPlanRoot().getNumInstances()); dataPartition = DataPartition.UNPARTITIONED; } else if (DistributionSpec.DistributionType.SHUFFLE.equals( distribution.getDistributionSpec().getType())) { exchangeNode.setNumInstances(inputFragment.getPlanRoot().getNumInstances()); List<ColumnRefOperator> partitionColumns = getShuffleColumns( (HashDistributionSpec) distribution.getDistributionSpec()); List<Expr> distributeExpressions = partitionColumns.stream().map( e -> ScalarOperatorToExpr.buildExecExpression( e, new ScalarOperatorToExpr .FormatterContext(context.getColRefToExpr()))) .collect(Collectors.toList()); dataPartition = DataPartition.hashPartitioned(distributeExpressions); } else { throw new StarRocksPlannerException ("Unsupport exchange type : " + distribution.getDistributionSpec().getType(), INTERNAL_ERROR); } exchangeNode.setDataPartition(dataPartition); PlanFragment fragment = new PlanFragment ( context.getNextFragmentId(), exchangeNode, dataPartition); fragment.setQueryGlobalDicts(distribution.getGlobalDicts()); inputFragment.setDestination(exchangeNode); inputFragment.setOutputPartition(dataPartition); context.getFragments().add(fragment); return fragment; }
finalizeFragments Fragment tree 构建完毕,在 finalizeFragments 函数中为每个 Fragment 设置DataSink,并翻转fragments,使 fragments[0] 是root-fragment。
1 2 3 4 5 6 7 private static ExecPlan finalizeFragments (ExecPlan execPlan, TResultSinkType resultSinkType) { List<PlanFragment> fragments = execPlan.getFragments(); for (PlanFragment fragment : fragments) { fragment.createDataSink(resultSinkType); } Collections.reverse(fragments); }
createDataSink 给每个 Fragment 生成一个 DataSink,和该 Fragment 的 DestFragment 的 ExchangeNode 配对
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public void createDataSink (TResultSinkType resultSinkType) { if (sink != null ) { return ; } if (destNode != null ) { DataStreamSink streamSink = new DataStreamSink (destNode.getId()); streamSink.setPartition(outputPartition); streamSink.setMerge(destNode.isMerge()); streamSink.setFragment(this ); sink = streamSink; } else { if (planRoot == null ) { return ; } sink = new ResultSink (planRoot.getId(), resultSinkType); } }
到此,生成的是 PlanFragment, 下一步需要在 Coordinator 中将 PlanFragment 转化为 FragmentInstance 在 BE 中执行。