Aggregate 大致可以分为两个部分:
先多线程从输入读取一个block的数据,然后基于这个block进行数据聚合,得到一个线程聚合数据 variant
再对多个线程得到的数据进行merge
下面源码讲解多线程聚合的过程。
ParallelAggregatingBlockInputStream 接受多个输入数据源:inputs、additional_inputs_at_end, params 负责相关的参数。
在 ParallelAggregatingBlockInputStream::execute 执行前先对many_data按照线程个数max_thread进行分配内存,这样后面每个线程可以直接向many_data中填入数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void ParallelAggregatingBlockInputStream::execute () { many_data.resize (max_threads); exceptions.resize (max_threads); for (size_t i = 0 ; i < max_threads; ++i) threads_data.emplace_back (keys_size, aggregates_size); LOG_FMT_TRACE (log, "Aggregating" ); Stopwatch watch; for (auto & elem : many_data) elem = std::make_shared <AggregatedDataVariants>(); processor.process (); processor.wait (); }
每个线程执行的callback是onBlock函数,在onBlock函数中,线程thread_num则会将该线程的聚合结果写入many_data[thread_num]。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void ParallelAggregatingBlockInputStream::Handler::onBlock (Block & block, size_t thread_num){ parent.aggregator.executeOnBlock ( block, *parent.many_data[thread_num], parent.file_provider, parent.threads_data[thread_num].key_columns, parent.threads_data[thread_num].aggregate_columns, parent.threads_data[thread_num].local_delta_memory, parent.no_more_keys); parent.threads_data[thread_num].src_rows += block.rows (); parent.threads_data[thread_num].src_bytes += block.bytes (); }
下面,再细看Aggregator。
Aggregator::executeOnBlock 当前Aggregator::header 中存储的只是block的schema信息,即知道具体的key_columns、aggregate_columns信息,但是缺乏数据。因此当对一个block进行聚合计算时,会先尝试输入的block中提取出key_columns、aggregate_columns对应的具体数据。
代码如下,先基于提取出 key_columns,再在 prepareAggregateInstructions 函数中提取出aggregate_columns。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 bool Aggregator::executeOnBlock (const Block & block, AggregatedDataVariants & result, const FileProviderPtr & file_provider, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, Int64 & local_delta_memory, bool & no_more_keys) { Columns columns = block.getColumns (); Columns materialized_columns; materialized_columns.reserve (params.keys_size); for (size_t i = 0 ; i < params.keys_size; ++i) { key_columns[i] = columns.at (params.keys[i]).get (); if (ColumnPtr converted = key_columns[i]->convertToFullColumnIfConst ()) { materialized_columns.push_back (converted); key_columns[i] = materialized_columns.back ().get (); } } AggregateFunctionInstructions aggregate_functions_instructions; prepareAggregateInstructions (columns, aggregate_columns, materialized_columns, aggregate_functions_instructions); }
Aggregator::prepareAggregateInstructions prepareAggregateInstructions 函数是为了获取聚合的相关信息及其数据,为后面做聚合做准备。
aggregte_columns[i] 存储的是 block 中第i个聚合函数(i.e. aggregate_functions[i])的所需的参数列
比如 sum(b),那么 aggregte_columns[i] 的 aggregates_size 为 1,需要从block中提取出 column b 。
params.aggregates[i].arguments 存储的是sum这个聚合函数所需要列的 position index
其中AggregateFunctionState是个聚合函数 adapter,用来表示这个聚合函数并未处于 finalize。
TODO: read ColumnAggregateFunction::convertToValues
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 void Aggregator::prepareAggregateInstructions (Columns columns, AggregateColumns & aggregate_columns, Columns & materialized_columns, AggregateFunctionInstructions & aggregate_functions_instructions) { for (size_t i = 0 ; i < params.aggregates_size; ++i) aggregate_columns[i].resize (params.aggregates[i].arguments.size ()); aggregate_functions_instructions.resize (params.aggregates_size + 1 ); aggregate_functions_instructions[params.aggregates_size].that = nullptr ; for (size_t i = 0 ; i < params.aggregates_size; ++i) { for (size_t j = 0 ; j < aggregate_columns[i].size (); ++j) { aggregate_columns[i][j] = columns.at (params.aggregates[i].arguments[j]).get (); if (ColumnPtr converted = aggregate_columns[i][j]->convertToFullColumnIfConst ()) { materialized_columns.push_back (converted); aggregate_columns[i][j] = materialized_columns.back ().get (); } } aggregate_functions_instructions[i].arguments = aggregate_columns[i].data (); aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i]; auto * that = aggregate_functions[i]; while (const auto * func = typeid_cast <const AggregateFunctionState *>(that)) that = func->getNestedFunction ().get (); aggregate_functions_instructions[i].that = that; if (const auto * func = typeid_cast <const AggregateFunctionArray *>(that); unlikely (func)) { UNUSED (func); throw Exception ("Not support AggregateFunctionArray" , ErrorCodes::NOT_IMPLEMENTED); } aggregate_functions_instructions[i].batch_arguments = aggregate_columns[i].data (); aggregate_functions_instructions[i].batch_that = that; } }
关于 aggregate_descriptions 信息,是在 appendAggregate时候添加的,如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 void appendAggDescription (const Names & arg_names, const DataTypes & arg_types, TiDB::TiDBCollators & arg_collators, const String & agg_func_name, AggregateDescriptions & aggregate_descriptions, NamesAndTypes & aggregated_columns, bool empty_input_as_null) { assert (arg_names.size () == arg_collators.size () && arg_names.size () == arg_types.size ()); AggregateDescription aggregate; aggregate.argument_names = arg_names; String func_string = genFuncString (agg_func_name, aggregate.argument_names, arg_collators); if (auto duplicated_return_type = findDuplicateAggFunc (func_string, aggregate_descriptions)) { aggregated_columns.emplace_back (func_string, duplicated_return_type); return ; } aggregate.column_name = func_string; aggregate.parameters = Array (); aggregate.function = AggregateFunctionFactory::instance ().get (agg_func_name, arg_types, {}, 0 , empty_input_as_null); aggregate.function->setCollators (arg_collators); DataTypePtr result_type = aggregate.function->getReturnType (); aggregated_columns.emplace_back (func_string, aggregate.function->getReturnType ()); aggregate_descriptions.push_back (std::move (aggregate)); }
prepareAggregateInstructions 函数执行完毕,则聚合的准备工作执行完:
key_columns
aggregate_columns:
aggregate_func
aggregate_arguments
Aggregator::executeWithoutKeyImpl 针对没有key的聚合情况,先为存储结果的结构 without_key 分配空间 + 初始化
在这里是使用 alignedAlloc 来分配空间,便于统一管理内存,复用;
createAggregateStates 内部实际上是个 placement new,调用ColumnFunction 的构造函数对该内存进行初始化;1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 if ((params.overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key){ AggregateDataPtr place = result.aggregates_pool->alignedAlloc (total_size_of_aggregate_states, align_aggregate_states); createAggregateStates (place); result.without_key = place; } if (result.type == AggregatedDataVariants::Type::without_key){ executeWithoutKeyImpl (result.without_key, num_rows, aggregate_functions_instructions.data (), result.aggregates_pool); }
针对这个 special case 的实现如下,即连续聚 rows 行数据,并将结果存储到 res, 也即每个线程的variant:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void NO_INLINE Aggregator::executeWithoutKeyImpl (AggregatedDataWithoutKey & res, size_t rows, AggregateFunctionInstruction * aggregate_instructions, Arena * arena) { for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) { if (inst->offsets) inst->batch_that->addBatchSinglePlace ( inst->offsets[static_cast <ssize_t >(rows - 1 )], res + inst->state_offset, inst->batch_arguments, arena); else inst->batch_that->addBatchSinglePlace ( rows, res + inst->state_offset, inst->batch_arguments, arena); } }
以Sum为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void addBatchSinglePlace (size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos) const override { if (if_argument_pos >= 0 ) { const auto & flags = assert_cast <const ColumnUInt8 &>(*columns[if_argument_pos]).getData (); for (size_t i = 0 ; i < batch_size; ++i) { if (flags[i]) add (place, columns, i, arena); } } else { const auto & column = assert_cast <const ColVecType &>(*columns[0 ]); this ->data (place).addMany (column.getData ().data (), batch_size); } }
Aggregator::executeImpl 下面介绍更为通用的情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 template <typename Method>void NO_INLINE Aggregator::executeImpl (Method & method, Arena * aggregates_pool, size_t rows, ColumnRawPtrs & key_columns, TiDB::TiDBCollators & collators, AggregateFunctionInstruction * aggregate_instructions, bool no_more_keys, AggregateDataPtr overflow_row) const { typename Method::State state (key_columns, key_sizes, collators) ; if (!no_more_keys) executeImplBatch <false >(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row); else executeImplBatch <true >(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row); }
Method::State 实际上是 hashtable,上面的 executeWithoutKeyImpl 因为没有key,因此不需要 hashtable(可以看看为什么有key的就需要hashtable)。
Aggregator::executeImplBatch 整个流程个上面的 executeWithoutKeyImpl 差不多:
对 ColumnFunction 分配内存
使用 placement new 调用构造函数,初始化分配的内存
聚合
内部对三种case,下面只描述通用的case.
传入的block 中的 method.data 在上面已经准备妥当,需要应该是按照key排序好,这样可以命中method 中的 cache,减少内存分配 + 加速查找
如果cache命中,那么 places[key_row_start, key_row_end) 这一段连续rows的都指向同一个内存地址,即同一个key,好处是聚合时就可以实现inplace
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 template <bool no_more_keys, typename Method>void NO_INLINE Aggregator::executeImplBatch (Method & method, typename Method::State & state, Arena * aggregates_pool, size_t rows, AggregateFunctionInstruction * aggregate_instructions, AggregateDataPtr overflow_row [[maybe_unused]]) const { std::unique_ptr<AggregateDataPtr[]> places (new AggregateDataPtr[rows]) ; for (size_t row = 0 ; row < rows; ++row) { AggregateDataPtr aggregate_data = nullptr ; if constexpr (!no_more_keys) { auto emplace_result = state.emplaceKey (method.data, row, *aggregates_pool, sort_key_containers); if (emplace_result.isInserted ()) { emplace_result.setMapped (nullptr ); aggregate_data = aggregates_pool->alignedAlloc (total_size_of_aggregate_states, align_aggregate_states); createAggregateStates (aggregate_data); emplace_result.setMapped (aggregate_data); } else { aggregate_data = emplace_result.getMapped (); } } else { auto find_result = state.findKey (method.data, row, *aggregates_pool, sort_key_containers); if (find_result.isFound ()) aggregate_data = find_result.getMapped (); else aggregate_data = overflow_row; } places[row] = aggregate_data; } }
内存初始化完毕,剩下的就是填充数据,这个可以看看sum的对应实现,逻辑和上面的差不多。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst){ if (inst->offsets) inst->batch_that->addBatchArray (rows, places.get (), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool); else inst->batch_that->addBatch (rows, places.get (), inst->state_offset, inst->batch_arguments, aggregates_pool); }
TODO state 怎么处理???
到此每个线程的局部数据 variant 就生成了, 线程级的 block 是聚合完成的,下面要在线程间聚合数据。