PlanFragmentBuilder 将 PhysicalPlan 转化为 ExecPlan 后得到一堆 Fragments,需要在 Coordinator 中将 Fragment 实例化为 FragmentInstance,设置具体的执行计划与 BE 节点参数。
computeScanRangeAssignment
computeFragmentHosts computeFragmentHosts 函数是为每个 Fragment 分配运行的 BE 节点。Fragment 的 root_node 要么是 ExchangeNode,要么是 OlapScanNode,具体还要设置一些信息。
computeFragmentHosts 是按照从 childFragment(叶结点) 到 rootFragment(根节点)逆序遍历,这样父节点可以根据子节点选择 BE。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private void computeFragmentHosts () throws Exception { for (int i = fragments.size() - 1 ; i >= 0 ; --i) { PlanFragment fragment = fragments.get(i); FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId()); boolean dopAdaptionEnabled = usePipeline && connectContext.getSessionVariable().isPipelineDopAdaptionEnabled(); if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) { continue ; } PlanNode leftMostNode = findLeftmostNode(fragment.getPlanRoot()); if (!(leftMostNode instanceof ScanNode)) { continue ; } } }
DataPartition.UNPARTITIONED 当 fragment 的 DataSink 分区方式是 DataPartition.UNPARTITIONED 时,表示将数据全部输出到 destinations,此时为这个 Fragment 选择在哪个BE节点执行时,通过 SimpleScheduler.getBackendHost 方法从 BE节点中选择一个。
Sink 分区方式是 DataPartition.UNPARTITIONED,应该只有 BROADCAST-JOIN 和 ResultSink 等几种,为啥这里不需要考虑输入,随机选择一个 BE节点去执行 ?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) { Reference<Long> backendIdRef = new Reference <>(); TNetworkAddress execHostport; if (usedComputeNode) { execHostport = SimpleScheduler.getComputeNodeHost( this .idToComputeNode, backendIdRef); } else { execHostport = SimpleScheduler.getBackendHost( this .idToBackend, backendIdRef); } if (execHostport == null ) { throw new UserException ( "Backend not found. Check if any backend is down or not. " + backendInfosString(usedComputeNode)); } recordUsedBackend(execHostport, backendIdRef.getRef()); FInstanceExecParam instanceParam = new FInstanceExecParam ( null , execHostport, 0 , params); params.instanceExecParams.add(instanceParam); continue ; }
ExchangeNode 当 Fragment 的最左叶结点是 ExchangeNode 时,将当前 Fragment 执行节点设置为其 InputFragments 中 instance_num 最大的 inputFragment 所处的 BE 节点上。这样在 BE 端在 ragmentInstacne 之间 RPC 通信时,可以减少跨进行通信。主要有如下三步:
T1 maxParallelism 寻找 InputFragments 中最大并行度 (maxParallelism) 的位置 (inputFragmentIndex)
currentChildFragmentParallelism 初始值是 instanceExecParams 的值,在 dopAdaptionEnabled 为 true 时(即 Fragment 的所有 Operator 都能 decompose_to_pipeline,目前应该所有的算子都支持了),currentChildFragmentParallelism 的真实 dop 需要考虑 fragment.pipelineDop。
开启 pipeline 时 numInstances * pipelineDop 的值等于开启 Pipeline 时的 numInstances 值,即总得并行度并不没有改变,只是之前没有 pipeline 时,并行度靠的是多个 Fragments 并行执行,现在开启 Pipeline 后,在一个 Fragment 中 dop 个线程去并发执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 int inputFragmentIndex = 0 ;int maxParallelism = 0 ;for (int j = 0 ; j < fragment.getChildren().size(); j++) { PlanFragment inputFragment = fragment.getChild(j); int currentChildFragmentParallelism = fragmentExecParamsMap .get(inputFragment.getFragmentId()) .instanceExecParams .size(); if (dopAdaptionEnabled) { currentChildFragmentParallelism *= inputFragment.getPipelineDop(); } if (currentChildFragmentParallelism > maxParallelism) { maxParallelism = currentChildFragmentParallelism; inputFragmentIndex = j; } }
T2. hostSet 计算 inputFragmentIndex 所处的 BE 集合 hostSet
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 PlanFragmentId inputFragmentId = fragment.getChild(inputFragmentIndex).getFragmentId(); FragmentExecParams maxParallelismFragmentExecParams = fragmentExecParamsMap.get(inputFragmentId); Set<TNetworkAddress> hostSet = Sets.newHashSet(); if (usedComputeNode) { } else { if (isUnionFragment(fragment) && isGatherOutput) { for (PlanFragment child : fragment.getChildren()) { FragmentExecParams childParams = fragmentExecParamsMap.get(child.getFragmentId()); childParams.instanceExecParams .stream().map(e -> e.host).forEach(hostSet::add); } maxParallelism = hostSet.size() * fragment.getParallelExecNum(); } else { for (FInstanceExecParam execParams : maxParallelismFragmentExecParams.instanceExecParams) { hostSet.add(execParams.host); } } } if (dopAdaptionEnabled) { Preconditions.checkArgument(leftMostNode instanceof ExchangeNode); maxParallelism = hostSet.size(); }
T3 params.instanceExecParams 现在需要根据 maxParallelism、parallel_exchange_instance_num 确定当前 PlanFragment 实例化为多少个(dop) FragmentInstance 对象:dop 取为 exchangeInstances(默认值是 4, 可以由 parallel_exchange_instance_num 更改)和子节点最大的并行度是 maxParallelism 中的较小值。
然后从 hostSet 中选出 dop 个 host 来作为每个 FragmentInsance 执行的 BE 节点。每个 FragmentInstance 的执行参数由 FInstanceExecParam 对象记录,params.instanceExecParams 的大小就是 PlanFragment 实例化为 FragmentInstance 的个数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 int exchangeInstances = -1 ;if (connectContext != null && connectContext.getSessionVariable() != null ) { exchangeInstances = connectContext.getSessionVariable().getExchangeInstanceParallel(); } if (exchangeInstances > 0 && maxParallelism > exchangeInstances) { List<TNetworkAddress> hosts = Lists.newArrayList(hostSet); Collections.shuffle(hosts, random); for (int index = 0 ; index < exchangeInstances; index++) { TNetworkAddress host = hosts.get(index % hosts.size()); FInstanceExecParam instanceParam = new FInstanceExecParam ( null , host, 0 , params); params.instanceExecParams.add(instanceParam); } } else { List<TNetworkAddress> hosts = Lists.newArrayList(hostSet); for (int index = 0 ; index < maxParallelism; ++index) { TNetworkAddress host = hosts.get(index % hosts.size()); FInstanceExecParam instanceParam = new FInstanceExecParam ( null , host, 0 , params); params.instanceExecParams.add(instanceParam); } } Collections.shuffle(params.instanceExecParams, random); continue ;
OlapScanNode 以 ExchangeNode 为 Souce 的 Fragment 的执行节点依赖于其 InputFragment 的 HostSet,最终依赖的都是以 OlapScanNode 为 Source 的 leftFragment。
由于 ColocateJOIN 和 BucketShuffleJOIN Fragment 的最左侧节点 leftMostNode 也是 OlapSacnNode,因此对于这种情况需要特殊处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 int parallelExecInstanceNum = fragment.getParallelExecNum();int pipelineDop = fragment.getPipelineDop();boolean hasColocate = isColocateFragment(fragment.getPlanRoot()) && fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId()) && fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0 ; boolean hasBucketShuffle = isBucketShuffleJoin(fragment.getFragmentId().asInt());if (hasColocate || hasBucketShuffle) { computeColocatedJoinInstanceParam( fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()), fragmentIdBucketSeqToScanRangeMap.get(fragment.getFragmentId()), parallelExecInstanceNum, pipelineDop, usePipeline, params); computeBucketSeq2InstanceOrdinal( params, fragmentIdToBucketNumMap.get(fragment.getFragmentId())); } else { }
computeColocatedJoinInstanceParam
计算出每个 BE 节点上所有需要读取的 Tablet 这是将所有待读取的 bucket 按照 BE 节点进行分类(即每个 BE 节点需要读取的 Tablets)存储于 addressToScanRanges。
1 2 3 4 5 6 7 8 auto addressToScanRanges = Maps.newHashMap();for (auto bucketSeqAndScanRanges : bucketSeqToScanRange.entrySet()) { TNetworkAddress address = bucketSeqToAddress.get(bucketSeqAndScanRanges.getKey()); addressToScanRanges .computeIfAbsent(address, k -> Lists.newArrayList()) .add(bucketSeqAndScanRanges); }
每个节点上都有个 expectedInstanceNum,均分 addressToScanRange,使得每个 FragmentInstance 读取的 tablets 即 scanRangePerInstance
开启了 pipeline 时,parallelExecInstanceNum 总是为 1,即只有一个 FragmentInstance 去执行 OlapScan,因此 scanRangesPerInstance 的大小还是 1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 for (auto addressScanRange : addressToScanRanges.entrySet()) { auto scanRange = addressScanRange.getValue(); int expectedInstanceNum = 1 ; if (parallelExecInstanceNum > 1 ) { expectedInstanceNum = Math.min(scanRange.size(), parallelExecInstanceNum); } auto scanRangesPerInstance = ListUtil.splitBySize(scanRange, expectedInstanceNum); for (auto scanRangePerInstance : scanRangesPerInstance) { } }
enableAssignScanRangesPerDriverSeq
预设的 pipelineDop(即 fragment.pipelineDop)默认值是 CPU 核心数的一半,真实的并行度 expectedDop 取值为 scanRangePerInstance 和 pipelineDop 的最小值,再将 scanRangePerInstance 按照 expectedDop 均分,确保每个线程读取的 scanRange 尽可能均衡读取,数据不至于倾斜太多。
enableAssignScanRangesPerDriverSeq 函数用于确定是否直接为每个 PipelineDriver 赋值对应的 Bucket 去读取,这样可以避免 LocalShuffle(即不用计算从 bucket 中读取的数据该发送到哪个 PipelineDriver)。只有当每个 BE 节点上的 scanRanges 都不小于 pipelineDop / 2 时,才该函数才会返回 true(也就是数据量较大时才会开启)。assignPerDriverSeq 为 true 时,下面两个对象生效:
bucketSeqToDriverSeq: 记录每个 bucket 对应的 PipelineDriver
nodeToPerDriverSeqScanRanges 映射关系是 {scanId, driverSeq, scanRange}
一个 scanId 对应一个 OlapScanNode,将多个 OlapScanNode 要读取的scanRangePerInstance 就被划分为 expectedDop 份,每个 PipelineDriver 对应一个 Tablet 的 scanRange。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 boolean assignPerDriverSeq = enablePipeline && addressToScanRanges.values().stream().allMatch(scanRanges -> enableAssignScanRangesPerDriverSeq(scanRanges, pipelineDop)); for (auto scanRangePerInstance : scanRangesPerInstance) { FInstanceExecParam instanceParam = new FInstanceExecParam ( null , addressScanRange.getKey(), 0 , params); int expectedDop = Math.min(scanRangePerInstance.size(), pipelineDop); auto scanRangesPerDriverSeq = ListUtil.splitBySize(scanRangePerInstance, expectedDop); if (assignPerDriverSeq) { instanceParam.pipelineDop = scanRangesPerDriverSeq.size(); } for (int driverSeq = 0 ; driverSeq < scanRangesPerDriverSeq.size(); ++driverSeq) { scanRangesPerDriverSeq.get(driverSeq).forEach(bucketSeqAndScanRanges -> { if (assignPerDriverSeq) { instanceParam.bucketSeqToDriverSeq.putIfAbsent( bucketSeqAndScanRanges.getKey(), driverSeq); } else { instanceParam.bucketSeqToDriverSeq.putIfAbsent( bucketSeqAndScanRanges.getKey(), -1 ); } bucketSeqAndScanRanges.getValue().forEach((scanId, scanRanges) -> { if (assignPerDriverSeq) { instanceParam.nodeToPerDriverSeqScanRanges .computeIfAbsent(scanId, k -> new HashMap <>()) .computeIfAbsent(driverSeq, k -> new ArrayList <>()) .addAll(scanRanges); } else { instanceParam.perNodeScanRanges .computeIfAbsent(scanId, k -> new ArrayList <>()) .addAll(scanRanges); } }); }); } if (assignPerDriverSeq) { instanceParam.nodeToPerDriverSeqScanRanges.forEach((scanId, perDriverSeqScanRanges) -> { for (int driverSeq = 0 ; driverSeq < instanceParam.pipelineDop; ++driverSeq) { perDriverSeqScanRanges.computeIfAbsent(driverSeq, k -> new ArrayList <>()); } }); } params.instanceExecParams.add(instanceParam); }
ShuffleJOIN 和上面的逻辑差不多,区别:这里一个 OlapScanNode 对应一个 FragmentInstance
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 boolean assignScanRangesPerDriverSeq = usePipeline && (fragment.isAssignScanRangesPerDriverSeq() || fragment.isForceAssignScanRangesPerDriverSeq()); auto scanRangeAssignment = fragmentExecParamsMap.get(fragment.getFragmentId()).scanRangeAssignment; for (auto tNetworkAddressMapEntry : scanRangeAssignment.entrySet()) { TNetworkAddress host = tNetworkAddressMapEntry.getKey(); Map<Integer, List<TScanRangeParams>> NodeAndScanRanges = tNetworkAddressMapEntry.getValue(); for (Integer planNodeId : NodeAndScanRanges.keySet()) { List<TScanRangeParams> perNodeScanRanges = NodeAndScanRanges.get(planNodeId); int expectedInstanceNum = Math.min(perNodeScanRanges.size(), parallelExecInstanceNum); auto perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges, expectedInstanceNum); for (List<TScanRangeParams> scanRangeParams : perInstanceScanRanges) { FInstanceExecParam instanceParam = new FInstanceExecParam ( null , host, 0 , params); params.instanceExecParams.add(instanceParam); boolean assignPerDriverSeq = assignScanRangesPerDriverSeq && (enableAssignScanRangesPerDriverSeq(scanRangeParams, pipelineDop) || fragment.isForceAssignScanRangesPerDriverSeq()); if (!assignPerDriverSeq) { instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams); } else { int expectedDop = Math.max( 1 , Math.min(pipelineDop, scanRangeParams.size())); List<List<TScanRangeParams>> scanRangeParamsPerDriverSeq = ListUtil.splitBySize(scanRangeParams, expectedDop); instanceParam.pipelineDop = scanRangeParamsPerDriverSeq.size(); Map<Integer, List<TScanRangeParams>> scanRangesPerDriverSeq = new HashMap <>(); instanceParam.nodeToPerDriverSeqScanRanges.put( planNodeId, scanRangesPerDriverSeq); for (int driverSeq = 0 ; driverSeq < instanceParam.pipelineDop; ++driverSeq) { scanRangesPerDriverSeq.put( driverSeq, scanRangeParamsPerDriverSeq.get(driverSeq)); } } } } }
computeFragmentExecParams TUniqueId 生成 TUniqueId
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 instanceIds.clear(); for (FragmentExecParams params : fragmentExecParamsMap.values()) { if (params.fragment.getSink() instanceof ResultSink && params.instanceExecParams.size() > 1 ) { throw new StarRocksPlannerException ( "This sql plan has multi result sinks" , ErrorType.INTERNAL_ERROR); } for (int j = 0 ; j < params.instanceExecParams.size(); ++j) { TUniqueId instanceId = new TUniqueId (); instanceId.setHi(queryId.hi); instanceId.setLo(queryId.lo + instanceIds.size() + 1 ); params.instanceExecParams.get(j).instanceId = instanceId; instanceIds.add(instanceId); } }
params.instanceExecParams 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 for (FragmentExecParams params : fragmentExecParamsMap.values()) { if (params.fragment instanceof MultiCastPlanFragment) { continue ; } if (params.fragment.getDestFragment() == null ) { continue ; } PlanFragment fragment = params.fragment; PlanFragment destFragment = fragment.getDestFragment(); FragmentExecParams destParams = fragmentExecParamsMap.get(destFragment.getFragmentId()); fragment.getDestNode().setPartitionType( fragment.getOutputPartition().getType()); DataSink sink = fragment.getSink(); if (sink instanceof DataStreamSink) { DataStreamSink dataStreamSink = (DataStreamSink) sink; dataStreamSink.setExchDop(destParams.fragment.getPipelineDop()); } PlanNodeId exchId = sink.getExchNodeId(); if (destParams.perExchNumSenders.get(exchId.asInt()) == null ) { destParams.perExchNumSenders.put( exchId.asInt(), params.instanceExecParams.size()); } else { destParams.perExchNumSenders.put(exchId.asInt(), params.instanceExecParams.size() + destParams.perExchNumSenders.get(exchId.asInt())); } }
params.destinations 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 if (needScheduleByShuffleJoin(destFragment.getFragmentId().asInt(), sink)) { int bucketSeq = 0 ; int bucketNum = getFragmentBucketNum(destFragment.getFragmentId()); TNetworkAddress dummyServer = new TNetworkAddress ("0.0.0.0" , 0 ); while (bucketSeq < bucketNum) { TPlanFragmentDestination dest = new TPlanFragmentDestination (); dest.fragment_instance_id = new TUniqueId (-1 , -1 ); dest.server = dummyServer; dest.setBrpc_server(dummyServer); for (FInstanceExecParam instanceExecParams : destParams.instanceExecParams) { Integer driverSeq = instanceExecParams.bucketSeqToDriverSeq.get(bucketSeq); if (driverSeq != null ) { dest.fragment_instance_id = instanceExecParams.instanceId; dest.server = toRpcHost(instanceExecParams.host); dest.setBrpc_server(toBrpcHost(instanceExecParams.host)); if (driverSeq != FInstanceExecParam.ABSENT_DRIVER_SEQUENCE) { dest.setPipeline_driver_sequence(driverSeq); } break ; } } bucketSeq++; params.destinations.add(dest); } } else { for (int j = 0 ; j < destParams.instanceExecParams.size(); ++j) { TPlanFragmentDestination dest = new TPlanFragmentDestination (); dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId; dest.server = toRpcHost(destParams.instanceExecParams.get(j).host); dest.setBrpc_server(toBrpcHost(destParams.instanceExecParams.get(j).host)); params.destinations.add(dest); } }