Pipeline: 查询加速: QueryCache

输入数据源会被划分多个 Morsels,每个 morsel 都表征着一个要读取的数据源及其范围 {tablet_id, version},这两个信息也包含在 scan_range 中。

QueryCache-1

enable_query_cache

在 query_cache 没有失效的情况下,N 个 MultiLaneOperator 之间的 lane 是一一对应的,数据流如下。

QueryCache-2

一个 MultiLaneOperator 包含多个 MultiOperator::Lane,每个 MultiOperator::Lane 中都包含了一个 operator:一个 lane-chain 用于处理一个 morsel 数据。
将原本的的 一个 Operator 子划分为 _num_lanes 个,实现 tablet 内的并行,每当一个 lane-chain 处理完一个morsel的数据及其后续操作时( last_chunk_received = true,eof_sent = true)会将结果缓存到 CacheMgr,供后续操作共享结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
struct Lane {
pipeline::OperatorPtr processor;
int64_t lane_owner; // tablet_id
int lane_id;
bool last_chunk_received;
bool eof_sent;

Lane(pipeline::OperatorPtr&& op, int id)
: processor(std::move(op)),
lane_owner(-1),
lane_id(id),
last_chunk_received(false),
eof_sent(false) {}
};

/// MultilaneOperator
MultilaneOperator::MultilaneOperator(pipeline::OperatorFactory* factory,
int32_t driver_sequence,
size_t num_lanes,
pipeline::Operators&& processors,
bool can_passthrough)
: pipeline::Operator(factory, factory->id(),
factory->get_raw_name(),
factory->plan_node_id(),
driver_sequence),
_num_lanes(num_lanes),
_can_passthrough(can_passthrough) {
_lanes.reserve(_num_lanes);
for (auto i = 0; i < _num_lanes; ++i) {
// 每个 lane 对应着一个 operator
_lanes.emplace_back(std::move(processors[i]), i);
}
}

/// MultilaneOperatorFactory
pipeline::OperatorPtr MultilaneOperatorFactory::create(int32_t degree_of_parallelism,
int32_t driver_sequence) {
pipeline::Operators processors;
processors.reserve(_num_lanes);
for (auto i = 0; i < _num_lanes; ++i) {
processors.push_back(_factory->create(
degree_of_parallelism * _num_lanes,
driver_sequence * _num_lanes + i));
}
return std::make_shared<MultilaneOperator>(this, driver_sequence,
_num_lanes,
std::move(processors),
_can_passthrough);
}

disable_query_cache

当 disable_query_cache 时,就退化为基本的状态,即此时每个 MultiLaneOperator 内部实际上只有一个 operator 在发挥作用,最后一个 MultiLaneOperator 中只有一个 passthrough_chunk 传递数据。

QueryCache-3

push_chunk

CacheOperator 内部有个 CacheMgr。在 push chunk 时,根据 chunk 的 {rows, bytes} 大小来决定 query_cache 是否有效。

LaneArbiter 的作用类似于全局锁,在 ScanOperator、MultiLaneOperator、CacheOperator 之间共享,记录着当前 query_cache 是否失效。
每次向 CacheOperator 中 push chunk 之前,都会检测 chunk 的 rows 和 bytes 是否超过 {max_rows, max_bytes} 阈值:

  • 如果超过,则转变为 passthrough 模式,不再向 CacheMgr 中写入数据,直接使用 passthrough_chunk 来存储每次读取的数据

    注意,变成 passthrough 模式后,CacheMgr 的数据仍在没有清除,仍可为后续的读取提供缓存。

  • 没有超过,则继续向 CacheMgr 中写数据

每个 chunk 都有一个 owner 信息:

  • owner_id:即 tablet_id,从 哪个 table_id 读取到的,通过 owner_id 可以定位到具体的 lane。
  • is_last_chunk:是不是这个 morsel 的最后一个 chunk;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
bool CacheOperator::_should_passthrough(size_t num_rows, size_t num_bytes) {
return _cache_param.entry_max_rows <= 0 || num_rows > _cache_param.entry_max_rows ||
_cache_param.entry_max_bytes <= 0 || num_bytes > _cache_param.entry_max_bytes;
}

Status CacheOperator::push_chunk(RuntimeState* state, const vectorized::ChunkPtr& chunk) {
DCHECK(chunk != nullptr);
if (_lane_arbiter->in_passthrough_mode()) {
DCHECK(_passthrough_chunk == nullptr);
_passthrough_chunk = chunk;
return Status::OK();
}

auto lane_owner = chunk->owner_info().owner_id();
DCHECK(_owner_to_lanes.count(lane_owner));
auto lane_id = _owner_to_lanes[lane_owner];
auto& buffer = _per_lane_buffers[lane_id];
buffer->append_chunk(chunk);

// 检测是否超过限制
if (_should_passthrough(buffer->num_rows, buffer->num_bytes)) {
_lane_arbiter->enable_passthrough_mode();
for (const auto& [_, lane_id] : _owner_to_lanes) {
_per_lane_buffers[lane_id]->set_passthrough();
}
return Status::OK();
}

if (buffer->should_populate_cache()) {
// 写入 cache
populate_cache(lane_owner);
}
return Status::OK();
}

populate_cache

在 push chunk 时,每次都会尝试调用 PerLaneBuffer::should_populate_cache 函数来 check 是否需要将填充 cache:

  • cached_version 是 cache_mgr 中目前 cache 的版本,required_version 是目前需要的版本,cached_version < required_version 说明 cache_mgr 中缓存的数据过时,需要填充新的数据;
  • PLBS_TOTAL:表征这个 lane 的数据读取完毕,此时会把这个 per_lane_buff 的数据填充到 cache_mgr 中;

完整的判断条件如下:

1
2
3
4
bool PerLaneBuffer::should_populate_cache() const {
return cached_version < required_version &&
(state == PLBS_HIT_TOTAL || state == PLBS_TOTAL);
}

填充 cache 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void CacheOperator::populate_cache(int64_t tablet_id) {
auto lane = _owner_to_lanes[tablet_id];
auto& buffer = _per_lane_buffers[lane];

auto cache_key_suffix_it = _cache_param.cache_key_prefixes.find(tablet_id);
if (cache_key_suffix_it == _cache_param.cache_key_prefixes.end()) {
// uncacheable
buffer->state = PLBS_POPULATE;
return;
}
std::string cache_key =
_cache_param.digest + cache_key_suffix_it->second;
int64_t current = GetMonoTimeMicros();
auto chunks = remap_chunks(buffer->chunks, _cache_param.slot_remapping);
CacheValue cache_value(current, buffer->required_version, std::move(chunks));
// If the cache implementation is global,
// populate method must be asynchronous and try its best to
// update the cache.
_cache_populate_bytes_counter->update(buffer->num_bytes);
_cache_populate_chunks_counter->update(buffer->chunks.size());
_cache_populate_rows_counter->update(buffer->num_rows);
_populate_tablets.insert(tablet_id);
// 缓存 mgr 中,超出容量时,kv会自动删除低优先级
_cache_mgr->populate(cache_key, cache_value);
buffer->state = PLBS_POPULATE;
}

pull_chunk

从 CacheOperator 中获取数据时:

  • 如果 query_cache 未失效, 则从 PerLaneBuffer 中获取;
  • 否则直接从 passthrough_chunk 读取

代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
StatusOr<vectorized::ChunkPtr> CacheOperator::pull_chunk(RuntimeState* state) {
auto opt_lane = _lane_arbiter->preferred_lane();
if (opt_lane.has_value()) {
auto lane = opt_lane.value();
auto& buffer = _per_lane_buffers[lane];
auto chunk = _pull_chunk_from_per_lane_buffer(buffer);
if (chunk != nullptr) {
return chunk;
}
}

for (const auto& [_, lane_id] : _owner_to_lanes) {
auto& buffer = _per_lane_buffers[lane_id];
auto chunk = _pull_chunk_from_per_lane_buffer(buffer);
if (chunk != nullptr) {
return chunk;
}
}
return std::move(_passthrough_chunk);
}

_pull_chunk_from_per_lane_buffer

当从 CacheOperator 中获取数据时,会从 PerLaneBufferPtr 中获取,如果 lane_id 中的数据已经读取完,则释放该 lane,让后续的 morsel 继续复用该lane继续提交任务。

1
2
3
4
5
6
7
8
9
10
11
vectorized::ChunkPtr CacheOperator::_pull_chunk_from_per_lane_buffer(PerLaneBufferPtr& buffer) {
if (buffer->has_chunks()) {
auto chunk = buffer->get_next_chunk();
if (buffer->can_release()) {
_lane_arbiter->release_lane(chunk->owner_info().owner_id()); // 类似于释放锁的逻辑
buffer->reset();
}
return chunk;
}
return nullptr;
}

_pick_morsel

Morsel 和 OlapScanOperator 中讲过 pick_morsel 函数,但是省略了 QueryCache 部分。

从 MorselQueue 中获取一个 morsel 后,先要从 lane_arbiter 中获得一个 lane slot 才能执行。

  • query_cache::AR_BUSY: 没有可执行的任务
  • query_cache::AR_PROBE:获得一个可用的 lane slot,先去 cache 中探测下,是否已经缓存过 {tablet_id, version} 的部分数据,如果已经缓存一部分,则后续只需要读取增量部分。
  • query_cache::AR_SKIP: 这个 lane 已经处理过;
  • query_cache::AR_IO:
    • passthrough 模式,或者
    • query-cache 失效后进入 passthroguh 模式,此时可能仍有部分缓存数据,尝试读取。

这部分代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
Status ScanOperator::_pickup_morsel(RuntimeState* state, int chunk_source_index) {
//...
// 根据 morsel_queue 的策略,获取一个 mosel
ASSIGN_OR_RETURN(auto morsel, _morsel_queue->try_get());

if (_lane_arbiter != nullptr) {
while (morsel != nullptr) {
// 这个 morsel 对应的 tablet_id 和 version
auto [lane_owner, version] = morsel->get_lane_owner_and_version();
// 这个 tablet 对应的 lane 是否被占据
auto acquire_result = _lane_arbiter->try_acquire_lane(lane_owner);
if (acquire_result == query_cache::AR_BUSY) {
// 无可用的 lane
_morsel_queue->unget(std::move(morsel));
return Status::OK();
} else if (acquire_result == query_cache::AR_PROBE) {
// 首次探测 {tablet_id, version}
auto hit = _cache_operator->probe_cache(lane_owner, version);
// 初始化
RETURN_IF_ERROR(_cache_operator->reset_lane(state, lane_owner));
if (!hit) {
break;
}
// hit 则从 cache 中获取,尽早跳出循环
auto [delta_version, delta_rowsets] =
_cache_operator->delta_version_and_rowsets(lane_owner);
if (!delta_rowsets.empty()) {
// 已经缓存的 version
morsel->set_from_version(delta_version);
// 对应的 rowsets
morsel->set_rowsets(delta_rowsets);
break;
} else {
ASSIGN_OR_RETURN(morsel, _morsel_queue->try_get());
}
} else if (acquire_result == query_cache::AR_SKIP) {
ASSIGN_OR_RETURN(morsel, _morsel_queue->try_get());
} else if (acquire_result == query_cache::AR_IO) {
auto [delta_verrsion, delta_rowsets] =
_cache_operator->delta_version_and_rowsets(lane_owner);
if (!delta_rowsets.empty()) {
morsel->set_from_version(delta_verrsion);
morsel->set_rowsets(delta_rowsets);
}
break;
}
}
}

//...
return Status::OK();
}

delta_version_and_rowsets

delta_version_and_rowsets 方法从 CacheOperator::_per_lane_buffers 中获得已经 scan 缓存的 {version, rowset} 。

1
2
3
4
5
6
7
8
9
10
11
std::tuple<int64_t, vector<RowsetSharedPtr>> 
CacheOperator::delta_version_and_rowsets(int64_t tablet_id) {
auto lane_it = _owner_to_lanes.find(tablet_id);
if (lane_it == _owner_to_lanes.end()) {
return std::make_tuple(0, vector<RowsetSharedPtr>{});
} else {
auto& buffer = _per_lane_buffers[lane_it->second];
return std::make_tuple(
buffer->cached_version + 1, buffer->rowsets);
}
}

OlapChunkSource::_init_olap_reader

TabletReader 初始化时,即只读取增量部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
Status OlapChunkSource::_init_olap_reader(RuntimeState* runtime_state) {
//...
// 只读取增量数据部分
_reader = std::make_shared<TabletReader>(_tablet,
Version(_morsel->from_version(), _version),
std::move(child_schema),
_morsel->rowsets());
//...
RETURN_IF_ERROR(_reader->prepare());
RETURN_IF_ERROR(_reader->open(_params));

return Status::OK();
}

CacheOperator::probe_cache

CacheMgr 是全局唯一的,在开启 QueryCache 的情况下,每次执行都会尝试去填充 CacheMgr,隐藏 cache 数据来自于两部分:

  • 当前 scan 任务填充的
  • 之前执行任务填充的

从 MorselQueue 中获得一个 mosel 之后,使用 morsel的 {tablet_id, version} 到CacheMgr中查询是否有缓存部分数据,基于缓存结果再设置 TabletReader 的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
bool CacheOperator::probe_cache(int64_t tablet_id, int64_t version) { 
_all_tablets.insert(tablet_id);
// allocate lane and PerLaneBuffer for tablet_id
int64_t lane = _lane_arbiter->must_acquire_lane(tablet_id);

_owner_to_lanes[tablet_id] = lane;
auto& buffer = _per_lane_buffers[lane];
buffer->reset();
buffer->lane = lane;
buffer->required_version = version;

if (_cache_param.force_populate
|| !_cache_param.cache_key_prefixes.count(tablet_id)) {
buffer->state = PLBS_MISS;
return false;
}
// probe cache
const std::string& cache_key =
_cache_param.digest + _cache_param.cache_key_prefixes.at(tablet_id);
auto probe_status = _cache_mgr->probe(cache_key);

// Cache MISS when failed to probe
if (!probe_status.ok()) {
buffer->state = PLBS_MISS;
return false;
}

auto& cache_value = probe_status.value();
if (cache_value.version == version) {
// Cache HIT_TOTAL when cached version equals to required version
buffer->state = PLBS_HIT_TOTAL; // 完全缓存
buffer->cached_version = cache_value.version;
auto chunks = remap_chunks(
cache_value.result, _cache_param.reverse_slot_remapping);
_update_probe_metrics(tablet_id, chunks);
buffer->chunks = std::move(chunks);
} else if (cache_value.version > version) {
// It rarely happens that required version is less that cached version,
// the required version become stale when the query is postponed to be
// processed because of some reasons,
// for examples, non-deterministic query scheduling, network congestion etc.
// make queries be executed out-of-order. so we must prevent stale result
// from replacing fresh cached result.
buffer->state = PLBS_MISS;
buffer->cached_version = 0;
} else {
// Incremental updating cause the cached value become stale,
// It is a very critical and complex situation.
// here we support a multi-version cache mechanism.
// 处理部分缓存
_handle_stale_cache_value(tablet_id, cache_value, buffer, version);
}

// return true on cache hit, false on cache miss
if (buffer->state == PLBS_HIT_TOTAL) {
_lane_arbiter->mark_processed(tablet_id);
return true;
} else if (buffer->state == PLBS_HIT_PARTIAL) {
return true;
} else {
return false;
}
}

_handle_stale_cache_value_for_non_pk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
void CacheOperator::_handle_stale_cache_value_for_non_pk(int64_t tablet_id, CacheValue& cache_value,
PerLaneBufferPtr& buffer, int64_t version) {
// Try to reuse partial cache result when cached version is less than
// required version, delta versions should be captured at first.
auto status = StorageEngine::instance()->tablet_manager()->capture_tablet_and_rowsets(
tablet_id, cache_value.version + 1, version);

// Cache MISS if delta versions are not captured,
// because aggressive cumulative compactions.
if (!status.ok()) {
buffer->state = PLBS_MISS;
buffer->cached_version = 0;
return;
}

// Delta versions are captured, several situations
// should be taken into consideration.
auto& [tablet, rowsets, rowsets_acq_rel] = status.value();
auto all_rs_empty = true;
auto min_version = std::numeric_limits<int64_t>::max();
auto max_version = std::numeric_limits<int64_t>::min();
for (const auto& rs : rowsets) {
all_rs_empty &= !rs->has_data_files();
min_version = std::min(min_version, rs->start_version());
max_version = std::max(max_version, rs->end_version());
}
Version delta_versions(min_version, max_version);
buffer->tablet = tablet;
auto has_delete_predicates = tablet->has_delete_predicates(delta_versions);
// case 1: there exist delete predicates in delta versions,
// or data model can not support multiversion cache and
// the tablet has non-empty delta rowsets;
// then cache result is not reuse, so cache miss.
if (has_delete_predicates || (!_cache_param.can_use_multiversion && !all_rs_empty)) {
buffer->state = PLBS_MISS;
buffer->cached_version = 0;
return;
}

// 当前cache的版本
buffer->cached_version = cache_value.version;
auto chunks = remap_chunks(cache_value.result, _cache_param.reverse_slot_remapping);
_update_probe_metrics(tablet_id, chunks);
// 从 cache_mgr 中获得的存量
buffer->chunks = std::move(chunks);
// case 2: all delta versions are empty rowsets, so the cache result is hit totally.
if (all_rs_empty) {
buffer->state = PLBS_HIT_TOTAL;
buffer->chunks.back()->owner_info().set_last_chunk(true);
return;
}

// case 3: otherwise, the cache result is partial result of per-tablet computation,
// so delta versions must be scanned and merged
// with cache result to generate total result.
buffer->state = PLBS_HIT_PARTIAL;
// 当前cache的版本
buffer->rowsets = std::move(rowsets);
buffer->rowsets_acq_rel = std::move(rowsets_acq_rel);
buffer->num_rows = 0;
buffer->num_bytes = 0;
for (const auto& chunk : buffer->chunks) {
buffer->num_rows += chunk->num_rows();
buffer->num_bytes += chunk->bytes_usage();
}
buffer->chunks.back()->owner_info().set_last_chunk(false);
}

LaneArbiter

在 CacheOperator 中创建,然后以指针形式分享给 ScanOperator、MultiLaneOperator。

QueryCache-4

LaneArbiter 类似于锁的作用,共有 _nun_lanes 个 slots,可以同时有 num_lane 个任务并发执行:

  • try_acquire_lane: 执行任务前,先通过 try_acquire_lane 函数来获取 slot
  • release_lane: 执行任务结束,再使用 release_lane 函数来是否 slot

try_acquire_lane

在获取scan任务之前,先通过 try_acquire_lane 来获得一个 slot:

  • in_passthrough_mode == true,则不考虑 query—cache
  • _processed.count(lane_owner) 为 1 则这个 lane_owner 已经处理完,不再考虑
  • _acquire_lane 函数从 _assignments 数组中获得一个处于 LANE_UNASSIGNED 状态的 lane,返回值
    • NO_FREE_LANE: 表示没有可用的 slot,返回 AcquireResult::AR_BUSY
    • NEW_LANE_BIT: 则表示存在可用的 slot,返回 AcquireResult::AR_PROBE
    • otherwise,相同的 lane_onwer 再次获取 lane
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
AcquireResult LaneArbiter::try_acquire_lane(LaneOwnerType lane_owner) {
if (in_passthrough_mode()) {
return AcquireResult::AR_IO;
}
/// 已处理过
if (_processed.count(lane_owner)) {
return AcquireResult::AR_SKIP;
}
/// 尚未处理
auto lane = _acquire_lane(lane_owner);
if (lane == NO_FREE_LANE) {
return AcquireResult::AR_BUSY;
}
if ((lane & NEW_LANE_BIT) == NEW_LANE_BIT) {
return AcquireResult::AR_PROBE;
}
return AcquireResult::AR_IO;
}

release_lane

当 CacheOperator::_per_lane_buffers 中的数据读完完毕,会调用 LaneArbiter::release_lane 函数,来标志某个 lane 已经处理完毕。

1
2
3
4
5
6
7
8
void LaneArbiter::release_lane(LaneOwnerType lane_owner) {
_processed.insert(lane_owner);
for (auto& _assignment : _assignments) {
if (_assignment.lane_owner == lane_owner) {
_assignment = LANE_UNASSIGNED; // 设置为初始化状态
}
}
}

PipelineDriver::prepare

将 CacheOperator 融入到 PipelineDriver 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Status PipelineDriver::prepare(RuntimeState* runtime_state) {
//...

ssize_t cache_op_idx = -1;
query_cache::CacheOperatorPtr cache_op = nullptr;
// 在一个 pipeline 中寻找 CacheOperator 的位置
for (auto i = 0; i < _operators.size(); ++i) {
if (cache_op = std::dynamic_pointer_cast<query_cache::CacheOperator>(_operators[i]);
cache_op != nullptr) {
cache_op_idx = i;
break;
}
}

if (cache_op != nullptr) {
query_cache::LaneArbiterPtr lane_arbiter = cache_op->lane_arbiter();
query_cache::MultilaneOperators multilane_operators;
// CacheOperator 之前的 operator 都被 wapper 在 MultilaneOperator 或者 OlapScanOperator
for (auto i = 0; i < cache_op_idx; ++i) {
auto& op = _operators[i];
// 在 pipeline_builder 中构建的对应的 factory
if (auto* multilane_op = dynamic_cast<query_cache::MultilaneOperator*>(op.get());
multilane_op != nullptr) {
multilane_op->set_lane_arbiter(lane_arbiter);
multilane_operators.push_back(multilane_op);
} else if (auto* olap_scan_op = dynamic_cast<OlapScanOperator*>(op.get());
olap_scan_op != nullptr) {
olap_scan_op->set_lane_arbiter(lane_arbiter);
// 设置 cache_operator
olap_scan_op->set_cache_operator(cache_op);
}
}
cache_op->set_multilane_operators(std::move(multilane_operators));
}
//...
}