Coordinator: ExexPlan 的实例化

PlanFragmentBuilder 将 PhysicalPlan 转化为 ExecPlan 后得到一堆 Fragments,需要在 Coordinator 中将 Fragment 实例化为 FragmentInstance,设置具体的执行计划与 BE 节点参数。

computeScanRangeAssignment

BackendSelector

computeFragmentHosts

computeFragmentHosts 函数是为每个 Fragment 分配运行的 BE 节点。Fragment 的 root_node 要么是 ExchangeNode,要么是 OlapScanNode,具体还要设置一些信息。

computeFragmentHosts 是按照从 childFragment(叶结点) 到 rootFragment(根节点)逆序遍历,这样父节点可以根据子节点选择 BE。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void computeFragmentHosts() throws Exception {
// from children --> parent
for (int i = fragments.size() - 1; i >= 0; --i) {
PlanFragment fragment = fragments.get(i);
FragmentExecParams params =
fragmentExecParamsMap.get(fragment.getFragmentId());

boolean dopAdaptionEnabled = usePipeline &&
connectContext.getSessionVariable().isPipelineDopAdaptionEnabled();

// 1. DataPartition.UNPARTITIONED
if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) {
//...
continue;
}

// 2. ExchangeNode
PlanNode leftMostNode = findLeftmostNode(fragment.getPlanRoot());
if (!(leftMostNode instanceof ScanNode)) {
//...
continue;
}

// 3. OlapScanNode
}
}

DataPartition.UNPARTITIONED

当 fragment 的 DataSink 分区方式是 DataPartition.UNPARTITIONED 时,表示将数据全部输出到 destinations,此时为这个 Fragment 选择在哪个BE节点执行时,通过 SimpleScheduler.getBackendHost 方法从 BE节点中选择一个。

Sink 分区方式是 DataPartition.UNPARTITIONED,应该只有 BROADCAST-JOIN 和 ResultSink 等几种,为啥这里不需要考虑输入,随机选择一个 BE节点去执行 ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) {
Reference<Long> backendIdRef = new Reference<>();
TNetworkAddress execHostport;
if (usedComputeNode) {
execHostport = SimpleScheduler.getComputeNodeHost(
this.idToComputeNode, backendIdRef);
} else {
execHostport = SimpleScheduler.getBackendHost(
this.idToBackend, backendIdRef);
}
if (execHostport == null) {
throw new UserException(
"Backend not found. Check if any backend is down or not. " +
backendInfosString(usedComputeNode));
}
recordUsedBackend(execHostport, backendIdRef.getRef());
FInstanceExecParam instanceParam = new FInstanceExecParam(
null, execHostport, 0, params);
params.instanceExecParams.add(instanceParam);
continue;
}

ExchangeNode

当 Fragment 的最左叶结点是 ExchangeNode 时,将当前 Fragment 执行节点设置为其 InputFragments 中 instance_num 最大的 inputFragment 所处的 BE 节点上。这样在 BE 端在 ragmentInstacne 之间 RPC 通信时,可以减少跨进行通信。主要有如下三步:

T1 maxParallelism

寻找 InputFragments 中最大并行度 (maxParallelism) 的位置 (inputFragmentIndex)

currentChildFragmentParallelism 初始值是 instanceExecParams 的值,在 dopAdaptionEnabled 为 true 时(即 Fragment 的所有 Operator 都能 decompose_to_pipeline,目前应该所有的算子都支持了),currentChildFragmentParallelism 的真实 dop 需要考虑 fragment.pipelineDop。

开启 pipeline 时 numInstances * pipelineDop 的值等于开启 Pipeline 时的 numInstances 值,即总得并行度并不没有改变,只是之前没有 pipeline 时,并行度靠的是多个 Fragments 并行执行,现在开启 Pipeline 后,在一个 Fragment 中 dop 个线程去并发执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int inputFragmentIndex = 0;
int maxParallelism = 0;
for (int j = 0; j < fragment.getChildren().size(); j++) {
PlanFragment inputFragment =fragment.getChild(j);
int currentChildFragmentParallelism = fragmentExecParamsMap
.get(inputFragment.getFragmentId())
.instanceExecParams
.size();
if (dopAdaptionEnabled) {
currentChildFragmentParallelism *=
inputFragment.getPipelineDop();
}
if (currentChildFragmentParallelism > maxParallelism) {
maxParallelism = currentChildFragmentParallelism;
inputFragmentIndex = j;
}
}

T2. hostSet

计算 inputFragmentIndex 所处的 BE 集合 hostSet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
PlanFragmentId inputFragmentId
= fragment.getChild(inputFragmentIndex).getFragmentId();
FragmentExecParams maxParallelismFragmentExecParams
= fragmentExecParamsMap.get(inputFragmentId);
Set<TNetworkAddress> hostSet = Sets.newHashSet();

if (usedComputeNode) {
//...
} else {
if (isUnionFragment(fragment) && isGatherOutput) {
// union fragment use all children's host
// if output fragment isn't gather, all fragment must keep 1 instance
for (PlanFragment child : fragment.getChildren()) {
FragmentExecParams childParams =
fragmentExecParamsMap.get(child.getFragmentId());
childParams.instanceExecParams
.stream().map(e -> e.host).forEach(hostSet::add);
}
//make olapScan maxParallelism equals prefer compute node number
maxParallelism = hostSet.size() * fragment.getParallelExecNum();
} else {
for (FInstanceExecParam execParams :
maxParallelismFragmentExecParams.instanceExecParams) {
hostSet.add(execParams.host);
}
}
}

if (dopAdaptionEnabled) {
Preconditions.checkArgument(leftMostNode instanceof ExchangeNode);
maxParallelism = hostSet.size();
}

T3 params.instanceExecParams

现在需要根据 maxParallelism、parallel_exchange_instance_num 确定当前 PlanFragment 实例化为多少个(dop) FragmentInstance 对象:dop 取为 exchangeInstances(默认值是 4, 可以由 parallel_exchange_instance_num 更改)和子节点最大的并行度是 maxParallelism 中的较小值。

然后从 hostSet 中选出 dop 个 host 来作为每个 FragmentInsance 执行的 BE 节点。每个 FragmentInstance 的执行参数由 FInstanceExecParam 对象记录,params.instanceExecParams 的大小就是 PlanFragment 实例化为 FragmentInstance 的个数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int exchangeInstances = -1;
if (connectContext != null && connectContext.getSessionVariable() != null) {
exchangeInstances =
connectContext.getSessionVariable().getExchangeInstanceParallel();
}

if (exchangeInstances > 0 && maxParallelism > exchangeInstances) {
List<TNetworkAddress> hosts = Lists.newArrayList(hostSet);
Collections.shuffle(hosts, random);

for (int index = 0; index < exchangeInstances; index++) {
TNetworkAddress host = hosts.get(index % hosts.size());
FInstanceExecParam instanceParam = new FInstanceExecParam(
null, host, 0, params);
params.instanceExecParams.add(instanceParam);
}
} else {
List<TNetworkAddress> hosts = Lists.newArrayList(hostSet);
for (int index = 0; index < maxParallelism; ++index) {
TNetworkAddress host = hosts.get(index % hosts.size());
FInstanceExecParam instanceParam = new FInstanceExecParam(
null, host, 0, params);
params.instanceExecParams.add(instanceParam);
}
}

Collections.shuffle(params.instanceExecParams, random);
continue;

OlapScanNode

以 ExchangeNode 为 Souce 的 Fragment 的执行节点依赖于其 InputFragment 的 HostSet,最终依赖的都是以 OlapScanNode 为 Source 的 leftFragment。

由于 ColocateJOIN 和 BucketShuffleJOIN Fragment 的最左侧节点 leftMostNode 也是 OlapSacnNode,因此对于这种情况需要特殊处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int parallelExecInstanceNum = fragment.getParallelExecNum();
int pipelineDop = fragment.getPipelineDop();
boolean hasColocate =
isColocateFragment(fragment.getPlanRoot()) &&
fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId()) &&
fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0;
boolean hasBucketShuffle = isBucketShuffleJoin(fragment.getFragmentId().asInt());
if (hasColocate || hasBucketShuffle) {
computeColocatedJoinInstanceParam(
fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()),
fragmentIdBucketSeqToScanRangeMap.get(fragment.getFragmentId()),
parallelExecInstanceNum, pipelineDop, usePipeline, params);
computeBucketSeq2InstanceOrdinal(
params, fragmentIdToBucketNumMap.get(fragment.getFragmentId()));
} else {
//...
}

computeColocatedJoinInstanceParam

  1. 计算出每个 BE 节点上所有需要读取的 Tablet
    这是将所有待读取的 bucket 按照 BE 节点进行分类(即每个 BE 节点需要读取的 Tablets)存储于 addressToScanRanges。

    1
    2
    3
    4
    5
    6
    7
    8
    auto addressToScanRanges = Maps.newHashMap();
    for (auto bucketSeqAndScanRanges : bucketSeqToScanRange.entrySet()) {
    TNetworkAddress address =
    bucketSeqToAddress.get(bucketSeqAndScanRanges.getKey());
    addressToScanRanges
    .computeIfAbsent(address, k -> Lists.newArrayList())
    .add(bucketSeqAndScanRanges);
    }
  2. 每个节点上都有个 expectedInstanceNum,均分 addressToScanRange,使得每个 FragmentInstance 读取的 tablets 即 scanRangePerInstance

    开启了 pipeline 时,parallelExecInstanceNum 总是为 1,即只有一个 FragmentInstance 去执行 OlapScan,因此 scanRangesPerInstance 的大小还是 1。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    for (auto addressScanRange : addressToScanRanges.entrySet()) {
    auto scanRange = addressScanRange.getValue();

    int expectedInstanceNum = 1;
    if (parallelExecInstanceNum > 1) {
    expectedInstanceNum =
    Math.min(scanRange.size(), parallelExecInstanceNum);
    }

    // 2. split how many scanRange one instance should scan
    auto scanRangesPerInstance =
    ListUtil.splitBySize(scanRange, expectedInstanceNum);

    // 3.construct instanceExecParam add the scanRange should be scan by instance
    for (auto scanRangePerInstance : scanRangesPerInstance) {
    //...
    }
    }
  3. enableAssignScanRangesPerDriverSeq

    预设的 pipelineDop(即 fragment.pipelineDop)默认值是 CPU 核心数的一半,真实的并行度 expectedDop 取值为 scanRangePerInstance 和 pipelineDop 的最小值,再将 scanRangePerInstance 按照 expectedDop 均分,确保每个线程读取的 scanRange 尽可能均衡读取,数据不至于倾斜太多。

    enableAssignScanRangesPerDriverSeq 函数用于确定是否直接为每个 PipelineDriver 赋值对应的 Bucket 去读取,这样可以避免 LocalShuffle(即不用计算从 bucket 中读取的数据该发送到哪个 PipelineDriver)。只有当每个 BE 节点上的 scanRanges 都不小于 pipelineDop / 2 时,才该函数才会返回 true(也就是数据量较大时才会开启)。assignPerDriverSeq 为 true 时,下面两个对象生效:

    • bucketSeqToDriverSeq: 记录每个 bucket 对应的 PipelineDriver

    • nodeToPerDriverSeqScanRanges 映射关系是 {scanId, driverSeq, scanRange}

      一个 scanId 对应一个 OlapScanNode,将多个 OlapScanNode 要读取的scanRangePerInstance 就被划分为 expectedDop 份,每个 PipelineDriver 对应一个 Tablet 的 scanRange。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
     boolean assignPerDriverSeq = 
    enablePipeline &&
    addressToScanRanges.values().stream().allMatch(scanRanges ->
    enableAssignScanRangesPerDriverSeq(scanRanges, pipelineDop));

    for (auto scanRangePerInstance : scanRangesPerInstance) {
    FInstanceExecParam instanceParam = new FInstanceExecParam(
    null, addressScanRange.getKey(), 0, params);

    int expectedDop = Math.min(scanRangePerInstance.size(), pipelineDop);
    auto scanRangesPerDriverSeq =
    ListUtil.splitBySize(scanRangePerInstance, expectedDop);

    // 并行度比较高时
    if (assignPerDriverSeq) {
    instanceParam.pipelineDop = scanRangesPerDriverSeq.size();
    }

    // 针对每个 PipelineDriver
    for (int driverSeq = 0; driverSeq < scanRangesPerDriverSeq.size(); ++driverSeq) {
    scanRangesPerDriverSeq.get(driverSeq).forEach(bucketSeqAndScanRanges ->
    {
    if (assignPerDriverSeq) {
    // bucketSeqAndScanRanges 记录了每个 bucket 对应的 PiplineDriver
    instanceParam.bucketSeqToDriverSeq.putIfAbsent(
    bucketSeqAndScanRanges.getKey(), driverSeq);
    } else {
    instanceParam.bucketSeqToDriverSeq.putIfAbsent(
    bucketSeqAndScanRanges.getKey(), -1);
    }

    bucketSeqAndScanRanges.getValue().forEach((scanId, scanRanges) -> {
    // 记录了每个 OlapScanNode 细分到每个 PipelineDriver 需要读取的 scanRange
    if (assignPerDriverSeq) {
    instanceParam.nodeToPerDriverSeqScanRanges
    .computeIfAbsent(scanId, k -> new HashMap<>())
    .computeIfAbsent(driverSeq, k -> new ArrayList<>())
    .addAll(scanRanges);
    } else {
    instanceParam.perNodeScanRanges
    .computeIfAbsent(scanId, k -> new ArrayList<>())
    .addAll(scanRanges);
    }
    });
    });
    }

    if (assignPerDriverSeq) {
    instanceParam.nodeToPerDriverSeqScanRanges.forEach((scanId, perDriverSeqScanRanges) -> {
    for (int driverSeq = 0; driverSeq < instanceParam.pipelineDop; ++driverSeq) {
    perDriverSeqScanRanges.computeIfAbsent(driverSeq, k -> new ArrayList<>());
    }
    });
    }

    params.instanceExecParams.add(instanceParam);
    }

ShuffleJOIN

和上面的逻辑差不多,区别:这里一个 OlapScanNode 对应一个 FragmentInstance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
boolean assignScanRangesPerDriverSeq = 
usePipeline &&
(fragment.isAssignScanRangesPerDriverSeq() ||
fragment.isForceAssignScanRangesPerDriverSeq());
auto scanRangeAssignment =
fragmentExecParamsMap.get(fragment.getFragmentId()).scanRangeAssignment;
for (auto tNetworkAddressMapEntry : scanRangeAssignment.entrySet()) {
TNetworkAddress host = tNetworkAddressMapEntry.getKey();
Map<Integer, List<TScanRangeParams>> NodeAndScanRanges =
tNetworkAddressMapEntry.getValue();

// 1. Handle normal scan node firstly
for (Integer planNodeId : NodeAndScanRanges.keySet()) {
List<TScanRangeParams> perNodeScanRanges = NodeAndScanRanges.get(planNodeId);
int expectedInstanceNum =
Math.min(perNodeScanRanges.size(), parallelExecInstanceNum);
auto perInstanceScanRanges =
ListUtil.splitBySize(perNodeScanRanges, expectedInstanceNum);

for (List<TScanRangeParams> scanRangeParams : perInstanceScanRanges) {
FInstanceExecParam instanceParam = new FInstanceExecParam(
null, host, 0, params);
params.instanceExecParams.add(instanceParam);

boolean assignPerDriverSeq =
assignScanRangesPerDriverSeq &&
(enableAssignScanRangesPerDriverSeq(scanRangeParams, pipelineDop) ||
fragment.isForceAssignScanRangesPerDriverSeq());
if (!assignPerDriverSeq) {
instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams);
} else {
// 消除 localShuffle
int expectedDop = Math.max(
1, Math.min(pipelineDop, scanRangeParams.size()));
List<List<TScanRangeParams>> scanRangeParamsPerDriverSeq =
ListUtil.splitBySize(scanRangeParams, expectedDop);
instanceParam.pipelineDop = scanRangeParamsPerDriverSeq.size();
Map<Integer, List<TScanRangeParams>> scanRangesPerDriverSeq = new HashMap<>();
instanceParam.nodeToPerDriverSeqScanRanges.put(
planNodeId, scanRangesPerDriverSeq);
for (int driverSeq = 0; driverSeq < instanceParam.pipelineDop; ++driverSeq) {
scanRangesPerDriverSeq.put(
driverSeq, scanRangeParamsPerDriverSeq.get(driverSeq));
}
}
}
}
}

computeFragmentExecParams

TUniqueId

生成 TUniqueId

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
instanceIds.clear();
for (FragmentExecParams params : fragmentExecParamsMap.values()) {
if (params.fragment.getSink() instanceof ResultSink &&
params.instanceExecParams.size() > 1) {
throw new StarRocksPlannerException(
"This sql plan has multi result sinks",
ErrorType.INTERNAL_ERROR);
}

for (int j = 0; j < params.instanceExecParams.size(); ++j) {
TUniqueId instanceId = new TUniqueId();
instanceId.setHi(queryId.hi);
instanceId.setLo(queryId.lo + instanceIds.size() + 1);
params.instanceExecParams.get(j).instanceId = instanceId;
instanceIds.add(instanceId);
}
}

params.instanceExecParams

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
for (FragmentExecParams params : fragmentExecParamsMap.values()) {
if (params.fragment instanceof MultiCastPlanFragment) {
continue;
}
if (params.fragment.getDestFragment() == null) {
continue;
}
PlanFragment fragment = params.fragment;
PlanFragment destFragment = fragment.getDestFragment();
FragmentExecParams destParams =
fragmentExecParamsMap.get(destFragment.getFragmentId());

// set # of senders
fragment.getDestNode().setPartitionType(
fragment.getOutputPartition().getType());
DataSink sink = fragment.getSink();
if (sink instanceof DataStreamSink) {
DataStreamSink dataStreamSink = (DataStreamSink) sink;
dataStreamSink.setExchDop(destParams.fragment.getPipelineDop());
}

PlanNodeId exchId = sink.getExchNodeId();
if (destParams.perExchNumSenders.get(exchId.asInt()) == null) {
destParams.perExchNumSenders.put(
exchId.asInt(), params.instanceExecParams.size());
} else {
destParams.perExchNumSenders.put(exchId.asInt(),
params.instanceExecParams.size() +
destParams.perExchNumSenders.get(exchId.asInt()));
}
//...
}

params.destinations

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
if (needScheduleByShuffleJoin(destFragment.getFragmentId().asInt(), sink)) {
int bucketSeq = 0;
int bucketNum = getFragmentBucketNum(destFragment.getFragmentId());
TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0);

while (bucketSeq < bucketNum) {
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = new TUniqueId(-1, -1);
dest.server = dummyServer;
dest.setBrpc_server(dummyServer);

for (FInstanceExecParam instanceExecParams : destParams.instanceExecParams) {
Integer driverSeq = instanceExecParams.bucketSeqToDriverSeq.get(bucketSeq);
if (driverSeq != null) {
// 设置目标 {fragment_instance_id, driverSeq}
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
dest.setBrpc_server(toBrpcHost(instanceExecParams.host));
if (driverSeq != FInstanceExecParam.ABSENT_DRIVER_SEQUENCE) {
dest.setPipeline_driver_sequence(driverSeq);
}
break;
}
}
bucketSeq++;
params.destinations.add(dest);
}
} else {
for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
dest.setBrpc_server(toBrpcHost(destParams.instanceExecParams.get(j).host));
params.destinations.add(dest);
}
}