Pipeline: BalancedChunkBuffer 与 ChunkBufferLimiter
本篇博客阐述 BalancedChunkBuffer 的内部实现,以及如何配合 ChunkBufferLimiter 限读取并发度。
BalancedChunkBuffer
在上一篇博客 MorselQueue_3 说过 chunk_buffer 的作用:缓存一个 Fragement 中,所有 ScanOperators 从存储层读取的数据。chunk_buffer 实际上是 BalancedChunkBuffer 对象,并在 OlapScanContextFactory 构造函数中创建。
1 | OlapScanContextFactory(vectorized::OlapScanNode* const scan_node, |
SubBuffer
BalancBdChunkBuffer 中,SubBuffer 存储着每个 Pipeline ScanOperator 待返回给上层的数据。
从实现可以看出,这个 SubBuffer 实际上是个无界的队列 UnboundedBlockingQueue,用于缓存数据。_sub_buffers
是个大小为 dop 的数组,为每个 Pipeline 都分配了一个 SubBuffer。
_sub_buffers 里面不应该直接存储 SubBuffer,而是应该经过 cache_line 对齐的后 AlignedSubBuffer,因为多线程同时访问 _sub_buffers,如果不cache_line 对齐,会造成 false sharing 问题。
因为会多线程向 sub_buffer[seq] 插入数据,因此 UnboundedBlockingQueue 需要确保线程安全,内部实现也很朴素。
1 | class BalancedChunkBuffer { |
BalancBdChunkBuffer::put
BalancedChunkBuffer 输入输出内存映射关系有两种,从 OlapScanContextFactory 的构造函数可以看出,
- BalanceStrategy::kDirect,直通模式,对应 IndividualMorselQueueFactory,此时 Pipeline-driver_seq ScanOperator 读取到的数据传递给 该Pipeline-driver_seq 上自己的 Operators 使用
- BalanceStrategy::kRoundRobin,共享模式,对应 SharedMorselQueueFactory,此时 io-tasks 会分别给所有 Pipelines 的 ChunkBuffer 依次分配自己读取到的 chunks
1
2
3
4enum BalanceStrategy {
kDirect,
kRoundRobin,
};
BalancedChunkBuffer::put 函数会根据 BalanceStrategy 来选择插入方式。
1 | const BalancedChunkBuffer::SubBuffer& BalancedChunkBuffer::_get_sub_buffer(int index) const { |
BalancedChunkBuffer::try_get
BalancedChunkBuffer::try_get 就更加简单了,每个 Pipeline-driver_seq 从自己的 SubBuffer 中取出数据即可。
1 | bool BalancedChunkBuffer::try_get(int buffer_index, vectorized::ChunkPtr* output_chunk) { |
ChunkBufferLimiter
在 BalancedChunkBuffer 内部存放数据的 SubBuffer 是无界队列,单独使用了一个 ChunkBufferLimiter 来限制速率。ChunkBufferLimiter 在 OlapScanNode::decompose_to_pipeline 中实例化后,经过 OlapScanContextFactory 构造函数传递给 BalancBdChunkBuffer。
由于 ScanOperator::max_buffer_capacity 默认值是 64,因此每个 Pipeline 一次性最多可以读取 64 个 Chunks。又通过 estimated_max_concurrent_chunks 函数估计个下限。
1 | // OlapScanNode::decompose_to_pipeline |
Token
DynamicChunkBufferLimiter::_pinned_tokens_counter 字段表征当前读取了多少个 chunk,不能超过限制。
DynamicChunkBufferLimiter::Token 基于 RAII 设计,用户通过 DynamicChunkBufferLimiter::pin 函数对 pinned_tokens_counter 进行累加 num_chunks,并在析构函数中完成恢复,即pinned_tokens_counter 递减 num_chunks。
1 | class DynamicChunkBufferLimiter final : public ChunkBufferLimiter { |
pin
ScanOperator 在向存储层读取数据前,先会调用 DynamicChunkBufferLimiter::pin 函数申请一个 Token,是否超过 query 限制的并发读取速率 _capacity。
1 | ChunkBufferTokenPtr DynamicChunkBufferLimiter::pin(int num_chunks) { |
那么是如此控制着 ScanOperator 读取速率?
- 在 ScanOperator::_trigger_next_scan 函数中,提交 io-task 前,会先向 ChunkBufferLimiter 申请一个 Token,
ChunkBufferLimiter::pin 判断还有余量,则返回一个 Token 对象, _trigger_next_scan 函数才能提交 io-task。否则限流。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21Status ScanOperator::_trigger_next_scan(
RuntimeState* state, int chunk_source_index) {
ChunkBufferTokenPtr buffer_token;
if (buffer_token = pin_chunk(1); buffer_token == nullptr) {
return Status::OK();
}
COUNTER_UPDATE(_submit_task_counter, 1);
_chunk_sources[chunk_source_index]->pin_chunk_token(std::move(buffer_token));
//...
}
// function detail
ChunkBufferTokenPtr OlapScanOperator::pin_chunk(int num_chunks) {
return _ctx->get_chunk_buffer().limiter()->pin(num_chunks);
}
void ChunkSource::pin_chunk_token(ChunkBufferTokenPtr chunk_token) {
_chunk_token = std::move(chunk_token);
} - 在 ChunkSource::buffer_next_batch_chunks_blocking 函数中每尝试读取一次 chunk 前都会申请一 Token,只有当 Token 非空,才能继续读取。成功读取到数据后,{Chunk, Token} 作为 Pair 一起缓存到 BalancedChunkBuffer。当执行 BalancedChunkBuffer::try_get 函数时候,Chunk 会被返回给上层,而 Token 的生命周期结束,调用析构函数释放 Token,增加 _pinned_tokens_counter 让后续线程可以继续读取。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24Status ChunkSource::buffer_next_batch_chunks_blocking(
RuntimeState* state, size_t batch_size,
const workgroup::WorkGroup* running_wg) {
//...
for (size_t i = 0; i < batch_size && !state->is_cancelled(); ++i) {
{
if (_chunk_token == nullptr &&
(_chunk_token = _chunk_buffer.limiter()->pin(1)) == nullptr) {
break;
}
ChunkPtr chunk;
_status = _read_chunk(state, &chunk);
//...
// drvier_seq --> {chunk, token}
_chunk_buffer.put(_scan_operator_seq,
std::move(chunk),
std::move(_chunk_token));
}
//...
}
return _status;
}
update_avg_row_bytes
但是读取总的速率也不是额定的,在每次 _read_chunk 后都会更新。