boolCacheOperator::probe_cache(int64_t tablet_id, int64_t version){ _all_tablets.insert(tablet_id); // allocate lane and PerLaneBuffer for tablet_id int64_t lane = _lane_arbiter->must_acquire_lane(tablet_id);
// Cache MISS when failed to probe if (!probe_status.ok()) { buffer->state = PLBS_MISS; returnfalse; }
auto& cache_value = probe_status.value(); if (cache_value.version == version) { // Cache HIT_TOTAL when cached version equals to required version buffer->state = PLBS_HIT_TOTAL; // 完全缓存 buffer->cached_version = cache_value.version; auto chunks = remap_chunks( cache_value.result, _cache_param.reverse_slot_remapping); _update_probe_metrics(tablet_id, chunks); buffer->chunks = std::move(chunks); } elseif (cache_value.version > version) { // It rarely happens that required version is less that cached version, // the required version become stale when the query is postponed to be // processed because of some reasons, // for examples, non-deterministic query scheduling, network congestion etc. // make queries be executed out-of-order. so we must prevent stale result // from replacing fresh cached result. buffer->state = PLBS_MISS; buffer->cached_version = 0; } else { // Incremental updating cause the cached value become stale, // It is a very critical and complex situation. // here we support a multi-version cache mechanism. // 处理部分缓存 _handle_stale_cache_value(tablet_id, cache_value, buffer, version); }
// return true on cache hit, false on cache miss if (buffer->state == PLBS_HIT_TOTAL) { _lane_arbiter->mark_processed(tablet_id); returntrue; } elseif (buffer->state == PLBS_HIT_PARTIAL) { returntrue; } else { returnfalse; } }
void CacheOperator::_handle_stale_cache_value_for_non_pk(int64_t tablet_id, CacheValue& cache_value, PerLaneBufferPtr& buffer, int64_t version) { // Try to reuse partial cache result when cached version is less than // required version, delta versions should be captured at first. auto status = StorageEngine::instance()->tablet_manager()->capture_tablet_and_rowsets( tablet_id, cache_value.version + 1, version);
// Cache MISS if delta versions are not captured, // because aggressive cumulative compactions. if (!status.ok()) { buffer->state = PLBS_MISS; buffer->cached_version = 0; return; }
// Delta versions are captured, several situations // should be taken into consideration. auto& [tablet, rowsets, rowsets_acq_rel] = status.value(); auto all_rs_empty = true; auto min_version = std::numeric_limits<int64_t>::max(); auto max_version = std::numeric_limits<int64_t>::min(); for (constauto& rs : rowsets) { all_rs_empty &= !rs->has_data_files(); min_version = std::min(min_version, rs->start_version()); max_version = std::max(max_version, rs->end_version()); } Version delta_versions(min_version, max_version); buffer->tablet = tablet; auto has_delete_predicates = tablet->has_delete_predicates(delta_versions); // case 1: there exist delete predicates in delta versions, // or data model can not support multiversion cache and // the tablet has non-empty delta rowsets; // then cache result is not reuse, so cache miss. if (has_delete_predicates || (!_cache_param.can_use_multiversion && !all_rs_empty)) { buffer->state = PLBS_MISS; buffer->cached_version = 0; return; }
// 当前cache的版本 buffer->cached_version = cache_value.version; auto chunks = remap_chunks(cache_value.result, _cache_param.reverse_slot_remapping); _update_probe_metrics(tablet_id, chunks); // 从 cache_mgr 中获得的存量 buffer->chunks = std::move(chunks); // case 2: all delta versions are empty rowsets, so the cache result is hit totally. if (all_rs_empty) { buffer->state = PLBS_HIT_TOTAL; buffer->chunks.back()->owner_info().set_last_chunk(true); return; }
// case 3: otherwise, the cache result is partial result of per-tablet computation, // so delta versions must be scanned and merged // with cache result to generate total result. buffer->state = PLBS_HIT_PARTIAL; // 当前cache的版本 buffer->rowsets = std::move(rowsets); buffer->rowsets_acq_rel = std::move(rowsets_acq_rel); buffer->num_rows = 0; buffer->num_bytes = 0; for (constauto& chunk : buffer->chunks) { buffer->num_rows += chunk->num_rows(); buffer->num_bytes += chunk->bytes_usage(); } buffer->chunks.back()->owner_info().set_last_chunk(false); }