MPP: ExchangeSourceOperator 设计详解

ExchangeSourceOperator

DataStreamMgr::create_recvr

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
PassThroughChunkBuffer* DataStreamMgr::get_pass_through_chunk_buffer(
const TUniqueId& query_id) {
return _pass_through_chunk_buffer_manager.get(query_id);
}

std::shared_ptr<DataStreamRecvr> DataStreamMgr::create_recvr(
RuntimeState* state, const RowDescriptor& row_desc,
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
int num_senders, int buffer_size,
const std::shared_ptr<RuntimeProfile>& profile, bool is_merging,
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr,
bool is_pipeline, int32_t degree_of_parallelism, bool keep_order) {

auto recvr = std::make_shared<DataStreamRecvr>(
this, state, row_desc,
fragment_instance_id, dest_node_id, num_senders, is_merging,
buffer_size, profile, std::move(sub_plan_query_statistics_recvr),
is_pipeline, degree_of_parallelism,
keep_order,
get_pass_through_chunk_buffer(state->query_id())));

uint32_t bucket = get_bucket(fragment_instance_id);
auto& receiver_map = _receiver_map[bucket];
std::lock_guard<Mutex> l(_lock[bucket]);
auto iter = receiver_map.find(fragment_instance_id);
if (iter == receiver_map.end()) {
receiver_map.emplace(fragment_instance_id, std::make_shared<RecvrMap>());
iter = receiver_map.find(fragment_instance_id);
_fragment_count += 1;
}
iter->second->emplace(dest_node_id, recvr);
_receiver_count += 1;
return recvr;
}

ExchangeNode::set_num_senders

1
2
3
4
5
6
7
// Set senders of exchange nodes before pipeline build
std::vector<ExecNode*> exch_nodes;
plan->collect_nodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
for (auto* exch_node : exch_nodes) {
int num_senders = FindWithDefault(params.per_exch_num_senders, exch_node->id(), 0);
down_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders);
}

DataStreamRecvr

  • is_merging 为 true 时,num_queues 为 num_senders,而 num_sender_per_queue 就是 1

    此时,_sender_queues 中有 num_queues 个 PipelineSenderQueue,每个 1 中只有一个 ChunkQueue。

  • is_merging 为 false 时,num_queue 为 1,而 num_sender_per_queue 为 num_senders

    此时,_sender_queues 中只有 1 个 PipelineSenderQueue,每个 PipelineSenderQueue 中有 dop 个 ChunkQueue。

is_merging 只有在 Sort 时才会为 true,下面先视为 false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, RuntimeState* runtime_state,
const RowDescriptor& row_desc,
const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int num_senders,
bool is_merging, int total_buffer_limit,
std::shared_ptr<RuntimeProfile> profile,
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr,
bool is_pipeline, int32_t dop, bool keep_order,
PassThroughChunkBuffer* pass_through_chunk_buffer)
: _mgr(stream_mgr),
_fragment_instance_id(fragment_instance_id),
_dest_node_id(dest_node_id),
_total_buffer_limit(total_buffer_limit),
_row_desc(row_desc),
_is_merging(is_merging),
_num_buffered_bytes(0),
_profile(std::move(profile)),
_instance_profile(runtime_state->runtime_profile_ptr()),
_query_mem_tracker(runtime_state->query_mem_tracker_ptr()),
_instance_mem_tracker(runtime_state->instance_mem_tracker_ptr()),
_sub_plan_query_statistics_recvr(std::move(sub_plan_query_statistics_recvr)),
_is_pipeline(is_pipeline),
_degree_of_parallelism(dop),
_keep_order(keep_order),
_pass_through_context(pass_through_chunk_buffer, fragment_instance_id, dest_node_id) {
// Create one queue per sender if is_merging is true.
int num_queues = is_merging ? num_senders : 1;
int num_sender_per_queue = is_merging ? 1 : num_senders;
_sender_queues.reserve(num_queues);
for (int i = 0; i < num_queues; ++i) {
SenderQueue* queue = nullptr;
if (_is_pipeline) {
queue = _sender_queue_pool.add(new PipelineSenderQueue(
this,
num_sender_per_queue,
is_merging ? 1 : dop));
} else {
//..
}
_sender_queues.push_back(queue);
}

// Initialize the counters
// ...

_pass_through_context.init();
if (runtime_state->query_options().__isset.transmission_encode_level) {
_encode_level = runtime_state->query_options().transmission_encode_level;
}
}

DataStreamRecvr::PipelineSenderQueue::PipelineSenderQueue(
DataStreamRecvr* parent_recvr, int32_t num_senders, int32_t degree_of_parallism)
: SenderQueue(parent_recvr),
_num_remaining_senders(num_senders),
_chunk_queues(degree_of_parallism),
_chunk_queue_states(degree_of_parallism) {
if (parent_recvr->_is_merging) {
_producer_token = std::make_unique<ChunkQueue::producer_token_t>(_chunk_queues[0]);
}
}

transmit_chunk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Status DataStreamMgr::transmit_chunk(const PTransmitChunkParams& request, ::google::protobuf::Closure** done) {
const PUniqueId& finst_id = request.finst_id();
TUniqueId t_finst_id;
t_finst_id.hi = finst_id.hi();
t_finst_id.lo = finst_id.lo();
std::shared_ptr<DataStreamRecvr> recvr = find_recvr(t_finst_id, request.node_id());
RETURN_IF(recvr == nullptr, Status::OK());

if (request.has_query_statistics()) {
recvr->add_sub_plan_statistics(request.query_statistics(), request.sender_id());
}

DeferOp op([&eos, &recvr, &request]() {
if (request.eos()) {
recvr->remove_sender(request.sender_id(), request.be_number());
}
});

if (request.chunks_size() > 0 || request.use_pass_through()) {
RETURN_IF_ERROR(recvr->add_chunks(request, request.eos() ? nullptr : done));
}

return Status::OK();
}

DataStreamRecvr::add_chunks

BE 在接受到 transmit_chunk RPC 后,就会调用 DataStreamRecvr::add_chunks 函数来处理请求。先基于 sender_id 将定位到具体的 SenderQueue,即 _sender_queues[sender_id]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Status DataStreamRecvr::add_chunks(const PTransmitChunkParams& request, ::google::protobuf::Closure** done) {
MemTracker* prev_tracker = tls_thread_status.set_mem_tracker(_instance_mem_tracker.get());
DeferOp op([&] { tls_thread_status.set_mem_tracker(prev_tracker); });

SCOPED_TIMER(_process_total_timer);
SCOPED_TIMER(_sender_total_timer);
COUNTER_UPDATE(_request_received_counter, 1);
int sender_id = _is_merging ? request.sender_id() : 0;
// Add all batches to the same queue if _is_merging is false.

if (_keep_order) {
DCHECK(_is_pipeline);
return _sender_queues[sender_id]->add_chunks_and_keep_order(request, done);
} else {
return _sender_queues[sender_id]->add_chunks(request, done);
}
}

PipelineSenderQueue::add_chunks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
template <bool keep_order>
Status DataStreamRecvr::PipelineSenderQueue::add_chunks(const PTransmitChunkParams& request,
::google::protobuf::Closure** done) {
const bool use_pass_through = request.use_pass_through();
RETURN_IF(_is_cancelled || _num_remaining_senders <= 0, Status::OK());
RETURN_IF_ERROR(try_to_build_chunk_meta(request));

size_t total_chunk_bytes = 0;
_is_pipeline_level_shuffle =
request.has_is_pipeline_level_shuffle() && request.is_pipeline_level_shuffle();

ChunkList chunks = use_pass_through
? get_chunks_from_pass_through(request.sender_id(), total_chunk_bytes)
: get_chunks_from_request<false>(request, total_chunk_bytes)));

RETURN_IF(_is_cancelled, Status::OK());

// remove the short-circuited chunks
if (_is_pipeline_level_shuffle) {
for (auto iter = chunks.begin(); iter != chunks.end();) {
if (_chunk_queue_states[iter->driver_sequence].is_short_circuited) {
total_chunk_bytes -= iter->chunk_bytes;
chunks.erase(iter++);
continue;
}
iter++;
}
}

if (!chunks.empty() && done != nullptr && _recvr->exceeds_limit(total_chunk_bytes)) {
chunks.back().closure = *done;
chunks.back().queue_enter_time = MonotonicNanos();
*done = nullptr;
}

for (auto&& chunk : chunks) {
int driver_seq = _is_pipeline_level_shuffle ? chunk.driver_sequence : 0;
size_t chunk_bytes = chunk.chunk_bytes;
auto* closure = chunk.closure;
_chunk_queues[driver_seq].enqueue(std::move(chunk));
_chunk_queue_states[driver_seq].blocked_closure_num += closure != nullptr;
_total_chunks++;
if (_chunk_queue_states[driver_seq].is_short_circuited) {
short_circuit(driver_seq);
}
_recvr->_num_buffered_bytes += chunk_bytes;
}

return Status::OK();
}

pull_chunk

1
2
3
4
5
6
7
StatusOr<vectorized::ChunkPtr> ExchangeSourceOperator::pull_chunk(RuntimeState* state) {
auto chunk = std::make_unique<vectorized::Chunk>();
RETURN_IF_ERROR(_stream_recvr->get_chunk_for_pipeline(&chunk, _driver_sequence));

eval_runtime_bloom_filters(chunk.get());
return std::move(chunk);
}

DataStreamRecvr::get_chunk_for_pipeline

1
2
3
4
5
6
7
8
9
Status DataStreamRecvr::get_chunk_for_pipeline(std::unique_ptr<vectorized::Chunk>* chunk,
const int32_t driver_sequence) {
DCHECK(!_is_merging);
DCHECK_EQ(_sender_queues.size(), 1);
vectorized::Chunk* tmp_chunk = nullptr;
Status status = _sender_queues[0]->get_chunk(&tmp_chunk, driver_sequence);
chunk->reset(tmp_chunk);
return status;
}

PipelineSenderQueue::get_chunk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Status DataStreamRecvr::PipelineSenderQueue::get_chunk(vectorized::Chunk** chunk, const int32_t driver_sequence) {
RETURN_IF(_is_cancelled, Status::Cancelled("Cancelled"));

size_t index = _is_pipeline_level_shuffle ? driver_sequence : 0;
auto& chunk_queue = _chunk_queues[index];
auto& chunk_queue_state = _chunk_queue_states[index];

ChunkItem item;
if (!chunk_queue.try_dequeue(item)) {
chunk_queue_state.unpluging = false;
return Status::OK();
}

DeferOp defer_op([&]() {
auto* closure = item.closure;
if (closure != nullptr) {
MemTracker* prev_tracker = tls_thread_status.set_mem_tracker(
ExecEnv::GetInstance()->process_mem_tracker());
DeferOp op([&] { tls_thread_status.set_mem_tracker(prev_tracker); });
closure->Run();
chunk_queue_state.blocked_closure_num--;
}
});

if (item.chunk_ptr == nullptr) {
ChunkUniquePtr chunk_ptr = std::make_unique<vectorized::Chunk>();
faststring uncompressed_buffer;
RETURN_IF_ERROR(_deserialize_chunk(item.pchunk, chunk_ptr.get(), &uncompressed_buffer));
*chunk = chunk_ptr.release();
} else {
*chunk = item.chunk_ptr.release();
}

_total_chunks--;
_recvr->_num_buffered_bytes -= item.chunk_bytes;
return Status::OK();
}

Reference