MPP: ExchangeSinkOperator 详解 RPC 有序性保证和数据 shuffle

前面几篇所述都单个 FragmentInstance 内的执行流。StarRocks 是 MPP 架构,并发执行多个 FragmentInstances。因此就涉及到多个 FragmentInstances 之间数据通信,FragmentExecutor 在构造每个 PipelineDriver 时,最后一个 Operator 肯定是 Sink。如果 Sink 的对端是另一个 FragmentInstance,则 Sink 会是 ExchangeSinkOperator,接受端使用 ExchangeSourceOperator 来接受数据。

Pipeline-FragmentInstance-1

StarRocks 输入输出的 Buffer 都是所有 PipelineDrivers 共享的,这也符合论文 Morsel-Driven-Parallelism 所述的设计。

同样,ExchangeSinkOperator 中的 SinkBuffer 也是在所有 PipelineDrivers 间共享。从构造 ExchangeSinkOperatorFactory 到 ExchangeSinkOperator 如下。

create_exchange_sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
std::shared_ptr<ExchangeSinkOperatorFactory> _create_exchange_sink_operator(
PipelineBuilderContext* context, const TDataStreamSink& stream_sink,
const DataStreamSender* sender, size_t dop) {

auto fragment_ctx = context->fragment_context();
bool is_dest_merge = stream_sink.__isset.is_merge && stream_sink.is_merge;
TPartitionType part_type = sender->get_partition_type();

bool is_pipeline_level_shuffle = false;
int32_t dest_dop = -1;
if (part_type == TPartitionType::HASH_PARTITIONED ||
part_type == TPartitionType::BUCKET_SHUFFLE_HASH_PARTITIONED) {
dest_dop = stream_sink.dest_dop;
is_pipeline_level_shuffle = true;
DCHECK_GT(dest_dop, 0);
}
// 1. 构造 sink_buffer
std::shared_ptr<SinkBuffer> sink_buffer = std::make_shared<SinkBuffer>(
fragment_ctx, sender->destinations(), is_dest_merge, dop);

return std::make_shared<ExchangeSinkOperatorFactory>(
context->next_operator_id(), stream_sink.dest_node_id,
sink_buffer, // 传递给 ExchangeSinkOperatorFactory
sender->get_partition_type(),
sender->destinations(), is_pipeline_level_shuffle,
dest_dop, sender->sender_id(),
sender->get_dest_node_id(), sender->get_partition_exprs(),
!is_dest_merge && sender->get_enable_exchange_pass_through(),
sender->get_enable_exchange_perf() && !context->has_aggregation,
fragment_ctx, sender->output_columns());
}

// 创建 ExchangeSinkOperator
OperatorPtr ExchangeSinkOperatorFactory::create(
int32_t degree_of_parallelism, int32_t driver_sequence) {

return std::make_shared<ExchangeSinkOperator>(
this, _id, _plan_node_id,
driver_sequence, // 具体的 PipelineDriver
_buffer, // 传递给所有的 PipelineDrivers
_part_type,
_destinations,
_is_pipeline_level_shuffle,
_num_shuffles_per_channel,
_sender_id, _dest_node_id,
_partition_expr_ctxs,
_enable_exchange_pass_through,
_enable_exchange_perf,
_fragment_ctx,
_output_columns);
}

Channel

ExchangeSinkOperator 构造函数中的 _destinations 表示所有的对端,即 Sink 的接受者,每一个 _destinations[i] 都被封装为一个 ExchangeSinkOperator::Channel 对象,并通过 Channel 对象向对端发送 RPC,但是内部通过 SinkBuffer::_try_to_send_rpc 函数发出去的。

此外,Channel 对象还可以判断是否与对端在同一个BE进程中,如果在一个 BE 进程中,则不用 PRC 跨进程通信,转而使用共享内存的方式。

Channel 构造函数如下:

  • _brpc_dest_addr: 对端 BE 的 bRPC 监听地址 ipport
  • _fragment_instance_id: 对端 BE 上的 fragment_instance
  • _dest_node_id: 接受数据的 ExchangeSourceOperator 所属的 ExchangeNode id

这三个信息就可以唯一确定谁接受消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Channel(ExchangeSinkOperator* parent, 
const TNetworkAddress& brpc_dest,
const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id,
int32_t num_shuffles,
bool enable_exchange_pass_through,
bool enable_exchange_perf,
PassThroughChunkBuffer* pass_through_chunk_buffer)
: _parent(parent),
_brpc_dest_addr(brpc_dest),
_fragment_instance_id(fragment_instance_id),
_dest_node_id(dest_node_id),
_enable_exchange_pass_through(enable_exchange_pass_through),
_enable_exchange_perf(enable_exchange_perf),
_pass_through_context(pass_through_chunk_buffer,
fragment_instance_id,
dest_node_id),
_chunks(num_shuffles) { }

Channel::is_local

is_local 函数用于判断和对端是否在 同一个 BE 进程中。此外,_enable_exchange_pass_through 是由于 FE 传递过来的参数,可以由用户更改是否开启 PassThrough 模式,默认值为 true。若 _enable_exchange_pass_through 为 true,且和对端在一个 BE 进程中,则 _use_pass_through 为 true,后续数据传输就不走 RPC 了,而是通过 _pass_through_context 在两个 FragmentInstance 之间传递数据。

这个设计就是通过共享内存在多线程间传递数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
bool ExchangeSinkOperator::Channel::is_local() {
if (BackendOptions::get_localhost() != _brpc_dest_addr.hostname) {
return false;
}
if (config::brpc_port != _brpc_dest_addr.port) {
return false;
}
return true;
}

bool ExchangeSinkOperator::Channel::_check_use_pass_through() {
if (!_enable_exchange_pass_through) {
return false;
}
return is_local();
}

void ExchangeSinkOperator::Channel::_prepare_pass_through() {
_pass_through_context.init();
_use_pass_through = _check_use_pass_through();
}

PTransmitChunkParams

在讲解 Channel::send_one_chunk 发送消息之前,先说下传输数据的 proto 格式。如下 PTransmitChunkParams 就是数据传输协议。

  • eos: 表示本次 PRC 是否是最后一个 chunk
  • sequence: 本次 RPC 请求的序号
  • chunks: RPC 是批量发送模式,chunks 中包含了多次 RPC 数据
  • use_pass_through: false 时对端从 chunks 中反序列得到数据,true 时从共享内存中获得数据
  • is_pipeline_level_shuffle: 其赋值见上文的 _create_exchange_sink_operator 函数
  • driver_sequences: 在 is_pipeline_level_shuffle 为 true 时生效。此时和 driver_sequences_size 和 chunks_size 一样,每个 chunk[i] 直接写入 driver_sequences[i] 对应的 PipelineDriver 的输入源。

全部字段如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
message PTransmitChunkParams {
// non-change member
optional PUniqueId finst_id = 1;
optional int32 node_id = 2;
// Id of this fragment in its role as a sender.
optional int32 sender_id = 3;
optional int32 be_number = 4;
// If set to true, indicates that no more row batches will be sent
// for this dest_node_id.
optional bool eos = 5;
// RPC sequence number for the send channel.
// Sever will check this number to see if some packet has lost.
optional int64 sequence = 6;

// The protobuf data structure for column chunk.
repeated ChunkPB chunks = 7;

// Some statistics for the runing query.
optional PQueryStatistics query_statistics = 8;
optional bool use_pass_through = 9 [default = false];

// Whether enable pipeline level shuffle.
optional bool is_pipeline_level_shuffle = 10 [default = false];
// Driver sequences of pipeline level shuffle.
repeated int32 driver_sequences = 11;
};

Channel::send_one_chunk

Channel 有三种方式发送RPC 数据:

  • send_chunk_request 是直接发送 RPC 请求,
  • send_one_chunk 是批量发送 RPC 数据,超过大小阈值或者最后一个 RPC 才会发送
  • add_rows_selective 也是批量模式,不过会先对输入 Chunk 进行筛选后再调用 send_one_chunk。

这里以 send_one_chunk 为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
Status ExchangeSinkOperator::Channel::send_one_chunk(RuntimeState* state,
const Chunk* chunk,
int32_t driver_sequence,
bool eos,
bool* is_real_sent) {

*is_real_sent = false;
RETURN_IF(_ignore_local_data && !eos, Status::OK());

if (_chunk_request == nullptr) {
_chunk_request = std::make_shared<PTransmitChunkParams>();
_chunk_request->set_node_id(_dest_node_id);
_chunk_request->set_sender_id(_parent->_sender_id);
_chunk_request->set_be_number(_parent->_be_number);
if (_parent->_is_pipeline_level_shuffle) {
_chunk_request->set_is_pipeline_level_shuffle(true);
}
}

// 1. batch 数据
if (chunk != nullptr) {
if (_use_pass_through) {
// 1.1 使用共享内存方式传递数据
// 发送端没有序列化,对端接受到数据也不用反序列化
size_t chunk_size =
serde::ProtobufChunkSerde::max_serialized_size(*chunk);
TRY_CATCH_BAD_ALLOC(_pass_through_context.append_chunk(
_parent->_sender_id, chunk, chunk_size,
_parent->_is_pipeline_level_shuffle
? driver_sequence : -1));
_current_request_bytes += chunk_size;
COUNTER_UPDATE(_parent->_bytes_pass_through_counter, chunk_size);
} else {
// 1.2 RPC 方式通信,则攒批
if (_parent->_is_pipeline_level_shuffle) {
_chunk_request->add_driver_sequences(driver_sequence);
}
// 数据序列化后添加到 pchunk 中
auto pchunk = _chunk_request->add_chunks();
TRY_CATCH_BAD_ALLOC(RETURN_IF_ERROR(_parent->serialize_chunk(
chunk, pchunk, &_is_first_chunk)));
_current_request_bytes += pchunk->data().size();
}
}

// 2. 真正的发送消息
// 条件是: batched 的数据超过内存限制,或者是最后一条数据(eos 为 true)
if (_current_request_bytes > config::max_transmit_batched_bytes || eos) {
_chunk_request->set_eos(eos);
_chunk_request->set_use_pass_through(_use_pass_through);
if (auto delta_statistic = state->intermediate_query_statistic()) {
delta_statistic->to_pb(_chunk_request->mutable_query_statistics());
}
butil::IOBuf attachment;
int64_t attachment_physical_bytes =
_parent->construct_brpc_attachment(_chunk_request, attachment);
TransmitChunkInfo info {_fragment_instance_id, _brpc_stub,
std::move(_chunk_request),
attachment, attachment_physical_bytes,
_brpc_dest_addr};
RETURN_IF_ERROR(_parent->_buffer->add_request(info));
_current_request_bytes = 0;
*is_real_sent = true;
}

return Status::OK();
}

SinkBuffer::add_request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Status SinkBuffer::add_request(TransmitChunkInfo& request) {
RETURN_IF(_is_finishing, Status::OK());
if (!request.attachment.empty()) {
_bytes_enqueued += request.attachment.size();
_request_enqueued++;
}

{
auto& instance_id = request.fragment_instance_id;
RETURN_IF_ERROR(_try_to_send_rpc(instance_id,
[&]() { _buffers[instance_id.lo].push(request); }));
}

return Status::OK();
}

SinkBuffer::_try_to_send_rpc

SinkBuffer 设计要稍微复杂点,因为要考虑顺序。下面将 _try_to_send_rpc 函数分解为4部分来剖析细节。完整代码见 _try_to_send_rpc

Part1: callback

_try_to_send_rpc 函数第二个参数 pre_works_cb 会在发送 RPC 之前执行,比如上面 add_request 函数传入的 cb 是将新增的 RPC 请求 request 添加到 _buffers[instance_id.lo] 中。

1
2
3
4
5
6
7
8
9
10
Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id,
const std::function<void()>& pre_works_cb) {
//! 注意:这里是有 mutex
std::lock_guard<Mutex> l(*_mutexes[instance_id.lo]);
pre_works_cb();

DeferOp decrease_defer([this]() { --_num_sending_rpc; });
++_num_sending_rpc;
//...
}

Part2: 限流

_buffers[instance_id] 中存储的是所有待发送给 instance_id 的 RPCs。在发送 RPC 之前会先检测下是否需要限流:

  • _is_dest_merge 为 true,此时需要保证发送顺序性
    比如 SQL 中包含 ORDER BY,TOPN 等操作时,要求输出结果有序。在 FragmentInstances 之间交换数据时,需要发送端和接收端一起保证顺序性,实现方式类似 TCP 滑动窗口。

    如图,_request_seqs[instance_id] 记录的是给 instance_id 实例已发送 RPC 的最大序号,_max_continuous_acked_seqs[instance_id] 记录的是 instance_id 已回应 RPC 的最大连续序号,差值是不连续的窗口大小 discontinuous_acked_window_size

    此时限流的标准是该 window_size 不能超过阈值 config::pipeline_sink_brpc_dop(默认值 64),防止乱序数据太多

    StarRocks/Pipeline-ExchangeNode-2

  • _is_dest_merge 为 false,此时不需要保证发送的顺序性

    _num_in_flight_rpcs[instance_id] 记录的是已发送给 instance_id 但尚未收到 response 的 PRC 数量,该数据量不能超过阈值 config::pipeline_sink_brpc_dop(默认值 64),如果超过则暂停发送,防止对端处理不过来

限流代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id,
const std::function<void()>& pre_works_cb) {
//...above code

// part2
while(true) {
RETURN_IF(_is_finishing, Status::OK());

// 1. 限流
int64_t too_much_rpc = 0;
if (_is_dest_merge) {
int64_t discontinuous_acked_window_size =
_request_seqs[instance_id.lo] - _max_continuous_acked_seqs[instance_id.lo];
too_much_brpc_process =
discontinuous_acked_window_size >= config::pipeline_sink_brpc_dop;
} else {
too_much_brpc_process =
_num_in_flight_rpcs[instance_id.lo] >= config::pipeline_sink_brpc_dop;
}
RETURN_IF (buffer.empty() || too_much_rpc, Status::OK());

//...
}
return Status::OK();

Part3: Order

顺序性保证,要求发送过程如下:

  • 必须等收到第一个 RPC 的 response 之后,后续 RPCs 才能发送出去,保证基准不会出问题
  • 最后一个 RPC(即 eos 为 true 的 RPC)必须是最后一个发送给对端,用于通知对端后续不会再有数据发送,让对端做好 finishing 操作
  • 中间 RPCs 可以一定程度的乱序,但是通过 _max_continuous_acked_seqs 来维护总体的有序性

所以,一共有两处需要等待(need_wait 为 true):

  1. 中间的 RPCs 需要等待收到第一个 RPC 的 response

    _num_finished_rpcs[instance_id] == 0 和 _num_in_flight_rpcs[instance_id] > 0 就能说明还没收到第一个 RPC 的 response。

  2. 最后一个 RPC 需要等待前面所有的 RPCs 都都收到 response

    _num_remaining_eos 和 _num_sinkers 可以用来确定当前 RPC 是否为发送给 instance_id 实例的最后一个 EOS RPC. 它们在构造函数中赋值如下:

    1
    2
    3
    4
    5
    6
    for (const auto& dest : destinations) {
    const auto& instance_id = dest.fragment_instance_id;
    _num_sinkers[instance_id] = _num_sinkers;
    //..
    }
    _num_remaining_eos = _num_sinkers.size() * num_sinkers;

    _num_sinkers 在 _create_exchange_sink_operator 函数中被赋值为 dop,如果 _num_sinkers[instance_id] 为 0,则表示当前 Sink 给 instance_id 对应的 FragmentInstance 实例的所有 PipelineDrivers ExchangeSourceOperator 该发送的数据都发送了,此时就需要给 RPC request 中的 eos 标志设为 true,告知对端不会再有数据了(即 ExchangeSourceOperator 即将进入 OperatorStage::FINISHING 阶段)。

Part3 就是做上述两处检测。当 need_wait 为 true,当前 RPC 不会被发送给对端,即从 _buffer[instance_id] 中取出的 RPC request 不会被 pop 出去。

DeferOp 是基于 RAII 设计的类,在 DeferOp 对象离开作用域时调用传入的回调函数。

核心代码及其注释如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id,
const std::function<void()>& pre_works_cb) {
//...
while(true) {
// 2. 提取待发送的 RPC request
TransmitChunkInfo& request = buffer.front();
bool need_wait = false;
DeferOp pop_defer([&need_wait, &buffer, mem_tracker = _mem_tracker]() {
// 如果需要等待
if (need_wait) {
return;
}

// 能正常发送给对端
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(mem_tracker);
buffer.pop();
});

// 2.1 CHECK: 必须等待第一个 add_request 对应的 RPC 返回,才能继续发送
if (_num_finished_rpcs[instance_id.lo] == 0
&& _num_in_flight_rpcs[instance_id.lo] > 0) {
need_wait = true;
return Status::OK();
}

// 2.2 CHECK: 最后一个 RPC,必须是最后一个发送,且只发送一次
auto& params = request.params;
if (params->eos()) {
DeferOp eos_defer([this, &instance_id, &need_wait]() {
if (need_wait) {
return;
}
if (--_num_remaining_eos == 0) {
// 所有对端的 FragmentInstances 该发送的数据
// 都已发送完
_is_finishing = true;
}
// 给 instance_id 的一个 PipelineDriver 发送完数据
--_num_sinkers[instance_id.lo];
});

// instance_id 对应的 FragmentInstance 中
// 还有 PipelineDrviers 还没有接受到完整的数据
if (_num_sinkers[instance_id.lo] > 1) {
if (params->chunks_size() == 0) {
continue;
}
params->set_eos(false);
} else {
// 仍有尚未收到 response 的 RPCs
// 则需要 wait
if (_num_in_flight_rpcs[instance_id.lo] > 0) {
need_wait = true;
return Status::OK();
}
// else-branch: _num_in_flight_rpcs[instance_id] 为 0
// 此时 params->eos() 就是 true
// 表示这个是该 FragmentInstance 最后一个 RPC 请求
}
}

//...
} // while

return Status::OK();
}

Part4: Send RPC

通过了上面的校验,下面就是真正发送 RPC request 的代码。 Part4 核心部分是设置 RPC 回调函数。

bRPC 异步回调相关知识可以参考:异步访问

由于是异步发送 RPC 消息,因此要设置回调函数来处理返回结果。 通过 DisposableClosure 继承 google::protobuf::Closure 传递给 bRPC。在 RPC 返回时执行 Closure::Run 函数。这里就是根据 RPC 发送结果即 ctnl.Failed() 来判断调用哪个 callback。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class DisposableClosure : public google::protobuf::Closure {
public:
using FailedFunc = std::function<void(const C&)>;
using SuccessFunc = std::function<void(const C&, const T&)>;

DisposableClosure(const C& ctx) : _ctx(ctx) {}
void addFailedHandler(FailedFunc fn) { _failed_handler = std::move(fn); }
void addSuccessHandler(SuccessFunc fn) { _success_handler = fn; }

void Run() noexcept override {
// 自我释放内存
std::unique_ptr<DisposableClosure> self_guard(this);

if (cntl.Failed()) {
_failed_handler(_ctx);
} else {
_success_handler(_ctx, result);
}
}

public:
brpc::Controller cntl;
T result;

private:
const C _ctx;
FailedFunc _failed_handler;
SuccessFunc _success_handler;
};

两个回调函数如下:

  1. SuccessFunc

    ctnl.Failed() 为 false 只表明 RPC 请求确实发送给对端了,但是对端接受到 RPC 后真正的处理结果由 result.status() 来表征。如果对端处理失败,则取消当前 FragmentInstance,进而由 GloablDriverExecutor 取消整个 query。否则就 递归 执行 _try_to_send_rpc 函数,不断地从 _buffer[instance_id] 中取出 RPC request 发送给 instance_id 对应的 FragmentInstance。此时传入的 pre_work_cb 中有 _process_send_window 函数,这是用来处理滑动窗口的。

    SuccessFunc 代码如下。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    success_cb = [this](const ClosureContext& ctx, 
    const PTransmitChunkResult& result) {
    Status status(result.status());
    {
    // 更新记录
    std::lock_guard<Mutex> l(*_mutexes[ctx.instance_id.lo]);
    ++_num_finished_rpcs[ctx.instance_id.lo];
    --_num_in_flight_rpcs[ctx.instance_id.lo];
    }

    if (!status.ok()) {
    // 对端处理失败,则取消当前 FragmentInstance
    _is_finishing = true;
    _fragment_ctx->cancel(status);
    } else {
    // 对端处理成功,则递归发送 RPC 给对端
    _try_to_send_rpc(ctx.instance_id, [&]() {
    _update_network_time(ctx.instance_id,
    ctx.send_timestamp,
    result.receiver_post_process_time());
    _process_send_window(ctx.instance_id, ctx.sequence); });
    }
    --_total_in_flight_rpc;
    };
  2. FailedFunc

    如果因为网络等问题 RPC 无法发送给对端,则取消当前 FragmentInstance 的 PipelineDriver,并更新相关状态。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    failed_cb = [this](const ClosureContext& ctx) noexcept {
    // 算子完成
    _is_finishing = true;
    {
    // 更新记录
    std::lock_guard<Mutex> l(*_mutexes[ctx.instance_id.lo]);
    ++_num_finished_rpcs[ctx.instance_id.lo];
    --_num_in_flight_rpcs[ctx.instance_id.lo];
    }

    // 取消当前 FragmentInstance 的 PipelineDrivers
    std::string err_msg = fmt::format(
    "transmit chunk rpc failed:{}", print_id(ctx.instance_id));
    _fragment_ctx->cancel(Status::InternalError(err_msg));

    --_total_in_flight_rpc;
    };

RPC 回调函数 closure 设置完毕,就可以将 RPC 真正发送出去了,这个由 _send_rpc 函数完成。完成代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id,
const std::function<void()>& pre_works_cb) {
//...
while(true) {
//... above code

params->mutable_finst_id()->CopyFrom(_instance_id2finst_id[instance_id.lo]);
params->set_sequence(++_request_seqs[instance_id.lo]);

if (!request.attachment.empty()) {
_bytes_sent += request.attachment.size();
_request_sent++;
}

// 设置 callback
auto* closure = new DisposableClosure<PTransmitChunkResult, ClosureContext>(
{instance_id, params->sequence(), MonotonicNanos()});
if (_first_send_time == -1) {
_first_send_time = MonotonicNanos();
}

closure->addFailedHandler(failed_cb);
closure->addSuccessHandler(success_cb);

// 在发送 RPC 前,递增变量
++_total_in_flight_rpc;
++_num_in_flight_rpcs[instance_id.lo];

_mem_tracker->release(request.attachment_physical_bytes);
ExecEnv::GetInstance()->process_mem_tracker()->consume(
request.attachment_physical_bytes);

closure->cntl.Reset();
closure->cntl.set_timeout_ms(_brpc_timeout_ms);

if (bthread_self()) {
return _send_rpc(closure, request);
} else {
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(nullptr);
return _send_rpc(closure, request);
}
}
return Status::OK();
}

_process_send_window

sequence 是本次成功的 RPC,下面用 sequence 来更新滑动窗口的逻辑也是比较简洁的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void SinkBuffer::_process_send_window(const TUniqueId& instance_id, const int64_t sequence) {
// Both sender side and receiver side can tolerate disorder of tranmission
// if receiver side is not ExchangeMergeSortSourceOperator
if (!_is_dest_merge) {
return;
}
auto& seqs = _discontinuous_acked_seqs[instance_id.lo];
seqs.insert(sequence);
auto& max_continuous_acked_seq = _max_continuous_acked_seqs[instance_id.lo];
std::unordered_set<int64_t>::iterator it;
while ((it = seqs.find(max_continuous_acked_seq + 1)) != seqs.end()) {
seqs.erase(it);
++max_continuous_acked_seq;
}
}

SinkBuffer::_send_rpc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Status SinkBuffer::_send_rpc(DisposableClosure<PTransmitChunkResult,
ClosureContext>* closure,
const TransmitChunkInfo& request) {
auto expected_iobuf_size = request.attachment.size()
+ request.params->ByteSizeLong()
+ sizeof(size_t) * 2;
if (UNLIKELY(expected_iobuf_size > _rpc_http_min_size)) {
butil::IOBuf iobuf;
butil::IOBufAsZeroCopyOutputStream wrapper(&iobuf);
request.params->SerializeToZeroCopyStream(&wrapper);
// append params to iobuf
size_t params_size = iobuf.size();
closure->cntl.request_attachment().append(
&params_size, sizeof(size_t));
closure->cntl.request_attachment().append(iobuf);
// append attachment
size_t attachment_size = request.attachment.size();
closure->cntl.request_attachment().append(
&attachment_size, sizeof(size_t));
closure->cntl.request_attachment().append(request.attachment);
closure->cntl.http_request().set_content_type("application/proto");
auto res = BrpcStubCache::create_http_stub(request.brpc_addr);
RETURN_IF(!res.ok(), res.status());
// 异步 RPC
res.value()->transmit_chunk_via_http(
&closure->cntl, NULL, &closure->result, closure);
} else {
// 异步 RPC
closure->cntl.request_attachment().append(request.attachment);
request.brpc_stub->transmit_chunk(
&closure->cntl, request.params.get(), &closure->result, closure);
}
return Status::OK();
}

push_chunk

上面讲解了 Channel 是如何 batched RPCs 到真正发送 RPC 给对端。下面来讲解 ExchangeSinkOperator::push_chunk。

_output_columns 是当前 Sink 需要输出的列,当前一个 Operator 将数据 Chunk 推向 ExchangeSinkOperator 时,需要从 chunk 中提取出 _output_columns 中所必须的列数据即可,得到的即 send_chunk。

TPartitionType::UNPARTITIONED

当传输数据方式是 TPartitionType::UNPARTITIONED,即广播模式,给每个对端都发送一份完整的数据,如下图所示。
StarRocks/Pipeline-ExchangeNode-3

即遍历 _channels 给所有的 Channel[idx] 发送数据。也正如前文所述,如果 Channel::use_pass_through() 为 true,则可以走共享内存模式,而不用 RPC 通信。反之,则需要先序列化再进行 RPC 发送。

Channel 发现 ExchangeSinkOperator::_is_pipeline_level_shuffle 为 false 时,在 Channel::send_one_chunk 函数中不会处理 DEFAULT_DRIVER_SEQUENCE。

代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
Status ExchangeSinkOperator::push_chunk(RuntimeState* state, 
const vectorized::ChunkPtr& chunk) {
uint16_t num_rows = chunk->num_rows();
RETURN_IF(num_rows == 0, Status::OK());

vectorized::Chunk temp_chunk;
vectorized::Chunk* send_chunk = chunk.get();
if (!_output_columns.empty()) {
for (int32_t cid : _output_columns) {
temp_chunk.append_column(chunk->get_column_by_slot_id(cid), cid);
}
send_chunk = &temp_chunk;
}

if (_part_type == TPartitionType::UNPARTITIONED || _num_shuffles == 1) {
if (_chunk_request == nullptr) {
_chunk_request = std::make_shared<PTransmitChunkParams>();
}

std::vector<int> not_pass_through_channles;
for (auto idx : _channel_indices) {
if (_channels[idx]->use_pass_through()) {
// pass_through 模式,直接传递数据
RETURN_IF_ERROR(_channels[idx]->send_one_chunk(
state, send_chunk, DEFAULT_DRIVER_SEQUENCE, false));
} else {
//记录需要爱 RPC 通信的对端
not_pass_through_channles.emplace_back(idx);
}
}

if (!not_pass_through_channles.empty()) {
// 1. create a new chunk PB to serialize
ChunkPB* pchunk = _chunk_request->add_chunks();
// 2. 将输入的 send_chunk 序列化到 pchunk 中
TRY_CATCH_BAD_ALLOC(RETURN_IF_ERROR(serialize_chunk(
send_chunk, pchunk, &_is_first_chunk, _channels.size())));
_current_request_bytes += pchunk->data().size();
// 3. 如果请求的字节数超过限制,再通过 Channel 发送
if (_current_request_bytes > config::max_transmit_batched_bytes) {
butil::IOBuf attachment;
int64_t attachment_physical_bytes =
construct_brpc_attachment(_chunk_request, attachment);
for (auto idx : not_pass_through_channles) {
RETURN_IF_ERROR(_channels[idx]->send_chunk_request(
state,
std::make_shared<PTransmitChunkParams>(*_chunk_request),
attachment,
attachment_physical_bytes));
}
_current_request_bytes = 0;
_chunk_request.reset();
}
}
}
//...
}

TPartitionType::RANDOM

TPartitionType::RANDOM 的数据分发策略是 Round-robin,每次调用 ExchangeSinkOperator::push_chunk 都挑选一个同进程的 FragmentInstance 作为对端。如下图所示。
StarRocks/Pipeline-ExchangeNode-4

_curr_random_channel_idx 指示当前发送给哪个 Local Channels。见代码注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Status ExchangeSinkOperator::push_chunk(RuntimeState* state, const vectorized::ChunkPtr& chunk) {
//...
if (_part_type == TPartitionType::UNPARTITIONED || _num_shuffles == 1) {
//...
} else if (_part_type == TPartitionType::RANDOM) {
std::vector<Channel*> local_channels;
// 1. 挑选处同进程的 FragmenInstance
for (const auto& channel : _channels) {
if (channel->is_local()) {
local_channels.emplace_back(channel);
}
}

// 没有 Local Channel,才选择 Remote Channel
if (local_channels.empty()) {
local_channels = _channels;
}

auto& channel = local_channels[_curr_random_channel_idx];
bool real_sent = false;
// 2. 发送
RETURN_IF_ERROR(channel->send_one_chunk(
state, send_chunk,
DEFAULT_DRIVER_SEQUENCE,
false, &real_sent));
// 3. 只有在 RPC 真的发送出去了,才会切换到下一个 FragmentInstance
if (real_sent) {
_curr_random_channel_idx =
(_curr_random_channel_idx + 1) % local_channels.size();
}
} // else if
}

TPartitionType::BUCKET_SHUFFLE/HASH_PARTITIONED

这两种数据分发策略要复杂点,二者本质不同点在于使用的 Hash 函数。如下示意图。

StarRocks/Pipeline-ExchangeNode-5

hash_values

Shuffler::exchange_shuffle 函数用于将 chunk 的每一行都分发到具体的分区 {instance_id, driver_seq} 中。参数 hash_values 是对 chunk 中每一行的 join-key 进行 hash 值。BUCKET_SHUFFLE_HASH_PARTITIONED 和 HASH_PARTITIONED 两种策略根本区别也在于这里。

这里的分区列的 “partition” 和建分区表时的 “partition” 不是一个含义。这里的是对端 {instnace_id, driver_seq} 处理的数据源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Status ExchangeSinkOperator::push_chunk(RuntimeState* state, 
const vectorized::ChunkPtr& chunk) {
//...
else if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHUFFLE_HASH_PARTITIONED) {
{
//1. 计算 hash 的分区列
for (size_t i = 0; i < _partitions_columns.size(); ++i) {
ASSIGN_OR_RETURN(
_partitions_columns[i], _partition_expr_ctxs[i]->evaluate(chunk.get()));
}

// 2. 为每个分区列计算 hash 值
if (_part_type == TPartitionType::HASH_PARTITIONED) {
_hash_values.assign(num_rows, HashUtil::FNV_SEED);
for (const vectorized::ColumnPtr& column : _partitions_columns) {
column->fnv_hash(&_hash_values[0], 0, num_rows);
}
} else {
// 当 join-key 和分桶键一样
// 则和分桶键使用一样的 Hash 函数
_hash_values.assign(num_rows, 0);
for (const vectorized::ColumnPtr& column : _partitions_columns) {
column->crc32_hash(&_hash_values[0], 0, num_rows);
}
}
//..
}
}
return Status::OK();

exchange_shuffle

TPartitionType::UNPARTITIONED 和 TPartitionType::RANDOM 两种策略的 shuffle 粒度是 chunk ,即可以直接将整个 chunk 发送给 FragmentInstance。而 (BUCKET_SHUFFLE_)HASH_PARTITIONED 策略,shuffle 的粒度是 row,需要把一个 chunk 分发到不同 {FragmentInstance, PipelineDriver} 中。

因此,可以把 (channl_id, driver_sequence) 视为一个坐标,那么 exchange_shuffle 函数就是将 chunk 的 num_rows 行数据均匀分布在 channels_size * num_shuffles_per_channel 的平面上。其中 num_shuffles_per_channel 表征的是 dop。

shuffle 策略实现如下:

  • two_level_shuffle 为 false 表示不需要再次 shuffle,直接获取 channel_id 即可,这里的 channel_id 实际上是 FE 中已经计算好的值
  • two_level_shuffle 为 true 则需要在 BE 端再次进行 shuffle

实现如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
template <bool two_level_shuffle, typename ReduceOp>
void exchange_shuffle(std::vector<uint32_t>& shuffle_channel_ids,
const std::vector<uint32_t>& hash_values,
size_t num_rows) {
for (size_t i = 0; i < num_rows; ++i) {
size_t channel_id = ReduceOp()(hash_values[i], _num_channels);
size_t shuffle_id;
if constexpr (!two_level_shuffle) {
shuffle_id = channel_id;
} else {
// 基于均匀分布将 hash_values 映射到 [0, _num_shuffles_per_channel) 区间
uint32_t driver_sequence = ReduceOp()(
HashUtil::xorshift32(hash_values[i]), _num_shuffles_per_channel);
shuffle_id =
channel_id * _num_shuffles_per_channel + driver_sequence;
}
shuffle_channel_ids[i] = shuffle_id;
}
}

得到 chunk 每一行的 hash_values 后,并基于 Shuffler::exchange_shuffle 函数得到每一行所属的分区,结果存储于 _shuffle_channel_ids。

1
2
3
4
5
6
7
8
9
else if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHUFFLE_HASH_PARTITIONED) {
{
// 3. Compute row indexes for each channel's each shuffle
_shuffler->exchange_shuffle(_shuffle_channel_ids, _hash_values, num_rows);

//...
}
}

channel_row_idx_start_points

接下来就是 shuffle 之后的处理流程:_row_indexes 将 chunk 中所有发往同一个分区的 row_ids 连续存储在一起,并用 _channel_row_idx_start_points 记录每个分区起始偏移量。如下示意图:

StarRocks/Pipeline-ExchangeNode-6

这个流程分为如下三个 for-loop:

  1. 计算每个分区的行数

    _shuffle_channel_ids[i] 表征 partitions[i],那么第一个 for-loop 计算完,_channel_row_idx_start_points[i] 即表征 chunk 落在 partitions[i] 的行数。

    比如有三个分区,分别是 9, 10, 11行,经过第一个 for-loop,start_points 中存储的值就是:

    1
    9 10 11
  2. 计算每个分区最后的位置

    上述例子,经过第二个 for-loop 后, start_points 中存储的值就是:

    1
    9 19 30
  3. 将同一个分区的在chunk中的行号记录在 _row_indexes,同时将 _channel_row_idx_start_points 更新到每个分区的起始位置。

    此时 _row_indexes 和 start_points 的值变更结果如下:

    1
    2
    3
    4
    5
    6
    7
    _row_indexes[0:8] = partition0
    _row_indexes[9:18] = partition1
    _row_indexes[19:29] = partition2

    start_points[0] = 0
    start_points[1] = 9
    start_points[2] = 19

三个 for-loop 的完成代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 else if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHUFFLE_HASH_PARTITIONED) {
{
// _num_shuffles = _channels.size() * _num_shuffles_per_channel
_channel_row_idx_start_points.assign(_num_shuffles + 1, 0);
//4.1
for (size_t i = 0; i < num_rows; ++i) {
_channel_row_idx_start_points[_shuffle_channel_ids[i]]++;
}
// 4.2
for (int32_t i = 1; i <= _num_shuffles; ++i) {
_channel_row_idx_start_points[i] += _channel_row_idx_start_points[i - 1];
}
// 4.3
for (int32_t i = num_rows - 1; i >= 0; --i) {
_row_indexes[_channel_row_idx_start_points[_shuffle_channel_ids[i]] - 1] = i;
_channel_row_idx_start_points[_shuffle_channel_ids[i]]--;
}
}
}

add_rows_selective

上面四步对chunk完成了分区,并将结果记录在 _row_indexes 和 _start_points 中,最后一步就是将所有分区数据发送出去,只需要遍历 (_channel_indices, _num_shuffles_per_channel) 二维区间,再使用 Channel::add_rows_selective 函数发送数据,没啥可说,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
 else if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHUFFLE_HASH_PARTITIONED) {
{
// above code
}

for (int32_t channel_id : _channel_indices) {
if (_channels[channel_id]->get_fragment_instance_id().lo == -1) {
// dest bucket is no used, continue
continue;
}

for (int32_t i = 0; i < _num_shuffles_per_channel; ++i) {
int shuffle_id = channel_id * _num_shuffles_per_channel + i;
int driver_sequence = _driver_sequence_per_shuffle[shuffle_id];

size_t from = _channel_row_idx_start_points[shuffle_id];
size_t size = _channel_row_idx_start_points[shuffle_id + 1] - from;
if (size == 0) {
// no data for this channel continue;
continue;
}

RETURN_IF_ERROR(_channels[channel_id]->add_rows_selective(send_chunk,
driver_sequence,
_row_indexes.data(),
from, size,
state));
}
}
}


Reference