从 TiKV 到 TiFlash,传递的都是 region 信息,比如 region_id, region_range 等,但是在 TilFlash 内部并不存储 region 信息,需要将 region 信息转化为 StorageDelteMerge 的相关状态,比如,在 region 内部都会有个对应的 mapped_table_id。同时,又需要记录 region 的相关的元数据 meta,来记录 TiKV、TiFlash 的同步信息。
Region 在 TiFLASH 中,每个 Region 都会映射到一个 table_id,里面包含了一些数据(data)状态(region_meta)。
RegionTable RegionTable 是个全局的数据结构,在 Region 和 Table 之间建立映射关系。
RegionTable::Table RegionTable::Table 中记录多个 InternelRegion,表征:
table_id 出现在哪些 Region 中,
每个 region 的 range 对应 table 中的哪些 key range
在RegionTable中插入一个region时,会更新 table_regions 和 regions 两个字段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 RegionTable::InternalRegion & RegionTable::insertRegion ( Table & table, const RegionRangeKeys & region_range_keys, const RegionID region_id) { LOG_FMT_INFO (log, "[############ insertRegion region {}]" , region_id); auto & table_regions = table.regions; auto [it, ok] = table_regions.emplace ( region_id, InternalRegion (region_id, region_range_keys.rawKeys ())); if (!ok) throw Exception ( std::string (__PRETTY_FUNCTION__) + ": insert duplicate internal region " + DB::toString (region_id), ErrorCodes::LOGICAL_ERROR); regions[region_id] = table.table_id; }
主要更改 RegionTable 的相关状态时,RegionTable::InternalRegion 在以下三个地方可能会被用到:
RegionTable::shrinkRegionRange
RegionTable::updateRegion
RegionTable::extendRegionRange
实际上就是 TiKV 发生分裂、合并等行为时。
RegionManager RegionManger 是用于管理 region 的读写行为。
regions: 记录了当前所有的 region的指针;
region_range_index:记录的是字段 regions 中的每个 range 的 key 和对应的 region 之间的映射,以及当前range的信息
RegionsRangeIndex RegionsRangeIndex::add RegionsRangeIndex::root 是基于 {range_key, region_map} 有序建立映射的,即每个 range_key 出现在哪些 region 中。 在向 RegionsRangeIndex 中新增一个 new_region 时:
先基于 new_region 的 new_range 定位到在 RegionsRangeIndex::root 中插入的起始位置,即 [begin_it, end_it) 区间
为 [begin_it, end_it) 区间的每个 IndexNode::region_map 都插入一条相同的 {new_region_id, new_region},表示 root[iter->first] 又多包含了一个 new_region
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void RegionsRangeIndex::add (const RegionPtr & new_region) { auto region_range = new_region->getRange (); const auto & new_range = region_range->comparableKeys (); RootMap::iterator begin_it = split (new_range.first); RootMap::iterator end_it = split (new_range.second); if (begin_it == end_it) throw Exception (__PRETTY_FUNCTION__ + ": range of region " s + toString (new_region->id ()) + " is empty" , ErrorCodes::LOGICAL_ERROR); for (auto it = begin_it; it != end_it; ++it) it->second.region_map.emplace (new_region->id (), new_region); }
由于 root 的数据结构是 std::map,即所有的key都是有序的,那么
begin_it 表达的即 root 中 >= new_range_start (new_range.first)的起始位置
end_it 表达的即 root 中 >= new_range_end(new_range.second) 的起始位置
那 [begin_it, end_it) 区间即添加 new_range 时需要更新的区间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 RegionsRangeIndex::RootMap::iterator RegionsRangeIndex::split (const TiKVRangeKey & new_start) { const auto do_split = [this ](RootMap::iterator begin_it, const TiKVRangeKey & new_start) { begin_it--; auto & ori = begin_it->second; auto tar_it = root.emplace (new_start.copy (), IndexNode{}).first; tar_it->second.region_map = ori.region_map; return tar_it; }; auto begin_it = root.lower_bound (new_start); assert (begin_it != root.end ()); if (begin_it->first.compare (new_start) == 0 ) return begin_it; else return do_split (begin_it, new_start); }
RegionsRangeIndex::remove 与add相对的,从 root 中删除一个 region 的remove 函数,去除异常部分简化后如下
1 2 3 4 5 6 7 8 9 void RegionsRangeIndex::remove (const RegionRange & range, RegionID region_id) { auto begin_it = root.find (range.first); auto end_it = root.find (range.second); for (auto it = begin_it; it != end_it; ++it) it->second.region_map.erase (region_id); tryMergeEmpty (begin_it); }
下面从 分裂、合并的流程来看 Region 的行为。
handleAdminRaftCmd execBatchSplit execBatchSplit 主要是根据 tikv response 生成新的 region 分裂信息 split_regions,代码去除异常部分简化后如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 Regions RegionRaftCommandDelegate::execBatchSplit ( const raft_cmdpb::AdminRequest &, const raft_cmdpb::AdminResponse & response, UInt64 index, UInt64 term) { const auto & new_region_infos = response.splits ().regions (); std::vector<RegionPtr> split_regions; split_regions.reserve (new_region_infos.size ()); { std::unique_lock<std::shared_mutex> lock (mutex) ; int new_region_index = -1 ; for (int i = 0 ; i < new_region_infos.size (); ++i) { const auto & region_info = new_region_infos[i]; if (region_info.id () != meta.regionId ()) { const auto & peer = findPeerByStore (region_info, meta.storeId ()); RegionMeta new_meta (peer, region_info, initialApplyState()) ; auto split_region = splitInto (std::move (new_meta)); split_regions.emplace_back (split_region); } else { if (new_region_index == -1 ) new_region_index = i; } } RegionMeta new_meta (meta.getPeer(), new_region_infos[new_region_index], meta.getApplyState()) ; new_meta.setApplied (index, term); meta.assignRegionMeta (std::move (new_meta)); } return split_regions; }
splitInto execBatchSplit 中的 splitInto 函数,是根据 new_meta 将当前 region 的 data、range 分裂到 new_range 中。
1 2 3 4 5 6 7 8 9 RegionPtr Region::splitInto (RegionMeta && new_meta) { RegionPtr new_region = std::make_shared <Region>(std::move (new_meta), proxy_helper); data.splitInto (new_region->getRange ()->comparableKeys (), new_region->data); return new_region; }
handle_batch_split 下面是基于 execBatchSplit 返回的信息来执行,主要分为三个部分:
RegionManager::region_range_index 添加新分裂生成的 region 信息,更新当前region的信息(即删除 + 重新插入)
RegionTable 新增新生成的 region 信息,在region 和 table 间建立映射关系。并且由于分裂,可能导致当前 region 的 range 发生更改,RegionTable 中当前 region 的 range 信息需要更改。
try_to_flush_region 函数:将 region 中缓存的的数据(Region::date)通过 writeRegionDataToStorage 接口写入到 dm_storage 。
persist_and_sync 函数:将 region 对应 Storage 中的部分,调用 IStorage::flushCache 接口对该region_range 进行持久化。
代码简化后如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 const auto handle_batch_split = [&](Regions & split_regions) { { auto manage_lock = genRegionWriteLock (task_lock); for (auto & new_region : split_regions) { auto [it, ok] = manage_lock.regions.emplace (new_region->id (), new_region); if (!ok) new_region = it->second; } manage_lock.index.remove (result.ori_region_range->comparableKeys (), curr_region_id); manage_lock.index.add (curr_region_ptr); for (auto & new_region : split_regions) manage_lock.index.add (new_region); } { for (const auto & new_region : split_regions) region_table.updateRegion (*new_region); region_table.shrinkRegionRange (curr_region); } { for (const auto & new_region : split_regions) try_to_flush_region (new_region); } { for (const auto & new_region : split_regions) persist_and_sync (*new_region); persist_and_sync (curr_region); } };
分裂日志 附上分裂时的日志