HashJOIN 经过优化器后,生成的执行计划一般是右表是小表,用于 Build HashTable,左表是大表,用于 Probe HashTable。而左表必须等右表构建完 HashTable 才能执行 Probe 过程,因此 Build 和 Probe 之间存在一个依赖关系。
因此,在将 HashJoinNode 拆分为 Pipeline 时,会生成两条 Pipelines:
Build Pipeline
Build Pipeline 从表 t1 中读取所有符合条件的数据,并构建 HashTable。构建完成,则使 Probe Pipeline 解除阻塞,则从 blocked_driver_poller 中移除并添加到 ready_driver_queue 中,让 DriverExecutor 去执行 Probe。
Probe Pipeline
Probe Pipeline 则从表 t2 中不断获取数据,然后执行 probe。由于 Build 过程完成,HashTable 就可以确定了,那么 Probe 的过程是可以逐 chunk 执行,即从表 t2 每次读取一个 chunk 就可以执行一次 probe,因此 PipelineJob 是不会阻塞的。
HashJoinBuildOperator
HashJoiner
HashJOIN 是通过 HashJoiner 来实现的,HashJoinBuildOperator 和 HashJoinProbeOperator 共享一个 HashJoiner,通过 HashJoiner::_phase 来判断当时是 Build 还是 Probe 阶段,HashJoinProbeOperator 也是基于这个字段来解除阻塞。示意图如下:
append_chunk_to_ht
BuildOperator 通过 push_chunk 接口接收表 t1 数据,内部调用 HashJoiner::append_chunk_to_ht 函数来构建 hashTable,
注意:BuildOperator 的前一个 Operator 不一定就是 OlapScanOperator,也可能是 ExchangeSourceOperator。因为当 t1 数据量很大,无法在一个节点上构建完整的 HashTable 时,需要将 t1 的数据正交划分,再 ExchangeSink 发到多个节点上执行 HashJOIN,最后组合多个节点上 HashJOIN 的执行结果即可。
目前,一个 BuildOperator 最大行数不能超过 UINT32_MAX。push_chunk 的过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| Status HashJoinBuildOperator::push_chunk(RuntimeState* state, const vectorized::ChunkPtr& chunk) { return _join_builder->append_chunk_to_ht(state, chunk); }
Status HashJoiner::append_chunk_to_ht(RuntimeState* state, const ChunkPtr& chunk) { if (_phase != HashJoinPhase::BUILD) { return Status::OK(); } if (!chunk || chunk->is_empty()) { return Status::OK(); } if (UNLIKELY(_ht.get_row_count() + chunk->num_rows() >= UINT32_MAX)) { return Status::NotSupported(strings::Substitute( "row count of right table in hash join > $0", UINT32_MAX)); } { SCOPED_TIMER(_build_conjunct_evaluate_timer); _prepare_key_columns(_key_columns, chunk, _build_expr_ctxs); } { SCOPED_TIMER(_copy_right_table_chunk_timer); TRY_CATCH_BAD_ALLOC( _ht.append_chunk(state, chunk, _key_columns)); } return Status::OK(); }
|
_prepare_key_columns
_prepare_key_columns 函数是基于 expr_ctxs 从 chunk 中提取出 join-key 对应的列,将结果保存到 key_columns。
只不过对于 only_null、const_column 两种类型的列有优化,因此特殊判断下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| void HashJoiner::_prepare_key_columns(Columns& key_columns, const ChunkPtr& chunk, const std::vector<ExprContext*>& expr_ctxs) { key_columns.resize(0); for (auto& expr_ctx : expr_ctxs) { auto column_ptr = EVALUATE_NULL_IF_ERROR(expr_ctx, expr_ctx->root(), chunk.get()); if (column_ptr->only_null()) { auto column = ColumnHelper::create_column(expr_ctx->root()->type(), true); column->append_nulls(chunk->num_rows()); key_columns.emplace_back(column); } else if (column_ptr->is_constant()) { auto const_column = ColumnHelper::as_raw_column<ConstColumn>(column_ptr); const_column->data_column()->assign(chunk->num_rows(), 0); key_columns.emplace_back(const_column->data_column()); } else { key_columns.emplace_back(column_ptr); } } }
|
JoinHashTable::append_chunk
在 HashJoiner::_ht 中有个内存数据结构 _table_items 记录着 BuildOperator::push_chunk 的所有数据:当调用 JoinHashTable::append_chunk 函数时,即将 chunk 中的参与构建 HashTable 的列添加到 _table_items->key_columns 和 _table_items->build_chunk 中。
注意:当 join-keys 中还包含 value columns 时,该 join-keys[i] 不会再单独赋值数据,在构建 HashTable 时会直接从 build_chunk 中获取,不会单独分配内存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| void JoinHashTable::append_chunk(RuntimeState* state, const ChunkPtr& chunk, const Columns& key_columns) { Columns& columns = _table_items->build_chunk->columns();
for (size_t i = 0; i < _table_items->build_column_count; i++) { SlotDescriptor* slot = _table_items->build_slots[i].slot; ColumnPtr& column = chunk->get_column_by_slot_id(slot->id());
if (!columns[i]->is_nullable() && column->is_nullable()) { columns[i] = NullableColumn::create( columns[i], NullColumn::create(columns[i]->size(), 0)); } columns[i]->append(*column); }
for (size_t i = 0; i < _table_items->key_columns.size(); i++) { if (_table_items->join_keys[i].col_ref == nullptr) { if (!_table_items->key_columns[i]->is_nullable() && key_columns[i]->is_nullable()) { size_t row_count = _table_items->key_columns[i]->size(); _table_items->key_columns[i] = NullableColumn::create( _table_items->key_columns[i], NullColumn::create(row_count, 0)); } _table_items->key_columns[i]->append(*key_columns[i]); } } }
|
build_ht
当 BuildOperator 的前一个 SourceOperator 数据已经全部 pulled,则会调用 HashJoinBuildOperator::set_finishing,在 set_finishing 函数中开始构建 HashTable。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Status HashJoinBuildOperator::set_finishing(RuntimeState* state) { _is_finished = true; RETURN_IF_ERROR(_join_builder->build_ht(state)); _join_builder->enter_probe_phase(); return Status::OK(); }
Status HashJoiner::build_ht(RuntimeState* state) { if (_phase == HashJoinPhase::BUILD) { RETURN_IF_ERROR(_build(state)); COUNTER_SET(_build_buckets_counter, static_cast<int64_t>(_ht.get_bucket_size())); } return Status::OK(); }
|
JoinHashTable::build
JoinHashTable::build 函数则是基于内存中的 {key_columns, build_chunk} 开始构建 HashTable。这个构建 HashTable 的算法也不难,可以参考 StarRocks Hash Join 源码解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| Status JoinHashTable::build(RuntimeState* state) { RETURN_IF_ERROR(_table_items->build_chunk->upgrade_if_overflow()); _table_items->has_large_column = _table_items->build_chunk->has_large_column();
size_t join_key_count = _table_items->join_keys.size(); for (size_t i = 0; i < join_key_count; i++) { if (_table_items->join_keys[i].col_ref != nullptr) { SlotId slot_id = _table_items->join_keys[i].col_ref->slot_id(); _table_items->key_columns[i] = _table_items->build_chunk->get_column_by_slot_id(slot_id); } }
RETURN_IF_ERROR(_upgrade_key_columns_if_overflow());
_hash_map_type = _choose_join_hash_map();
switch (_hash_map_type) { #define M(NAME) \ case JoinHashMapType::NAME: \ _##NAME = std::make_unique<typename decltype(_##NAME)::element_type>(_table_items.get(), _probe_state.get()); \ _##NAME->build_prepare(state); \ _##NAME->probe_prepare(state); \ _##NAME->build(state); \ break; APPLY_FOR_JOIN_VARIANTS(M) #undef M default: assert(false); }
return Status::OK(); }
|
HashJoinProbeOperator
当 HashJoiner::build_ht 函数构建完 HashTable,会通过 HashJoiner::enter_probe_phase 函数主动进入 HashJoinPhase::PROBE 阶段。HashJoinProbeOperator::need_input 返回 true,解除阻塞,则会从 blocked_driver_poller 中剔除进入 ready poller 开始执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| void HashJoiner::enter_probe_phase() { _short_circuit_break();
auto old_phase = HashJoinPhase::BUILD; _phase.compare_exchange_strong(old_phase, HashJoinPhase::PROBE); }
bool HashJoinProbeOperator::need_input() const { return _join_prober->need_input(); }
bool HashJoiner::need_input() const { return _phase == HashJoinPhase::PROBE && _probe_input_chunk == nullptr; }
|
push_chunk
进入 HashJoinPhase::Probe 阶段后,Probe Pipeline 阻塞点就在 t2 的 pull_chunk。
一旦 SourceOperator::pull_chunk 返回一个chunk,DriverExecutor 就会推动 Probe Pipeline 状态机前进: SourceOperator:::pull_chunk –> HashJoinProbeOperator::push_chunk –> HashJoinProbeOperator::pull_chunk
Probe HashTable 的过程就发生在 HashJoinProbeOperator::pull_chunk 过程中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Status HashJoinProbeOperator::push_chunk(RuntimeState* state, const vectorized::ChunkPtr& chunk) { _join_prober->push_chunk(state, chunk); return Status::OK(); }
void HashJoiner::push_chunk(RuntimeState* state, ChunkPtr chunk) { DCHECK(chunk && !chunk->is_empty()); DCHECK(!_probe_input_chunk);
_probe_input_chunk = std::move(chunk); _ht_has_remain = true; _prepare_probe_key_columns(); }
|
pull_chunk
pull_chunk 函数就是一个 Probe HashTable 的过程,过程详解 StarRocks Hash Join 源码解析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| StatusOr<vectorized::ChunkPtr> HashJoinProbeOperator::pull_chunk(RuntimeState* state) { return _join_prober->pull_chunk(state); }
StatusOr<ChunkPtr> HashJoiner::pull_chunk(RuntimeState* state) { DCHECK(_phase != HashJoinPhase::BUILD); return _pull_probe_output_chunk(state); }
StatusOr<ChunkPtr> HashJoiner::_pull_probe_output_chunk(RuntimeState* state) { DCHECK(_phase != HashJoinPhase::BUILD);
auto chunk = std::make_shared<Chunk>();
if (_phase == HashJoinPhase::PROBE || _probe_input_chunk != nullptr) { DCHECK(_ht_has_remain && _probe_input_chunk);
TRY_CATCH_BAD_ALLOC( RETURN_IF_ERROR(_ht.probe(state, _key_columns, &_probe_input_chunk, &chunk, &_ht_has_remain))); if (!_ht_has_remain) { _probe_input_chunk = nullptr; }
RETURN_IF_ERROR(_filter_probe_output_chunk(chunk));
return chunk; }
if (_phase == HashJoinPhase::POST_PROBE) { if (!_need_post_probe()) { enter_eos_phase(); return chunk; }
TRY_CATCH_BAD_ALLOC(RETURN_IF_ERROR(_ht.probe_remain(state, &chunk, &_ht_has_remain))); if (!_ht_has_remain) { enter_eos_phase(); }
RETURN_IF_ERROR(_filter_post_probe_output_chunk(chunk));
return chunk; }
return chunk; }
|
Questtion
StarRocks 中的 HashTable cache miss 比较高,效率没那么好?一个 cache-friendly HashTable 设计考量可以参考这篇论文 A Seven-Dimensional Analysis of Hashing Methods and its Implications on Query Processing