1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, RuntimeState* runtime_state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit, std::shared_ptr<RuntimeProfile> profile, std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr, bool is_pipeline, int32_t dop, bool keep_order, PassThroughChunkBuffer* pass_through_chunk_buffer) : _mgr(stream_mgr), _fragment_instance_id(fragment_instance_id), _dest_node_id(dest_node_id), _total_buffer_limit(total_buffer_limit), _row_desc(row_desc), _is_merging(is_merging), _num_buffered_bytes(0), _profile(std::move(profile)), _instance_profile(runtime_state->runtime_profile_ptr()), _query_mem_tracker(runtime_state->query_mem_tracker_ptr()), _instance_mem_tracker(runtime_state->instance_mem_tracker_ptr()), _sub_plan_query_statistics_recvr(std::move(sub_plan_query_statistics_recvr)), _is_pipeline(is_pipeline), _degree_of_parallelism(dop), _keep_order(keep_order), _pass_through_context(pass_through_chunk_buffer, fragment_instance_id, dest_node_id) { int num_queues = is_merging ? num_senders : 1; int num_sender_per_queue = is_merging ? 1 : num_senders; _sender_queues.reserve(num_queues); for (int i = 0; i < num_queues; ++i) { SenderQueue* queue = nullptr; if (_is_pipeline) { queue = _sender_queue_pool.add(new PipelineSenderQueue( this, num_sender_per_queue, is_merging ? 1 : dop)); } else { } _sender_queues.push_back(queue); }
_pass_through_context.init(); if (runtime_state->query_options().__isset.transmission_encode_level) { _encode_level = runtime_state->query_options().transmission_encode_level; } }
DataStreamRecvr::PipelineSenderQueue::PipelineSenderQueue( DataStreamRecvr* parent_recvr, int32_t num_senders, int32_t degree_of_parallism) : SenderQueue(parent_recvr), _num_remaining_senders(num_senders), _chunk_queues(degree_of_parallism), _chunk_queue_states(degree_of_parallism) { if (parent_recvr->_is_merging) { _producer_token = std::make_unique<ChunkQueue::producer_token_t>(_chunk_queues[0]); } }
|