1. 1. PhysicalProperty
  2. 2. TaskContext
  3. 3. RequiredPropertyDeriver
    1. 3.1. visitOperator
    2. 3.2. visitPhysicalHashJoin
      1. 3.2.1. PropertyDeriverBase.computeShuffleJoinRequiredProperties
      2. 3.2.2. PropertyDeriverBase.getShuffleJoinHashDistributionDesc
      3. 3.2.3. PropertyDeriverBase.createShuffleJoinRequiredProperties
  4. 4. EnforceAndCostTask
    1. 4.1. initRequiredProperties
    2. 4.2. optimizeChildGroup
  5. 5. OutputPropertyDeriver
    1. 5.1. OutputPropertyDeriver.visitPhysicalOlapScan
    2. 5.2. EnforceAndCostTask.recordCostsAndEnforce
      1. 5.2.1. EnforceAndCostTask.setSatisfiedPropertyWithCost
      2. 5.2.2. EnforceAndCostTask.setPropertyWithCost
      3. 5.2.3. GroupExpression.updatePropertyWithCost
      4. 5.2.4. GroupExpression.setOutputPropertySatisfyRequiredProperty
      5. 5.2.5. Group.setBestExpression
    3. 5.3. EnforceAndCostTask.enforceDistribute
      1. 5.3.1. DistributionProperty.appendEnforcers
      2. 5.3.2. EnforceAndCostTask.updateCostWithEnforcer
      3. 5.3.3. Memo.insertEnforceExpression
  6. 6. ChildOutputPropertyGuarantor
    1. 6.1. EnforceAndCostTask.execute
    2. 6.2. enforceLegalChildOutputProperty
    3. 6.3. visitPhysicalJoin
      1. 6.3.1. BRANCH-0: BroadcastJoin
      2. 6.3.2. BRANCH-1: ColocateJoin
      3. 6.3.3. BRANCH-2: BucketShuffleJoin
        1. 6.3.3.1. transToBucketShuffleJoin
      4. 6.3.4. BRANCH-3: ShuffleJoin
      5. 6.3.5. enforceChildSatisfyShuffleJoin
      6. 6.3.6. enforceChildShuffleDistribution
      7. 6.3.7. enforceChildDistribution
      8. 6.3.8. updateChildCostWithEnforcer
  7. 7. OutputPropertyDeriver.visitPhysicalJoin
    1. 7.1. computeHashJoinDistributionPropertyInfo
    2. 7.2. BRANCH-0: BROADCAST
    3. 7.3. BRANCH-1:ColocateJoin
    4. 7.4. BRNACH-2: BucketShuffleJoin
    5. 7.5. BRNACH-3: Shufflejoin
      1. 7.5.1. computeShuffleJoinOutputProperty
      2. 7.5.2. getRequiredShuffleDesc

EnforceAndCostTask: 搜索代价最低的 PhysicalPlan

PhysicalProperty

PhysicalProperty 是个接口的,实现接口的类中比较重要是 DistributionProperty

  • DistributionSpec.DistributionType 表示 Operator 的输出数据分布
  • HashDistributionDesc.sourceType 表示 Operator 的数据来源

继承逻辑如下:
DistributionSpec-1
PropertyDeriver

TaskContext

TaskContext 作为优化一个 Group 时的上下文信息,主要有以下五个成员变量

  • requiredProperty: 即父节点对当前 Grouo 所有 GroupExpressions 的输出必须满足的属性
  • requiredColumns: 父节点的输出要求当前 Group 输出的列

比如,

  1. 在 optimizeByCost 函数首次构造 TaskContext 对象,

    • requiredProperty 是空的
    • requiredColumns 是最终输出的列
  2. EnforceAndCostTask.optimizeChildGroup 中对 ChildGroup 进行递归初始化时:

    • requiredProperty

      当前节点 Group 先通过 RequiredPropertyDeriver.getRequiredProps 函数先计算出自己子节点 ChildernGroup 输出所需的属性,再将子节点输出所需满足的属性 inputProperty 用于构造 TaskContext,递归优化子节点 ChildGroup

    • requiredColumns 也是所需输出的列

先简略看下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public TaskContext(OptimizerContext context,
PhysicalPropertySet physicalPropertySet,
ColumnRefSet requiredColumns,
double cost) {
this.optimizerContext = context;
this.requiredProperty = physicalPropertySet;
this.requiredColumns = requiredColumns;
this.upperBoundCost = cost;
this.allScanOperators = Collections.emptyList();
}

private OptExpression optimizeByCost(ConnectContext connectContext,
OptExpression logicOperatorTree,
PhysicalPropertySet requiredProperty,
ColumnRefSet requiredColumns) {
TaskContext rootTaskContext = new TaskContext(
context, requiredProperty, requiredColumns.clone(), Double.MAX_VALUE);
//...
}

RequiredPropertyDeriver

RequiredPropertyDeriver 是用于计算每个 Operator 对子节点输出需要满足的属性。

1
2
3
4
5
6
public List<List<PhysicalPropertySet>> getRequiredProps(GroupExpression groupExpression) {
requiredProperties = Lists.newArrayList();
groupExpression.getOp().accept(this, new ExpressionContext(groupExpression));
deriveChildCTEProperty(groupExpression);
return requiredProperties;
}

visitOperator

没有实现 visitPhysicalxxx 函数的 Operator,就会进入 visitOperator 函数,即根据节点的入度(arity) 生成包含 arity 个 PhysicalPropertySet.EMPTY 的 PhysicalPropertySet,也就是对每个子节点输出属性都没有要求。

比如,PhysicalHashJoinOperator 实现了 visitPhysicalHashJoin 函数,则会进入下文流程来获取属性,而 OlapScanNodeOperator 没有实现 visitOlapScanNodeOperator 函数则进入 visitOperator 函数。

1
2
3
4
5
6
7
8
public Void visitOperator(Operator node, ExpressionContext context) {
List<PhysicalPropertySet> requiredProps = new ArrayList<>();
for (int childIndex = 0; childIndex < context.arity(); ++childIndex) {
requiredProps.add(PhysicalPropertySet.EMPTY);
}
requiredProperties.add(requiredProps);
return null;
}

visitPhysicalHashJoin

当遇到一个 PhysicalHashJoinOperator,则会进入 visitPhysicalHashJoin 函数,生成对左右子节点输出所需的属性。
DistributionSpec-4

JOIN 的实现主要有两种:

  1. BroadcastJOIN
    将右子节点数据(Build-TABLE)的数据全部发送到左子节点(Probe-TABLE),在左子节点很丧构建 HashMap。这种实现对右表的行数有大小限制。
  2. ShuffleJOIN
    需要对子节点的数据进行Shuffle,根据 Shuffle 规则又可以细分。

最终选择哪种作为物理计划,根据右表的大小、以及生成的物理计划的代价进行抉择。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public Void visitPhysicalHashJoin(PhysicalHashJoinOperator node, ExpressionContext context) {
// 1 For broadcast join
PhysicalPropertySet rightBroadcastProperty = new PhysicalPropertySet(
new DistributionProperty(DistributionSpec.createReplicatedDistributionSpec()));
requiredProperties.add(Lists.newArrayList(
PhysicalPropertySet.EMPTY, // 对左子节点输出无要求
rightBroadcastProperty)); // 对右子节点输出属性有分布要求

JoinHelper joinHelper = JoinHelper.of(node,
context.getChildOutputColumns(0), // 左子节点的输出列
context.getChildOutputColumns(1)); // 右子节点的输出列

// 当前 Operator 仅能通过 BroadcastJoin 来实现
if (joinHelper.onlyBroadcast()) {
return null;
}

// 当前 Operator 仅能通过 ShuffleJoin 来实现
if (joinHelper.onlyShuffle()) {
requiredProperties.clear();
}

// 2 For shuffle join
List<Integer> leftOnPredicateColumns = joinHelper.getLeftOnColumns();
List<Integer> rightOnPredicateColumns = joinHelper.getRightOnColumns();
if (leftOnPredicateColumns.isEmpty() || rightOnPredicateColumns.isEmpty()) {
return null;
}

// 将父节点对自己的输出要求 requirementsFromParent 传递给对子节点
// 生成 ShuffleJoin 属性
requiredProperties.add(computeShuffleJoinRequiredProperties(
requirementsFromParent, leftOnPredicateColumns, rightOnPredicateColumns));

return null;
}

PropertyDeriverBase.computeShuffleJoinRequiredProperties

requiredFromParent 是 PhysicalHashJoinOperator 父节点对其输出需要满足的属性,leftShuffleColumns、rightShuffleColumns 是 JoinOperator 的两个子节点的 shuffle 列。 JoinOperator 需要在满足 requiredFromParent 的前提下,调整 shuffle 列的顺序,生成自己左右子节点的输出属性。

实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
protected static List<PhysicalPropertySet> 
computeShuffleJoinRequiredProperties(PhysicalPropertySet requiredFromParent,
List<Integer> leftShuffleColumns,
List<Integer> rightShuffleColumns) {
// 1. 获得父节点的 ShuffleJoin 分布要求
Optional<HashDistributionDesc> requiredShuffleDescOptional =
getShuffleJoinHashDistributionDesc(requiredFromParent);
if (!requiredShuffleDescOptional.isPresent()) {
// 2. requiredFromParent 不要求子节点是 ShufffleJOIN
return createShuffleJoinRequiredProperties(
leftShuffleColumns, rightShuffleColumns);
} else {
// 3. 是否需要调整列的顺序
List<Integer> requiredColumns
= requiredShuffleDescOptional.get().getColumns();
boolean adjustBasedOnLeft
= leftShuffleColumns.size() == requiredColumns.size()
&& leftShuffleColumns.containsAll(requiredColumns)
&& requiredColumns.containsAll(leftShuffleColumns);
boolean adjustBasedOnRight
= rightShuffleColumns.size() == requiredColumns.size()
&& rightShuffleColumns.containsAll(requiredColumns)
&& requiredColumns.containsAll(rightShuffleColumns);

// 基于和 requiredColumns 完全相同的列进行选择
if (adjustBasedOnLeft || adjustBasedOnRight) {
List<Integer> requiredLeft = Lists.newArrayList();
List<Integer> requiredRight = Lists.newArrayList();

for (Integer cid : requiredColumns) {
int idx = adjustBasedOnLeft
? leftShuffleColumns.indexOf(cid)
: rightShuffleColumns.indexOf(cid);
requiredLeft.add(leftShuffleColumns.get(idx));
requiredRight.add(rightShuffleColumns.get(idx));
}
return createShuffleJoinRequiredProperties(
requiredLeft, requiredRight);
} else {
return createShuffleJoinRequiredProperties(
leftShuffleColumns, rightShuffleColumns);
}
}
}

PropertyDeriverBase.getShuffleJoinHashDistributionDesc

getShuffleJoinHashDistributionDesc 函数有两个作用:

  1. 检测父节点是否要子节点输出为 SHUFFLE 方式。如果不是,则返回 Optional.empty(),即不强求子节点也必须有这个分布
  2. 否则,则返回父节点要求子节点满足的输出属性 requiredPropertySet

代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected static Optional<HashDistributionDesc> 
getShuffleJoinHashDistributionDesc(PhysicalPropertySet requiredPropertySet) {
// 1. 子节点输出方式 shuffle
if (!requiredPropertySet.getDistributionProperty().isShuffle()) {
return Optional.empty();
}

HashDistributionDesc requireDistributionDesc =
((HashDistributionSpec) requiredPropertySet.getDistributionProperty()
.getSpec()).getHashDistributionDesc();
// 2. 子节点输入源是 SHUFFLE_JOIN
if (!HashDistributionDesc.SourceType.SHUFFLE_JOIN
.equals(requireDistributionDesc.getSourceType())) {
return Optional.empty();
}

return Optional.of(requireDistributionDesc);
}

PropertyDeriverBase.createShuffleJoinRequiredProperties

基于输入参数 <leftColumns, rightColumns> 生成 List<PhysicalPropertySet>。这里的 HashDistributionDesc.SourceType.SHUFFLE_JOIN 表示这个 hash 分布来源是 JOIN。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static List<PhysicalPropertySet>
createShuffleJoinRequiredProperties(List<Integer> leftColumns, List<Integer> rightColumns) {
HashDistributionSpec leftDistribution =
DistributionSpec.createHashDistributionSpec(new HashDistributionDesc(
leftColumns,
HashDistributionDesc.SourceType.SHUFFLE_JOIN));
HashDistributionSpec rightDistribution =
DistributionSpec.createHashDistributionSpec(new HashDistributionDesc(
rightColumns,
HashDistributionDesc.SourceType.SHUFFLE_JOIN));

PhysicalPropertySet leftRequiredPropertySet =
new PhysicalPropertySet(new DistributionProperty(leftDistribution));
PhysicalPropertySet rightRequiredPropertySet =
new PhysicalPropertySet(new DistributionProperty(rightDistribution));

return Lists.newArrayList(leftRequiredPropertySet, rightRequiredPropertySet);
}

EnforceAndCostTask

EnforceAndCostTask 是用于优化一个 groupExpression 所属 Group 及其输入:从叶结点向根节点方向优化,bottom-up。

1
2
3
4
EnforceAndCostTask(TaskContext context, GroupExpression expression) {
super(context);
this.groupExpression = expression;
}

initRequiredProperties

initRequiredProperties 函数中通过调用 RequiredPropertyDeriver.getRequiredProps 函数来获得 GroupExpression.op 要求子节点需要满足的输出属性,记录记录在 childrenRequiredPropertiesList。

1
2
3
4
5
6
7
8
9
10
11
12
13
private void initRequiredProperties() {
if (curChildIndex != -1) {
return;
}

localCost = 0;
curTotalCost = 0;

// 子节点所需满足的输出属性
childrenRequiredPropertiesList =
new RequiredPropertyDeriver(context).getRequiredProps(groupExpression);
curChildIndex = 0;
}

optimizeChildGroup

optimizeChildGroup 函数对当前 groupExpression 的所有输入(即 ChildernGroup) 先优化,等输入都优化完毕,就能统计出从叶结点到当前 Group 的最佳路径了。

  • inputProperty 是对当前 group 对 childGroup 输出属性的要求
  • newUpperBound 是以 childGroup 为根节点的子树中,cost 上界

借助 pushTask 函数实现保护现场、现场恢复。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void optimizeChildGroup(PhysicalPropertySet inputProperty,
Group childGroup) {
// clone 函数实现保留现场
pushTask((EnforceAndCostTask) clone());
double newUpperBound = context.getUpperBoundCost() - curTotalCost;
TaskContext taskContext = new TaskContext(
context.getOptimizerContext(),
inputProperty,
context.getRequiredColumns(),
newUpperBound);

// 本次执行的 ChildGroup
pushTask(new OptimizeGroupTask(taskContext, childGroup));
// 执行完再返回现场
}

OutputPropertyDeriver

OutputPropertyDeriver 类用于生成当前节点的输出属性,ChildOutputPropertyGuarantor 用于生成子节点的输出属性

先来看没有输入的节点,比如 OlapScanNodeOperator,在执行 EnforceAndCostTask.execute 函数时会跳过 optimizeChildGroup 函数和 ChildOutputPropertyGuarantor,进入 OutputPropertyDeriver 中,来满足父节点的输出属性要求。

  • context.getRequiredProperty() 是父节点所需要的属性
  • outputProperty 是本节点输出的属性
  • childrenOutputProperties 是子节点属性的属性

这部分代码注释如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// in EnforceAndCostTask.execute

// 统计完 ChildernGroup 的 cost 之后,再统计当前 groupExpression 的 cost
if (curChildIndex == 0 && prevChildIndex == -1) {
localCost = CostModel.calculateCost(groupExpression);
curTotalCost += localCost;
}

// ChildernGroup 优化完毕,再考虑自己
if (curChildIndex == groupExpression.getInputs().size()) {
ChildOutputPropertyGuarantor childOutputPropertyGuarantor
= new ChildOutputPropertyGuarantor(
context,
groupExpression,
context.getRequiredProperty(),
childrenBestExprList,
childrenRequiredProperties,
childrenOutputProperties,
curTotalCost);
// 校验子节点输出属性是否满足当前节点对其输出属性要求
curTotalCost = childOutputPropertyGuarantor.enforceLegalChildOutputProperty();

// 裁剪: 当前 groupExpression 优化的代价已经高于 Group 中其他等价 GroupExpression
if (curTotalCost > context.getUpperBoundCost()) {
break;
}

// update current group statistics and re-compute costs
if (!computeCurrentGroupStatistics()) {
// child group has been pruned
return;
}

// compute the output property
// 计算当前 GroupExpression 的输出属性,并满足父节点输出属性要求
OutputPropertyDeriver outputPropertyDeriver
= new OutputPropertyDeriver(groupExpression,
context.getRequiredProperty(),
childrenOutputProperties);
PhysicalPropertySet outputProperty = outputPropertyDeriver.getOutputProperty();
recordCostsAndEnforce(outputProperty, childrenRequiredProperties);
}

OutputPropertyDeriver.visitPhysicalOlapScan

PhysicalOlapScanOperator 是个递归基,没有输入属性、只有输出属性,输出属性集中只有个分布属性 HashDistributionSpec,其中 HashDistributionDesc.SourceType.LOCAL 表示数据来源于本地。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public PhysicalPropertySet visitPhysicalOlapScan(PhysicalOlapScanOperator node,
ExpressionContext context) {
HashDistributionSpec olapDistributionSpec = node.getDistributionSpec();

DistributionSpec.PropertyInfo physicalPropertyInfo
= new DistributionSpec.PropertyInfo();

physicalPropertyInfo.tableId = node.getTable().getId();
physicalPropertyInfo.partitionIds = node.getSelectedPartitionId();
return createPropertySetByDistribution(new HashDistributionSpec(
new HashDistributionDesc(
olapDistributionSpec.getShuffleColumns(),
HashDistributionDesc.SourceType.LOCAL),
physicalPropertyInfo));
}

EnforceAndCostTask.recordCostsAndEnforce

outputProperty 是当前节点输出属性,context.getRequiredPropert() 是父节点对当前节点的输出属性要求,此时需要进一步校验 outputProperty –> requiredPropert 链路是否满足,如果不满足则施加一个 enforcer: outputProperty –> enforcer –> requiredPropert

recordCostsAndEnforce 函数的输入是当前节点输出属性 outputProperty、和子节点输入属性 childrenOutputProperties,函数执行完,一个 Group 的及其子 Group 的优化就结束了,并将最低成本记录在 curTotalCost 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private void 
recordCostsAndEnforce(PhysicalPropertySet outputProperty,
List<PhysicalPropertySet> childrenOutputProperties) {
//1. 重新计算 cost,并更新当前 groupExpression 的 totoal cost
curTotalCost -= localCost;
localCost = CostModel.calculateCostWithChildrenOutProperty(
groupExpression, childrenOutputProperties);
curTotalCost += localCost;

// 1. 先假设满足本节点的输出属性 outputProperty 能满足父节点所需属性 requiredProperty
setSatisfiedPropertyWithCost(outputProperty, childrenOutputProperties);
PhysicalPropertySet requiredProperty = context.getRequiredProperty();

if (!outputProperty.isSatisfy(requiredProperty)) {
// 2. 如果不满足,则添加 Enforcer
// outputProperty --> enforcedProperty --> requiredProperty
PhysicalPropertySet enforcedProperty
= enforceProperty(outputProperty, requiredProperty);

// enforcedProperty is superset of requiredProperty
if (!enforcedProperty.equals(requiredProperty)) {
// 在 {enforcedProperty, requiredProperty} 之间映射
setPropertyWithCost(
// enforcer
groupExpression.getGroup().getBestExpression(enforcedProperty),
enforcedProperty,
requiredProperty,
Lists.newArrayList(outputProperty));
}
} else {
// 3. 如果满足,且 outputProperty != requiredProperty
// outputProperty 是 requiredProperty 的子集,则更新记录
if (!outputProperty.equals(requiredProperty)) {
setPropertyWithCost(
groupExpression,
outputProperty,
requiredProperty,
childrenOutputProperties);
}
}

// 4. 更新当前 groupExpression 所属的 Group 的 cost 上界
if (curTotalCost < context.getUpperBoundCost()) {
context.setUpperBoundCost(curTotalCost);
}
}

EnforceAndCostTask.setSatisfiedPropertyWithCost

setSatisfiedPropertyWithCost 函数表示当前 groupExpression 的输出属性 outputProperty 父节点要求的属性 requiredProperty 完全一样了。因此,调用 setPropertyWithCost 函数时,直接使用 outputProperty 代替 requiredProperty。

1
2
3
4
5
6
7
8
9
10
11
12
13
private void setSatisfiedPropertyWithCost(
PhysicalPropertySet outputProperty,
List<PhysicalPropertySet> childrenOutputProperties) {
setPropertyWithCost(
groupExpression, outputProperty, childrenOutputProperties);
if (outputProperty.getCteProperty().isEmpty()) {
// groupExpression can satisfy the ANY type output property
setPropertyWithCost(groupExpression,
outputProperty,
PhysicalPropertySet.EMPTY,
childrenOutputProperties);
}
}

EnforceAndCostTask.setPropertyWithCost

Group 与 GroupExpression:一个 Group 中包含了所有逻辑上等价的 GroupExpression(即输出一样),并记录了 lowestCost 的 GroupExpression,而 GroupExpression 输入输出都是 Group

  • GroupExpression.lowestCostTable: 记录是得到 outputProperty 所需的成小成本 curTotalCost 以及对应的输入属性 childrenOutputProperties

    因此,如果本次更新 childrenOutputProperties –> outputProperty 所需的成本 curTotalCost 更低,则更新 GroupExpression.lowestCostTable 中的记录(如果原本没有则插入)

    GroupExpression.updatePropertyWithCost 函数更新。

  • GroupExpression.outputPropertyMap: 记录的是得到最佳路径时,满足 requiredPropertySet 的输出属性 outputProperty

    由于 updatePropertyWithCost 返回 true 时,才会更新 GroupExpression.outputPropertyMap,因此两个函数联动起来就是输入到输出的最佳路径信息

    • Group 中最佳表达式:this.groupExpression
    • 最低的成本:curTotalCost
    • 此时的输入属性:childrenOutputProperties
    • 此时的输出属性:outputProperty
    • 父节点要求属性:requiredProperty

    GroupExpression.setOutputPropertySatisfyRequiredProperty 更新。

  • Group.lowestCostExpressions: 记录的是 Group 所有等价的 GroupExpression 中,在满足 requiredProperty 属性要求的前提下,代价最低的 GroupExprssion 是哪个

    • key: requiredProperty
    • value: <cost, expression> 最低的代价与对应的 GroupExpression

    Group.setBestExpression 函数更新

因此,当优化结束,父节点可以通过 GroupExpression.getOutputProperty(requiredProperty) 函数得到最佳输入属性 outputProperty,再递归到 childrenOutputProperties,… 即可遍历所有最佳表达式。

groupExpression-1

1
2
3
4
5
6
7
8
9
10
11
12
private void setPropertyWithCost(GroupExpression groupExpression,
PhysicalPropertySet outputProperty,
PhysicalPropertySet requiredProperty,
List<PhysicalPropertySet> childrenOutputProperties) {
if (groupExpression.updatePropertyWithCost(
requiredProperty, childrenOutputProperties, curTotalCost)) {
groupExpression.setOutputPropertySatisfyRequiredProperty(
outputProperty, requiredProperty);
}
this.groupExpression.getGroup().setBestExpression(
groupExpression, curTotalCost, requiredProperty);
}

GroupExpression.updatePropertyWithCost

当发现一个满足条件的 outputProperties,并且本次 new_cost 比之前的 old_cost 更小则更新 GroupExpression.lowestCostTable 记录(如果没有则插入),它是在 {outputProperties, <cost, inputProperties>} 之间建立映射关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean updatePropertyWithCost(PhysicalPropertySet outputProperties,
List<PhysicalPropertySet> inputProperties,
double cost) {
if (lowestCostTable.containsKey(outputProperties)) {
if (lowestCostTable.get(outputProperties).first > cost) {
lowestCostTable.put(
outputProperties, new Pair<>(cost, inputProperties));
return true;
}
} else {
lowestCostTable.put(
outputProperties, new Pair<>(cost, inputProperties));
return true;
}
return false;
}

GroupExpression.setOutputPropertySatisfyRequiredProperty

setOutputPropertySatisfyRequiredProperty 是为了记录在父节点 parent 所需属性 requiredPropertySet 和本节点输出属性之间 outputPropertySet 的映射关系。

当 parent 执行 EnforceAndCostTask 函数时,会通过 GroupExpression.getOutputProperty 函数来获取子节点的输出属性,后面会仔细讲解。

1
2
3
4
5
6
7
8
9
10
public void setOutputPropertySatisfyRequiredProperty(PhysicalPropertySet outputPropertySet,
PhysicalPropertySet requiredPropertySet) {
this.outputPropertyMap.put(requiredPropertySet, outputPropertySet);
}

public PhysicalPropertySet getOutputProperty(PhysicalPropertySet requiredPropertySet) {
PhysicalPropertySet outputProperty = outputPropertyMap.get(requiredPropertySet);
Preconditions.checkState(outputProperty != null);
return outputProperty;
}

Group.setBestExpression

Group.lowestCostExpressions 的映射关系是 {RequeiredProperty, {cost, groupExpression}},即 key 是父节点所需的属性,value 是代价最低的 groupExpression 及其 cost。Group.setBestExpression 函数实时更新代价最低的 Group.lowestCostExpressions

那么最终根节点的 group 获取代价最低的 groupExpression 时就可以通过 Group.getBestExpression 函数获得。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void setBestExpression(GroupExpression expression,
double cost,
PhysicalPropertySet physicalPropertySet) {
if (lowestCostExpressions.containsKey(physicalPropertySet)) {
if (lowestCostExpressions.get(physicalPropertySet).first > cost) {
lowestCostExpressions.put(
physicalPropertySet, new Pair<>(cost, expression));
}
} else {
lowestCostExpressions.put(
physicalPropertySet, new Pair<>(cost, expression));
}
}

public GroupExpression getBestExpression(PhysicalPropertySet physicalPropertySet) {
if (hasBestExpression(physicalPropertySet)) {
return lowestCostExpressions.get(physicalPropertySet).second;
}
return null;
}

EnforceAndCostTask.enforceDistribute

enforceProperty 函数会检测 outputPropertySet 中的 DistributionProperty、SortProperty 是否满足 requiredPropertySet。这里以 DistributionProperty 为例阐述 enforceDistribute 函数怎么添加 Enforcer。

  1. PhysicalPropertySet.setDistributionProperty 函数表示只更改 DistributionProperty,其他属性仍然保持不变,如此 newOutputProperty 的分布属性就和父节点的一致。
  2. 然后 DistributionProperty.appendEnforcers 函数将当前 group(即 groupExpression.getGroup())作为输入生成一个新的 GroupExpression 对象 enforcer,enforcer 类似转换器,实现 oldOutputProperty -> newOutputProperty -> requiredProperty 的转变。
  3. 最后,由于新生成的 enforcer 还没有所属的 Group,updateCostWithEnforcer 函数中将 enforcer 的所属 group 也设置为 groupExpression.getGroup()

整体逻辑如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
private PhysicalPropertySet enforceDistribute(PhysicalPropertySet oldOutputProperty) {
PhysicalPropertySet newOutputProperty = oldOutputProperty.copy();
// 设置目标分布属性
newOutputProperty.setDistributionProperty(
context.getRequiredProperty().getDistributionProperty());
// 增加一个 groupExpression
GroupExpression enforcer = context.getRequiredProperty()
.getDistributionProperty()
.appendEnforcers(groupExpression.getGroup());
// 将 newOutputProperty --> enforcer
updateCostWithEnforcer(enforcer, oldOutputProperty, newOutputProperty);
return newOutputProperty;
}

DistributionProperty.appendEnforcers

生成新的 GroupExpression 对象 enforer,其中包含的 Operator 是 PhysicalDistributionOperator 实现所需的数据分布。

1
2
3
4
public GroupExpression appendEnforcers(Group child) {
return new GroupExpression(
new PhysicalDistributionOperator(spec), Lists.newArrayList(child));
}

EnforceAndCostTask.updateCostWithEnforcer

updateCostWithEnforcer 函数设置 enforcer 所属的 Group,串联起 {oldOutputProperty, newOutputProperty} 之间的联系,并重新计算 enforcer 之后的代价。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void updateCostWithEnforcer(GroupExpression enforcer,
PhysicalPropertySet oldOutputProperty,
PhysicalPropertySet newOutputProperty) {
context.getOptimizerContext().getMemo().
insertEnforceExpression(enforcer, groupExpression.getGroup());
// 记录增加的 enforcer 代价
curTotalCost += CostModel.calculateCost(enforcer);

if (enforcer.updatePropertyWithCost(
newOutputProperty,
Lists.newArrayList(oldOutputProperty),
curTotalCost)) {
enforcer.setOutputPropertySatisfyRequiredProperty(
newOutputProperty, newOutputProperty);
}
//
groupExpression.getGroup().setBestExpression(
enforcer, curTotalCost, newOutputProperty);
}

Memo.insertEnforceExpression

1
2
3
public void insertEnforceExpression(GroupExpression groupExpression, Group targetGroup) {
groupExpression.setGroup(targetGroup);
}

ChildOutputPropertyGuarantor

EnforceAndCostTask.execute

上面的视角是以 PhysicalOlapScanOperator 为例讲解了优化一个 Group 的过程,因为 OlapScanNode 作为物理计划树中的叶结点,没有输入,因此是 EnforceAndCostTask.execute 函数的递归基,下面以 PhysicalHashJoinOperator 为例讲解一个子树优化的过程。

HashJoinNode 的左右子节点是 PhysicalOlapScanOperator or PhysicalDistributionOperator,并且 optimizeChildGroup 函数已递归两次将左右子节点都优化完,因此

  • childGroup.getBestExpression() 获得的 childBestExpr 是在上面的 Group.setBestExpression 函数中设置的
    如果 childBestExpr == null 表示当前 childGroup 要么已经被裁剪了,要么还没优化,具体哪种取决于 prevChildIndex: prevChildIndex 记录正在优化的子节点索引,如果 prevChildIndex < curChildIndex 则表示 childGroup 还没优化,则进入 optimizeChildGroup 函数递归完
  • childBestExpr.getOutputProperty() 获得的是也是上面 setOutputPropertySatisfyRequiredProperty 函数设置的最佳 GroupExpression 对象

这部分代码逻辑及其解释如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
for (; curChildIndex < groupExpression.getInputs().size(); curChildIndex++) {
PhysicalPropertySet childRequiredProperty
= childrenRequiredProperties.get(curChildIndex);
Group childGroup = groupExpression.getInputs().get(curChildIndex);

// Check whether the child group is already optimized for the property
GroupExpression childBestExpr =
childGroup.getBestExpression(childRequiredProperty);

// pruned
if (childBestExpr == null && prevChildIndex >= curChildIndex) {
break;
}

// 尚未优化
if (childBestExpr == null) {
prevChildIndex = curChildIndex;
optimizeChildGroup(childRequiredProperty, childGroup);
return;
}

// childOutputProperty 就是 childBestExpr 输出的
childrenBestExprList.add(childBestExpr);
PhysicalPropertySet childOutputProperty =
childBestExpr.getOutputProperty(childRequiredProperty);
childrenOutputProperties.add(childOutputProperty);
childrenRequiredProperties.set(curChildIndex, childOutputProperty);

// 裁剪: agg operator 不能生成一阶段聚合
if (!canGenerateOneStageAgg(childBestExpr)) {
break;
}

// 裁剪: join operator 不能使用 brocast join
if (!checkBroadcastRowCountLimit(childRequiredProperty, childBestExpr)) {
break;
}

// 裁剪: 当前成本已超过 group 中其他逻辑等价的 GroupExpression 成本的上界
curTotalCost += childBestExpr.getCost(childRequiredProperty);
if (curTotalCost > context.getUpperBoundCost()) {
break;
}
}

enforceLegalChildOutputProperty

获得 ChildernGroup 信息后,就通过 ChildOutputPropertyGuarantor 来校验属性是否符合要求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// in EnforceAndCostTask.execute

if (curChildIndex == groupExpression.getInputs().size()) {
// before we compute the property, here need to make sure that the plan is legal
ChildOutputPropertyGuarantor childOutputPropertyGuarantor =
new ChildOutputPropertyGuarantor(
context,
groupExpression,
context.getRequiredProperty(),
childrenBestExprList,
childrenRequiredProperties,
childrenOutputProperties,
curTotalCost);
curTotalCost =
childOutputPropertyGuarantor.enforceLegalChildOutputProperty();
//...
}

public double enforceLegalChildOutputProperty() {
groupExpression.getOp().accept(this, new ExpressionContext(groupExpression));
return this.curTotalCost;
}

visitPhysicalJoin

visitPhysicalJoin 函数完整的代码很长不全贴了,分段进行讲解。

BRANCH-0: BroadcastJoin

如果在 checkBroadcastRowCountLimit 函数中,BroadcastJoin 没有被裁剪,那么也会生成一个 BroadcastJoin PropertySet,至于最终是否被选择,就得看 BroadcastJoin 和 ShuffleJOIN 的 cost 谁更低。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public Void visitPhysicalJoin(PhysicalJoinOperator node, ExpressionContext context) {
GroupExpression leftChild = childrenBestExprList.get(0);
GroupExpression rightChild = childrenBestExprList.get(1);

PhysicalPropertySet leftChildOutputProperty = childrenOutputProperties.get(0);
PhysicalPropertySet rightChildOutputProperty = childrenOutputProperties.get(1);

// 1. Distribution is broadcast
DistributionProperty rightDistribute = rightChildOutputProperty.getDistributionProperty();
if (rightDistribute.isBroadcast() || rightDistribute.isGather()) {
return visitOperator(node, context);
}
//...
}

ShuffleJoin
此时,左右两个子节点的 输出分布属性 都需要是 DistributionType.SHUFFLE,如果不是则报错。

确定了输出分布属性之后,再根据子节点的 DistributionType.SourceType 来优化分布。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public Void visitPhysicalJoin(PhysicalJoinOperator node, ExpressionContext context) {
//...
// 2. Distribution is shuffle
JoinHelper joinHelper = JoinHelper.of(
node, context.getChildOutputColumns(0), context.getChildOutputColumns(1));
List<Integer> leftOnPredicateColumns = joinHelper.getLeftOnColumns();
List<Integer> rightOnPredicateColumns = joinHelper.getRightOnColumns();
// Get required properties for children.
List<PhysicalPropertySet> requiredProperties =
computeShuffleJoinRequiredProperties(
requirements, leftOnPredicateColumns, rightOnPredicateColumns);

List<Integer> leftShuffleColumns =
((HashDistributionSpec) requiredProperties.get(0)
.getDistributionProperty().getSpec()).getShuffleColumns();
List<Integer> rightShuffleColumns =
((HashDistributionSpec) requiredProperties.get(1)
.getDistributionProperty().getSpec()).getShuffleColumns();

DistributionProperty leftChildDistributionProperty =
leftChildOutputProperty.getDistributionProperty();
DistributionProperty rightChildDistributionProperty =
rightChildOutputProperty.getDistributionProperty();
// CHECK: 只有 broadcast 不需要 shuffle
if (!leftChildDistributionProperty.isShuffle()
|| !rightChildDistributionProperty.isShuffle()) {
Preconditions.checkState(
false, "Children output property distribution error");
return visitOperator(node, context);
}
//...
}

BRANCH-1: ColocateJoin

如果 JOIN 的左右子节点都是来自于 OlapScan,会再检测 ColocateJoin 函数来检测是否真的能通过 ColocateJoin 来完成。不能则通过 transToBucketShuffleJoin 函数转为 BucketShuffleJoin。

要转换为 BucketShuffleJoin 一般是 join-key 包含了 value-column。

1
2
3
4
5
6
7
8
9
10
if (leftDistributionDesc.isLocal() && rightDistributionDesc.isLocal()) {
if (JoinOperator.HINT_BUCKET.equals(hint) ||
!canColocateJoin(
leftDistributionSpec, rightDistributionSpec,
leftShuffleColumns, rightShuffleColumns)) {
transToBucketShuffleJoin(
leftDistributionSpec, leftShuffleColumns, rightShuffleColumns);
}
return visitOperator(node, context);
}

BRANCH-2: BucketShuffleJoin

一般是左表(大表)的 bucket-key 命中了 join-key

1
2
3
4
5
if (leftDistributionDesc.isLocal() && rightDistributionDesc.isShuffle()) {
// bucket join
transToBucketShuffleJoin(leftDistributionSpec, leftShuffleColumns, rightShuffleColumns);
return visitOperator(node, context);
}
transToBucketShuffleJoin

和 enforceChildSatisfyShuffleJoin 逻辑类似

BRANCH-3: ShuffleJoin

如果左侧是 Shuffle,而右侧是 Local,需要调整为 {Shuffle, Shuffle_Enforcer},如果左右两侧都是 Shuffle,则需要检测是否满足 shffleJoin

如果左表的 join-key 不是 bucket-key,一般都是 shuffle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if (leftDistributionDesc.isShuffle() && rightDistributionDesc.isLocal()) {
// coordinator can not bucket shuffle data from left to right, so we need to adjust to shuffle join
enforceChildSatisfyShuffleJoin(leftDistributionSpec, leftShuffleColumns, rightShuffleColumns,
rightChild, rightChildOutputProperty);
return visitOperator(node, context);
} else if (leftDistributionDesc.isShuffle() && rightDistributionDesc.isShuffle()) {
// 如果不满足 shuffle join
if (!checkChildDistributionSatisfyShuffle(leftDistributionSpec, rightDistributionSpec,
leftShuffleColumns,
rightShuffleColumns)) {
enforceChildSatisfyShuffleJoin(leftDistributionSpec, leftShuffleColumns, rightShuffleColumns,
rightChild, rightChildOutputProperty);
}
return visitOperator(node, context);
}

enforceChildSatisfyShuffleJoin

transToBucketShuffleJoin 和 enforceChildSatisfyShuffleJoin 核心逻辑都一样,都是先需要生产 shuffle columns,再重新分布。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void enforceChildSatisfyShuffleJoin(HashDistributionSpec leftDistributionSpec,
List<Integer> leftShuffleColumns,
List<Integer> rightShuffleColumns,
GroupExpression child,
PhysicalPropertySet childOutputProperty) {
List<Integer> newRightShuffleColumns = Lists.newArrayList();
HashDistributionDesc leftDistributionDesc = leftDistributionSpec.getHashDistributionDesc();
DistributionSpec.PropertyInfo leftDistributionPropertyInfo = leftDistributionSpec.getPropertyInfo();

for (int cid : leftDistributionDesc.getColumns()) {
if (leftShuffleColumns.contains(cid)) {
int index = leftShuffleColumns.indexOf(cid);
newRightShuffleColumns.add(rightShuffleColumns.get(index));
} else {
// find equivalent columns for the hash distribution columns
int equivalentColumn =
Arrays.stream(leftDistributionPropertyInfo.getEquivalentColumns(cid).getColumnIds()).
filter(leftShuffleColumns::contains).findAny().orElse(cid);
Preconditions.checkState(leftShuffleColumns.contains(equivalentColumn));
int index = leftShuffleColumns.indexOf(equivalentColumn);
newRightShuffleColumns.add(rightShuffleColumns.get(index));
}
}

// 都是更改啊右子节点的分布属性
enforceChildShuffleDistribution(newRightShuffleColumns, child, childOutputProperty, 1);
}

enforceChildShuffleDistribution

构造 HashDistributionSpec 对象主要是需要构造 HashDistributionDesc.columns,上层函数传递 HashDistributionDesc.columns 后就可以得到所需的分布 enforceDistributionSpec。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private GroupExpression enforceChildShuffleDistribution(List<Integer> shuffleColumns, GroupExpression child,
PhysicalPropertySet childOutputProperty,
int childIndex) {
DistributionSpec enforceDistributionSpec =
DistributionSpec.createHashDistributionSpec(new HashDistributionDesc(shuffleColumns,
HashDistributionDesc.SourceType.SHUFFLE_ENFORCE));

// 将 enforceDistributionSpec 和 child 连接起来,
// 并更新 cost 信息
Pair<GroupExpression, PhysicalPropertySet> pair =
enforceChildDistribution(enforceDistributionSpec, child, childOutputProperty);
PhysicalPropertySet newChildInputProperty = pair.second;

// 更新结果
requiredChildrenProperties.set(childIndex, newChildInputProperty);
childrenOutputProperties.set(childIndex, newChildInputProperty);
return pair.first;
}

enforceChildDistribution

DistributionSpec-2

enforceChildDistribution 函数也比较重要,是为了让子节点 child 的输出分布属性中满足父节点要求的 distributionSpec 的要求,。因此需要在输入和输出之间再增加个 enforcer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Pair<GroupExpression, PhysicalPropertySet> enforceChildDistribution(DistributionSpec distributionSpec,
GroupExpression child,
PhysicalPropertySet childOutputProperty) {
double childCosts = child.getCost(childOutputProperty);
Group childGroup = child.getGroup();

// 1. 基于新的分布要求,生成新的分布属性
DistributionProperty newDistributionProperty = new DistributionProperty(distributionSpec);
// 2. 基于旧的输出属性集,复制构造一个新的 PhysicalPropertySet 对象
PhysicalPropertySet newOutputProperty = childOutputProperty.copy();
// 3. 仅更改分布属性
newOutputProperty.setDistributionProperty(newDistributionProperty);

// 4. 如果子节点对应 Operator 是 PhysicalDistributionOperator
if (child.getOp() instanceof PhysicalDistributionOperator) {
GroupExpression enforcer = newDistributionProperty.appendEnforcers(childGroup);
// 记录映射关系
enforcer.setOutputPropertySatisfyRequiredProperty(newOutputProperty, newOutputProperty);
// 设置 enforcer 所属的 Group
context.getMemo().insertEnforceExpression(enforcer, childGroup);
// 记录本节点的输出属性 与 输入分布属性之间的映射
enforcer.updatePropertyWithCost(newOutputProperty,
child.getInputProperties(childOutputProperty),
childCosts);
// 更新最佳路径
childGroup.setBestExpression(enforcer, childCosts, newOutputProperty);
return new Pair<>(enforcer, newOutputProperty);
}

// 5.1 如果不是 PhysicalDistributionOperator
GroupExpression enforcer = newDistributionProperty.appendEnforcers(childGroup);
enforcer.setOutputPropertySatisfyRequiredProperty(
newOutputProperty, newOutputProperty);
updateChildCostWithEnforcer(
enforcer, childOutputProperty, newOutputProperty, childCosts, childGroup);
return new Pair<>(enforcer, newOutputProperty);
}

updateChildCostWithEnforcer

当 child.getOp() 不是 PhysicalDistributionOperator 对象时,则进入 updateChildCostWithEnforcer 函数中,区别就是此函数会计算新增的 enforcer 的cost。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void updateChildCostWithEnforcer(GroupExpression enforcer,
PhysicalPropertySet oldOutputProperty,
PhysicalPropertySet newOutputProperty,
double childCost,
Group childGroup) {
context.getMemo().insertEnforceExpression(enforcer, childGroup);
// update current total cost
curTotalCost -= childCost;
// add enforcer cost
childCost += CostModel.calculateCost(enforcer);
curTotalCost += childCost;

enforcer.updatePropertyWithCost(
newOutputProperty,
Lists.newArrayList(oldOutputProperty),
childCost);
childGroup.setBestExpression(enforcer, childCost, newOutputProperty);
}

OutputPropertyDeriver.visitPhysicalJoin

在前文 ChildOutputPropertyGuarantor.visitPhysicalJoin 中已经设置好右子节的输出属性,在 OutputPropertyDeriver.visitPhysicalJoin 中则根据左右子节点的输出属性,生成 PhysicalJoinOperator 自己的输出属性。

computeHashJoinDistributionPropertyInfo

此函数设置 physicalPropertySet 的 DistributionSpec.PropertyInfo,再返回 physicalPropertySet。因此 Join 输出的属性即传入的 physicalPropertySet。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private PhysicalPropertySet computeHashJoinDistributionPropertyInfo(PhysicalJoinOperator node,
PhysicalPropertySet physicalPropertySet,
List<Integer> leftOnPredicateColumns,
List<Integer> rightOnPredicateColumns,
ExpressionContext context) {
DistributionSpec.PropertyInfo propertyInfo =
physicalPropertySet.getDistributionProperty().getSpec().getPropertyInfo();

ColumnRefSet leftChildColumns = context.getChildOutputColumns(0);
ColumnRefSet rightChildColumns = context.getChildOutputColumns(1);
if (node.getJoinType().isLeftOuterJoin()) {
propertyInfo.nullableColumns.union(rightChildColumns);
} else if (node.getJoinType().isRightOuterJoin()) {
propertyInfo.nullableColumns.union(leftChildColumns);
} else if (node.getJoinType().isFullOuterJoin()) {
propertyInfo.nullableColumns.union(leftChildColumns);
propertyInfo.nullableColumns.union(rightChildColumns);
} else if (node.getJoinType().isInnerJoin()) {
for (int i = 0; i < leftOnPredicateColumns.size(); i++) {
int leftColumn = leftOnPredicateColumns.get(i);
int rightColumn = rightOnPredicateColumns.get(i);
propertyInfo.addJoinEquivalentPair(leftColumn, rightColumn);
}
}

return physicalPropertySet;
}

BRANCH-0: BROADCAST

TODO: computeHashJoinDistributionPropertyInfo 函数左右传入的都是空集?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private PhysicalPropertySet visitPhysicalJoin(PhysicalJoinOperator node, ExpressionContext context) {
PhysicalPropertySet leftChildOutputProperty = childrenOutputProperties.get(0);
PhysicalPropertySet rightChildOutputProperty = childrenOutputProperties.get(1);

// 1. Distribution is broadcast
if (rightChildOutputProperty.getDistributionProperty().isBroadcast()) {
return computeHashJoinDistributionPropertyInfo(
node,
leftChildOutputProperty,
Collections.emptyList(),
Collections.emptyList(),
context);
}
}

BRANCH-1:ColocateJoin

1
2
3
4
5
6
7
if (leftDistributionDesc.isLocal() && rightDistributionDesc.isLocal()) {
// colocate join
return computeHashJoinDistributionPropertyInfo(node,
computeColocateJoinOutputProperty(leftDistributionSpec, rightDistributionSpec),
leftOnPredicateColumns,
rightOnPredicateColumns, context);
}

BRNACH-2: BucketShuffleJoin

leftChildOutputProperty 直接设置。

1
2
3
4
5
6
7
8
if (leftDistributionDesc.isLocal() && rightDistributionDesc.isBucketJoin()) {
// bucket join
return computeHashJoinDistributionPropertyInfo(node,
leftChildOutputProperty,
leftOnPredicateColumns,
rightOnPredicateColumns,
context);
}

BRNACH-3: Shufflejoin

左右输入数据源都是 Shuffle(SourceType.SHUFFLE_AGG、SourceType.SHUFFLE_JOIN)

1
2
3
4
5
6
7
8
if ((leftDistributionDesc.isShuffle() || leftDistributionDesc.isShuffleEnforce()) &&
(rightDistributionDesc.isShuffle()) || rightDistributionDesc.isShuffleEnforce()) {
// shuffle join
return computeHashJoinDistributionPropertyInfo(node,
computeShuffleJoinOutputProperty(leftDistributionDesc.getColumns()),
leftOnPredicateColumns,
rightOnPredicateColumns, context);
}

computeShuffleJoinOutputProperty

基于左侧的 shuffle 列创建输出属性。

1
2
3
4
5
6
7
8
9
10
private PhysicalPropertySet computeShuffleJoinOutputProperty(List<Integer> leftShuffleColumns) {
Optional<HashDistributionDesc> requiredShuffleDesc = getRequiredShuffleDesc();
if (!requiredShuffleDesc.isPresent()) {
return PhysicalPropertySet.EMPTY;
}
HashDistributionSpec leftShuffleDistribution = DistributionSpec.createHashDistributionSpec(
new HashDistributionDesc(leftShuffleColumns, HashDistributionDesc.SourceType.SHUFFLE_JOIN));

return createPropertySetByDistribution(leftShuffleDistribution);
}

getRequiredShuffleDesc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Optional<HashDistributionDesc> getRequiredShuffleDesc() {
if (!requirements.getDistributionProperty().isShuffle()) {
return Optional.empty();
}

HashDistributionDesc requireDistributionDesc = ((HashDistributionSpec)
requirements.getDistributionProperty().getSpec()).getHashDistributionDesc();

if (HashDistributionDesc.SourceType.SHUFFLE_JOIN.equals(requireDistributionDesc.getSourceType()) ||
HashDistributionDesc.SourceType.SHUFFLE_AGG.equals(requireDistributionDesc.getSourceType())) {
return Optional.of(requireDistributionDesc);
}

return Optional.empty();
}