if (!aggregator.hasTemporaryFiles()) { /** If all partially-aggregated data is in RAM, then merge them in parallel, also in RAM. */ impl = aggregator.mergeAndConvertToBlocks(many_data, final, max_threads); } // ... }
Aggregator::mergeAndConvertToBlocks
Of course, mergeAndConvertToBlocks 输入的是各个线程的variant,最终的目的是输出一个 block 给下一个 IBlockInputStream, 这是由内部的 ``完成。
std::unique_ptr<IBlockInputStream> Aggregator::mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, boolfinal, size_t max_threads)const { if (data_variants.empty()) throwException("Empty data passed to Aggregator::mergeAndConvertToBlocks.", ErrorCodes::EMPTY_DATA_PASSED);
LOG_FMT_TRACE(log, "Merging aggregated data");
ManyAggregatedDataVariants non_empty_data; non_empty_data.reserve(data_variants.size()); for (auto & data : data_variants) if (!data->empty()) non_empty_data.push_back(data);
if (non_empty_data.empty()) return std::make_unique<NullBlockInputStream>(getHeader(final));
if (non_empty_data.size() > 1) { /// Sort the states in descending order so that the merge is more efficient (since all states are merged into the first). std::sort(non_empty_data.begin(), non_empty_data.end(), [](const AggregatedDataVariantsPtr & lhs, const AggregatedDataVariantsPtr & rhs) { return lhs->sizeWithoutOverflowRow() > rhs->sizeWithoutOverflowRow(); }); }
/// If at least one of the options is two-level, then convert all the options into two-level ones, if there are not such. /// Note - perhaps it would be more optimal not to convert single-level versions before the merge, but merge them separately, at the end. bool has_at_least_one_two_level = false; for (constauto & variant : non_empty_data) { if (variant->isTwoLevel()) { has_at_least_one_two_level = true; break; } }
if (has_at_least_one_two_level) for (auto & variant : non_empty_data) if (!variant->isTwoLevel()) variant->convertToTwoLevel(); //... }
for (size_t i = 1, size = non_empty_data.size(); i < size; ++i) { if (first->type != non_empty_data[i]->type) throwException("Cannot merge different aggregated data variants.", ErrorCodes::CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS);
/** Elements from the remaining sets can be moved to the first data set. * Therefore, it must own all the arenas of all other sets. */ first->aggregates_pools.insert(first->aggregates_pools.end(), non_empty_data[i]->aggregates_pools.begin(), non_empty_data[i]->aggregates_pools.end()); }
if (current_bucket_num == -1) { ++current_bucket_num;
if (first->type == AggregatedDataVariants::Type::without_key || aggregator.params.overflow_row) { // |data| is Intermediate buffer to save result. // Next, wo need to move |data| to block adn return it aggregator.mergeWithoutKeyDataImpl(data); return aggregator.prepareBlockAndFillWithoutKey( *first, final, first->type != AggregatedDataVariants::Type::without_key); } } // ... }
/// We merge all aggregation results to the first. for (size_t result_num = 1, size = non_empty_data.size(); result_num < size; ++result_num) { if (!checkLimits(res->sizeWithoutOverflowRow(), no_more_keys)) break;
AggregatedDataVariants & current = *non_empty_data[result_num];
/// reserved, so push_back does not throw exceptions for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_columns[i]->push_back(mapped + offsets_of_aggregate_states[i]);
if (!first->isTwoLevel()) { //... mergeSingleLevelDataImpl } else { if (!parallel_merge_data) { parallel_merge_data = std::make_unique<ParallelMergeData>(threads); for (size_t i = 0; i < threads; ++i) scheduleThreadForNextBucket(); }
Block res;
while (true) { std::unique_lock lock(parallel_merge_data->mutex);
if (parallel_merge_data->exception) std::rethrow_exception(parallel_merge_data->exception);
auto it = parallel_merge_data->ready_blocks.find(current_bucket_num); if (it != parallel_merge_data->ready_blocks.end()) { ++current_bucket_num; scheduleThreadForNextBucket();
voidAggregator::createAggregateStates(AggregateDataPtr & aggregate_data)const { for (size_t j = 0; j < params.aggregates_size; ++j) { try { /** An exception may occur if there is a shortage of memory. * In order that then everything is properly destroyed, we "roll back" some of the created states. * The code is not very convenient. */ FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_aggregate_create_state_failpoint); // 调用不同类型的构造函数,初始化 aggregate_data + offsets_of_aggregate_states[j 内存 aggregate_functions[j]->create(aggregate_data + offsets_of_aggregate_states[j]); } catch (...) { for (size_t rollback_j = 0; rollback_j < j; ++rollback_j) aggregate_functions[rollback_j]->destroy(aggregate_data + offsets_of_aggregate_states[rollback_j]);