Pipeline: BalancedChunkBuffer 与 ChunkBufferLimiter

本篇博客阐述 BalancedChunkBuffer 的内部实现,以及如何配合 ChunkBufferLimiter 限读取并发度。

BalancedChunkBuffer

在上一篇博客 MorselQueue_3 说过 chunk_buffer 的作用:缓存一个 Fragement 中,所有 ScanOperators 从存储层读取的数据。chunk_buffer 实际上是 BalancedChunkBuffer 对象,并在 OlapScanContextFactory 构造函数中创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
OlapScanContextFactory(vectorized::OlapScanNode* const scan_node,
int32_t dop,
bool shared_morsel_queue,
bool shared_scan,
ChunkBufferLimiterPtr chunk_buffer_limiter)
: _scan_node(scan_node),
_dop(dop),
_shared_morsel_queue(shared_morsel_queue),
_shared_scan(shared_scan),
// BalancedChunkBuffer 创建
_chunk_buffer(shared_scan
? BalanceStrategy::kRoundRobin
: BalanceStrategy::kDirect,
dop,
std::move(chunk_buffer_limiter)),
_contexts(shared_morsel_queue ? 1 : dop) {}

// BalancedChunkBuffer 构造函数
BalancedChunkBuffer::BalancedChunkBuffer(BalanceStrategy strategy,
int output_operators,
ChunkBufferLimiterPtr limiter)
: _output_operators(output_operators),
_strategy(strategy),
_sub_buffers(std::make_unique<SubBuffer[]>(output_operators)),
_limiter(std::move(limiter)) {
for (int i = 0; i < output_operators; i++) {
_sub_buffers[i] = std::make_unique<QueueT>();
}
}

SubBuffer

BalancBdChunkBuffer 中,SubBuffer 存储着每个 Pipeline ScanOperator 待返回给上层的数据。

从实现可以看出,这个 SubBuffer 实际上是个无界的队列 UnboundedBlockingQueue,用于缓存数据。_sub_buffers 是个大小为 dop 的数组,为每个 Pipeline 都分配了一个 SubBuffer。

_sub_buffers 里面不应该直接存储 SubBuffer,而是应该经过 cache_line 对齐的后 AlignedSubBuffer,因为多线程同时访问 _sub_buffers,如果不cache_line 对齐,会造成 false sharing 问题。

因为会多线程向 sub_buffer[seq] 插入数据,因此 UnboundedBlockingQueue 需要确保线程安全,内部实现也很朴素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class BalancedChunkBuffer {
public:
bool try_get(int buffer_index, vectorized::ChunkPtr* output_chunk);
bool put(int buffer_index, vectorized::ChunkPtr chunk, ChunkBufferTokenPtr chunk_token);
//... other methods
private:

using ChunkWithToken = std::pair<vectorized::ChunkPtr, ChunkBufferTokenPtr>;
using QueueT = UnboundedBlockingQueue<ChunkWithToken>;
using SubBuffer = std::unique_ptr<QueueT>;

const SubBuffer& _get_sub_buffer(int index) const;
SubBuffer& _get_sub_buffer(int index);

const int _output_operators;
const BalanceStrategy _strategy;
std::unique_ptr<SubBuffer[]> _sub_buffers;
std::atomic_int64_t _output_index = 0;

ChunkBufferLimiterPtr _limiter;
};

BalancBdChunkBuffer::put

BalancedChunkBuffer 输入输出内存映射关系有两种,从 OlapScanContextFactory 的构造函数可以看出,

  • BalanceStrategy::kDirect,直通模式,对应 IndividualMorselQueueFactory,此时 Pipeline-driver_seq ScanOperator 读取到的数据传递给 该Pipeline-driver_seq 上自己的 Operators 使用
  • BalanceStrategy::kRoundRobin,共享模式,对应 SharedMorselQueueFactory,此时 io-tasks 会分别给所有 Pipelines 的 ChunkBuffer 依次分配自己读取到的 chunks
    1
    2
    3
    4
    enum BalanceStrategy {
    kDirect,
    kRoundRobin,
    };

BalancedChunkBuffer::put 函数会根据 BalanceStrategy 来选择插入方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const BalancedChunkBuffer::SubBuffer& BalancedChunkBuffer::_get_sub_buffer(int index) const {
return _sub_buffers[index % _output_operators];
}

bool BalancedChunkBuffer::put(int buffer_index, vectorized::ChunkPtr chunk, ChunkBufferTokenPtr chunk_token) {
if (!chunk || (!chunk->owner_info().is_last_chunk() && chunk->num_rows() == 0)) {
return true;
}

if (_strategy == BalanceStrategy::kDirect) {
return _get_sub_buffer(buffer_index)->put(
std::make_pair(std::move(chunk), std::move(chunk_token)));
} else if (_strategy == BalanceStrategy::kRoundRobin) {
int target_index = _output_index.fetch_add(1) % _output_operators;
return _get_sub_buffer(target_index)->put(
std::make_pair(std::move(chunk), std::move(chunk_token)));
}
__builtin_unreachable();

BalancedChunkBuffer::try_get

BalancedChunkBuffer::try_get 就更加简单了,每个 Pipeline-driver_seq 从自己的 SubBuffer 中取出数据即可。

1
2
3
4
5
6
7
8
9
bool BalancedChunkBuffer::try_get(int buffer_index, vectorized::ChunkPtr* output_chunk) {
// Will release the token after exiting this scope.
ChunkWithToken chunk_with_token = std::make_pair(nullptr, nullptr);
bool ok = _get_sub_buffer(buffer_index)->try_get(&chunk_with_token);
if (ok) {
*output_chunk = std::move(chunk_with_token.first);
}
return ok;
}

ChunkBufferLimiter

在 BalancedChunkBuffer 内部存放数据的 SubBuffer 是无界队列,单独使用了一个 ChunkBufferLimiter 来限制速率。ChunkBufferLimiter 在 OlapScanNode::decompose_to_pipeline 中实例化后,经过 OlapScanContextFactory 构造函数传递给 BalancBdChunkBuffer。

由于 ScanOperator::max_buffer_capacity 默认值是 64,因此每个 Pipeline 一次性最多可以读取 64 个 Chunks。又通过 estimated_max_concurrent_chunks 函数估计个下限。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// OlapScanNode::decompose_to_pipeline
size_t max_buffer_capacity =
pipeline::ScanOperator::max_buffer_capacity() * dop; // 64 * dop
size_t default_buffer_capacity = std::min<size_t>(
max_buffer_capacity, estimated_max_concurrent_chunks());
auto buffer_limiter = std::make_unique<pipeline::DynamicChunkBufferLimiter>(
max_buffer_capacity,
default_buffer_capacity,
_mem_limit,
runtime_state()->chunk_size());

auto scan_ctx_factory = std::make_shared<pipeline::OlapScanContextFactory>(
this,
dop, shared_morsel_queue,
_enable_shared_scan,
std::move(buffer_limiter));

// DynamicChunkBufferLimiter 构造函数
DynamicChunkBufferLimiter(size_t max_capacity, size_t default_capacity,
int64_t mem_limit, int chunk_size)
: _capacity(default_capacity),
_max_capacity(max_capacity),
_default_capacity(default_capacity),
_mem_limit(mem_limit) {}

Token

DynamicChunkBufferLimiter::_pinned_tokens_counter 字段表征当前读取了多少个 chunk,不能超过限制。

DynamicChunkBufferLimiter::Token 基于 RAII 设计,用户通过 DynamicChunkBufferLimiter::pin 函数对 pinned_tokens_counter 进行累加 num_chunks,并在析构函数中完成恢复,即pinned_tokens_counter 递减 num_chunks。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class DynamicChunkBufferLimiter final : public ChunkBufferLimiter {
public:
class Token final : public ChunkBufferToken {
public:
Token(std::atomic<int>& pinned_tokens_counter, int num_tokens)
: _pinned_tokens_counter(pinned_tokens_counter),
_num_tokens(num_tokens) {}

~Token() override {
_pinned_tokens_counter.fetch_sub(_num_tokens);
}

private:
DISALLOW_COPY_AND_MOVE(Token);

std::atomic<int>& _pinned_tokens_counter;
const int _num_tokens;
};

public:
void update_avg_row_bytes(size_t added_sum_row_bytes,
size_t added_num_rows,
size_t max_chunk_rows) override;

ChunkBufferTokenPtr pin(int num_chunks) override;
private:
std::atomic<int> _pinned_chunks_counter = 0;
};

pin

ScanOperator 在向存储层读取数据前,先会调用 DynamicChunkBufferLimiter::pin 函数申请一个 Token,是否超过 query 限制的并发读取速率 _capacity。

1
2
3
4
5
6
7
8
9
10
ChunkBufferTokenPtr DynamicChunkBufferLimiter::pin(int num_chunks) {
size_t prev_value = _pinned_chunks_counter.fetch_add(num_chunks);
if (prev_value + num_chunks > _capacity) {
// 回滚 counter
_pinned_chunks_counter.fetch_sub(num_chunks);
return nullptr;
}
return std::make_unique<DynamicChunkBufferLimiter::Token>(
_pinned_chunks_counter, num_chunks);
}

那么是如此控制着 ScanOperator 读取速率?

  1. 在 ScanOperator::_trigger_next_scan 函数中,提交 io-task 前,会先向 ChunkBufferLimiter 申请一个 Token,
    ChunkBufferLimiter::pin 判断还有余量,则返回一个 Token 对象, _trigger_next_scan 函数才能提交 io-task。否则限流。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    Status ScanOperator::_trigger_next_scan(
    RuntimeState* state, int chunk_source_index) {

    ChunkBufferTokenPtr buffer_token;
    if (buffer_token = pin_chunk(1); buffer_token == nullptr) {
    return Status::OK();
    }

    COUNTER_UPDATE(_submit_task_counter, 1);
    _chunk_sources[chunk_source_index]->pin_chunk_token(std::move(buffer_token));
    //...
    }

    // function detail
    ChunkBufferTokenPtr OlapScanOperator::pin_chunk(int num_chunks) {
    return _ctx->get_chunk_buffer().limiter()->pin(num_chunks);
    }

    void ChunkSource::pin_chunk_token(ChunkBufferTokenPtr chunk_token) {
    _chunk_token = std::move(chunk_token);
    }
  2. 在 ChunkSource::buffer_next_batch_chunks_blocking 函数中每尝试读取一次 chunk 前都会申请一 Token,只有当 Token 非空,才能继续读取。成功读取到数据后,{Chunk, Token} 作为 Pair 一起缓存到 BalancedChunkBuffer。当执行 BalancedChunkBuffer::try_get 函数时候,Chunk 会被返回给上层,而 Token 的生命周期结束,调用析构函数释放 Token,增加 _pinned_tokens_counter 让后续线程可以继续读取。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    Status ChunkSource::buffer_next_batch_chunks_blocking(
    RuntimeState* state, size_t batch_size,
    const workgroup::WorkGroup* running_wg) {
    //...
    for (size_t i = 0; i < batch_size && !state->is_cancelled(); ++i) {
    {
    if (_chunk_token == nullptr &&
    (_chunk_token = _chunk_buffer.limiter()->pin(1)) == nullptr) {
    break;
    }

    ChunkPtr chunk;
    _status = _read_chunk(state, &chunk);
    //...

    // drvier_seq --> {chunk, token}
    _chunk_buffer.put(_scan_operator_seq,
    std::move(chunk),
    std::move(_chunk_token));
    }
    //...
    }
    return _status;
    }

update_avg_row_bytes

但是读取总的速率也不是额定的,在每次 _read_chunk 后都会更新。