tiflash: 多线程二阶段 Aggregate 实现(1)

Aggregate

大致可以分为两个部分:

  • 先多线程从输入读取一个block的数据,然后基于这个block进行数据聚合,得到一个线程聚合数据 variant
  • 再对多个线程得到的数据进行merge

下面源码讲解多线程聚合的过程。

ParallelAggregatingBlockInputStream

ParallelAggregatingBlockInputStream 接受多个输入数据源:inputsadditional_inputs_at_end, params 负责相关的参数。

ParallelAggregatingBlockInputStream::execute 执行前先对many_data按照线程个数max_thread进行分配内存,这样后面每个线程可以直接向many_data中填入数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void ParallelAggregatingBlockInputStream::execute()
{
many_data.resize(max_threads);
exceptions.resize(max_threads);

for (size_t i = 0; i < max_threads; ++i)
threads_data.emplace_back(keys_size, aggregates_size);

LOG_FMT_TRACE(log, "Aggregating");

Stopwatch watch;

for (auto & elem : many_data)
elem = std::make_shared<AggregatedDataVariants>();

processor.process();
processor.wait();
//...
}

每个线程执行的callback是onBlock函数,在onBlock函数中,线程thread_num则会将该线程的聚合结果写入many_data[thread_num]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t thread_num)
{
parent.aggregator.executeOnBlock(
block,
*parent.many_data[thread_num],
parent.file_provider,
parent.threads_data[thread_num].key_columns,
parent.threads_data[thread_num].aggregate_columns,
parent.threads_data[thread_num].local_delta_memory,
parent.no_more_keys);

parent.threads_data[thread_num].src_rows += block.rows();
parent.threads_data[thread_num].src_bytes += block.bytes();
}

下面,再细看Aggregator

Aggregator::executeOnBlock

当前Aggregator::header 中存储的只是block的schema信息,即知道具体的key_columnsaggregate_columns信息,但是缺乏数据。因此当对一个block进行聚合计算时,会先尝试输入的block中提取出key_columnsaggregate_columns对应的具体数据。

代码如下,先基于提取出 key_columns,再在 prepareAggregateInstructions 函数中提取出aggregate_columns

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
bool Aggregator::executeOnBlock(const Block & block,
AggregatedDataVariants & result,
const FileProviderPtr & file_provider,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns,
Int64 & local_delta_memory,
bool & no_more_keys)
{
// ...
Columns columns = block.getColumns();
Columns materialized_columns;
materialized_columns.reserve(params.keys_size);

for (size_t i = 0; i < params.keys_size; ++i)
{
key_columns[i] = columns.at(params.keys[i]).get();

// need to materialized ?
if (ColumnPtr converted = key_columns[i]->convertToFullColumnIfConst())
{
materialized_columns.push_back(converted);
key_columns[i] = materialized_columns.back().get();
}
}

AggregateFunctionInstructions aggregate_functions_instructions;
prepareAggregateInstructions(columns, aggregate_columns, materialized_columns, aggregate_functions_instructions);
// ...
}

Aggregator::prepareAggregateInstructions

prepareAggregateInstructions 函数是为了获取聚合的相关信息及其数据,为后面做聚合做准备。

  • aggregte_columns[i] 存储的是 block 中第i个聚合函数(i.e. aggregate_functions[i])的所需的参数列

    比如 sum(b),那么 aggregte_columns[i]aggregates_size 为 1,需要从block中提取出 column b

    params.aggregates[i].arguments 存储的是sum这个聚合函数所需要列的 position index

    其中AggregateFunctionState是个聚合函数 adapter,用来表示这个聚合函数并未处于 finalize。

    TODO: read ColumnAggregateFunction::convertToValues

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
void Aggregator::prepareAggregateInstructions(Columns columns,
AggregateColumns & aggregate_columns,
Columns & materialized_columns,
AggregateFunctionInstructions & aggregate_functions_instructions)
{
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i].resize(params.aggregates[i].arguments.size());

aggregate_functions_instructions.resize(params.aggregates_size + 1);
aggregate_functions_instructions[params.aggregates_size].that = nullptr;

for (size_t i = 0; i < params.aggregates_size; ++i)
{
for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
{
aggregate_columns[i][j] = columns.at(params.aggregates[i].arguments[j]).get();
if (ColumnPtr converted = aggregate_columns[i][j]->convertToFullColumnIfConst())
{
materialized_columns.push_back(converted);
aggregate_columns[i][j] = materialized_columns.back().get();
}
}

aggregate_functions_instructions[i].arguments = aggregate_columns[i].data();
aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i];

auto * that = aggregate_functions[i];
/// Unnest consecutive trailing -State combinators
while (const auto * func = typeid_cast<const AggregateFunctionState *>(that)) // 基类 --> 子类
that = func->getNestedFunction().get();
aggregate_functions_instructions[i].that = that;

if (const auto * func = typeid_cast<const AggregateFunctionArray *>(that); unlikely(func))
{
UNUSED(func);
throw Exception("Not support AggregateFunctionArray", ErrorCodes::NOT_IMPLEMENTED);
}
aggregate_functions_instructions[i].batch_arguments = aggregate_columns[i].data();

aggregate_functions_instructions[i].batch_that = that;
}
}

关于 aggregate_descriptions 信息,是在 appendAggregate时候添加的,如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void appendAggDescription(const Names & arg_names, const DataTypes & arg_types, 
TiDB::TiDBCollators & arg_collators,
const String & agg_func_name,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns,
bool empty_input_as_null)
{
assert(arg_names.size() == arg_collators.size() && arg_names.size() == arg_types.size());

AggregateDescription aggregate;
aggregate.argument_names = arg_names;
String func_string = genFuncString(agg_func_name, aggregate.argument_names, arg_collators);
if (auto duplicated_return_type = findDuplicateAggFunc(func_string, aggregate_descriptions))
{
// agg function duplicate, don't need to build again.
aggregated_columns.emplace_back(func_string, duplicated_return_type);
return;
}

aggregate.column_name = func_string;
aggregate.parameters = Array();
aggregate.function = AggregateFunctionFactory::instance().get(agg_func_name, arg_types, {}, 0, empty_input_as_null);
aggregate.function->setCollators(arg_collators);

DataTypePtr result_type = aggregate.function->getReturnType();
aggregated_columns.emplace_back(func_string, aggregate.function->getReturnType());

aggregate_descriptions.push_back(std::move(aggregate));
}

prepareAggregateInstructions 函数执行完毕,则聚合的准备工作执行完:

  • key_columns
  • aggregate_columns:
    • aggregate_func
    • aggregate_arguments

Aggregator::executeWithoutKeyImpl

针对没有key的聚合情况,先为存储结果的结构 without_key 分配空间 + 初始化

  • 在这里是使用 alignedAlloc 来分配空间,便于统一管理内存,复用;
  • createAggregateStates 内部实际上是个 placement new,调用ColumnFunction 的构造函数对该内存进行初始化;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    if ((params.overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key)
    {
    AggregateDataPtr place = result.aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
    createAggregateStates(place);
    result.without_key = place;
    }

    /// We select one of the aggregation methods and call it.

    /// For the case when there are no keys (all aggregate into one row).
    if (result.type == AggregatedDataVariants::Type::without_key)
    {
    executeWithoutKeyImpl(result.without_key,
    num_rows,
    aggregate_functions_instructions.data(),
    result.aggregates_pool);
    }

针对这个 special case 的实现如下,即连续聚 rows 行数据,并将结果存储到 res, 也即每个线程的variant:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
  void NO_INLINE Aggregator::executeWithoutKeyImpl(AggregatedDataWithoutKey & res,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena)
{
/// Adding values
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
if (inst->offsets)
inst->batch_that->addBatchSinglePlace(
inst->offsets[static_cast<ssize_t>(rows - 1)],
res + inst->state_offset, // result
inst->batch_arguments,
arena);
else
inst->batch_that->addBatchSinglePlace(
rows,
res + inst->state_offset,
inst->batch_arguments,
arena);
}
}

Sum为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/// Vectorized version when there is no GROUP BY keys.
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos) const override
{
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = 0; i < batch_size; ++i)
{
if (flags[i])
add(place, columns, i, arena);
}
}
else
{
const auto & column = assert_cast<const ColVecType &>(*columns[0]);
this->data(place).addMany(column.getData().data(), batch_size);
}
}

Aggregator::executeImpl

下面介绍更为通用的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
template <typename Method>
void NO_INLINE Aggregator::executeImpl(Method & method, Arena * aggregates_pool,
size_t rows, ColumnRawPtrs & key_columns,
TiDB::TiDBCollators & collators,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
AggregateDataPtr overflow_row) const
{
typename Method::State state(key_columns, key_sizes, collators);

if (!no_more_keys)
executeImplBatch<false>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
else
executeImplBatch<true>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
}

Method::State 实际上是 hashtable,上面的 executeWithoutKeyImpl 因为没有key,因此不需要 hashtable(可以看看为什么有key的就需要hashtable)。

Aggregator::executeImplBatch

整个流程个上面的 executeWithoutKeyImpl 差不多:

  • ColumnFunction 分配内存
  • 使用 placement new 调用构造函数,初始化分配的内存
  • 聚合

内部对三种case,下面只描述通用的case.

传入的block 中的 method.data 在上面已经准备妥当,需要应该是按照key排序好,这样可以命中method 中的 cache,减少内存分配 + 加速查找

如果cache命中,那么 places[key_row_start, key_row_end) 这一段连续rows的都指向同一个内存地址,即同一个key,好处是聚合时就可以实现inplace

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
template <bool no_more_keys, typename Method>
void NO_INLINE Aggregator::executeImplBatch(Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions,
AggregateDataPtr overflow_row [[maybe_unused]]) const
{
// ...
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[rows]);

for (size_t row = 0; row < rows; ++row)
{
AggregateDataPtr aggregate_data = nullptr;

if constexpr (!no_more_keys)
{
// 基于 data[row] 构造 hashtable key, 在 method.data 中插入key
// method.data 在merge阶段会被 getDataVariant 函数提取出来进行merge
auto emplace_result = state.emplaceKey(method.data, row, *aggregates_pool, sort_key_containers);

/// NOTE: 这种需要 key 是排序后,相同的key是在一起的, 此时就可以使用 cache
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (emplace_result.isInserted())
{
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
emplace_result.setMapped(nullptr);

// method.data 的 row行构造的的 key 的value分配内存
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
// 初始化内存
createAggregateStates(aggregate_data);

emplace_result.setMapped(aggregate_data);
}
else
{
// cache
aggregate_data = emplace_result.getMapped();
}
}
else
{
/// no_more_key 表示聚合的数据已经超过限制,那么不再接受新的插入
/// Add only if the key already exists.
auto find_result = state.findKey(method.data, row, *aggregates_pool, sort_key_containers);
if (find_result.isFound())
aggregate_data = find_result.getMapped();
else
aggregate_data = overflow_row;
}

places[row] = aggregate_data;
}
// ...
}

内存初始化完毕,剩下的就是填充数据,这个可以看看sum的对应实现,逻辑和上面的差不多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
if (inst->offsets)
inst->batch_that->addBatchArray(rows,
places.get(),
inst->state_offset,
inst->batch_arguments,
inst->offsets,
aggregates_pool);
else
inst->batch_that->addBatch(rows,
places.get(),
inst->state_offset,
inst->batch_arguments,
aggregates_pool);
}

TODO state 怎么处理???

到此每个线程的局部数据 variant 就生成了, 线程级的 block 是聚合完成的,下面要在线程间聚合数据。