前面几篇所述都单个 FragmentInstance 内的执行流。StarRocks 是 MPP 架构,并发执行多个 FragmentInstances。因此就涉及到多个 FragmentInstances 之间数据通信,FragmentExecutor 在构造每个 PipelineDriver 时,最后一个 Operator 肯定是 Sink。如果 Sink 的对端是另一个 FragmentInstance,则 Sink 会是 ExchangeSinkOperator,接受端使用 ExchangeSourceOperator 来接受数据。
StarRocks 输入输出的 Buffer 都是所有 PipelineDrivers 共享的,这也符合论文 Morsel-Driven-Parallelism 所述的设计。
同样,ExchangeSinkOperator 中的 SinkBuffer 也是在所有 PipelineDrivers 间共享。从构造 ExchangeSinkOperatorFactory 到 ExchangeSinkOperator 如下。
create_exchange_sink 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 std::shared_ptr<ExchangeSinkOperatorFactory> _create_exchange_sink_operator( PipelineBuilderContext* context, const TDataStreamSink& stream_sink, const DataStreamSender* sender, size_t dop) { auto fragment_ctx = context->fragment_context (); bool is_dest_merge = stream_sink.__isset.is_merge && stream_sink.is_merge; TPartitionType part_type = sender->get_partition_type (); bool is_pipeline_level_shuffle = false ; int32_t dest_dop = -1 ; if (part_type == TPartitionType::HASH_PARTITIONED || part_type == TPartitionType::BUCKET_SHUFFLE_HASH_PARTITIONED) { dest_dop = stream_sink.dest_dop; is_pipeline_level_shuffle = true ; DCHECK_GT (dest_dop, 0 ); } std::shared_ptr<SinkBuffer> sink_buffer = std::make_shared <SinkBuffer>( fragment_ctx, sender->destinations (), is_dest_merge, dop); return std::make_shared <ExchangeSinkOperatorFactory>( context->next_operator_id (), stream_sink.dest_node_id, sink_buffer, sender->get_partition_type (), sender->destinations (), is_pipeline_level_shuffle, dest_dop, sender->sender_id (), sender->get_dest_node_id (), sender->get_partition_exprs (), !is_dest_merge && sender->get_enable_exchange_pass_through (), sender->get_enable_exchange_perf () && !context->has_aggregation, fragment_ctx, sender->output_columns ()); } OperatorPtr ExchangeSinkOperatorFactory::create ( int32_t degree_of_parallelism, int32_t driver_sequence) { return std::make_shared <ExchangeSinkOperator>( this , _id, _plan_node_id, driver_sequence, _buffer, _part_type, _destinations, _is_pipeline_level_shuffle, _num_shuffles_per_channel, _sender_id, _dest_node_id, _partition_expr_ctxs, _enable_exchange_pass_through, _enable_exchange_perf, _fragment_ctx, _output_columns); }
Channel ExchangeSinkOperator 构造函数中的 _destinations 表示所有的对端,即 Sink 的接受者,每一个 _destinations[i] 都被封装为一个 ExchangeSinkOperator::Channel 对象,并通过 Channel 对象向对端发送 RPC,但是内部通过 SinkBuffer::_try_to_send_rpc 函数发出去的。
此外,Channel 对象还可以判断是否与对端在同一个BE进程中,如果在一个 BE 进程中,则不用 PRC 跨进程通信,转而使用共享内存的方式。
Channel 构造函数如下:
_brpc_dest_addr: 对端 BE 的 bRPC 监听地址 ipport
_fragment_instance_id: 对端 BE 上的 fragment_instance
_dest_node_id: 接受数据的 ExchangeSourceOperator 所属的 ExchangeNode id
这三个信息就可以唯一确定谁接受消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Channel (ExchangeSinkOperator* parent, const TNetworkAddress& brpc_dest, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int32_t num_shuffles, bool enable_exchange_pass_through, bool enable_exchange_perf, PassThroughChunkBuffer* pass_through_chunk_buffer) : _parent(parent), _brpc_dest_addr(brpc_dest), _fragment_instance_id(fragment_instance_id), _dest_node_id(dest_node_id), _enable_exchange_pass_through(enable_exchange_pass_through), _enable_exchange_perf(enable_exchange_perf), _pass_through_context(pass_through_chunk_buffer, fragment_instance_id, dest_node_id), _chunks(num_shuffles) { }
Channel::is_local is_local 函数用于判断和对端是否在 同一个 BE 进程中。此外,_enable_exchange_pass_through 是由于 FE 传递过来的参数,可以由用户更改是否开启 PassThrough 模式,默认值为 true。若 _enable_exchange_pass_through 为 true,且和对端在一个 BE 进程中,则 _use_pass_through 为 true,后续数据传输就不走 RPC 了,而是通过 _pass_through_context 在两个 FragmentInstance 之间传递数据。
这个设计就是通过共享内存在多线程间传递数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 bool ExchangeSinkOperator::Channel::is_local () { if (BackendOptions::get_localhost () != _brpc_dest_addr.hostname) { return false ; } if (config::brpc_port != _brpc_dest_addr.port) { return false ; } return true ; } bool ExchangeSinkOperator::Channel::_check_use_pass_through() { if (!_enable_exchange_pass_through) { return false ; } return is_local (); } void ExchangeSinkOperator::Channel::_prepare_pass_through() { _pass_through_context.init (); _use_pass_through = _check_use_pass_through(); }
PTransmitChunkParams 在讲解 Channel::send_one_chunk 发送消息之前,先说下传输数据的 proto 格式。如下 PTransmitChunkParams 就是数据传输协议。
eos: 表示本次 PRC 是否是最后一个 chunk
sequence: 本次 RPC 请求的序号
chunks: RPC 是批量发送模式,chunks 中包含了多次 RPC 数据
use_pass_through: false 时对端从 chunks 中反序列得到数据,true 时从共享内存中获得数据
is_pipeline_level_shuffle: 其赋值见上文的 _create_exchange_sink_operator 函数
driver_sequences: 在 is_pipeline_level_shuffle 为 true 时生效。此时和 driver_sequences_size 和 chunks_size 一样,每个 chunk[i] 直接写入 driver_sequences[i] 对应的 PipelineDriver 的输入源。
全部字段如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 message PTransmitChunkParams { optional PUniqueId finst_id = 1 ; optional int32 node_id = 2 ; optional int32 sender_id = 3 ; optional int32 be_number = 4 ; optional bool eos = 5 ; optional int64 sequence = 6 ; repeated ChunkPB chunks = 7 ; optional PQueryStatistics query_statistics = 8 ; optional bool use_pass_through = 9 [default = false ]; optional bool is_pipeline_level_shuffle = 10 [default = false ]; repeated int32 driver_sequences = 11 ; };
Channel::send_one_chunk Channel 有三种方式发送RPC 数据:
send_chunk_request 是直接发送 RPC 请求,
send_one_chunk 是批量发送 RPC 数据,超过大小阈值或者最后一个 RPC 才会发送
add_rows_selective 也是批量模式,不过会先对输入 Chunk 进行筛选后再调用 send_one_chunk。
这里以 send_one_chunk 为例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 Status ExchangeSinkOperator::Channel::send_one_chunk (RuntimeState* state, const Chunk* chunk, int32_t driver_sequence, bool eos, bool * is_real_sent) { *is_real_sent = false ; RETURN_IF (_ignore_local_data && !eos, Status::OK ()); if (_chunk_request == nullptr ) { _chunk_request = std::make_shared <PTransmitChunkParams>(); _chunk_request->set_node_id (_dest_node_id); _chunk_request->set_sender_id (_parent->_sender_id); _chunk_request->set_be_number (_parent->_be_number); if (_parent->_is_pipeline_level_shuffle) { _chunk_request->set_is_pipeline_level_shuffle (true ); } } if (chunk != nullptr ) { if (_use_pass_through) { size_t chunk_size = serde::ProtobufChunkSerde::max_serialized_size (*chunk); TRY_CATCH_BAD_ALLOC (_pass_through_context.append_chunk ( _parent->_sender_id, chunk, chunk_size, _parent->_is_pipeline_level_shuffle ? driver_sequence : -1 )); _current_request_bytes += chunk_size; COUNTER_UPDATE (_parent->_bytes_pass_through_counter, chunk_size); } else { if (_parent->_is_pipeline_level_shuffle) { _chunk_request->add_driver_sequences (driver_sequence); } auto pchunk = _chunk_request->add_chunks (); TRY_CATCH_BAD_ALLOC (RETURN_IF_ERROR (_parent->serialize_chunk ( chunk, pchunk, &_is_first_chunk))); _current_request_bytes += pchunk->data ().size (); } } if (_current_request_bytes > config::max_transmit_batched_bytes || eos) { _chunk_request->set_eos (eos); _chunk_request->set_use_pass_through (_use_pass_through); if (auto delta_statistic = state->intermediate_query_statistic ()) { delta_statistic->to_pb (_chunk_request->mutable_query_statistics ()); } butil::IOBuf attachment; int64_t attachment_physical_bytes = _parent->construct_brpc_attachment (_chunk_request, attachment); TransmitChunkInfo info {_fragment_instance_id, _brpc_stub, std::move (_chunk_request), attachment, attachment_physical_bytes, _brpc_dest_addr}; RETURN_IF_ERROR (_parent->_buffer->add_request (info)); _current_request_bytes = 0 ; *is_real_sent = true ; } return Status::OK (); }
SinkBuffer::add_request 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Status SinkBuffer::add_request (TransmitChunkInfo& request) { RETURN_IF (_is_finishing, Status::OK ()); if (!request.attachment.empty ()) { _bytes_enqueued += request.attachment.size (); _request_enqueued++; } { auto & instance_id = request.fragment_instance_id; RETURN_IF_ERROR (_try_to_send_rpc(instance_id, [&]() { _buffers[instance_id.lo].push (request); })); } return Status::OK (); }
SinkBuffer::_try_to_send_rpc SinkBuffer 设计要稍微复杂点,因为要考虑顺序。下面将 _try_to_send_rpc 函数分解为4部分来剖析细节。完整代码见 _try_to_send_rpc 。
Part1: callback _try_to_send_rpc 函数第二个参数 pre_works_cb 会在发送 RPC 之前执行,比如上面 add_request 函数传入的 cb 是将新增的 RPC 请求 request 添加到 _buffers[instance_id.lo] 中。
1 2 3 4 5 6 7 8 9 10 Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::function<void ()>& pre_works_cb) { std::lock_guard<Mutex> l (*_mutexes[instance_id.lo]) ; pre_works_cb (); DeferOp decrease_defer ([this ]() { --_num_sending_rpc; }) ; ++_num_sending_rpc; }
Part2: 限流 _buffers[instance_id] 中存储的是所有待发送给 instance_id 的 RPCs。在发送 RPC 之前会先检测下是否需要限流:
_is_dest_merge 为 true,此时需要保证发送顺序性 比如 SQL 中包含 ORDER BY,TOPN 等操作时,要求输出结果有序。在 FragmentInstances 之间交换数据时,需要发送端和接收端一起保证顺序性,实现方式类似 TCP 滑动窗口。
如图,_request_seqs[instance_id] 记录的是给 instance_id 实例已发送 RPC 的最大序号,_max_continuous_acked_seqs[instance_id] 记录的是 instance_id 已回应 RPC 的最大连续序号,差值是不连续的窗口大小 discontinuous_acked_window_size
此时限流的标准是该 window_size 不能超过阈值 config::pipeline_sink_brpc_dop (默认值 64),防止乱序数据太多 。
_is_dest_merge 为 false,此时不需要保证发送的顺序性
_num_in_flight_rpcs[instance_id] 记录的是已发送给 instance_id 但尚未收到 response 的 PRC 数量,该数据量不能超过阈值 config::pipeline_sink_brpc_dop(默认值 64),如果超过则暂停发送,防止对端处理不过来 。
限流代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::function<void ()>& pre_works_cb) { while (true ) { RETURN_IF (_is_finishing, Status::OK ()); int64_t too_much_rpc = 0 ; if (_is_dest_merge) { int64_t discontinuous_acked_window_size = _request_seqs[instance_id.lo] - _max_continuous_acked_seqs[instance_id.lo]; too_much_brpc_process = discontinuous_acked_window_size >= config::pipeline_sink_brpc_dop; } else { too_much_brpc_process = _num_in_flight_rpcs[instance_id.lo] >= config::pipeline_sink_brpc_dop; } RETURN_IF (buffer.empty () || too_much_rpc, Status::OK ()); } return Status::OK ();
Part3: Order 顺序性保证,要求发送过程如下:
必须等收到第一个 RPC 的 response 之后,后续 RPCs 才能发送出去,保证基准不会出问题
最后一个 RPC(即 eos 为 true 的 RPC)必须是最后一个发送给对端,用于通知对端后续不会再有数据发送,让对端做好 finishing 操作
中间 RPCs 可以一定程度的乱序,但是通过 _max_continuous_acked_seqs 来维护总体的有序性
所以,一共有两处需要等待(need_wait 为 true):
中间的 RPCs 需要等待收到第一个 RPC 的 response
_num_finished_rpcs[instance_id] == 0 和 _num_in_flight_rpcs[instance_id] > 0 就能说明还没收到第一个 RPC 的 response。
最后一个 RPC 需要等待前面所有的 RPCs 都都收到 response
_num_remaining_eos 和 _num_sinkers 可以用来确定当前 RPC 是否为发送给 instance_id 实例的最后一个 EOS RPC. 它们在构造函数中赋值如下:
1 2 3 4 5 6 for (const auto & dest : destinations) { const auto & instance_id = dest.fragment_instance_id; _num_sinkers[instance_id] = _num_sinkers; } _num_remaining_eos = _num_sinkers.size () * num_sinkers;
_num_sinkers 在 _create_exchange_sink_operator 函数中被赋值为 dop,如果 _num_sinkers[instance_id] 为 0,则表示当前 Sink 给 instance_id 对应的 FragmentInstance 实例的所有 PipelineDrivers ExchangeSourceOperator 该发送的数据都发送了,此时就需要给 RPC request 中的 eos 标志设为 true,告知对端不会再有数据了(即 ExchangeSourceOperator 即将进入 OperatorStage::FINISHING 阶段)。
Part3 就是做上述两处检测。当 need_wait 为 true,当前 RPC 不会被发送给对端,即从 _buffer[instance_id] 中取出的 RPC request 不会被 pop 出去。
DeferOp 是基于 RAII 设计的类,在 DeferOp 对象离开作用域时调用传入的回调函数。
核心代码及其注释如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::function<void ()>& pre_works_cb) { while (true ) { TransmitChunkInfo& request = buffer.front (); bool need_wait = false ; DeferOp pop_defer ([&need_wait, &buffer, mem_tracker = _mem_tracker]() { if (need_wait) { return ; } SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker); buffer.pop(); }) ; if (_num_finished_rpcs[instance_id.lo] == 0 && _num_in_flight_rpcs[instance_id.lo] > 0 ) { need_wait = true ; return Status::OK (); } auto & params = request.params; if (params->eos ()) { DeferOp eos_defer ([this , &instance_id, &need_wait]() { if (need_wait) { return ; } if (--_num_remaining_eos == 0 ) { _is_finishing = true ; } --_num_sinkers[instance_id.lo]; }) ; if (_num_sinkers[instance_id.lo] > 1 ) { if (params->chunks_size () == 0 ) { continue ; } params->set_eos (false ); } else { if (_num_in_flight_rpcs[instance_id.lo] > 0 ) { need_wait = true ; return Status::OK (); } } } } return Status::OK (); }
Part4: Send RPC 通过了上面的校验,下面就是真正发送 RPC request 的代码。 Part4 核心部分是设置 RPC 回调函数。
bRPC 异步回调相关知识可以参考:异步访问
由于是异步发送 RPC 消息,因此要设置回调函数来处理返回结果。 通过 DisposableClosure 继承 google::protobuf::Closure 传递给 bRPC。在 RPC 返回时执行 Closure::Run 函数。这里就是根据 RPC 发送结果即 ctnl.Failed() 来判断调用哪个 callback。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class DisposableClosure : public google::protobuf::Closure {public : using FailedFunc = std::function<void (const C&)>; using SuccessFunc = std::function<void (const C&, const T&)>; DisposableClosure (const C& ctx) : _ctx(ctx) {} void addFailedHandler (FailedFunc fn) { _failed_handler = std::move (fn); } void addSuccessHandler (SuccessFunc fn) { _success_handler = fn; } void Run () noexcept override { std::unique_ptr<DisposableClosure> self_guard (this ) ; if (cntl.Failed ()) { _failed_handler(_ctx); } else { _success_handler(_ctx, result); } } public : brpc::Controller cntl; T result; private : const C _ctx; FailedFunc _failed_handler; SuccessFunc _success_handler; };
两个回调函数如下:
SuccessFunc
ctnl.Failed() 为 false 只表明 RPC 请求确实发送给对端了,但是对端接受到 RPC 后真正的处理结果由 result.status() 来表征。如果对端处理失败,则取消当前 FragmentInstance,进而由 GloablDriverExecutor 取消整个 query。否则就 递归 执行 _try_to_send_rpc 函数,不断地从 _buffer[instance_id] 中取出 RPC request 发送给 instance_id 对应的 FragmentInstance。此时传入的 pre_work_cb 中有 _process_send_window 函数,这是用来处理滑动窗口的。
SuccessFunc 代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 success_cb = [this ](const ClosureContext& ctx, const PTransmitChunkResult& result) { Status status (result.status ()); { std::lock_guard<Mutex> l (*_mutexes[ctx.instance_id.lo]) ; ++_num_finished_rpcs[ctx.instance_id.lo]; --_num_in_flight_rpcs[ctx.instance_id.lo]; } if (!status.ok ()) { _is_finishing = true ; _fragment_ctx->cancel (status); } else { _try_to_send_rpc(ctx.instance_id, [&]() { _update_network_time(ctx.instance_id, ctx.send_timestamp, result.receiver_post_process_time ()); _process_send_window(ctx.instance_id, ctx.sequence); }); } --_total_in_flight_rpc; };
FailedFunc
如果因为网络等问题 RPC 无法发送给对端,则取消当前 FragmentInstance 的 PipelineDriver,并更新相关状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 failed_cb = [this ](const ClosureContext& ctx) noexcept { _is_finishing = true ; { std::lock_guard<Mutex> l (*_mutexes[ctx.instance_id.lo]) ; ++_num_finished_rpcs[ctx.instance_id.lo]; --_num_in_flight_rpcs[ctx.instance_id.lo]; } std::string err_msg = fmt::format( "transmit chunk rpc failed:{}" , print_id (ctx.instance_id)); _fragment_ctx->cancel (Status::InternalError (err_msg)); --_total_in_flight_rpc; };
RPC 回调函数 closure 设置完毕,就可以将 RPC 真正发送出去了,这个由 _send_rpc 函数完成。完成代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::function<void ()>& pre_works_cb) { while (true ) { params->mutable_finst_id ()->CopyFrom (_instance_id2finst_id[instance_id.lo]); params->set_sequence (++_request_seqs[instance_id.lo]); if (!request.attachment.empty ()) { _bytes_sent += request.attachment.size (); _request_sent++; } auto * closure = new DisposableClosure <PTransmitChunkResult, ClosureContext>( {instance_id, params->sequence (), MonotonicNanos ()}); if (_first_send_time == -1 ) { _first_send_time = MonotonicNanos (); } closure->addFailedHandler (failed_cb); closure->addSuccessHandler (success_cb); ++_total_in_flight_rpc; ++_num_in_flight_rpcs[instance_id.lo]; _mem_tracker->release (request.attachment_physical_bytes); ExecEnv::GetInstance ()->process_mem_tracker ()->consume ( request.attachment_physical_bytes); closure->cntl.Reset (); closure->cntl.set_timeout_ms (_brpc_timeout_ms); if (bthread_self ()) { return _send_rpc(closure, request); } else { SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER (nullptr ); return _send_rpc(closure, request); } } return Status::OK (); }
_process_send_window sequence 是本次成功的 RPC,下面用 sequence 来更新滑动窗口的逻辑也是比较简洁的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void SinkBuffer::_process_send_window(const TUniqueId& instance_id, const int64_t sequence) { if (!_is_dest_merge) { return ; } auto & seqs = _discontinuous_acked_seqs[instance_id.lo]; seqs.insert (sequence); auto & max_continuous_acked_seq = _max_continuous_acked_seqs[instance_id.lo]; std::unordered_set<int64_t >::iterator it; while ((it = seqs.find (max_continuous_acked_seq + 1 )) != seqs.end ()) { seqs.erase (it); ++max_continuous_acked_seq; } }
SinkBuffer::_send_rpc 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 Status SinkBuffer::_send_rpc(DisposableClosure<PTransmitChunkResult, ClosureContext>* closure, const TransmitChunkInfo& request) { auto expected_iobuf_size = request.attachment.size () + request.params->ByteSizeLong () + sizeof (size_t ) * 2 ; if (UNLIKELY (expected_iobuf_size > _rpc_http_min_size)) { butil::IOBuf iobuf; butil::IOBufAsZeroCopyOutputStream wrapper (&iobuf) ; request.params->SerializeToZeroCopyStream (&wrapper); size_t params_size = iobuf.size (); closure->cntl.request_attachment ().append ( ¶ms_size, sizeof (size_t )); closure->cntl.request_attachment ().append (iobuf); size_t attachment_size = request.attachment.size (); closure->cntl.request_attachment ().append ( &attachment_size, sizeof (size_t )); closure->cntl.request_attachment ().append (request.attachment); closure->cntl.http_request ().set_content_type ("application/proto" ); auto res = BrpcStubCache::create_http_stub (request.brpc_addr); RETURN_IF (!res.ok (), res.status ()); res.value ()->transmit_chunk_via_http ( &closure->cntl, NULL , &closure->result, closure); } else { closure->cntl.request_attachment ().append (request.attachment); request.brpc_stub->transmit_chunk ( &closure->cntl, request.params.get (), &closure->result, closure); } return Status::OK (); }
push_chunk 上面讲解了 Channel 是如何 batched RPCs 到真正发送 RPC 给对端。下面来讲解 ExchangeSinkOperator::push_chunk。
_output_columns 是当前 Sink 需要输出的列,当前一个 Operator 将数据 Chunk 推向 ExchangeSinkOperator 时,需要从 chunk 中提取出 _output_columns 中所必须的列数据即可,得到的即 send_chunk。
TPartitionType::UNPARTITIONED 当传输数据方式是 TPartitionType::UNPARTITIONED,即广播模式,给每个对端都发送一份完整的数据,如下图所示。
即遍历 _channels 给所有的 Channel[idx] 发送数据。也正如前文所述,如果 Channel::use_pass_through() 为 true,则可以走共享内存模式,而不用 RPC 通信。反之,则需要先序列化再进行 RPC 发送。
Channel 发现 ExchangeSinkOperator::_is_pipeline_level_shuffle 为 false 时,在 Channel::send_one_chunk 函数中不会处理 DEFAULT_DRIVER_SEQUENCE。
代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 Status ExchangeSinkOperator::push_chunk (RuntimeState* state, const vectorized::ChunkPtr& chunk) { uint16_t num_rows = chunk->num_rows (); RETURN_IF (num_rows == 0 , Status::OK ()); vectorized::Chunk temp_chunk; vectorized::Chunk* send_chunk = chunk.get (); if (!_output_columns.empty ()) { for (int32_t cid : _output_columns) { temp_chunk.append_column (chunk->get_column_by_slot_id (cid), cid); } send_chunk = &temp_chunk; } if (_part_type == TPartitionType::UNPARTITIONED || _num_shuffles == 1 ) { if (_chunk_request == nullptr ) { _chunk_request = std::make_shared <PTransmitChunkParams>(); } std::vector<int > not_pass_through_channles; for (auto idx : _channel_indices) { if (_channels[idx]->use_pass_through ()) { RETURN_IF_ERROR (_channels[idx]->send_one_chunk ( state, send_chunk, DEFAULT_DRIVER_SEQUENCE, false )); } else { not_pass_through_channles.emplace_back (idx); } } if (!not_pass_through_channles.empty ()) { ChunkPB* pchunk = _chunk_request->add_chunks (); TRY_CATCH_BAD_ALLOC (RETURN_IF_ERROR (serialize_chunk ( send_chunk, pchunk, &_is_first_chunk, _channels.size ()))); _current_request_bytes += pchunk->data ().size (); if (_current_request_bytes > config::max_transmit_batched_bytes) { butil::IOBuf attachment; int64_t attachment_physical_bytes = construct_brpc_attachment (_chunk_request, attachment); for (auto idx : not_pass_through_channles) { RETURN_IF_ERROR (_channels[idx]->send_chunk_request ( state, std::make_shared <PTransmitChunkParams>(*_chunk_request), attachment, attachment_physical_bytes)); } _current_request_bytes = 0 ; _chunk_request.reset (); } } } }
TPartitionType::RANDOM TPartitionType::RANDOM 的数据分发策略是 Round-robin,每次调用 ExchangeSinkOperator::push_chunk 都挑选一个同进程的 FragmentInstance 作为对端。如下图所示。
_curr_random_channel_idx 指示当前发送给哪个 Local Channels。见代码注释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 Status ExchangeSinkOperator::push_chunk (RuntimeState* state, const vectorized::ChunkPtr& chunk) { if (_part_type == TPartitionType::UNPARTITIONED || _num_shuffles == 1 ) { } else if (_part_type == TPartitionType::RANDOM) { std::vector<Channel*> local_channels; for (const auto & channel : _channels) { if (channel->is_local ()) { local_channels.emplace_back (channel); } } if (local_channels.empty ()) { local_channels = _channels; } auto & channel = local_channels[_curr_random_channel_idx]; bool real_sent = false ; RETURN_IF_ERROR (channel->send_one_chunk ( state, send_chunk, DEFAULT_DRIVER_SEQUENCE, false , &real_sent)); if (real_sent) { _curr_random_channel_idx = (_curr_random_channel_idx + 1 ) % local_channels.size (); } } }
TPartitionType::BUCKET_SHUFFLE/HASH_PARTITIONED 这两种数据分发策略要复杂点,二者本质不同点在于使用的 Hash 函数。如下示意图。
hash_values Shuffler::exchange_shuffle 函数用于将 chunk 的每一行都分发到具体的分区 {instance_id, driver_seq} 中。参数 hash_values 是对 chunk 中每一行的 join-key 进行 hash 值。BUCKET_SHUFFLE_HASH_PARTITIONED 和 HASH_PARTITIONED 两种策略根本区别也在于这里。
这里的分区列的 “partition” 和建分区表时的 “partition” 不是一个含义。这里的是对端 {instnace_id, driver_seq} 处理的数据源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 Status ExchangeSinkOperator::push_chunk (RuntimeState* state, const vectorized::ChunkPtr& chunk) { else if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHUFFLE_HASH_PARTITIONED) { { for (size_t i = 0 ; i < _partitions_columns.size (); ++i) { ASSIGN_OR_RETURN ( _partitions_columns[i], _partition_expr_ctxs[i]->evaluate (chunk.get ())); } if (_part_type == TPartitionType::HASH_PARTITIONED) { _hash_values.assign (num_rows, HashUtil::FNV_SEED); for (const vectorized::ColumnPtr& column : _partitions_columns) { column->fnv_hash (&_hash_values[0 ], 0 , num_rows); } } else { _hash_values.assign (num_rows, 0 ); for (const vectorized::ColumnPtr& column : _partitions_columns) { column->crc32_hash (&_hash_values[0 ], 0 , num_rows); } } } } return Status::OK ();
exchange_shuffle TPartitionType::UNPARTITIONED 和 TPartitionType::RANDOM 两种策略的 shuffle 粒度是 chunk ,即可以直接将整个 chunk 发送给 FragmentInstance。而 (BUCKET_SHUFFLE_)HASH_PARTITIONED 策略,shuffle 的粒度是 row,需要把一个 chunk 分发到不同 {FragmentInstance, PipelineDriver} 中。
因此,可以把 (channl_id, driver_sequence) 视为一个坐标,那么 exchange_shuffle 函数就是将 chunk 的 num_rows 行数据均匀分布在 channels_size * num_shuffles_per_channel 的平面上。其中 num_shuffles_per_channel 表征的是 dop。
shuffle 策略实现如下:
two_level_shuffle 为 false 表示不需要再次 shuffle,直接获取 channel_id 即可,这里的 channel_id 实际上是 FE 中已经计算好的值
two_level_shuffle 为 true 则需要在 BE 端再次进行 shuffle
实现如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 template <bool two_level_shuffle, typename ReduceOp>void exchange_shuffle (std::vector<uint32_t >& shuffle_channel_ids, const std::vector<uint32_t >& hash_values, size_t num_rows) { for (size_t i = 0 ; i < num_rows; ++i) { size_t channel_id = ReduceOp ()(hash_values[i], _num_channels); size_t shuffle_id; if constexpr (!two_level_shuffle) { shuffle_id = channel_id; } else { uint32_t driver_sequence = ReduceOp ()( HashUtil::xorshift32 (hash_values[i]), _num_shuffles_per_channel); shuffle_id = channel_id * _num_shuffles_per_channel + driver_sequence; } shuffle_channel_ids[i] = shuffle_id; } }
得到 chunk 每一行的 hash_values 后,并基于 Shuffler::exchange_shuffle 函数得到每一行所属的分区,结果存储于 _shuffle_channel_ids。
1 2 3 4 5 6 7 8 9 else if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHUFFLE_HASH_PARTITIONED) { { _shuffler->exchange_shuffle (_shuffle_channel_ids, _hash_values, num_rows); } }
channel_row_idx_start_points 接下来就是 shuffle 之后的处理流程:_row_indexes 将 chunk 中所有发往同一个分区的 row_ids 连续存储在一起,并用 _channel_row_idx_start_points 记录每个分区起始偏移量。如下示意图:
这个流程分为如下三个 for-loop:
计算每个分区的行数
_shuffle_channel_ids[i] 表征 partitions[i],那么第一个 for-loop 计算完,_channel_row_idx_start_points[i] 即表征 chunk 落在 partitions[i] 的行数。
比如有三个分区,分别是 9, 10, 11行,经过第一个 for-loop,start_points 中存储的值就是:
计算每个分区最后的位置
上述例子,经过第二个 for-loop 后, start_points 中存储的值就是:
将同一个分区的在chunk中的行号记录在 _row_indexes,同时将 _channel_row_idx_start_points 更新到每个分区的起始位置。
此时 _row_indexes 和 start_points 的值变更结果如下:
1 2 3 4 5 6 7 _row_indexes[0:8] = partition0 _row_indexes[9:18] = partition1 _row_indexes[19:29] = partition2 start_points[0] = 0 start_points[1] = 9 start_points[2] = 19
三个 for-loop 的完成代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 else if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHUFFLE_HASH_PARTITIONED) { { _channel_row_idx_start_points.assign (_num_shuffles + 1 , 0 ); for (size_t i = 0 ; i < num_rows; ++i) { _channel_row_idx_start_points[_shuffle_channel_ids[i]]++; } for (int32_t i = 1 ; i <= _num_shuffles; ++i) { _channel_row_idx_start_points[i] += _channel_row_idx_start_points[i - 1 ]; } for (int32_t i = num_rows - 1 ; i >= 0 ; --i) { _row_indexes[_channel_row_idx_start_points[_shuffle_channel_ids[i]] - 1 ] = i; _channel_row_idx_start_points[_shuffle_channel_ids[i]]--; } } }
add_rows_selective 上面四步对chunk完成了分区,并将结果记录在 _row_indexes 和 _start_points 中,最后一步就是将所有分区数据发送出去,只需要遍历 (_channel_indices, _num_shuffles_per_channel) 二维区间,再使用 Channel::add_rows_selective 函数发送数据,没啥可说,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 else if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHUFFLE_HASH_PARTITIONED) { { } for (int32_t channel_id : _channel_indices) { if (_channels[channel_id]->get_fragment_instance_id ().lo == -1 ) { continue ; } for (int32_t i = 0 ; i < _num_shuffles_per_channel; ++i) { int shuffle_id = channel_id * _num_shuffles_per_channel + i; int driver_sequence = _driver_sequence_per_shuffle[shuffle_id]; size_t from = _channel_row_idx_start_points[shuffle_id]; size_t size = _channel_row_idx_start_points[shuffle_id + 1 ] - from; if (size == 0 ) { continue ; } RETURN_IF_ERROR (_channels[channel_id]->add_rows_selective (send_chunk, driver_sequence, _row_indexes.data (), from, size, state)); } } }
Reference