PlanFragmentBuilder 将 PhysicalPlan 转化为 ExecPlan 后得到一堆 Fragments,需要在 Coordinator 中将 Fragment 实例化为 FragmentInstance,设置具体的执行计划与 BE 节点参数。
computeScanRangeAssignment 
computeFragmentHosts computeFragmentHosts 函数是为每个 Fragment 分配运行的 BE 节点。Fragment 的 root_node 要么是 ExchangeNode,要么是 OlapScanNode,具体还要设置一些信息。
computeFragmentHosts 是按照从 childFragment(叶结点) 到 rootFragment(根节点)逆序遍历,这样父节点可以根据子节点选择 BE。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private  void  computeFragmentHosts ()  throws  Exception {         for  (int  i  =  fragments.size() - 1 ; i >= 0 ; --i) {         PlanFragment  fragment  =  fragments.get(i);         FragmentExecParams  params  =               fragmentExecParamsMap.get(fragment.getFragmentId());         boolean  dopAdaptionEnabled  =  usePipeline &&             connectContext.getSessionVariable().isPipelineDopAdaptionEnabled();                  if  (fragment.getDataPartition() == DataPartition.UNPARTITIONED) {                          continue ;         }                  PlanNode  leftMostNode  =  findLeftmostNode(fragment.getPlanRoot());         if  (!(leftMostNode instanceof  ScanNode)) {                          continue ;         }              } } 
DataPartition.UNPARTITIONED 当 fragment 的 DataSink 分区方式是 DataPartition.UNPARTITIONED 时,表示将数据全部输出到 destinations,此时为这个 Fragment 选择在哪个BE节点执行时,通过 SimpleScheduler.getBackendHost 方法从 BE节点中选择一个。
Sink 分区方式是 DataPartition.UNPARTITIONED,应该只有 BROADCAST-JOIN 和 ResultSink 等几种,为啥这里不需要考虑输入,随机选择一个 BE节点去执行 ?
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 if  (fragment.getDataPartition() == DataPartition.UNPARTITIONED) {    Reference<Long> backendIdRef = new  Reference <>();     TNetworkAddress execHostport;     if  (usedComputeNode) {         execHostport = SimpleScheduler.getComputeNodeHost(             this .idToComputeNode, backendIdRef);     } else  {         execHostport = SimpleScheduler.getBackendHost(             this .idToBackend, backendIdRef);     }     if  (execHostport == null ) {         throw  new  UserException (             "Backend not found. Check if any backend is down or not. "  +              backendInfosString(usedComputeNode));     }     recordUsedBackend(execHostport, backendIdRef.getRef());     FInstanceExecParam  instanceParam  =  new  FInstanceExecParam (                 null , execHostport, 0 , params);     params.instanceExecParams.add(instanceParam);     continue ; } 
ExchangeNode 当 Fragment 的最左叶结点是 ExchangeNode 时,将当前 Fragment 执行节点设置为其 InputFragments 中 instance_num 最大的 inputFragment 所处的 BE 节点上。这样在 BE 端在 ragmentInstacne 之间 RPC 通信时,可以减少跨进行通信。主要有如下三步:
T1 maxParallelism 寻找 InputFragments 中最大并行度 (maxParallelism) 的位置 (inputFragmentIndex)
currentChildFragmentParallelism 初始值是 instanceExecParams 的值,在 dopAdaptionEnabled 为 true 时(即 Fragment 的所有 Operator 都能 decompose_to_pipeline,目前应该所有的算子都支持了),currentChildFragmentParallelism 的真实 dop 需要考虑 fragment.pipelineDop。
开启 pipeline 时 numInstances * pipelineDop 的值等于开启 Pipeline 时的 numInstances 值,即总得并行度并不没有改变,只是之前没有 pipeline 时,并行度靠的是多个 Fragments 并行执行,现在开启 Pipeline 后,在一个 Fragment 中 dop 个线程去并发执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 int  inputFragmentIndex  =  0 ;int  maxParallelism  =  0 ;for  (int  j  =  0 ; j < fragment.getChildren().size(); j++) {    PlanFragment  inputFragment  = fragment.getChild(j);      int  currentChildFragmentParallelism  =  fragmentExecParamsMap             .get(inputFragment.getFragmentId())             .instanceExecParams             .size();     if  (dopAdaptionEnabled) {         currentChildFragmentParallelism *=              inputFragment.getPipelineDop();     }     if  (currentChildFragmentParallelism > maxParallelism) {         maxParallelism = currentChildFragmentParallelism;         inputFragmentIndex = j;     } } 
T2. hostSet 计算 inputFragmentIndex 所处的 BE 集合 hostSet
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 PlanFragmentId  inputFragmentId     =  fragment.getChild(inputFragmentIndex).getFragmentId(); FragmentExecParams  maxParallelismFragmentExecParams      =  fragmentExecParamsMap.get(inputFragmentId); Set<TNetworkAddress> hostSet = Sets.newHashSet(); if  (usedComputeNode) {     } else  {     if  (isUnionFragment(fragment) && isGatherOutput) {                           for  (PlanFragment child : fragment.getChildren()) {             FragmentExecParams  childParams  =                   fragmentExecParamsMap.get(child.getFragmentId());             childParams.instanceExecParams                 .stream().map(e -> e.host).forEach(hostSet::add);         }                  maxParallelism = hostSet.size() * fragment.getParallelExecNum();     } else  {         for  (FInstanceExecParam execParams :                 maxParallelismFragmentExecParams.instanceExecParams) {             hostSet.add(execParams.host);         }     } } if  (dopAdaptionEnabled) {    Preconditions.checkArgument(leftMostNode instanceof  ExchangeNode);     maxParallelism = hostSet.size(); } 
T3 params.instanceExecParams 现在需要根据 maxParallelism、parallel_exchange_instance_num 确定当前 PlanFragment 实例化为多少个(dop) FragmentInstance 对象:dop 取为 exchangeInstances(默认值是 4, 可以由 parallel_exchange_instance_num 更改)和子节点最大的并行度是 maxParallelism 中的较小值。
然后从 hostSet 中选出 dop 个 host 来作为每个 FragmentInsance 执行的 BE 节点。每个 FragmentInstance 的执行参数由 FInstanceExecParam 对象记录,params.instanceExecParams 的大小就是 PlanFragment 实例化为 FragmentInstance 的个数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 int  exchangeInstances  =  -1 ;if  (connectContext != null  && connectContext.getSessionVariable() != null ) {    exchangeInstances =          connectContext.getSessionVariable().getExchangeInstanceParallel(); } if  (exchangeInstances > 0  && maxParallelism > exchangeInstances) {    List<TNetworkAddress> hosts = Lists.newArrayList(hostSet);     Collections.shuffle(hosts, random);     for  (int  index  =  0 ; index < exchangeInstances; index++) {         TNetworkAddress  host  =  hosts.get(index % hosts.size());         FInstanceExecParam  instanceParam  =  new  FInstanceExecParam (             null , host, 0 , params);         params.instanceExecParams.add(instanceParam);     } } else  {     List<TNetworkAddress> hosts = Lists.newArrayList(hostSet);     for  (int  index  =  0 ; index < maxParallelism; ++index) {         TNetworkAddress  host  =  hosts.get(index % hosts.size());         FInstanceExecParam  instanceParam  =  new  FInstanceExecParam (             null , host, 0 , params);         params.instanceExecParams.add(instanceParam);     } } Collections.shuffle(params.instanceExecParams, random); continue ;
OlapScanNode 以 ExchangeNode 为 Souce 的 Fragment 的执行节点依赖于其 InputFragment 的 HostSet,最终依赖的都是以 OlapScanNode 为 Source 的 leftFragment。
由于 ColocateJOIN 和 BucketShuffleJOIN Fragment 的最左侧节点 leftMostNode 也是 OlapSacnNode,因此对于这种情况需要特殊处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 int  parallelExecInstanceNum  =  fragment.getParallelExecNum();int  pipelineDop  =  fragment.getPipelineDop();boolean  hasColocate  =      isColocateFragment(fragment.getPlanRoot()) &&     fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId()) &&      fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0 ; boolean  hasBucketShuffle  =  isBucketShuffleJoin(fragment.getFragmentId().asInt());if  (hasColocate || hasBucketShuffle) {    computeColocatedJoinInstanceParam(         fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()),         fragmentIdBucketSeqToScanRangeMap.get(fragment.getFragmentId()),         parallelExecInstanceNum, pipelineDop, usePipeline, params);     computeBucketSeq2InstanceOrdinal(         params, fragmentIdToBucketNumMap.get(fragment.getFragmentId())); } else  {       } 
computeColocatedJoinInstanceParam 
计算出每个 BE 节点上所有需要读取的 Tablet
 1 2 3 4 5 6 7 8 auto  addressToScanRanges  =  Maps.newHashMap();for  (auto bucketSeqAndScanRanges : bucketSeqToScanRange.entrySet()) {  TNetworkAddress  address  =         bucketSeqToAddress.get(bucketSeqAndScanRanges.getKey());   addressToScanRanges       .computeIfAbsent(address, k -> Lists.newArrayList())       .add(bucketSeqAndScanRanges); } 
每个节点上都有个 expectedInstanceNum,均分 addressToScanRange,使得每个 FragmentInstance 读取的 tablets 即 scanRangePerInstance
 开启了 pipeline 时,parallelExecInstanceNum 总是为 1,即只有一个 FragmentInstance 去执行 OlapScan,因此 scanRangesPerInstance 的大小还是 1。
 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 for  (auto addressScanRange : addressToScanRanges.entrySet()) {    auto  scanRange  =  addressScanRange.getValue();     int  expectedInstanceNum  =  1 ;     if  (parallelExecInstanceNum > 1 ) {         expectedInstanceNum =              Math.min(scanRange.size(), parallelExecInstanceNum);     }          auto  scanRangesPerInstance  =           ListUtil.splitBySize(scanRange, expectedInstanceNum);          for  (auto scanRangePerInstance : scanRangesPerInstance) {              } } 
enableAssignScanRangesPerDriverSeq
预设的 pipelineDop(即 fragment.pipelineDop)默认值是 CPU 核心数的一半,真实的并行度 expectedDop 取值为 scanRangePerInstance 和 pipelineDop 的最小值,再将 scanRangePerInstance 按照 expectedDop 均分,确保每个线程读取的 scanRange 尽可能均衡读取,数据不至于倾斜太多。
enableAssignScanRangesPerDriverSeq  函数用于确定是否直接为每个 PipelineDriver 赋值对应的 Bucket 去读取,这样可以避免 LocalShuffle(即不用计算从 bucket 中读取的数据该发送到哪个 PipelineDriver)。只有当每个 BE 节点上的 scanRanges 都不小于 pipelineDop / 2 时,才该函数才会返回 true(也就是数据量较大时才会开启)。assignPerDriverSeq 为 true 时,下面两个对象生效:
bucketSeqToDriverSeq: 记录每个 bucket 对应的 PipelineDriver
nodeToPerDriverSeqScanRanges 映射关系是 {scanId, driverSeq, scanRange}
一个 scanId 对应一个 OlapScanNode,将多个 OlapScanNode 要读取的scanRangePerInstance 就被划分为 expectedDop 份,每个 PipelineDriver 对应一个 Tablet 的 scanRange。
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57  boolean  assignPerDriverSeq  =        enablePipeline &&       addressToScanRanges.values().stream().allMatch(scanRanges ->           enableAssignScanRangesPerDriverSeq(scanRanges, pipelineDop)); for  (auto scanRangePerInstance : scanRangesPerInstance) {   FInstanceExecParam  instanceParam  =  new  FInstanceExecParam (              null , addressScanRange.getKey(), 0 , params);    int  expectedDop  =  Math.min(scanRangePerInstance.size(), pipelineDop);    auto  scanRangesPerDriverSeq  =             ListUtil.splitBySize(scanRangePerInstance, expectedDop);        if  (assignPerDriverSeq) {        instanceParam.pipelineDop = scanRangesPerDriverSeq.size();    }        for  (int  driverSeq  =  0 ; driverSeq < scanRangesPerDriverSeq.size(); ++driverSeq) {      scanRangesPerDriverSeq.get(driverSeq).forEach(bucketSeqAndScanRanges ->       {          if  (assignPerDriverSeq) {                          instanceParam.bucketSeqToDriverSeq.putIfAbsent(                 bucketSeqAndScanRanges.getKey(), driverSeq);          } else  {             instanceParam.bucketSeqToDriverSeq.putIfAbsent(                 bucketSeqAndScanRanges.getKey(), -1 );          }          bucketSeqAndScanRanges.getValue().forEach((scanId, scanRanges) -> {                        if  (assignPerDriverSeq) {               instanceParam.nodeToPerDriverSeqScanRanges                  .computeIfAbsent(scanId, k -> new  HashMap <>())                  .computeIfAbsent(driverSeq, k -> new  ArrayList <>())                  .addAll(scanRanges);            } else  {              instanceParam.perNodeScanRanges                  .computeIfAbsent(scanId, k -> new  ArrayList <>())                  .addAll(scanRanges);            }          });      });    }      if  (assignPerDriverSeq) {      instanceParam.nodeToPerDriverSeqScanRanges.forEach((scanId, perDriverSeqScanRanges) -> {       for  (int  driverSeq  =  0 ; driverSeq < instanceParam.pipelineDop; ++driverSeq) {          perDriverSeqScanRanges.computeIfAbsent(driverSeq, k -> new  ArrayList <>());       }      });    }    params.instanceExecParams.add(instanceParam); } 
 
ShuffleJOIN 和上面的逻辑差不多,区别:这里一个 OlapScanNode 对应一个 FragmentInstance
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 boolean  assignScanRangesPerDriverSeq  =         usePipeline &&       (fragment.isAssignScanRangesPerDriverSeq() ||         fragment.isForceAssignScanRangesPerDriverSeq()); auto  scanRangeAssignment  =     fragmentExecParamsMap.get(fragment.getFragmentId()).scanRangeAssignment; for  (auto tNetworkAddressMapEntry : scanRangeAssignment.entrySet()) {  TNetworkAddress  host  =  tNetworkAddressMapEntry.getKey();   Map<Integer, List<TScanRangeParams>> NodeAndScanRanges =      tNetworkAddressMapEntry.getValue();      for  (Integer planNodeId : NodeAndScanRanges.keySet()) {     List<TScanRangeParams> perNodeScanRanges = NodeAndScanRanges.get(planNodeId);     int  expectedInstanceNum  =          Math.min(perNodeScanRanges.size(), parallelExecInstanceNum);     auto  perInstanceScanRanges  =          ListUtil.splitBySize(perNodeScanRanges, expectedInstanceNum);     for  (List<TScanRangeParams> scanRangeParams : perInstanceScanRanges) {         FInstanceExecParam  instanceParam  =  new  FInstanceExecParam (              null , host, 0 , params);         params.instanceExecParams.add(instanceParam);         boolean  assignPerDriverSeq  =             assignScanRangesPerDriverSeq &&           (enableAssignScanRangesPerDriverSeq(scanRangeParams, pipelineDop) ||             fragment.isForceAssignScanRangesPerDriverSeq());         if  (!assignPerDriverSeq) {           instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams);         } else  {                      int  expectedDop  =  Math.max(                1 , Math.min(pipelineDop, scanRangeParams.size()));           List<List<TScanRangeParams>> scanRangeParamsPerDriverSeq =                 ListUtil.splitBySize(scanRangeParams, expectedDop);           instanceParam.pipelineDop = scanRangeParamsPerDriverSeq.size();           Map<Integer, List<TScanRangeParams>> scanRangesPerDriverSeq = new  HashMap <>();           instanceParam.nodeToPerDriverSeqScanRanges.put(                  planNodeId, scanRangesPerDriverSeq);           for  (int  driverSeq  =  0 ; driverSeq < instanceParam.pipelineDop; ++driverSeq) {                 scanRangesPerDriverSeq.put(                    driverSeq, scanRangeParamsPerDriverSeq.get(driverSeq));           }         }     }   } } 
computeFragmentExecParams TUniqueId 生成 TUniqueId
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 instanceIds.clear(); for  (FragmentExecParams params : fragmentExecParamsMap.values()) {   if  (params.fragment.getSink() instanceof  ResultSink &&         params.instanceExecParams.size() > 1 ) {        throw  new  StarRocksPlannerException (             "This sql plan has multi result sinks" ,             ErrorType.INTERNAL_ERROR);    }    for  (int  j  =  0 ; j < params.instanceExecParams.size(); ++j) {        TUniqueId  instanceId  =  new  TUniqueId ();        instanceId.setHi(queryId.hi);        instanceId.setLo(queryId.lo + instanceIds.size() + 1 );        params.instanceExecParams.get(j).instanceId = instanceId;        instanceIds.add(instanceId);    } } 
params.instanceExecParams 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 for  (FragmentExecParams params : fragmentExecParamsMap.values()) {   if  (params.fragment instanceof  MultiCastPlanFragment) {        continue ;    }    if  (params.fragment.getDestFragment() == null ) {        continue ;    }    PlanFragment  fragment  =  params.fragment;    PlanFragment  destFragment  =  fragment.getDestFragment();    FragmentExecParams  destParams  =         fragmentExecParamsMap.get(destFragment.getFragmentId());        fragment.getDestNode().setPartitionType(         fragment.getOutputPartition().getType());    DataSink  sink  =  fragment.getSink();    if  (sink instanceof  DataStreamSink) {        DataStreamSink  dataStreamSink  =  (DataStreamSink) sink;        dataStreamSink.setExchDop(destParams.fragment.getPipelineDop());    }    PlanNodeId  exchId  =  sink.getExchNodeId();    if  (destParams.perExchNumSenders.get(exchId.asInt()) == null ) {        destParams.perExchNumSenders.put(             exchId.asInt(), params.instanceExecParams.size());    } else  {        destParams.perExchNumSenders.put(exchId.asInt(),             params.instanceExecParams.size() +                destParams.perExchNumSenders.get(exchId.asInt()));    }     } 
params.destinations 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 if  (needScheduleByShuffleJoin(destFragment.getFragmentId().asInt(), sink)) {   int  bucketSeq  =  0 ;    int  bucketNum  =  getFragmentBucketNum(destFragment.getFragmentId());    TNetworkAddress  dummyServer  =  new  TNetworkAddress ("0.0.0.0" , 0 );    while  (bucketSeq < bucketNum) {        TPlanFragmentDestination  dest  =  new  TPlanFragmentDestination ();        dest.fragment_instance_id = new  TUniqueId (-1 , -1 );        dest.server = dummyServer;        dest.setBrpc_server(dummyServer);        for  (FInstanceExecParam instanceExecParams : destParams.instanceExecParams) {            Integer  driverSeq  =  instanceExecParams.bucketSeqToDriverSeq.get(bucketSeq);            if  (driverSeq != null ) {                                dest.fragment_instance_id = instanceExecParams.instanceId;                dest.server = toRpcHost(instanceExecParams.host);                dest.setBrpc_server(toBrpcHost(instanceExecParams.host));                if  (driverSeq != FInstanceExecParam.ABSENT_DRIVER_SEQUENCE) {                    dest.setPipeline_driver_sequence(driverSeq);                }                break ;            }        }        bucketSeq++;        params.destinations.add(dest);    } } else  {    for  (int  j  =  0 ; j < destParams.instanceExecParams.size(); ++j) {        TPlanFragmentDestination  dest  =  new  TPlanFragmentDestination ();        dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;        dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);        dest.setBrpc_server(toBrpcHost(destParams.instanceExecParams.get(j).host));        params.destinations.add(dest);    } }