HashJoin: HashBuildOperator、HashProbeOperator 流程剖析

HashJOIN 经过优化器后,生成的执行计划一般是右表是小表,用于 Build HashTable,左表是大表,用于 Probe HashTable。而左表必须等右表构建完 HashTable 才能执行 Probe 过程,因此 Build 和 Probe 之间存在一个依赖关系。

因此,在将 HashJoinNode 拆分为 Pipeline 时,会生成两条 Pipelines:

  • Build Pipeline

    Build Pipeline 从表 t1 中读取所有符合条件的数据,并构建 HashTable。构建完成,则使 Probe Pipeline 解除阻塞,则从 blocked_driver_poller 中移除并添加到 ready_driver_queue 中,让 DriverExecutor 去执行 Probe。

  • Probe Pipeline

    Probe Pipeline 则从表 t2 中不断获取数据,然后执行 probe。由于 Build 过程完成,HashTable 就可以确定了,那么 Probe 的过程是可以逐 chunk 执行,即从表 t2 每次读取一个 chunk 就可以执行一次 probe,因此 PipelineJob 是不会阻塞的。

HashJoinBuildOperator

HashJoiner

HashJOIN 是通过 HashJoiner 来实现的,HashJoinBuildOperator 和 HashJoinProbeOperator 共享一个 HashJoiner,通过 HashJoiner::_phase 来判断当时是 Build 还是 Probe 阶段,HashJoinProbeOperator 也是基于这个字段来解除阻塞。示意图如下:

Pipeline-HashJoin-1

append_chunk_to_ht

BuildOperator 通过 push_chunk 接口接收表 t1 数据,内部调用 HashJoiner::append_chunk_to_ht 函数来构建 hashTable,

注意:BuildOperator 的前一个 Operator 不一定就是 OlapScanOperator,也可能是 ExchangeSourceOperator。因为当 t1 数据量很大,无法在一个节点上构建完整的 HashTable 时,需要将 t1 的数据正交划分,再 ExchangeSink 发到多个节点上执行 HashJOIN,最后组合多个节点上 HashJOIN 的执行结果即可。

目前,一个 BuildOperator 最大行数不能超过 UINT32_MAX。push_chunk 的过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Status HashJoinBuildOperator::push_chunk(RuntimeState* state, 
const vectorized::ChunkPtr& chunk) {
return _join_builder->append_chunk_to_ht(state, chunk);
}

Status HashJoiner::append_chunk_to_ht(RuntimeState* state, const ChunkPtr& chunk) {
if (_phase != HashJoinPhase::BUILD) {
return Status::OK();
}
if (!chunk || chunk->is_empty()) {
return Status::OK();
}
if (UNLIKELY(_ht.get_row_count() + chunk->num_rows() >= UINT32_MAX)) {
return Status::NotSupported(strings::Substitute(
"row count of right table in hash join > $0", UINT32_MAX));
}
{
// 获取 key_columns
SCOPED_TIMER(_build_conjunct_evaluate_timer);
_prepare_key_columns(_key_columns, chunk, _build_expr_ctxs);
}
{
// copy chunk of right table
SCOPED_TIMER(_copy_right_table_chunk_timer);
TRY_CATCH_BAD_ALLOC(
_ht.append_chunk(state, chunk, _key_columns));
}
return Status::OK();
}

_prepare_key_columns

_prepare_key_columns 函数是基于 expr_ctxs 从 chunk 中提取出 join-key 对应的列,将结果保存到 key_columns。

只不过对于 only_null、const_column 两种类型的列有优化,因此特殊判断下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void HashJoiner::_prepare_key_columns(Columns& key_columns, 
const ChunkPtr& chunk,
const std::vector<ExprContext*>& expr_ctxs) {
key_columns.resize(0);
for (auto& expr_ctx : expr_ctxs) {
// 从 chunk 中提取出 column
auto column_ptr =
EVALUATE_NULL_IF_ERROR(expr_ctx, expr_ctx->root(), chunk.get());
if (column_ptr->only_null()) {
auto column =
ColumnHelper::create_column(expr_ctx->root()->type(), true);
column->append_nulls(chunk->num_rows());
key_columns.emplace_back(column);
} else if (column_ptr->is_constant()) {
auto const_column =
ColumnHelper::as_raw_column<ConstColumn>(column_ptr);
const_column->data_column()->assign(chunk->num_rows(), 0);
key_columns.emplace_back(const_column->data_column());
} else {
key_columns.emplace_back(column_ptr);
}
}
}

JoinHashTable::append_chunk

在 HashJoiner::_ht 中有个内存数据结构 _table_items 记录着 BuildOperator::push_chunk 的所有数据:当调用 JoinHashTable::append_chunk 函数时,即将 chunk 中的参与构建 HashTable 的列添加到 _table_items->key_columns 和 _table_items->build_chunk 中。

注意:当 join-keys 中还包含 value columns 时,该 join-keys[i] 不会再单独赋值数据,在构建 HashTable 时会直接从 build_chunk 中获取,不会单独分配内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void JoinHashTable::append_chunk(RuntimeState* state, const ChunkPtr& chunk, 
const Columns& key_columns) {
Columns& columns = _table_items->build_chunk->columns();

// 将 chunk 中参与构建 HashTable 的 value 列数据添加到 build_chunk 中
for (size_t i = 0; i < _table_items->build_column_count; i++) {
SlotDescriptor* slot = _table_items->build_slots[i].slot;
ColumnPtr& column = chunk->get_column_by_slot_id(slot->id());

if (!columns[i]->is_nullable() && column->is_nullable()) {
// upgrade to nullable column
columns[i] = NullableColumn::create(
columns[i], NullColumn::create(columns[i]->size(), 0));
}
columns[i]->append(*column);
}

// 将参与构建 HashTable 的 key 列数据添加到 key_columns 中
for (size_t i = 0; i < _table_items->key_columns.size(); i++) {
// 如果 join-key 不在 build_chunk 中
if (_table_items->join_keys[i].col_ref == nullptr) {
// upgrade to nullable column
if (!_table_items->key_columns[i]->is_nullable()
&& key_columns[i]->is_nullable()) {
size_t row_count = _table_items->key_columns[i]->size();
_table_items->key_columns[i] = NullableColumn::create(
_table_items->key_columns[i], NullColumn::create(row_count, 0));
}
_table_items->key_columns[i]->append(*key_columns[i]);
}
}
}

build_ht

当 BuildOperator 的前一个 SourceOperator 数据已经全部 pulled,则会调用 HashJoinBuildOperator::set_finishing,在 set_finishing 函数中开始构建 HashTable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Status HashJoinBuildOperator::set_finishing(RuntimeState* state) {
_is_finished = true;
RETURN_IF_ERROR(_join_builder->build_ht(state));
// build runtime bloomfilter ...
_join_builder->enter_probe_phase();
return Status::OK();
}

Status HashJoiner::build_ht(RuntimeState* state) {
if (_phase == HashJoinPhase::BUILD) {
RETURN_IF_ERROR(_build(state));
COUNTER_SET(_build_buckets_counter,
static_cast<int64_t>(_ht.get_bucket_size()));
}
return Status::OK();
}

JoinHashTable::build

JoinHashTable::build 函数则是基于内存中的 {key_columns, build_chunk} 开始构建 HashTable。这个构建 HashTable 的算法也不难,可以参考 StarRocks Hash Join 源码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Status JoinHashTable::build(RuntimeState* state) {
RETURN_IF_ERROR(_table_items->build_chunk->upgrade_if_overflow());
_table_items->has_large_column = _table_items->build_chunk->has_large_column();

// 如果 join-key 包含在 build_chunk 中,则直接从 build_chunk 中获取
size_t join_key_count = _table_items->join_keys.size();
for (size_t i = 0; i < join_key_count; i++) {
if (_table_items->join_keys[i].col_ref != nullptr) {
SlotId slot_id = _table_items->join_keys[i].col_ref->slot_id();
_table_items->key_columns[i] =
_table_items->build_chunk->get_column_by_slot_id(slot_id);
}
}

RETURN_IF_ERROR(_upgrade_key_columns_if_overflow());

_hash_map_type = _choose_join_hash_map();

// 开始构建 HashTable
switch (_hash_map_type) {
#define M(NAME) \
case JoinHashMapType::NAME: \
_##NAME = std::make_unique<typename decltype(_##NAME)::element_type>(_table_items.get(), _probe_state.get()); \
_##NAME->build_prepare(state); \
_##NAME->probe_prepare(state); \
_##NAME->build(state); \
break;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
default:
assert(false);
}

return Status::OK();
}

HashJoinProbeOperator

当 HashJoiner::build_ht 函数构建完 HashTable,会通过 HashJoiner::enter_probe_phase 函数主动进入 HashJoinPhase::PROBE 阶段。HashJoinProbeOperator::need_input 返回 true,解除阻塞,则会从 blocked_driver_poller 中剔除进入 ready poller 开始执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void HashJoiner::enter_probe_phase() {
_short_circuit_break();

auto old_phase = HashJoinPhase::BUILD;
_phase.compare_exchange_strong(old_phase, HashJoinPhase::PROBE);
}

bool HashJoinProbeOperator::need_input() const {
return _join_prober->need_input();
}

bool HashJoiner::need_input() const {
return _phase == HashJoinPhase::PROBE && _probe_input_chunk == nullptr;
}

push_chunk

进入 HashJoinPhase::Probe 阶段后,Probe Pipeline 阻塞点就在 t2 的 pull_chunk。

一旦 SourceOperator::pull_chunk 返回一个chunk,DriverExecutor 就会推动 Probe Pipeline 状态机前进: SourceOperator:::pull_chunk –> HashJoinProbeOperator::push_chunk –> HashJoinProbeOperator::pull_chunk

Probe HashTable 的过程就发生在 HashJoinProbeOperator::pull_chunk 过程中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Status HashJoinProbeOperator::push_chunk(RuntimeState* state, const vectorized::ChunkPtr& chunk) {
_join_prober->push_chunk(state, chunk);
return Status::OK();
}

void HashJoiner::push_chunk(RuntimeState* state, ChunkPtr chunk) {
DCHECK(chunk && !chunk->is_empty());
DCHECK(!_probe_input_chunk);

_probe_input_chunk = std::move(chunk);
_ht_has_remain = true;
// 获取 probe 对应的 key-columns
_prepare_probe_key_columns();
}

pull_chunk

pull_chunk 函数就是一个 Probe HashTable 的过程,过程详解 StarRocks Hash Join 源码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
StatusOr<vectorized::ChunkPtr> HashJoinProbeOperator::pull_chunk(RuntimeState* state) {
return _join_prober->pull_chunk(state);
}

StatusOr<ChunkPtr> HashJoiner::pull_chunk(RuntimeState* state) {
DCHECK(_phase != HashJoinPhase::BUILD);
return _pull_probe_output_chunk(state);
}

StatusOr<ChunkPtr> HashJoiner::_pull_probe_output_chunk(RuntimeState* state) {
DCHECK(_phase != HashJoinPhase::BUILD);

auto chunk = std::make_shared<Chunk>();

if (_phase == HashJoinPhase::PROBE || _probe_input_chunk != nullptr) {
DCHECK(_ht_has_remain && _probe_input_chunk);

TRY_CATCH_BAD_ALLOC(
RETURN_IF_ERROR(_ht.probe(state, _key_columns, &_probe_input_chunk, &chunk, &_ht_has_remain)));
if (!_ht_has_remain) {
_probe_input_chunk = nullptr;
}

RETURN_IF_ERROR(_filter_probe_output_chunk(chunk));

return chunk;
}

if (_phase == HashJoinPhase::POST_PROBE) {
if (!_need_post_probe()) {
enter_eos_phase();
return chunk;
}

TRY_CATCH_BAD_ALLOC(RETURN_IF_ERROR(_ht.probe_remain(state, &chunk, &_ht_has_remain)));
if (!_ht_has_remain) {
enter_eos_phase();
}

RETURN_IF_ERROR(_filter_post_probe_output_chunk(chunk));

return chunk;
}

return chunk;
}

Questtion

StarRocks 中的 HashTable cache miss 比较高,效率没那么好?一个 cache-friendly HashTable 设计考量可以参考这篇论文 A Seven-Dimensional Analysis of Hashing Methods and its Implications on Query Processing