tiflash: 多线程二阶段 Aggregate 实现(2)

Aggregate

各个线程的聚合完毕后,进入线程间聚合,

  • 如果聚合的数据没有超过阈值,不会落盘到 disk,在内存中完成 此时聚合各个线程据的函数入口 mergeAndConvertToBlocks;
  • 如果带聚合的数据过多超过阈值,则需要落盘,此时进入 MergingAggregatedMemoryEfficientBlockInputStream

下面以不落盘的情况为主。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
    Block ParallelAggregatingBlockInputStream::readImpl()
{
if (!executed)
{
Aggregator::CancellationHook hook = [&]() {
return this->isCancelled();
};
aggregator.setCancellationHook(hook);

execute(); // 各个线程完成聚合

if (isCancelledOrThrowIfKilled())
return {};

if (!aggregator.hasTemporaryFiles())
{
/** If all partially-aggregated data is in RAM, then merge them in parallel, also in RAM.
*/
impl = aggregator.mergeAndConvertToBlocks(many_data, final, max_threads);
}
// ...
}

Aggregator::mergeAndConvertToBlocks

Of course, mergeAndConvertToBlocks 输入的是各个线程的variant,最终的目的是输出一个 block 给下一个 IBlockInputStream, 这是由内部的
``完成。

先把 data_variants 中为空的部分排除,因为多线程读取输入的时候可能某些线程没有读取到有效数据,得到的有效数据是non_empty_data, 得到 non_empty_data 之后:

  • 又基于 vaiant 大小对variant进行排序,大小更大说明其内部的key最多,聚合的时候内存allocation就更少,聚合就更高效。(todo 验证)
  • 是否需要转化为二阶段,这个取决于是否存在某个varianttwo-level,这由线程初始化Aggregator 时的 Aggregator::chooseAggregationMethod 函数确定。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
std::unique_ptr<IBlockInputStream> Aggregator::mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants,
bool final, size_t max_threads) const
{
if (data_variants.empty())
throw Exception("Empty data passed to Aggregator::mergeAndConvertToBlocks.", ErrorCodes::EMPTY_DATA_PASSED);

LOG_FMT_TRACE(log, "Merging aggregated data");

ManyAggregatedDataVariants non_empty_data;
non_empty_data.reserve(data_variants.size());
for (auto & data : data_variants)
if (!data->empty())
non_empty_data.push_back(data);

if (non_empty_data.empty())
return std::make_unique<NullBlockInputStream>(getHeader(final));

if (non_empty_data.size() > 1)
{
/// Sort the states in descending order so that the merge is more efficient (since all states are merged into the first).
std::sort(non_empty_data.begin(), non_empty_data.end(),
[](const AggregatedDataVariantsPtr & lhs, const AggregatedDataVariantsPtr & rhs)
{
return lhs->sizeWithoutOverflowRow() > rhs->sizeWithoutOverflowRow();
});
}

/// If at least one of the options is two-level, then convert all the options into two-level ones, if there are not such.
/// Note - perhaps it would be more optimal not to convert single-level versions before the merge, but merge them separately, at the end.
bool has_at_least_one_two_level = false;
for (const auto & variant : non_empty_data)
{
if (variant->isTwoLevel())
{
has_at_least_one_two_level = true;
break;
}
}

if (has_at_least_one_two_level)
for (auto & variant : non_empty_data)
if (!variant->isTwoLevel())
variant->convertToTwoLevel();
//...
}

接着上面,剩余代码如下:

  • non_empty_data[0] 存储着最终的聚合结果,即如果non_empty_data[1..N] 中如果存在和non_empty_data[0] 中相同的key,则将其结果都聚合到non_empty_data[0],
    因此,non_empty_data[0].aggregates_pools就以此保存着后续每个 variantaaggregates_pools,其实就是将整个 non_empty_dataaggregates_pools
    flat到 non_empty_data[0].aggregates_pools
  • 进入 MergingAndConvertingBlockInputStream,进行聚合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
std::unique_ptr<IBlockInputStream> Aggregator::mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants,
bool final, size_t max_threads) const
{
// ...
AggregatedDataVariantsPtr & first = non_empty_data[0];

for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
{
if (first->type != non_empty_data[i]->type)
throw Exception("Cannot merge different aggregated data variants.",
ErrorCodes::CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS);

/** Elements from the remaining sets can be moved to the first data set.
* Therefore, it must own all the arenas of all other sets.
*/
first->aggregates_pools.insert(first->aggregates_pools.end(),
non_empty_data[i]->aggregates_pools.begin(),
non_empty_data[i]->aggregates_pools.end());
}

return std::make_unique<MergingAndConvertingBlockInputStream>(*this, non_empty_data, final, max_threads);
}

MergingAndConvertingBlockInputStream

按照惯例,先从没有 key 的特殊聚合场景来,实现逻辑:

  • 聚合: 在 mergeWithoutKeyDataImpl 函数中完成各个线程variant的聚合
  • 生成block: 在 prepareBlockAndFillWithoutKey 中将聚合过后的数据转换为 block。

下面主要讲解 mergeWithoutKeyDataImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Block MergingAndConvertingBlockInputStream::readImpl() override
{
if (data.empty())
return {};

if (current_bucket_num >= NUM_BUCKETS)
return {};

FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_aggregate_merge_failpoint);

AggregatedDataVariantsPtr & first = data[0];

if (current_bucket_num == -1)
{
++current_bucket_num;

if (first->type == AggregatedDataVariants::Type::without_key || aggregator.params.overflow_row)
{
// |data| is Intermediate buffer to save result.
// Next, wo need to move |data| to block adn return it
aggregator.mergeWithoutKeyDataImpl(data);
return aggregator.prepareBlockAndFillWithoutKey(
*first,
final,
first->type != AggregatedDataVariants::Type::without_key);
}
}
// ...
}

Aggregator::mergeWithoutKeyDataImpl

  • 使用 mergenon_empty_data[1..N] 的数据merge到 non_empty_data[0]
  • merge完,就调用 destroy 函数调用non_empty_data[1..N]ColumnFunc 的析构函数,析构该内存上的对象,但是内存本身没有free, 便于后续复用。
    最终聚合的结果就存储在 non_empty_data[0]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(ManyAggregatedDataVariants & non_empty_data) const
{
AggregatedDataVariantsPtr & res = non_empty_data[0];

/// We merge all aggregation results to the first.
for (size_t result_num = 1, size = non_empty_data.size(); result_num < size; ++result_num)
{
AggregatedDataWithoutKey & res_data = res->without_key;
AggregatedDataWithoutKey & current_data = non_empty_data[result_num]->without_key;

for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(res_data + offsets_of_aggregate_states[i],
current_data + offsets_of_aggregate_states[i],
res->aggregates_pool);

for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(current_data + offsets_of_aggregate_states[i]);

current_data = nullptr;
}
}

注意,IAggregateFunction::merge 是合并同类的数据,IAggregateFunction::addBatch 是用来构建 data base。

Aggregator::mergeSingleLevelDataImpl

下面来看单阶段聚合实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
template <typename Method>
void NO_INLINE Aggregator::mergeSingleLevelDataImpl(ManyAggregatedDataVariants & non_empty_data) const
{
AggregatedDataVariantsPtr & res = non_empty_data[0];
bool no_more_keys = false;

/// We merge all aggregation results to the first.
for (size_t result_num = 1, size = non_empty_data.size(); result_num < size; ++result_num)
{
if (!checkLimits(res->sizeWithoutOverflowRow(), no_more_keys))
break;

AggregatedDataVariants & current = *non_empty_data[result_num];

if (!no_more_keys)
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data,
getDataVariant<Method>(current).data,
res->aggregates_pool);
else if (res->without_key)
mergeDataNoMoreKeysImpl<Method>(
getDataVariant<Method>(*res).data,
res->without_key,
getDataVariant<Method>(current).data,
res->aggregates_pool);
else
mergeDataOnlyExistingKeysImpl<Method>(
getDataVariant<Method>(*res).data,
getDataVariant<Method>(current).data,
res->aggregates_pool);

/// `current` will not destroy the states of aggregate functions in the destructor
current.aggregator = nullptr;
}
}

注意:getDataVariant 是将单线程聚合阶段的hashtable给保存起来了,然后现在使用 getDataVariant 给取出来`

Aggregator::mergeDataImpl

这是正常情况下的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
template <typename Method, typename HashTable>
void NO_INLINE Aggregator::mergeDataImpl(HashTable & table_dst, HashTable & table_src, Arena * arena) const
{
table_src.mergeToViaEmplace(table_dst,
[&](AggregateDataPtr & __restrict dst, AggregateDataPtr & __restrict src, bool inserted) {
if (!inserted)
{
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(
dst + offsets_of_aggregate_states[i],
src + offsets_of_aggregate_states[i],
arena);

for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(src + offsets_of_aggregate_states[i]);
}
else
{
dst = src;
}

src = nullptr;
});
table_src.clearAndShrink();
}
````

我们来看下 `HashMapTable::mergeToViaEmplace` 的实现如下,主要是语义就是将当前 hashtable `'this'` 中的key给merge 其他 hashtable `'this'`:
- 遍历 this 中的key-range, 插入到 this

这也是为什么一开始要对`no_empty_data`进行`std::sort`,减少that发生rehash的概率
- 对插入的每个key, 基于传入的 callback |func| 聚合与之对应的vaiant中的数据,比如上面的 `Aggregator::mergeDataImpl` 中传入的lambda表达式。


```cpp
// in class HashMapTable
template <typename Func>
void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func)
{
for (auto it = this->begin(), end = this->end(); it != end; ++it)
{
typename Self::LookupResult res_it;
bool inserted;
// 先插入key: 将 it->getKey() 插入到 that
that.emplace(Cell::getKey(it->getValue()), res_it, inserted, it.getHash());
// 再构造对应的值: 基于 callback |func| 进行构造
func(res_it->getMapped(), it->getMapped(), inserted);
}
}

Aggregator::convertToBlockImpl

下面需要把生成的结果转化到block中,这一步取决于是否是 final 阶段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template <typename Method, typename HashTable>
void Aggregator::convertToBlockImpl(Method & method, HashTable & data,
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
Arena * arena, bool final) const
{
if (data.empty())
return;

if (key_columns.size() != params.keys_size)
throw Exception{"Aggregate. Unexpected key columns size.", ErrorCodes::LOGICAL_ERROR};

std::vector<IColumn *> raw_key_columns;
raw_key_columns.reserve(key_columns.size());
for (auto & column : key_columns)
raw_key_columns.push_back(column.get());

if (final)
convertToBlockImplFinal(method, data, std::move(raw_key_columns), final_aggregate_columns, arena);
else
convertToBlockImplNotFinal(method, data, std::move(raw_key_columns), aggregate_columns);

/// In order to release memory early.
data.clearAndShrink();
}
Aggregator::convertToBlockImplFinal

非final阶段和final阶段差别只在于给 aggregate_column 赋值不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
template <typename Method, typename HashTable>
void NO_INLINE Aggregator::convertToBlockImplNotFinal(Method & method, HashTable & data,
std::vector<IColumn *> key_columns,
AggregateColumnsData & aggregate_columns) const
{
auto shuffled_key_sizes = method.shuffleKeyColumns(key_columns, key_sizes);
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;

data.forEachValue([&](const auto & key, auto & mapped) {
method.insertKeyIntoColumns(key, key_columns, key_sizes_ref, params.collators);

/// reserved, so push_back does not throw exceptions
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i]->push_back(mapped + offsets_of_aggregate_states[i]);

mapped = nullptr;
});
}
scheduleThreadForNextBucket

每个bucket一个线程,每个线程将一个 bucket 转化为 block,按照 bucket 顺序依次读取

TWO-Level

  • 转化为两阶段有啥好处
  • 各个线程自己聚合玩,会尝试是否能转化为二阶段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
if (!first->isTwoLevel())
{
//... mergeSingleLevelDataImpl
}
else
{
if (!parallel_merge_data)
{
parallel_merge_data = std::make_unique<ParallelMergeData>(threads);
for (size_t i = 0; i < threads; ++i)
scheduleThreadForNextBucket();
}

Block res;

while (true)
{
std::unique_lock lock(parallel_merge_data->mutex);

if (parallel_merge_data->exception)
std::rethrow_exception(parallel_merge_data->exception);

auto it = parallel_merge_data->ready_blocks.find(current_bucket_num);
if (it != parallel_merge_data->ready_blocks.end())
{
++current_bucket_num;
scheduleThreadForNextBucket();

if (it->second)
{
res.swap(it->second);
break;
}
else if (current_bucket_num >= NUM_BUCKETS)
break;
}

parallel_merge_data->condvar.wait(lock);
}

return res;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void Aggregator::createAggregateStates(AggregateDataPtr & aggregate_data) const
{
for (size_t j = 0; j < params.aggregates_size; ++j)
{
try
{
/** An exception may occur if there is a shortage of memory.
* In order that then everything is properly destroyed, we "roll back" some of the created states.
* The code is not very convenient.
*/
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_aggregate_create_state_failpoint);
// 调用不同类型的构造函数,初始化 aggregate_data + offsets_of_aggregate_states[j 内存
aggregate_functions[j]->create(aggregate_data + offsets_of_aggregate_states[j]);
}
catch (...)
{
for (size_t rollback_j = 0; rollback_j < j; ++rollback_j)
aggregate_functions[rollback_j]->destroy(aggregate_data + offsets_of_aggregate_states[rollback_j]);

throw;
}
}
}