tiflash: tikv region 与 tiflash 映射关系

从 TiKV 到 TiFlash,传递的都是 region 信息,比如 region_id, region_range 等,但是在 TilFlash 内部并不存储 region 信息,需要将 region 信息转化为 StorageDelteMerge 的相关状态,比如,在 region 内部都会有个对应的 mapped_table_id。同时,又需要记录 region 的相关的元数据 meta,来记录 TiKV、TiFlash 的同步信息。

Region

在 TiFLASH 中,每个 Region 都会映射到一个 table_id,里面包含了一些数据(data)状态(region_meta)。

tiflash-region-1

RegionTable

RegionTable 是个全局的数据结构,在 Region 和 Table 之间建立映射关系。

tiflash-region-2

RegionTable::Table

RegionTable::Table 中记录多个 InternelRegion,表征:

  • table_id 出现在哪些 Region 中,
  • 每个 region 的 range 对应 table 中的哪些 key range

在RegionTable中插入一个region时,会更新 table_regions 和 regions 两个字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
RegionTable::InternalRegion & RegionTable::insertRegion(
Table & table,
const RegionRangeKeys & region_range_keys,
const RegionID region_id)
{
LOG_FMT_INFO(log, "[############ insertRegion region {}]", region_id);
auto & table_regions = table.regions;
// Insert table mapping.
// todo check if region_range_keys.mapped_table_id == table.table_id ??
auto [it, ok] = table_regions.emplace(
region_id, InternalRegion(region_id, region_range_keys.rawKeys()));
if (!ok)
throw Exception(
std::string(__PRETTY_FUNCTION__) +
": insert duplicate internal region " +
DB::toString(region_id),
ErrorCodes::LOGICAL_ERROR);

// Insert region mapping.
regions[region_id] = table.table_id;
}

主要更改 RegionTable 的相关状态时,RegionTable::InternalRegion 在以下三个地方可能会被用到:

  • RegionTable::shrinkRegionRange
  • RegionTable::updateRegion
  • RegionTable::extendRegionRange

实际上就是 TiKV 发生分裂、合并等行为时。

RegionManager

RegionManger 是用于管理 region 的读写行为。

  • regions: 记录了当前所有的 region的指针;
  • region_range_index:记录的是字段 regions 中的每个 range 的 key 和对应的 region 之间的映射,以及当前range的信息

tiflash-region-3

RegionsRangeIndex

RegionsRangeIndex::add
RegionsRangeIndex::root 是基于 {range_key, region_map} 有序建立映射的,即每个 range_key 出现在哪些 region 中。
在向 RegionsRangeIndex 中新增一个 new_region 时:

  • 先基于 new_region 的 new_range 定位到在 RegionsRangeIndex::root 中插入的起始位置,即 [begin_it, end_it) 区间
  • 为 [begin_it, end_it) 区间的每个 IndexNode::region_map 都插入一条相同的 {new_region_id, new_region},表示 root[iter->first] 又多包含了一个 new_region
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void RegionsRangeIndex::add(const RegionPtr & new_region)
{
auto region_range = new_region->getRange();
const auto & new_range = region_range->comparableKeys();
RootMap::iterator begin_it = split(new_range.first);
RootMap::iterator end_it = split(new_range.second);
if (begin_it == end_it)
throw Exception(__PRETTY_FUNCTION__ + ": range of region "s
+ toString(new_region->id()) + " is empty",
ErrorCodes::LOGICAL_ERROR);

// 为 [begin_it, end_it) 区间的所有 IndexNode::region_map
// 都添加一份 new_region
for (auto it = begin_it; it != end_it; ++it)
it->second.region_map.emplace(new_region->id(), new_region);
}

由于 root 的数据结构是 std::map,即所有的key都是有序的,那么

  • begin_it 表达的即 root 中 >= new_range_start (new_range.first)的起始位置
  • end_it 表达的即 root 中 >= new_range_end(new_range.second) 的起始位置

那 [begin_it, end_it) 区间即添加 new_range 时需要更新的区间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
RegionsRangeIndex::RootMap::iterator 
RegionsRangeIndex::split(const TiKVRangeKey & new_start)
{
const auto do_split = [this](RootMap::iterator begin_it,
const TiKVRangeKey & new_start) {
begin_it--;
auto & ori = begin_it->second;
auto tar_it = root.emplace(new_start.copy(), IndexNode{}).first;
tar_it->second.region_map = ori.region_map;
return tar_it;
};

auto begin_it = root.lower_bound(new_start); // >= new_start
assert(begin_it != root.end());

if (begin_it->first.compare(new_start) == 0)
return begin_it;
else
return do_split(begin_it, new_start);
}

RegionsRangeIndex::remove

与add相对的,从 root 中删除一个 region 的remove 函数,去除异常部分简化后如下

1
2
3
4
5
6
7
8
9
void RegionsRangeIndex::remove(const RegionRange & range, RegionID region_id)
{
auto begin_it = root.find(range.first);
auto end_it = root.find(range.second);
for (auto it = begin_it; it != end_it; ++it)
it->second.region_map.erase(region_id);
// 合并空的 region
tryMergeEmpty(begin_it);
}

下面从 分裂、合并的流程来看 Region 的行为。

handleAdminRaftCmd

execBatchSplit

execBatchSplit 主要是根据 tikv response 生成新的 region 分裂信息 split_regions,代码去除异常部分简化后如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Regions RegionRaftCommandDelegate::execBatchSplit(
const raft_cmdpb::AdminRequest &,
const raft_cmdpb::AdminResponse & response,
UInt64 index, UInt64 term)
{
const auto & new_region_infos = response.splits().regions();

std::vector<RegionPtr> split_regions;
split_regions.reserve(new_region_infos.size());
{
std::unique_lock<std::shared_mutex> lock(mutex);

int new_region_index = -1; // 当前 region 在 new_region_infos 中的位置
for (int i = 0; i < new_region_infos.size(); ++i)
{
const auto & region_info = new_region_infos[i];
if (region_info.id() != meta.regionId())
{
const auto & peer = findPeerByStore(region_info, meta.storeId());
RegionMeta new_meta(peer, region_info, initialApplyState());
// 当前 range 按照 new_meta 进行分类,并返回新的 region
auto split_region = splitInto(std::move(new_meta));
split_regions.emplace_back(split_region);
}
else
{
if (new_region_index == -1)
new_region_index = i;
}
}
// 当前 region 分裂后的元数据信息
RegionMeta new_meta(meta.getPeer(), new_region_infos[new_region_index], meta.getApplyState());
new_meta.setApplied(index, term);
meta.assignRegionMeta(std::move(new_meta));
}

return split_regions;
}

splitInto

execBatchSplit 中的 splitInto 函数,是根据 new_meta 将当前 region 的 data、range 分裂到 new_range 中。

1
2
3
4
5
6
7
8
9
RegionPtr Region::splitInto(RegionMeta && new_meta)
{
RegionPtr new_region = std::make_shared<Region>(std::move(new_meta),
proxy_helper);
data.splitInto(new_region->getRange()->comparableKeys(),
new_region->data);

return new_region;
}

handle_batch_split

下面是基于 execBatchSplit 返回的信息来执行,主要分为三个部分:

  1. RegionManager::region_range_index 添加新分裂生成的 region 信息,更新当前region的信息(即删除 + 重新插入)
  2. RegionTable 新增新生成的 region 信息,在region 和 table 间建立映射关系。并且由于分裂,可能导致当前 region 的 range 发生更改,RegionTable 中当前 region 的 range 信息需要更改。
  3. try_to_flush_region 函数:将 region 中缓存的的数据(Region::date)通过 writeRegionDataToStorage 接口写入到 dm_storage 。
  4. persist_and_sync 函数:将 region 对应 Storage 中的部分,调用 IStorage::flushCache 接口对该region_range 进行持久化。

代码简化后如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
const auto handle_batch_split = [&](Regions & split_regions) {
// 1.
{
auto manage_lock = genRegionWriteLock(task_lock);
for (auto & new_region : split_regions)
{
auto [it, ok] = manage_lock.regions.emplace(new_region->id(),
new_region);
if (!ok)
new_region = it->second;
}

manage_lock.index.remove(result.ori_region_range->comparableKeys(),
curr_region_id);
manage_lock.index.add(curr_region_ptr);

for (auto & new_region : split_regions)
manage_lock.index.add(new_region);
}
// 2.
{
for (const auto & new_region : split_regions)
region_table.updateRegion(*new_region);
region_table.shrinkRegionRange(curr_region);
}
// 3.
{
for (const auto & new_region : split_regions)
try_to_flush_region(new_region);
}
// 4.
{
for (const auto & new_region : split_regions)
persist_and_sync(*new_region);
persist_and_sync(curr_region);
}
};

分裂日志

附上分裂时的日志

tiflash-region-4