HashAggregte: Aggregtor 与聚合算子实现

AggregateSinkOperator 的输入源通过 AggregateSinkOperator::push_chunk 完全输入到 Aggregator 后,构建好 HashMap 会通知 AggregateSourceOperator::pull_chunk 从 Aggregator 的 hashmap 提取chunk。

Pipeline-AggregateNode-3

Aggregator

Aggregator 基于生成的 TAggregationNode 中包含的信息进行初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
struct TAggregationNode {
1: optional list<Exprs.TExpr> grouping_exprs
// aggregate exprs. The root of each expr is the aggregate function.
// The other exprs are the inputs to the aggregate function.
2: required list<Exprs.TExpr> aggregate_functions

// Tuple id used for intermediate aggregations
// (with slots of agg intermediate types)
3: required Types.TTupleId intermediate_tuple_id

// Tupld id used for the aggregation output (with slots of agg output types)
// Equal to intermediate_tuple_id if intermediate type == output type for all
// aggregate functions.
4: required Types.TTupleId output_tuple_id

// Set to true if this aggregation function requires finalization to complete
// after all rows have been aggregated, and this node is not an intermediate
5: required bool need_finalize
6: optional bool use_streaming_preaggregation

// For vector query engine
20: optional bool has_outer_join_child
21: optional TStreamingPreaggregationMode streaming_preaggregation_mode

// For profile attributes' printing: `Grouping Keys` `Aggregate Functions`
22: optional string sql_grouping_keys
23: optional string sql_aggregate_functions

24: optional i32 agg_func_set_version = 1

// used in query cache
25: optional list<Exprs.TExpr> intermediate_aggr_exprs

// used in pipeline engine
26: optional bool interpolate_passthrough = false

27: optional bool use_sort_agg
}

Pipeline-AggregateNode-1

Pipeline-AggregateNode-2