tiflash: 构建 DeltaTree Index(2)

下面从读流程来看DeltaIndex是怎么发挥作用以及更新的。

Segment::getReadInfo

在 Segment::getReadInfo 中先通过 pk_version 来获得 delta_reader,这里的 delta_reader 主要是通过 pk_version 来确定delta中新加入的数据在 memtable
和 ColumnFile 中的位置,为后面更新 DeltaTree 准备。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Segment::ReadInfo Segment::getReadInfo(const DMContext & dm_context,
const ColumnDefines & read_columns,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & read_ranges,
UInt64 max_version) const
{
// 1.
auto pk_ver_col_defs = std::make_shared<ColumnDefines>(ColumnDefines{getExtraHandleColumnDefine(dm_context.is_common_handle),
getVersionColumnDefine()});
// Create a reader only for pk and version columns.
auto delta_reader = std::make_shared<DeltaValueReader>(dm_context,
segment_snap->delta,
pk_ver_col_defs,
this->rowkey_range);
auto [my_delta_index, fully_indexed] = ensurePlace(dm_context,
segment_snap->stable,
delta_reader,
read_ranges,
max_version);
// ...
}

Segment::ensurePlace

比较重要的是 Segment::ensurePlace 函数:

  • 先检测读取的时候是否有新插入的数据;
  • 如果有新的数据插入,则获得新插入的数据的元信息;
  • 将新插入的数据信息更新到 DeltaIndex

下面先来看第一部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(const DMContext & dm_context,
const StableSnapshotPtr & stable_snap,
const DeltaValueReaderPtr & delta_reader,
const RowKeyRanges & read_ranges,
UInt64 max_version) const
{
auto delta_snap = delta_reader->getDeltaSnap();
// Clone a new delta index.
auto my_delta_index = delta_snap->getSharedDeltaIndex()->tryClone(delta_snap->getRows(), delta_snap->getDeletes());
auto my_delta_tree = my_delta_index->getDeltaTree();

bool relevant_place = dm_context.enable_relevant_place;
bool skippable_place = dm_context.enable_skippable_place;

// 如果 enable_relevant_place 是 false, 那就不能使用当前 segemnt 的 range,
// 这是因为有些 block/delete range 可能会包含不属于当前 segment range 的数据
// 在这种情况下,使用当前segment的 rangge 作为 relevant_range, 那么 fully_indexed 就总是 false
RowKeyRange relevant_range = relevant_place ? mergeRanges(read_ranges, is_common_handle, rowkey_column_size)
: RowKeyRange::newAll(is_common_handle, rowkey_column_size);

// 记录 delta_index 更新前的状态
auto [my_placed_rows, my_placed_deletes] = my_delta_index->getPlacedStatus();

// Let's do a fast check, determine whether we need to do place or not.
if (!delta_reader->shouldPlace(dm_context, my_delta_index, rowkey_range, relevant_range, max_version))
return {my_delta_index, false};
//...
}

DeltaValueReader::shouldPlace

  • relevant_range
  • version < max_version
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    bool DeltaValueReader::shouldPlace(const DMContext & context,
    DeltaIndexPtr my_delta_index,
    const RowKeyRange & segment_range_,
    const RowKeyRange & relevant_range,
    UInt64 max_version)
    {
    auto [placed_rows, placed_delete_ranges] = my_delta_index->getPlacedStatus();

    // Already placed.
    // delta_snap->getRows() 是当前写入到 delta 中的行数, palce_rows 是已更新到 DeltaTree 的行数
    // TODO 为啥 placed_delete_ranges 是 ==
    if (placed_rows >= delta_snap->getRows() && placed_delete_ranges == delta_snap->getDeletes())
    return false;

    // 两个 range 判断
    // 超过限制
    if (relevant_range.all() || relevant_range == segment_range_ //
    || delta_snap->getRows() - placed_rows > context.delta_cache_limit_rows //
    || placed_delete_ranges != delta_snap->getDeletes())
    return true;

    // 下面看 persisted_files_reader 是否需要 place
    size_t rows_in_persisted_file_snap = delta_snap->getMemTableSetRowsOffset();
    return persisted_files_reader->shouldPlace(context, relevant_range, max_version, placed_rows)
    || (mem_table_reader && mem_table_reader->shouldPlace(context, relevant_range, max_version,
    std::max(placed_rows - rows_in_persisted_file_snap, 0)));
    }

ColumnFileSetReader::shouldPlace

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
bool ColumnFileSetReader::shouldPlace(const DMContext & context, const RowKeyRange & relevant_range, UInt64 max_version, size_t placed_rows)
{
auto & column_files = snapshot->getColumnFiles();
auto [start_file_index, rows_start_in_start_file] = locatePosByAccumulation(column_file_rows_end, placed_rows);

for (size_t file_index = start_file_index; file_index < snapshot->getColumnFileCount(); ++file_index)
{
auto & column_file = column_files[file_index];

// Always do place index if ColumnFileBig exists.
// ColumnBigFile 中的数据肯定是没有更新到 DelteTree 中,更新完应该是要对这个 BigFile 做个操作
if (column_file->isBigFile())
return true;
if (unlikely(column_file->isDeleteRange())) //???
throw Exception("column file is delete range", ErrorCodes::LOGICAL_ERROR);

size_t rows_start_in_file = file_index == start_file_index ? rows_start_in_start_file : 0;
size_t rows_end_in_file = column_file_rows[file_index];

auto & column_file_reader = column_file_readers[file_index];
if (column_file->isInMemoryFile())
{
auto & dpb_reader = typeid_cast<ColumnFileInMemoryReader &>(*column_file_reader);
auto pk_column = dpb_reader.getPKColumn();
auto version_column = dpb_reader.getVersionColumn();

auto rkcc = RowKeyColumnContainer(pk_column, context.is_common_handle);
const auto & version_col_data = toColumnVectorData<UInt64>(version_column);

for (auto i = rows_start_in_file; i < rows_end_in_file; ++i)
{
// 这是什么逻辑
if (version_col_data[i] <= max_version && relevant_range.check(rkcc.getRowKeyValue(i)))
return true;
}
}
else if (column_file->isTinyFile())
{
auto & dpb_reader = typeid_cast<ColumnFileTinyReader &>(*column_file_reader);
auto pk_column = dpb_reader.getPKColumn();
auto version_column = dpb_reader.getVersionColumn();

auto rkcc = RowKeyColumnContainer(pk_column, context.is_common_handle);
const auto & version_col_data = toColumnVectorData<UInt64>(version_column);

for (auto i = rows_start_in_file; i < rows_end_in_file; ++i)
{
if (version_col_data[i] <= max_version && relevant_range.check(rkcc.getRowKeyValue(i)))
return true;
}
}
else
{
throw Exception("Unknown column file: " + column_file->toString(), ErrorCodes::LOGICAL_ERROR);
}
}

return false;
}