PhysicalProperty PhysicalProperty 是个接口的,实现接口的类中比较重要是 DistributionProperty
DistributionSpec.DistributionType 表示 Operator 的输出数据分布
HashDistributionDesc.sourceType 表示 Operator 的数据来源
继承逻辑如下:
TaskContext TaskContext 作为优化一个 Group 时的上下文信息,主要有以下五个成员变量
requiredProperty: 即父节点对当前 Grouo 所有 GroupExpressions 的输出必须满足的属性
requiredColumns: 父节点的输出要求当前 Group 输出的列
比如,
在 optimizeByCost 函数首次构造 TaskContext 对象,
requiredProperty 是空的
requiredColumns 是最终输出的列
在 EnforceAndCostTask.optimizeChildGroup 中对 ChildGroup 进行递归初始化时:
先简略看下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public TaskContext (OptimizerContext context, PhysicalPropertySet physicalPropertySet, ColumnRefSet requiredColumns, double cost) { this .optimizerContext = context; this .requiredProperty = physicalPropertySet; this .requiredColumns = requiredColumns; this .upperBoundCost = cost; this .allScanOperators = Collections.emptyList(); } private OptExpression optimizeByCost (ConnectContext connectContext, OptExpression logicOperatorTree, PhysicalPropertySet requiredProperty, ColumnRefSet requiredColumns) { TaskContext rootTaskContext = new TaskContext ( context, requiredProperty, requiredColumns.clone(), Double.MAX_VALUE); }
RequiredPropertyDeriver RequiredPropertyDeriver 是用于计算每个 Operator 对子节点输出需要满足的属性。
1 2 3 4 5 6 public List<List<PhysicalPropertySet>> getRequiredProps (GroupExpression groupExpression) { requiredProperties = Lists.newArrayList(); groupExpression.getOp().accept(this , new ExpressionContext (groupExpression)); deriveChildCTEProperty(groupExpression); return requiredProperties; }
visitOperator 没有实现 visitPhysicalxxx 函数的 Operator,就会进入 visitOperator 函数,即根据节点的入度(arity) 生成包含 arity 个 PhysicalPropertySet.EMPTY 的 PhysicalPropertySet,也就是对每个子节点输出属性都没有要求。
比如,PhysicalHashJoinOperator 实现了 visitPhysicalHashJoin 函数,则会进入下文流程来获取属性,而 OlapScanNodeOperator 没有实现 visitOlapScanNodeOperator 函数则进入 visitOperator 函数。
1 2 3 4 5 6 7 8 public Void visitOperator (Operator node, ExpressionContext context) { List<PhysicalPropertySet> requiredProps = new ArrayList <>(); for (int childIndex = 0 ; childIndex < context.arity(); ++childIndex) { requiredProps.add(PhysicalPropertySet.EMPTY); } requiredProperties.add(requiredProps); return null ; }
visitPhysicalHashJoin 当遇到一个 PhysicalHashJoinOperator,则会进入 visitPhysicalHashJoin 函数,生成对左右子节点输出所需的属性。
JOIN 的实现主要有两种:
BroadcastJOIN 将右子节点数据(Build-TABLE)的数据全部发送到左子节点(Probe-TABLE),在左子节点很丧构建 HashMap。这种实现对右表的行数有大小限制。
ShuffleJOIN 需要对子节点的数据进行Shuffle,根据 Shuffle 规则又可以细分。
最终选择哪种作为物理计划,根据右表的大小、以及生成的物理计划的代价进行抉择。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public Void visitPhysicalHashJoin (PhysicalHashJoinOperator node, ExpressionContext context) { PhysicalPropertySet rightBroadcastProperty = new PhysicalPropertySet ( new DistributionProperty (DistributionSpec.createReplicatedDistributionSpec())); requiredProperties.add(Lists.newArrayList( PhysicalPropertySet.EMPTY, rightBroadcastProperty)); JoinHelper joinHelper = JoinHelper.of(node, context.getChildOutputColumns(0 ), context.getChildOutputColumns(1 )); if (joinHelper.onlyBroadcast()) { return null ; } if (joinHelper.onlyShuffle()) { requiredProperties.clear(); } List<Integer> leftOnPredicateColumns = joinHelper.getLeftOnColumns(); List<Integer> rightOnPredicateColumns = joinHelper.getRightOnColumns(); if (leftOnPredicateColumns.isEmpty() || rightOnPredicateColumns.isEmpty()) { return null ; } requiredProperties.add(computeShuffleJoinRequiredProperties( requirementsFromParent, leftOnPredicateColumns, rightOnPredicateColumns)); return null ; }
PropertyDeriverBase.computeShuffleJoinRequiredProperties requiredFromParent 是 PhysicalHashJoinOperator 父节点对其输出需要满足的属性,leftShuffleColumns、rightShuffleColumns 是 JoinOperator 的两个子节点的 shuffle 列。 JoinOperator 需要在满足 requiredFromParent 的前提下,调整 shuffle 列的顺序,生成自己左右子节点的输出属性。
实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 protected static List<PhysicalPropertySet> computeShuffleJoinRequiredProperties (PhysicalPropertySet requiredFromParent, List<Integer> leftShuffleColumns, List<Integer> rightShuffleColumns) { Optional<HashDistributionDesc> requiredShuffleDescOptional = getShuffleJoinHashDistributionDesc(requiredFromParent); if (!requiredShuffleDescOptional.isPresent()) { return createShuffleJoinRequiredProperties( leftShuffleColumns, rightShuffleColumns); } else { List<Integer> requiredColumns = requiredShuffleDescOptional.get().getColumns(); boolean adjustBasedOnLeft = leftShuffleColumns.size() == requiredColumns.size() && leftShuffleColumns.containsAll(requiredColumns) && requiredColumns.containsAll(leftShuffleColumns); boolean adjustBasedOnRight = rightShuffleColumns.size() == requiredColumns.size() && rightShuffleColumns.containsAll(requiredColumns) && requiredColumns.containsAll(rightShuffleColumns); if (adjustBasedOnLeft || adjustBasedOnRight) { List<Integer> requiredLeft = Lists.newArrayList(); List<Integer> requiredRight = Lists.newArrayList(); for (Integer cid : requiredColumns) { int idx = adjustBasedOnLeft ? leftShuffleColumns.indexOf(cid) : rightShuffleColumns.indexOf(cid); requiredLeft.add(leftShuffleColumns.get(idx)); requiredRight.add(rightShuffleColumns.get(idx)); } return createShuffleJoinRequiredProperties( requiredLeft, requiredRight); } else { return createShuffleJoinRequiredProperties( leftShuffleColumns, rightShuffleColumns); } } }
PropertyDeriverBase.getShuffleJoinHashDistributionDesc getShuffleJoinHashDistributionDesc 函数有两个作用:
检测父节点是否要子节点输出为 SHUFFLE 方式。如果不是,则返回 Optional.empty(),即不强求子节点也必须有这个分布
否则,则返回父节点要求子节点满足的输出属性 requiredPropertySet
代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected static Optional<HashDistributionDesc> getShuffleJoinHashDistributionDesc (PhysicalPropertySet requiredPropertySet) { if (!requiredPropertySet.getDistributionProperty().isShuffle()) { return Optional.empty(); } HashDistributionDesc requireDistributionDesc = ((HashDistributionSpec) requiredPropertySet.getDistributionProperty() .getSpec()).getHashDistributionDesc(); if (!HashDistributionDesc.SourceType.SHUFFLE_JOIN .equals(requireDistributionDesc.getSourceType())) { return Optional.empty(); } return Optional.of(requireDistributionDesc); }
PropertyDeriverBase.createShuffleJoinRequiredProperties 基于输入参数 <leftColumns, rightColumns> 生成 List<PhysicalPropertySet>。这里的 HashDistributionDesc.SourceType.SHUFFLE_JOIN 表示这个 hash 分布来源是 JOIN。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private static List<PhysicalPropertySet>createShuffleJoinRequiredProperties (List<Integer> leftColumns, List<Integer> rightColumns) { HashDistributionSpec leftDistribution = DistributionSpec.createHashDistributionSpec(new HashDistributionDesc ( leftColumns, HashDistributionDesc.SourceType.SHUFFLE_JOIN)); HashDistributionSpec rightDistribution = DistributionSpec.createHashDistributionSpec(new HashDistributionDesc ( rightColumns, HashDistributionDesc.SourceType.SHUFFLE_JOIN)); PhysicalPropertySet leftRequiredPropertySet = new PhysicalPropertySet (new DistributionProperty (leftDistribution)); PhysicalPropertySet rightRequiredPropertySet = new PhysicalPropertySet (new DistributionProperty (rightDistribution)); return Lists.newArrayList(leftRequiredPropertySet, rightRequiredPropertySet); }
EnforceAndCostTask EnforceAndCostTask 是用于优化一个 groupExpression 所属 Group 及其输入:从叶结点向根节点方向优化,bottom-up。
1 2 3 4 EnforceAndCostTask(TaskContext context, GroupExpression expression) { super (context); this .groupExpression = expression; }
initRequiredProperties initRequiredProperties 函数中通过调用 RequiredPropertyDeriver.getRequiredProps 函数来获得 GroupExpression.op 要求子节点需要满足的输出属性,记录记录在 childrenRequiredPropertiesList。
1 2 3 4 5 6 7 8 9 10 11 12 13 private void initRequiredProperties () { if (curChildIndex != -1 ) { return ; } localCost = 0 ; curTotalCost = 0 ; childrenRequiredPropertiesList = new RequiredPropertyDeriver (context).getRequiredProps(groupExpression); curChildIndex = 0 ; }
optimizeChildGroup optimizeChildGroup 函数对当前 groupExpression 的所有输入(即 ChildernGroup) 先优化,等输入都优化完毕,就能统计出从叶结点到当前 Group 的最佳路径了。
inputProperty 是对当前 group 对 childGroup 输出属性的要求
newUpperBound 是以 childGroup 为根节点的子树中,cost 上界
借助 pushTask 函数实现保护现场、现场恢复。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void optimizeChildGroup (PhysicalPropertySet inputProperty, Group childGroup) { pushTask((EnforceAndCostTask) clone()); double newUpperBound = context.getUpperBoundCost() - curTotalCost; TaskContext taskContext = new TaskContext ( context.getOptimizerContext(), inputProperty, context.getRequiredColumns(), newUpperBound); pushTask(new OptimizeGroupTask (taskContext, childGroup)); }
OutputPropertyDeriver OutputPropertyDeriver 类用于生成当前节点的输出属性,ChildOutputPropertyGuarantor 用于生成子节点的输出属性
先来看没有输入的节点,比如 OlapScanNodeOperator,在执行 EnforceAndCostTask.execute 函数时会跳过 optimizeChildGroup 函数和 ChildOutputPropertyGuarantor,进入 OutputPropertyDeriver 中,来满足父节点的输出属性要求。
context.getRequiredProperty() 是父节点所需要的属性
outputProperty 是本节点输出的属性
childrenOutputProperties 是子节点属性的属性
这部分代码注释如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 if (curChildIndex == 0 && prevChildIndex == -1 ) { localCost = CostModel.calculateCost(groupExpression); curTotalCost += localCost; } if (curChildIndex == groupExpression.getInputs().size()) { ChildOutputPropertyGuarantor childOutputPropertyGuarantor = new ChildOutputPropertyGuarantor ( context, groupExpression, context.getRequiredProperty(), childrenBestExprList, childrenRequiredProperties, childrenOutputProperties, curTotalCost); curTotalCost = childOutputPropertyGuarantor.enforceLegalChildOutputProperty(); if (curTotalCost > context.getUpperBoundCost()) { break ; } if (!computeCurrentGroupStatistics()) { return ; } OutputPropertyDeriver outputPropertyDeriver = new OutputPropertyDeriver (groupExpression, context.getRequiredProperty(), childrenOutputProperties); PhysicalPropertySet outputProperty = outputPropertyDeriver.getOutputProperty(); recordCostsAndEnforce(outputProperty, childrenRequiredProperties); }
OutputPropertyDeriver.visitPhysicalOlapScan PhysicalOlapScanOperator 是个递归基,没有输入属性、只有输出属性,输出属性集中只有个分布属性 HashDistributionSpec,其中 HashDistributionDesc.SourceType.LOCAL 表示数据来源于本地。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public PhysicalPropertySet visitPhysicalOlapScan (PhysicalOlapScanOperator node, ExpressionContext context) { HashDistributionSpec olapDistributionSpec = node.getDistributionSpec(); DistributionSpec.PropertyInfo physicalPropertyInfo = new DistributionSpec .PropertyInfo(); physicalPropertyInfo.tableId = node.getTable().getId(); physicalPropertyInfo.partitionIds = node.getSelectedPartitionId(); return createPropertySetByDistribution(new HashDistributionSpec ( new HashDistributionDesc ( olapDistributionSpec.getShuffleColumns(), HashDistributionDesc.SourceType.LOCAL), physicalPropertyInfo)); }
EnforceAndCostTask.recordCostsAndEnforce outputProperty 是当前节点输出属性,context.getRequiredPropert() 是父节点对当前节点的输出属性要求,此时需要进一步校验 outputProperty –> requiredPropert 链路是否满足,如果不满足则施加一个 enforcer: outputProperty –> enforcer –> requiredPropert 。
recordCostsAndEnforce 函数的输入是当前节点输出属性 outputProperty、和子节点输入属性 childrenOutputProperties,函数执行完,一个 Group 的及其子 Group 的优化就结束了,并将最低成本记录在 curTotalCost 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private void recordCostsAndEnforce (PhysicalPropertySet outputProperty, List<PhysicalPropertySet> childrenOutputProperties) { curTotalCost -= localCost; localCost = CostModel.calculateCostWithChildrenOutProperty( groupExpression, childrenOutputProperties); curTotalCost += localCost; setSatisfiedPropertyWithCost(outputProperty, childrenOutputProperties); PhysicalPropertySet requiredProperty = context.getRequiredProperty(); if (!outputProperty.isSatisfy(requiredProperty)) { PhysicalPropertySet enforcedProperty = enforceProperty(outputProperty, requiredProperty); if (!enforcedProperty.equals(requiredProperty)) { setPropertyWithCost( groupExpression.getGroup().getBestExpression(enforcedProperty), enforcedProperty, requiredProperty, Lists.newArrayList(outputProperty)); } } else { if (!outputProperty.equals(requiredProperty)) { setPropertyWithCost( groupExpression, outputProperty, requiredProperty, childrenOutputProperties); } } if (curTotalCost < context.getUpperBoundCost()) { context.setUpperBoundCost(curTotalCost); } }
EnforceAndCostTask.setSatisfiedPropertyWithCost setSatisfiedPropertyWithCost 函数表示当前 groupExpression 的输出属性 outputProperty 父节点要求的属性 requiredProperty 完全一样了。因此,调用 setPropertyWithCost 函数时,直接使用 outputProperty 代替 requiredProperty。
1 2 3 4 5 6 7 8 9 10 11 12 13 private void setSatisfiedPropertyWithCost ( PhysicalPropertySet outputProperty, List<PhysicalPropertySet> childrenOutputProperties) { setPropertyWithCost( groupExpression, outputProperty, childrenOutputProperties); if (outputProperty.getCteProperty().isEmpty()) { setPropertyWithCost(groupExpression, outputProperty, PhysicalPropertySet.EMPTY, childrenOutputProperties); } }
EnforceAndCostTask.setPropertyWithCost Group 与 GroupExpression:一个 Group 中包含了所有逻辑上等价的 GroupExpression(即输出一样),并记录了 lowestCost 的 GroupExpression,而 GroupExpression 输入输出都是 Group
GroupExpression.lowestCostTable: 记录是得到 outputProperty 所需的成小成本 curTotalCost 以及对应的输入属性 childrenOutputProperties
因此,如果本次更新 childrenOutputProperties –> outputProperty 所需的成本 curTotalCost 更低,则更新 GroupExpression.lowestCostTable 中的记录(如果原本没有则插入)
GroupExpression.updatePropertyWithCost 函数更新。
GroupExpression.outputPropertyMap: 记录的是得到最佳路径时,满足 requiredPropertySet 的输出属性 outputProperty
由于 updatePropertyWithCost 返回 true 时,才会更新 GroupExpression.outputPropertyMap,因此两个函数联动起来就是输入到输出的最佳路径信息 :
Group 中最佳表达式:this.groupExpression
最低的成本:curTotalCost
此时的输入属性:childrenOutputProperties
此时的输出属性:outputProperty
父节点要求属性:requiredProperty
GroupExpression.setOutputPropertySatisfyRequiredProperty 更新。
Group.lowestCostExpressions: 记录的是 Group 所有等价的 GroupExpression 中,在满足 requiredProperty 属性要求的前提下,代价最低的 GroupExprssion 是哪个
key: requiredProperty
value: <cost, expression> 最低的代价与对应的 GroupExpression
Group.setBestExpression 函数更新
因此,当优化结束,父节点可以通过 GroupExpression.getOutputProperty(requiredProperty) 函数得到最佳输入属性 outputProperty,再递归到 childrenOutputProperties,… 即可遍历所有最佳表达式。
1 2 3 4 5 6 7 8 9 10 11 12 private void setPropertyWithCost (GroupExpression groupExpression, PhysicalPropertySet outputProperty, PhysicalPropertySet requiredProperty, List<PhysicalPropertySet> childrenOutputProperties) { if (groupExpression.updatePropertyWithCost( requiredProperty, childrenOutputProperties, curTotalCost)) { groupExpression.setOutputPropertySatisfyRequiredProperty( outputProperty, requiredProperty); } this .groupExpression.getGroup().setBestExpression( groupExpression, curTotalCost, requiredProperty); }
GroupExpression.updatePropertyWithCost 当发现一个满足条件的 outputProperties,并且本次 new_cost 比之前的 old_cost 更小则更新 GroupExpression.lowestCostTable 记录(如果没有则插入),它是在 {outputProperties, <cost, inputProperties>} 之间建立映射关系。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public boolean updatePropertyWithCost (PhysicalPropertySet outputProperties, List<PhysicalPropertySet> inputProperties, double cost) { if (lowestCostTable.containsKey(outputProperties)) { if (lowestCostTable.get(outputProperties).first > cost) { lowestCostTable.put( outputProperties, new Pair <>(cost, inputProperties)); return true ; } } else { lowestCostTable.put( outputProperties, new Pair <>(cost, inputProperties)); return true ; } return false ; }
GroupExpression.setOutputPropertySatisfyRequiredProperty setOutputPropertySatisfyRequiredProperty 是为了记录在父节点 parent 所需属性 requiredPropertySet 和本节点输出属性之间 outputPropertySet 的映射关系。
当 parent 执行 EnforceAndCostTask 函数时,会通过 GroupExpression.getOutputProperty 函数来获取子节点的输出属性,后面会仔细讲解。
1 2 3 4 5 6 7 8 9 10 public void setOutputPropertySatisfyRequiredProperty (PhysicalPropertySet outputPropertySet, PhysicalPropertySet requiredPropertySet) { this .outputPropertyMap.put(requiredPropertySet, outputPropertySet); } public PhysicalPropertySet getOutputProperty (PhysicalPropertySet requiredPropertySet) { PhysicalPropertySet outputProperty = outputPropertyMap.get(requiredPropertySet); Preconditions.checkState(outputProperty != null ); return outputProperty; }
Group.setBestExpression Group.lowestCostExpressions 的映射关系是 {RequeiredProperty, {cost, groupExpression}},即 key 是父节点所需的属性,value 是代价最低的 groupExpression 及其 cost。Group.setBestExpression 函数实时更新代价最低的 Group.lowestCostExpressions
那么最终根节点的 group 获取代价最低的 groupExpression 时就可以通过 Group.getBestExpression 函数获得。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void setBestExpression (GroupExpression expression, double cost, PhysicalPropertySet physicalPropertySet) { if (lowestCostExpressions.containsKey(physicalPropertySet)) { if (lowestCostExpressions.get(physicalPropertySet).first > cost) { lowestCostExpressions.put( physicalPropertySet, new Pair <>(cost, expression)); } } else { lowestCostExpressions.put( physicalPropertySet, new Pair <>(cost, expression)); } } public GroupExpression getBestExpression (PhysicalPropertySet physicalPropertySet) { if (hasBestExpression(physicalPropertySet)) { return lowestCostExpressions.get(physicalPropertySet).second; } return null ; }
EnforceAndCostTask.enforceDistribute enforceProperty 函数会检测 outputPropertySet 中的 DistributionProperty、SortProperty 是否满足 requiredPropertySet。这里以 DistributionProperty 为例阐述 enforceDistribute 函数怎么添加 Enforcer。
PhysicalPropertySet.setDistributionProperty 函数表示只更改 DistributionProperty,其他属性仍然保持不变,如此 newOutputProperty 的分布属性就和父节点的一致。
然后 DistributionProperty.appendEnforcers 函数将当前 group(即 groupExpression.getGroup())作为输入生成一个新的 GroupExpression 对象 enforcer,enforcer 类似转换器,实现 oldOutputProperty -> newOutputProperty -> requiredProperty 的转变。
最后,由于新生成的 enforcer 还没有所属的 Group,updateCostWithEnforcer 函数中将 enforcer 的所属 group 也设置为 groupExpression.getGroup()
整体逻辑如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 private PhysicalPropertySet enforceDistribute (PhysicalPropertySet oldOutputProperty) { PhysicalPropertySet newOutputProperty = oldOutputProperty.copy(); newOutputProperty.setDistributionProperty( context.getRequiredProperty().getDistributionProperty()); GroupExpression enforcer = context.getRequiredProperty() .getDistributionProperty() .appendEnforcers(groupExpression.getGroup()); updateCostWithEnforcer(enforcer, oldOutputProperty, newOutputProperty); return newOutputProperty; }
DistributionProperty.appendEnforcers 生成新的 GroupExpression 对象 enforer,其中包含的 Operator 是 PhysicalDistributionOperator 实现所需的数据分布。
1 2 3 4 public GroupExpression appendEnforcers (Group child) { return new GroupExpression ( new PhysicalDistributionOperator (spec), Lists.newArrayList(child)); }
EnforceAndCostTask.updateCostWithEnforcer updateCostWithEnforcer 函数设置 enforcer 所属的 Group,串联起 {oldOutputProperty, newOutputProperty} 之间的联系,并重新计算 enforcer 之后的代价。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private void updateCostWithEnforcer (GroupExpression enforcer, PhysicalPropertySet oldOutputProperty, PhysicalPropertySet newOutputProperty) { context.getOptimizerContext().getMemo(). insertEnforceExpression(enforcer, groupExpression.getGroup()); curTotalCost += CostModel.calculateCost(enforcer); if (enforcer.updatePropertyWithCost( newOutputProperty, Lists.newArrayList(oldOutputProperty), curTotalCost)) { enforcer.setOutputPropertySatisfyRequiredProperty( newOutputProperty, newOutputProperty); } groupExpression.getGroup().setBestExpression( enforcer, curTotalCost, newOutputProperty); }
Memo.insertEnforceExpression 1 2 3 public void insertEnforceExpression (GroupExpression groupExpression, Group targetGroup) { groupExpression.setGroup(targetGroup); }
ChildOutputPropertyGuarantor EnforceAndCostTask.execute 上面的视角是以 PhysicalOlapScanOperator 为例讲解了优化一个 Group 的过程,因为 OlapScanNode 作为物理计划树中的叶结点,没有输入,因此是 EnforceAndCostTask.execute 函数的递归基,下面以 PhysicalHashJoinOperator 为例讲解一个子树优化的过程。
HashJoinNode 的左右子节点是 PhysicalOlapScanOperator or PhysicalDistributionOperator,并且 optimizeChildGroup 函数已递归两次将左右子节点都优化完,因此
childGroup.getBestExpression() 获得的 childBestExpr 是在上面的 Group.setBestExpression 函数中设置的 如果 childBestExpr == null 表示当前 childGroup 要么已经被裁剪了,要么还没优化,具体哪种取决于 prevChildIndex: prevChildIndex 记录正在优化的子节点索引,如果 prevChildIndex < curChildIndex 则表示 childGroup 还没优化,则进入 optimizeChildGroup 函数递归完
childBestExpr.getOutputProperty() 获得的是也是上面 setOutputPropertySatisfyRequiredProperty 函数设置的最佳 GroupExpression 对象
这部分代码逻辑及其解释如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 for (; curChildIndex < groupExpression.getInputs().size(); curChildIndex++) { PhysicalPropertySet childRequiredProperty = childrenRequiredProperties.get(curChildIndex); Group childGroup = groupExpression.getInputs().get(curChildIndex); GroupExpression childBestExpr = childGroup.getBestExpression(childRequiredProperty); if (childBestExpr == null && prevChildIndex >= curChildIndex) { break ; } if (childBestExpr == null ) { prevChildIndex = curChildIndex; optimizeChildGroup(childRequiredProperty, childGroup); return ; } childrenBestExprList.add(childBestExpr); PhysicalPropertySet childOutputProperty = childBestExpr.getOutputProperty(childRequiredProperty); childrenOutputProperties.add(childOutputProperty); childrenRequiredProperties.set(curChildIndex, childOutputProperty); if (!canGenerateOneStageAgg(childBestExpr)) { break ; } if (!checkBroadcastRowCountLimit(childRequiredProperty, childBestExpr)) { break ; } curTotalCost += childBestExpr.getCost(childRequiredProperty); if (curTotalCost > context.getUpperBoundCost()) { break ; } }
enforceLegalChildOutputProperty 获得 ChildernGroup 信息后,就通过 ChildOutputPropertyGuarantor 来校验属性是否符合要求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 if (curChildIndex == groupExpression.getInputs().size()) { ChildOutputPropertyGuarantor childOutputPropertyGuarantor = new ChildOutputPropertyGuarantor ( context, groupExpression, context.getRequiredProperty(), childrenBestExprList, childrenRequiredProperties, childrenOutputProperties, curTotalCost); curTotalCost = childOutputPropertyGuarantor.enforceLegalChildOutputProperty(); } public double enforceLegalChildOutputProperty () { groupExpression.getOp().accept(this , new ExpressionContext (groupExpression)); return this .curTotalCost; }
visitPhysicalJoin visitPhysicalJoin 函数完整的代码很长不全贴了,分段进行讲解。
BRANCH-0: BroadcastJoin 如果在 checkBroadcastRowCountLimit 函数中,BroadcastJoin 没有被裁剪,那么也会生成一个 BroadcastJoin PropertySet,至于最终是否被选择,就得看 BroadcastJoin 和 ShuffleJOIN 的 cost 谁更低。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public Void visitPhysicalJoin (PhysicalJoinOperator node, ExpressionContext context) { GroupExpression leftChild = childrenBestExprList.get(0 ); GroupExpression rightChild = childrenBestExprList.get(1 ); PhysicalPropertySet leftChildOutputProperty = childrenOutputProperties.get(0 ); PhysicalPropertySet rightChildOutputProperty = childrenOutputProperties.get(1 ); DistributionProperty rightDistribute = rightChildOutputProperty.getDistributionProperty(); if (rightDistribute.isBroadcast() || rightDistribute.isGather()) { return visitOperator(node, context); } }
ShuffleJoin 此时,左右两个子节点的 输出分布属性 都需要是 DistributionType.SHUFFLE ,如果不是则报错。
确定了输出分布属性之后,再根据子节点的 DistributionType.SourceType 来优化分布。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public Void visitPhysicalJoin (PhysicalJoinOperator node, ExpressionContext context) { JoinHelper joinHelper = JoinHelper.of( node, context.getChildOutputColumns(0 ), context.getChildOutputColumns(1 )); List<Integer> leftOnPredicateColumns = joinHelper.getLeftOnColumns(); List<Integer> rightOnPredicateColumns = joinHelper.getRightOnColumns(); List<PhysicalPropertySet> requiredProperties = computeShuffleJoinRequiredProperties( requirements, leftOnPredicateColumns, rightOnPredicateColumns); List<Integer> leftShuffleColumns = ((HashDistributionSpec) requiredProperties.get(0 ) .getDistributionProperty().getSpec()).getShuffleColumns(); List<Integer> rightShuffleColumns = ((HashDistributionSpec) requiredProperties.get(1 ) .getDistributionProperty().getSpec()).getShuffleColumns(); DistributionProperty leftChildDistributionProperty = leftChildOutputProperty.getDistributionProperty(); DistributionProperty rightChildDistributionProperty = rightChildOutputProperty.getDistributionProperty(); if (!leftChildDistributionProperty.isShuffle() || !rightChildDistributionProperty.isShuffle()) { Preconditions.checkState( false , "Children output property distribution error" ); return visitOperator(node, context); } }
BRANCH-1: ColocateJoin 如果 JOIN 的左右子节点都是来自于 OlapScan,会再检测 ColocateJoin 函数来检测是否真的能通过 ColocateJoin 来完成。不能则通过 transToBucketShuffleJoin 函数转为 BucketShuffleJoin。
要转换为 BucketShuffleJoin 一般是 join-key 包含了 value-column。
1 2 3 4 5 6 7 8 9 10 if (leftDistributionDesc.isLocal() && rightDistributionDesc.isLocal()) { if (JoinOperator.HINT_BUCKET.equals(hint) || !canColocateJoin( leftDistributionSpec, rightDistributionSpec, leftShuffleColumns, rightShuffleColumns)) { transToBucketShuffleJoin( leftDistributionSpec, leftShuffleColumns, rightShuffleColumns); } return visitOperator(node, context); }
BRANCH-2: BucketShuffleJoin 一般是左表(大表)的 bucket-key 命中了 join-key
1 2 3 4 5 if (leftDistributionDesc.isLocal() && rightDistributionDesc.isShuffle()) { transToBucketShuffleJoin(leftDistributionSpec, leftShuffleColumns, rightShuffleColumns); return visitOperator(node, context); }
transToBucketShuffleJoin 和 enforceChildSatisfyShuffleJoin 逻辑类似
BRANCH-3: ShuffleJoin 如果左侧是 Shuffle,而右侧是 Local,需要调整为 {Shuffle, Shuffle_Enforcer},如果左右两侧都是 Shuffle,则需要检测是否满足 shffleJoin
如果左表的 join-key 不是 bucket-key,一般都是 shuffle
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if (leftDistributionDesc.isShuffle() && rightDistributionDesc.isLocal()) { enforceChildSatisfyShuffleJoin(leftDistributionSpec, leftShuffleColumns, rightShuffleColumns, rightChild, rightChildOutputProperty); return visitOperator(node, context); } else if (leftDistributionDesc.isShuffle() && rightDistributionDesc.isShuffle()) { if (!checkChildDistributionSatisfyShuffle(leftDistributionSpec, rightDistributionSpec, leftShuffleColumns, rightShuffleColumns)) { enforceChildSatisfyShuffleJoin(leftDistributionSpec, leftShuffleColumns, rightShuffleColumns, rightChild, rightChildOutputProperty); } return visitOperator(node, context); }
enforceChildSatisfyShuffleJoin transToBucketShuffleJoin 和 enforceChildSatisfyShuffleJoin 核心逻辑都一样,都是先需要生产 shuffle columns,再重新分布。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private void enforceChildSatisfyShuffleJoin (HashDistributionSpec leftDistributionSpec, List<Integer> leftShuffleColumns, List<Integer> rightShuffleColumns, GroupExpression child, PhysicalPropertySet childOutputProperty) { List<Integer> newRightShuffleColumns = Lists.newArrayList(); HashDistributionDesc leftDistributionDesc = leftDistributionSpec.getHashDistributionDesc(); DistributionSpec.PropertyInfo leftDistributionPropertyInfo = leftDistributionSpec.getPropertyInfo(); for (int cid : leftDistributionDesc.getColumns()) { if (leftShuffleColumns.contains(cid)) { int index = leftShuffleColumns.indexOf(cid); newRightShuffleColumns.add(rightShuffleColumns.get(index)); } else { int equivalentColumn = Arrays.stream(leftDistributionPropertyInfo.getEquivalentColumns(cid).getColumnIds()). filter(leftShuffleColumns::contains).findAny().orElse(cid); Preconditions.checkState(leftShuffleColumns.contains(equivalentColumn)); int index = leftShuffleColumns.indexOf(equivalentColumn); newRightShuffleColumns.add(rightShuffleColumns.get(index)); } } enforceChildShuffleDistribution(newRightShuffleColumns, child, childOutputProperty, 1 ); }
enforceChildShuffleDistribution 构造 HashDistributionSpec 对象主要是需要构造 HashDistributionDesc.columns,上层函数传递 HashDistributionDesc.columns 后就可以得到所需的分布 enforceDistributionSpec。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private GroupExpression enforceChildShuffleDistribution (List<Integer> shuffleColumns, GroupExpression child, PhysicalPropertySet childOutputProperty, int childIndex) { DistributionSpec enforceDistributionSpec = DistributionSpec.createHashDistributionSpec(new HashDistributionDesc (shuffleColumns, HashDistributionDesc.SourceType.SHUFFLE_ENFORCE)); Pair<GroupExpression, PhysicalPropertySet> pair = enforceChildDistribution(enforceDistributionSpec, child, childOutputProperty); PhysicalPropertySet newChildInputProperty = pair.second; requiredChildrenProperties.set(childIndex, newChildInputProperty); childrenOutputProperties.set(childIndex, newChildInputProperty); return pair.first; }
enforceChildDistribution
enforceChildDistribution 函数也比较重要,是为了让子节点 child 的输出分布属性中满足父节点要求的 distributionSpec 的要求,。因此需要在输入和输出之间再增加个 enforcer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private Pair<GroupExpression, PhysicalPropertySet> enforceChildDistribution (DistributionSpec distributionSpec, GroupExpression child, PhysicalPropertySet childOutputProperty) { double childCosts = child.getCost(childOutputProperty); Group childGroup = child.getGroup(); DistributionProperty newDistributionProperty = new DistributionProperty (distributionSpec); PhysicalPropertySet newOutputProperty = childOutputProperty.copy(); newOutputProperty.setDistributionProperty(newDistributionProperty); if (child.getOp() instanceof PhysicalDistributionOperator) { GroupExpression enforcer = newDistributionProperty.appendEnforcers(childGroup); enforcer.setOutputPropertySatisfyRequiredProperty(newOutputProperty, newOutputProperty); context.getMemo().insertEnforceExpression(enforcer, childGroup); enforcer.updatePropertyWithCost(newOutputProperty, child.getInputProperties(childOutputProperty), childCosts); childGroup.setBestExpression(enforcer, childCosts, newOutputProperty); return new Pair <>(enforcer, newOutputProperty); } GroupExpression enforcer = newDistributionProperty.appendEnforcers(childGroup); enforcer.setOutputPropertySatisfyRequiredProperty( newOutputProperty, newOutputProperty); updateChildCostWithEnforcer( enforcer, childOutputProperty, newOutputProperty, childCosts, childGroup); return new Pair <>(enforcer, newOutputProperty); }
updateChildCostWithEnforcer 当 child.getOp() 不是 PhysicalDistributionOperator 对象时,则进入 updateChildCostWithEnforcer 函数中,区别就是此函数会计算新增的 enforcer 的cost。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void updateChildCostWithEnforcer (GroupExpression enforcer, PhysicalPropertySet oldOutputProperty, PhysicalPropertySet newOutputProperty, double childCost, Group childGroup) { context.getMemo().insertEnforceExpression(enforcer, childGroup); curTotalCost -= childCost; childCost += CostModel.calculateCost(enforcer); curTotalCost += childCost; enforcer.updatePropertyWithCost( newOutputProperty, Lists.newArrayList(oldOutputProperty), childCost); childGroup.setBestExpression(enforcer, childCost, newOutputProperty); }
在前文 ChildOutputPropertyGuarantor.visitPhysicalJoin 中已经设置好右子节的输出属性,在 OutputPropertyDeriver.visitPhysicalJoin 中则根据左右子节点的输出属性,生成 PhysicalJoinOperator 自己的输出属性。
computeHashJoinDistributionPropertyInfo computeHashJoinDistributionPropertyInfo 设置 physicalPropertySet 的 DistributionSpec.PropertyInfo,再返回 physicalPropertySet。因此 Join 输出的属性即传入的 physicalPropertySet。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private PhysicalPropertySet computeHashJoinDistributionPropertyInfo (PhysicalJoinOperator node, PhysicalPropertySet physicalPropertySet, List<Integer> leftOnPredicateColumns, List<Integer> rightOnPredicateColumns, ExpressionContext context) { DistributionSpec.PropertyInfo propertyInfo = physicalPropertySet.getDistributionProperty().getSpec().getPropertyInfo(); ColumnRefSet leftChildColumns = context.getChildOutputColumns(0 ); ColumnRefSet rightChildColumns = context.getChildOutputColumns(1 ); if (node.getJoinType().isLeftOuterJoin()) { propertyInfo.nullableColumns.union(rightChildColumns); } else if (node.getJoinType().isRightOuterJoin()) { propertyInfo.nullableColumns.union(leftChildColumns); } else if (node.getJoinType().isFullOuterJoin()) { propertyInfo.nullableColumns.union(leftChildColumns); propertyInfo.nullableColumns.union(rightChildColumns); } else if (node.getJoinType().isInnerJoin()) { for (int i = 0 ; i < leftOnPredicateColumns.size(); i++) { int leftColumn = leftOnPredicateColumns.get(i); int rightColumn = rightOnPredicateColumns.get(i); propertyInfo.addJoinEquivalentPair(leftColumn, rightColumn); } } return physicalPropertySet; }
BRANCH-0: BROADCAST TODO: computeHashJoinDistributionPropertyInfo 函数左右传入的都是空集?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private PhysicalPropertySet visitPhysicalJoin (PhysicalJoinOperator node, ExpressionContext context) { PhysicalPropertySet leftChildOutputProperty = childrenOutputProperties.get(0 ); PhysicalPropertySet rightChildOutputProperty = childrenOutputProperties.get(1 ); if (rightChildOutputProperty.getDistributionProperty().isBroadcast()) { return computeHashJoinDistributionPropertyInfo( node, leftChildOutputProperty, Collections.emptyList(), Collections.emptyList(), context); } }
BRANCH-1:ColocateJoin 1 2 3 4 5 6 7 if (leftDistributionDesc.isLocal() && rightDistributionDesc.isLocal()) { return computeHashJoinDistributionPropertyInfo(node, computeColocateJoinOutputProperty(leftDistributionSpec, rightDistributionSpec), leftOnPredicateColumns, rightOnPredicateColumns, context); }
BRNACH-2: BucketShuffleJoin leftChildOutputProperty 直接设置。
1 2 3 4 5 6 7 8 if (leftDistributionDesc.isLocal() && rightDistributionDesc.isBucketJoin()) { return computeHashJoinDistributionPropertyInfo(node, leftChildOutputProperty, leftOnPredicateColumns, rightOnPredicateColumns, context); }
BRNACH-3: Shufflejoin 左右输入数据源都是 Shuffle(SourceType.SHUFFLE_AGG、SourceType.SHUFFLE_JOIN)
1 2 3 4 5 6 7 8 if ((leftDistributionDesc.isShuffle() || leftDistributionDesc.isShuffleEnforce()) && (rightDistributionDesc.isShuffle()) || rightDistributionDesc.isShuffleEnforce()) { return computeHashJoinDistributionPropertyInfo(node, computeShuffleJoinOutputProperty(leftDistributionDesc.getColumns()), leftOnPredicateColumns, rightOnPredicateColumns, context); }
computeShuffleJoinOutputProperty 基于左侧的 shuffle 列创建输出属性。
1 2 3 4 5 6 7 8 9 10 private PhysicalPropertySet computeShuffleJoinOutputProperty (List<Integer> leftShuffleColumns) { Optional<HashDistributionDesc> requiredShuffleDesc = getRequiredShuffleDesc(); if (!requiredShuffleDesc.isPresent()) { return PhysicalPropertySet.EMPTY; } HashDistributionSpec leftShuffleDistribution = DistributionSpec.createHashDistributionSpec( new HashDistributionDesc (leftShuffleColumns, HashDistributionDesc.SourceType.SHUFFLE_JOIN)); return createPropertySetByDistribution(leftShuffleDistribution); }
getRequiredShuffleDesc 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private Optional<HashDistributionDesc> getRequiredShuffleDesc () { if (!requirements.getDistributionProperty().isShuffle()) { return Optional.empty(); } HashDistributionDesc requireDistributionDesc = ((HashDistributionSpec) requirements.getDistributionProperty().getSpec()).getHashDistributionDesc(); if (HashDistributionDesc.SourceType.SHUFFLE_JOIN.equals(requireDistributionDesc.getSourceType()) || HashDistributionDesc.SourceType.SHUFFLE_AGG.equals(requireDistributionDesc.getSourceType())) { return Optional.of(requireDistributionDesc); } return Optional.empty(); }