tiflash: 如何基于 DeleteTreeIndex 获得全局有序数据输出流

DeltaMergeBlockInputStream

类 DeltaMergeBlockInputStream 是用来将 stable-layer 和 delta-layer 合并为全局有序的数据。

在 DeltaMergeBlockInputStream 的构造函数中:

  • [delta_index_it, delta_index_end) 用来表示遍历 delta-layer 中的数据。
  • use_stable_rows: 用来表征当前 delta-entry 需要处理多少行 stable-layer 中的数据,如果 delta_index_it == delta_index_end 则表示 delta-layer 为空,则只需要处理stable-layer中的数据,将 use_stable_rows 设置为 UNLIMITED 则表示后续仅处理stable-layer中数据,直至结束(stable-layer中行数不可能超过 UNLIMITED)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
DeltaMergeBlockInputStream(const SkippableBlockInputStreamPtr & stable_input_stream_,
const DeltaValueReaderPtr & delta_value_reader_,
const IndexIterator & delta_index_start_,
const IndexIterator & delta_index_end_,
const RowKeyRange rowkey_range_,
size_t max_block_size_)
: stable_input_stream(stable_input_stream_)
, delta_value_reader(delta_value_reader_)
, delta_index_it(delta_index_start_)
, delta_index_end(delta_index_end_)
, rowkey_range(rowkey_range_)
, is_common_handle(rowkey_range.is_common_handle)
, rowkey_column_size(rowkey_range.rowkey_column_size)
, max_block_size(max_block_size_)
{
if constexpr (skippable_place)
{
if (!rowkey_range.isEndInfinite())
throw Exception("The end of rowkey range should be +Inf in skippable_place mode");
}

header = stable_input_stream->getHeader();
num_columns = header.columns();

if (delta_index_it == delta_index_end)
{
use_stable_rows = UNLIMITED; // 表示后续只处理 stable-layer, 因为待处理的 stable-layer 行数无限制
delta_done = true;
}
else
{
use_stable_rows = delta_index_it.getSid(); // 第一个 delta-entry 节点前需要读取的stable行数
}
auto all_range = RowKeyRange::newAll(is_common_handle, rowkey_column_size);
last_value = all_range.getStart().toRowKeyValue();
last_value_ref = last_value.toRowKeyValueRef();
}

do_read

无论是 DeltaMergeBlockInputStream::getSkippedRows 还是 DeltaMergeBlockInputStream::read 核心实现都依赖 DeltaMergeBlockInputStream::do_read。下面着重阐述这个函数。

{stable, delta} 的数据是交叉读取的. DeltaIndex 遍历的每个位置,实际的数据由DTLeaf记录:

  • mutations: 记录是每个 Leaf-Node 的相关数据: 这个 Leaf-Node 是插入还是删除。如果是插入,具体的值是用字段 value 记录。
  • sids: 记录的是处理每个Leaft-Node之前需要先处理的stable-layer的行数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
struct DTLeaf
{
using NodePtr = void *;
using Leaf = DTLeaf<M, F, S>;
using Intern = DTIntern<M, F, S>;
using LeafPtr = Leaf *;
using InternPtr = Intern *;


const size_t mark = 1;

DT_Id sids[M * S + 1]; // 在处理这个 leaf-entry 之前,需要先处理多少行 stable 层的数据
DTMutation mutations[M * S + 1];
size_t count = 0; // mutations count

LeafPtr prev = nullptr;
LeafPtr next = nullptr;
InternPtr parent = nullptr;

DTLeaf() = default;
DTLeaf(const Leaf & o) = default;
//...
};

struct DTMutation
{
DTMutation() = default;
DTMutation(bool is_insert, UInt32 count, UInt64 value_)
: type_count(DTType::getTypeCount(is_insert, count))
, value(value_)
{}

/// The lowest bit of type_count indicates whether this is a insert or not (delete).
/// And the rest bits represent the inserted or deleted rows.
DT_TypeCount type_count = 0;
/// For DT_INS, "value" is the value index (tuple_id) in value space;
DT_Id value = 0;

bool isInsert() const { return DTType::isInsert(type_count); }
bool isDelete() const { return DTType::isDelete(type_count); }
UInt32 count() const { return DTType::getCount(type_count); }
void setCount(UInt32 v) { type_count = DTType::updateCount(type_count, v); }
};

那么下面来看看doRead函数。 它每次读取尝试读取 max_block_size 行数.

  • 如果 delta_done == false,即delta没有读取完毕,则先读delta-layer数据;
  • 否则,如果delta_done == true,即 delta-layer数据都读完,则仅只读取stable层数据;
  • delta_done == true && stable_done == true,则当前segment读取完毕。

流程大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Block doRead()
{
while (!finished())
{
//1. 初始化 columns 的 schema
MutableColumns columns;
initOutputColumns(columns);

// 2. 向 columns 中填充数据
size_t limit = max_block_size;
while (limit)
{
if (stable_done)
{
if (delta_done)
break;
else
next<true, false>(columns, limit);
}
else
{
if (delta_done)
next<false, true>(columns, limit);
else
next<false, false>(columns, limit);
}
}

// 没有读取到数据
if (limit == max_block_size)
continue;

return header.cloneWithColumns(std::move(columns));
}
return {};
}

writeFromStable

stable_ignore 是当前 stable-layer 中需要连续忽略的行数, 因此每次在读取 stable-layer 数据时,先忽略 stable_ignore 行:

  • 如果当前 block 剩余的行数小于 stable_ignore,则调用 SkippableBlockInputStream::getSkippedRows 函数,来看看在读取下一个block之前可以先skip多少行,

    如果 skipped 的行数超过 stable_ignore, 则无需真正的读取 stable-stream,否则还是会继续读取 stable-stram, 调用 fillStableBlockIfNeeded 填充 cur_stable_block_columns

  • 有可能 skipped 的行数超过了 stable_ignore ,导致 stable_ignore 变成负.

    • stable_ignore 表征的是 stable-layer 中 deleted 的行数,因此这些 stable_ingore 的行数不能算被处理的stable行数,即不会影响 use_stable_rows 的大小;
    • skip 表征的是不符合条件的行数,会影响 use_stable_rows;

    如果 stable_ignore < 0 则 skip 的行数多了,需要在 if (stable_ignore < 0) 分支中进行处理。如果处理完,

    • stable_ingore = 0,则 use_stable_rows >= 0, 需要继续处理
    • stable_ingore < 0,则本次读取 stable-layer 结束,下次读取 stable-layer 时继续在 if (stable_ignore < 0) 分支中处理;

优化:这里使用了个 SkippableBlockInputStream::getSkippedRows 来减少对 stable-layer 的读取。

TODO: getSkippedRows 调用两次是否幂等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
template <bool c_delta_done>
void writeFromStable(MutableColumns & output_columns, size_t & output_write_limit)
{
// First let's do skip caused by stable_ignore.
while (stable_ignore > 0)
{
if (curStableBlockRemaining())
{
stable_ignore -= skipRowsInCurStableBlock(stable_ignore);
continue;
}

// 当前block读完完, 下面可能要读取 stable-stram

size_t skips;
stable_input_stream->getSkippedRows(skips);

if (skips > 0)
{
stable_ignore -= skips;
continue;
}
else
{
if (!fillStableBlockIfNeeded())
throw Exception("Unexpected end of stable stream, need more rows to skip");
}
}

// skipped 的多了
if (stable_ignore < 0)
{
// 直接减去不用处理的行
size_t stable_ignore_abs = std::abs(stable_ignore);
if (use_stable_rows > stable_ignore_abs)
{
use_stable_rows += stable_ignore;
if constexpr (skippable_place)
sk_skip_total_rows += stable_ignore_abs;
stable_ignore = 0;
}
else
{
// 当前不用处理 stable-layer,可以直接忽略
stable_ignore += use_stable_rows;
if constexpr (skippable_place)
sk_skip_total_rows += use_stable_rows;
// Nothing to write, because those rows we want to write are skipped already.
use_stable_rows = 0;
}
}

// 到此, stable_ingore 和 use_stable_rows 至少有一个是 0
if (unlikely(!(stable_ignore == 0 || use_stable_rows == 0)))
throw Exception("Algorithm broken!");

while (use_stable_rows && output_write_limit)
{
if (curStableBlockRemaining())
{
writeCurStableBlock(output_columns, output_write_limit);
continue;
}

// 当前stable-block没有数据, 需要从 stable stream 中读取
size_t skips;
stable_input_stream->getSkippedRows(skips);

if (skips > 0)
{
if (skips <= use_stable_rows)
{
use_stable_rows -= skips;
}
else
{
stable_ignore -= skips - use_stable_rows;
use_stable_rows = 0;
}
}
else
{
// 无法跳过则只能实际读取数据
if (!fillStableBlockIfNeeded())
{
if constexpr (c_delta_done)
{
stable_done = true;
use_stable_rows = 0;
break;
}
else
throw Exception("Unexpected end of stable stream, need more rows to write");
}
}
}
}

writeInsertFromDelta

读完 stable-layer 的数据,下面要从 delta-layer 中读取数据。先假设读取的数据不是deleted状态的。

  • use_delta_rows: 表征该 leaf-node entry 插入了多少行数据,
  • use_delta_offset: 表征从 delta-value 中的起始位置。

有这两个参数就可以调用 delta_reader->readRows 读取数据,读取完再更新相关参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
inline void writeInsertFromDelta(MutableColumns & output_columns, size_t & output_write_limit)
{
auto write_rows = std::min(output_write_limit, use_delta_rows);

// Prevent frequently reallocation.
if (output_columns[0]->empty())
{
for (size_t column_id = 0; column_id < num_columns; ++column_id)
output_columns[column_id]->reserve(max_block_size);
}

// Note that the rows between [use_delta_offset, use_delta_offset + write_rows) are guaranteed sorted,
// otherwise we won't read them in the same range.
auto actual_write = delta_value_reader->readRows(output_columns, use_delta_offset, write_rows, &rowkey_range);

if constexpr (skippable_place)
{
sk_skip_total_rows += write_rows - actual_write; // 跳过的行数
}

output_write_limit -= actual_write;
use_delta_offset += write_rows;
use_delta_rows -= write_rows;
}

writeInsertFromDelta

如果读取到的 delta-layer 是 deleted 数据,标志接下来的 stable-layer 中的数据要忽略 n 行。

1
inline void writeDeleteFromDelta(size_t n) { stable_ignore += n; }

next

熟悉了上面几个函数,下面,我们就顺着 next 函数调用来看怎么读取 output_write_limit 行数局:

  • use_stable_rows: 如果初始化时 use_stable_rows 不为 0, 则需要先从 stable-layer 中读取数据 use_stable_rows 行数据,
  • output_write_limit > use_stable_rows: 则需要继续从 delta-layer 中读取数据,
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    template <bool c_stable_done, bool c_delta_done>
    inline void next(MutableColumns & output_columns, size_t & output_write_limit)
    {
    if constexpr (!c_stable_done)
    {
    if (use_stable_rows)
    {
    writeFromStable<c_delta_done>(output_columns, output_write_limit);
    return;
    }
    }

    if constexpr (!c_delta_done)
    {
    if (use_delta_rows) // 当前 delta 没有读完
    {
    writeInsertFromDelta(output_columns, output_write_limit);
    }
    else
    {
    // 当前一个 delta_index_it 读取完了, 读下一个

    if (delta_index_it.isDelete())
    {
    // Delete.
    writeDeleteFromDelta(delta_index_it.getCount()); // 删除了多少行
    }
    else
    {
    // Insert.
    bool do_write = true;
    if constexpr (skippable_place)
    {
    if (delta_index_it.getSid() < sk_skip_stable_rows)
    {
    do_write = false;
    sk_skip_total_rows += delta_index_it.getCount();
    }
    }

    if (do_write)
    {
    use_delta_offset = delta_index_it.getValue(); // 写入 delta-layer 的偏移量
    use_delta_rows = delta_index_it.getCount(); // 写入 delta-layer 的行数
    writeInsertFromDelta(output_columns, output_write_limit);
    }
    }
    }

    if (!use_delta_rows)
    stepForwardEntryIt();
    }
    }

Segment::ensurePlace

下面讲解如何将 delta_value_space 中的数据位置信息更新到 DeltaTreeIndex。

DeltaValueReader::getPlaceItems 返回的是写入到 delta_value_space 中,但是尚未更新位置新的的数据。怎么保证总是新的数据?

{my_placed_rows, my_placed_deletes} pair保证了这一点:

  • my_placed_rows: 表征的是当前 Segment 的 DeltaValueSpace 中已添加位置信息到 DeltaTreeIndex 中的行数
  • my_placed_deletes: 表征的是当前 Segment 的 DeltaValueSpace 中删除的行数已添加位置信息到 DeltaTreeIndex 中的行数

通过这两个参数,不断向前处理 DeltaValueSpace 中的新加入的数据。

下面的代码是 Segment::ensurePlace 的核心部分,得到 items 之后就准备使用 placeUpsert/placeDelete 将 item 位置信息更新到 DeltaTreeIndex 中。

全部更新完后,my_delta_index->update 会得到新的 DeltaIndex。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(const DMContext & dm_context,
const StableSnapshotPtr & stable_snap,
const DeltaValueReaderPtr & delta_reader,
const RowKeyRanges & read_ranges,
UInt64 max_version) const
{
// ...
auto items = delta_reader->getPlaceItems(my_placed_rows, my_placed_deletes, delta_snap->getRows(), delta_snap->getDeletes());

bool fully_indexed = true;
for (auto & v : items)
{
if (v.isBlock())
{
auto block = v.getBlock();
auto offset = v.getBlockOffset();
auto rows = block.rows();

// 插入的数据,在 DeltaValueSpace 中存储需要是连续的
if (unlikely(my_placed_rows != offset))
throw Exception("Place block offset not match", ErrorCodes::LOGICAL_ERROR);

fully_indexed &= placeUpsert<true>(
dm_context,
stable_snap,
delta_reader,
offset,
std::move(block),
*my_delta_tree,
relevant_range,
relevant_place);

my_placed_rows += rows;
}
else
{
fully_indexed &= placeDelete<true>(
dm_context,
stable_snap,
delta_reader,
v.getDeleteRange(),
*my_delta_tree,
relevant_range,
relevant_place);

++my_placed_deletes;
}
}

my_delta_index->update(my_delta_tree, my_placed_rows, my_placed_deletes);

return {my_delta_index, fully_indexed};
}
}

placeInsert

这里重点注意下 merged_stream, 为什么 DM::placeInsert 的 stable_stream 要传入 merged_stram ?

stable_stream 表征的是 {stable, delta} merge之后的全局有序状态,但是其中 DeltaValueSpace 部分数据的位置信息没有更新到 DeltaTreeIndex 中。针对每个item中的block需要在 merged_stream 中寻找到合适的位置,获得相应的位置信息,再将这个位置信息更新到 DeltaTreeIndex 中,更新完DeltaTreeIndex完,merge_stream 就可以变成一个 stable-stram。

因此,你也可以看到每次 split、merge等操作中都会先 Segment::getReadInfo() –> Segment::getPlacedStream。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
template <bool skippable_place>
bool Segment::placeUpsert(const DMContext & dm_context,
const StableSnapshotPtr & stable_snap,
const DeltaValueReaderPtr & delta_reader,
size_t delta_value_space_offset,
Block && block,
DeltaTree & update_delta_tree,
const RowKeyRange & relevant_range,
bool relevant_place) const
{
IColumn::Permutation perm;
const auto & handle = getExtraHandleColumnDefine(is_common_handle);
bool do_sort = sortBlockByPk(handle, block, perm);
RowKeyValueRef first_rowkey = RowKeyColumnContainer(block.getByPosition(0).column, is_common_handle).getRowKeyValue(0);
RowKeyValueRef range_start = relevant_range.getStart();

auto place_handle_range = skippable_place
? RowKeyRange::startFrom(max(first_rowkey, range_start), is_common_handle, rowkey_column_size)
: RowKeyRange::newAll(is_common_handle, rowkey_column_size);

auto compacted_index = update_delta_tree.getCompactedEntries();

auto merged_stream = getPlacedStream<skippable_place>( //
dm_context,
{handle, getVersionColumnDefine()},
{place_handle_range},
EMPTY_FILTER,
stable_snap,
delta_reader,
compacted_index->begin(),
compacted_index->end(),
dm_context.stable_pack_rows);

if (do_sort)
return DM::placeInsert<true>(
merged_stream,
block,
relevant_range,
relevant_place,
update_delta_tree,
delta_value_space_offset,
perm,
getPkSort(handle));
}

RidGenerator::nextForUpsert

先讲解下 RidGenerator,即怎么生成 delta_block 的 rid(Row-Id), 其中 rid 表征的是 delta_block 中某一行在最终合并的流 merged_stream 中的行位置。

针对delta的某一行 tuples[i], 先在 stable 中寻找到 tuples[i] <= stable 的位置 rid,先将 delta_block_pos++

  • tuples[i] == stable[j]:

    • 如果 tuples[i] < tuples[i+1],那么此时 rid 就是 tuples[i] 在最终流 merged_stream 中的位置。

      为了 tuples[i + 1] 后续也能在 mered_stream 中找到正确的位置,就需要 stable_block_pos++ 跳过 merged_stream 中当前相等的行,顺带 rid++,那么 tuples[i + 1] 的插入位置就是自增后的 rid 值。

      如果 tuples[i + 1] 还和 tuples[i + 2] 相同,则重复上述操作。

    • 如果 tuples[i] == tuples[i + 1],此时并没有更新 rid++,这是为了 tuples[i + 1] 和 tuples[i] 具有相同的 rid,但是 tuple_id 是不同的

      <rid, tuple_id> 的唯一性

  • tuples[i] < stable[j]:

    和上面的比较类似。

  • tuple[i] > stable

    此时, 新插入的tuple比整个stable中的所有数据都大,则在后面append。

注意: 字段 RidGenerator::stable 其实是上述的 merged_stream,表征的是最终merged之后的全局有序流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//TODO 梳理逻辑
std::pair<UInt64, bool> nextForUpsert()
{
while (fillStableBlockIfNeed())
{
// 1. 先在 stable-layer 中找到第一个 delta <= stale 的位置
int res = compareModifyToStable();
if (res > 0)
{
++stable_block_pos;
++rid;
continue;
}

// 2. rid = sid + delta
// 如果 dup_prev == true,
auto cur_dup_prev = dup_prev;
dup_prev = delta_block_dup_next[delta_block_pos++];
if (res == 0)
{
if (dup_prev)
return {rid, true};
else
{
// 此时 delta_block_pos++ 那么 stable-layer 的也需要前进一个位置
++stable_block_pos; // 从stable下一行开始
return {rid++, true};
}
}
else if (res < 0)
{
if (dup_prev)
return {rid, cur_dup_prev};
else
return {rid++, cur_dup_prev};
}
}

// delta > stable
auto cur_dup_prev = dup_prev;
dup_prev = delta_block_dup_next[delta_block_pos];
++delta_block_pos;
if (dup_prev)
return {rid, cur_dup_prev};
else
return {rid++, cur_dup_prev};
}

DM::placeInsert

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
template <bool use_row_id_ref, class DeltaTree>
bool placeInsert(const SkippableBlockInputStreamPtr & stable, //
const Block & delta_block,
const RowKeyRange & range,
bool relevant_place,
DeltaTree & delta_tree,
size_t delta_value_space_offset,
const IColumn::Permutation & row_id_ref,
const SortDescription & sort)
{
auto rows = delta_block.rows();

size_t offset = 0;
size_t limit = rows;

// 过滤出 delta_block 中不在 range 中的行, 返回的是有效的行
if (relevant_place)
{
std::tie(offset, limit) = range.getPosRange(delta_block.getByPosition(0).column, 0, rows);
if (!limit)
return rows == limit;
}

RidGenerator rid_gen(stable, sort, delta_block, offset, limit);

using Rids = std::vector<std::pair<UInt64, bool>>;
Rids rids(limit);
for (size_t i = 0; i < limit; ++i)
rids[i] = rid_gen.nextForUpsert();

for (size_t i = 0; i < limit; ++i)
{
auto [rid, dup] = rids[i];
// use_row_id_ref 表示输入的 delta_block 是否经过排序
// row_id_ref 是id_mapping
// tuple_id 是在 DeltaValueSpace 中的位置
UInt64 tuple_id;
if constexpr (use_row_id_ref)
tuple_id = delta_value_space_offset + row_id_ref[offset + i];
else
tuple_id = delta_value_space_offset + (offset + i);

// writeDeleteFromDeltao
// 遇到 DeletedEntry 则忽略后面几行
if (dup)
delta_tree.addDelete(rid); // 表示忽略下面的插入
// 将 rid, tuple_id 更新到 DeltaTreeIndex
delta_tree.addInsert(rid, tuple_id);
}

return rows == limit;
}

第一次调用 placeInsert

当用户第一次向 DelteValueSpace 中插入数据时,然后查询,此时 DeltaMergeBlockInputStream 输入的 stable 是空的, delta_index_start_ == delta_index_end_, 那么第一次调用 next 方法时,特化的版本是 next<false, true>, RidGenerator::nextForUpsert 简化如下:

1
2
3
4
5
6
7
8
9
10
std::pair<UInt64, bool> nextForUpsert()
{
auto cur_dup_prev = dup_prev;
dup_prev = delta_block_dup_next[delta_block_pos];
++delta_block_pos;
if (dup_prev)
return {rid, cur_dup_prev};
else
return {rid++, cur_dup_prev};
}

因此生成的 rid 就是按照单调递增的。

等把这一批数据的位置信息更新到更新到 DeltaTreeIndex 中,下此在 getPlaceStream 中调用就能利用方才更新的位置信息来排序,利用 getInputStream 中调用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size)
{
//...
// 1. 更新 DeltaTreeIndex
auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, read_ranges, max_version);

// 2. 可以利用方才更新的位置信息,实现merge
stream = getPlacedStream(dm_context,
*read_info.read_columns,
real_ranges,
filter,
segment_snap->stable,
read_info.getDeltaReader(),
read_info.index_begin,
read_info.index_end,
expected_block_size,
max_version);
}

怎么保证第一次查询时就会更新 DeltaTreeIndex ?

因此 tikv 一开始就向 tiflash 先delete之前的旧数据。