类 DeltaMergeBlockInputStream 是用来将 stable-layer 和 delta-layer 合并为全局有序的数据。
在 DeltaMergeBlockInputStream 的构造函数中:
[delta_index_it, delta_index_end) 用来表示遍历 delta-layer 中的数据。
use_stable_rows: 用来表征当前 delta-entry 需要处理多少行 stable-layer 中的数据,如果 delta_index_it == delta_index_end 则表示 delta-layer 为空,则只需要处理stable-layer中的数据,将 use_stable_rows 设置为 UNLIMITED 则表示后续仅处理stable-layer中数据,直至结束(stable-layer中行数不可能超过 UNLIMITED)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 DeltaMergeBlockInputStream (const SkippableBlockInputStreamPtr & stable_input_stream_, const DeltaValueReaderPtr & delta_value_reader_, const IndexIterator & delta_index_start_, const IndexIterator & delta_index_end_, const RowKeyRange rowkey_range_, size_t max_block_size_) : stable_input_stream (stable_input_stream_) , delta_value_reader (delta_value_reader_) , delta_index_it (delta_index_start_) , delta_index_end (delta_index_end_) , rowkey_range (rowkey_range_) , is_common_handle (rowkey_range.is_common_handle) , rowkey_column_size (rowkey_range.rowkey_column_size) , max_block_size (max_block_size_) { if constexpr (skippable_place) { if (!rowkey_range.isEndInfinite ()) throw Exception ("The end of rowkey range should be +Inf in skippable_place mode" ); } header = stable_input_stream->getHeader (); num_columns = header.columns (); if (delta_index_it == delta_index_end) { use_stable_rows = UNLIMITED; delta_done = true ; } else { use_stable_rows = delta_index_it.getSid (); } auto all_range = RowKeyRange::newAll (is_common_handle, rowkey_column_size); last_value = all_range.getStart ().toRowKeyValue (); last_value_ref = last_value.toRowKeyValueRef (); }
do_read 无论是 DeltaMergeBlockInputStream::getSkippedRows 还是 DeltaMergeBlockInputStream::read 核心实现都依赖 DeltaMergeBlockInputStream::do_read。下面着重阐述这个函数。
{stable, delta} 的数据是交叉读取的. DeltaIndex 遍历的每个位置,实际的数据由DTLeaf
记录:
mutations
: 记录是每个 Leaf-Node 的相关数据: 这个 Leaf-Node 是插入还是删除。如果是插入,具体的值是用字段 value
记录。
sids
: 记录的是处理每个Leaft-Node
之前需要先处理的stable-layer的行数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 struct DTLeaf { using NodePtr = void *; using Leaf = DTLeaf<M, F, S>; using Intern = DTIntern<M, F, S>; using LeafPtr = Leaf *; using InternPtr = Intern *; const size_t mark = 1 ; DT_Id sids[M * S + 1 ]; DTMutation mutations[M * S + 1 ]; size_t count = 0 ; LeafPtr prev = nullptr ; LeafPtr next = nullptr ; InternPtr parent = nullptr ; DTLeaf () = default ; DTLeaf (const Leaf & o) = default ; }; struct DTMutation { DTMutation () = default ; DTMutation (bool is_insert, UInt32 count, UInt64 value_) : type_count (DTType::getTypeCount (is_insert, count)) , value (value_) {} DT_TypeCount type_count = 0 ; DT_Id value = 0 ; bool isInsert () const { return DTType::isInsert (type_count); } bool isDelete () const { return DTType::isDelete (type_count); } UInt32 count () const { return DTType::getCount (type_count); } void setCount (UInt32 v) { type_count = DTType::updateCount (type_count, v); } };
那么下面来看看doRead
函数。 它每次读取尝试读取 max_block_size 行数.
如果 delta_done == false
,即delta没有读取完毕,则先读delta-layer数据;
否则,如果delta_done == true
,即 delta-layer数据都读完,则仅只读取stable层数据;
等 delta_done == true && stable_done == true
,则当前segment读取完毕。
流程大致如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 Block doRead () { while (!finished ()) { MutableColumns columns; initOutputColumns (columns); size_t limit = max_block_size; while (limit) { if (stable_done) { if (delta_done) break ; else next <true , false >(columns, limit); } else { if (delta_done) next <false , true >(columns, limit); else next <false , false >(columns, limit); } } if (limit == max_block_size) continue ; return header.cloneWithColumns (std::move (columns)); } return {}; }
writeFromStable stable_ignore 是当前 stable-layer 中需要连续忽略的行数, 因此每次在读取 stable-layer 数据时,先忽略 stable_ignore 行:
如果当前 block 剩余的行数小于 stable_ignore,则调用 SkippableBlockInputStream::getSkippedRows
函数,来看看在读取下一个block之前可以先skip多少行,
如果 skipped 的行数超过 stable_ignore, 则无需真正的读取 stable-stream,否则还是会继续读取 stable-stram, 调用 fillStableBlockIfNeeded
填充 cur_stable_block_columns
有可能 skipped 的行数超过了 stable_ignore ,导致 stable_ignore 变成负.
stable_ignore
表征的是 stable-layer 中 deleted 的行数,因此这些 stable_ingore 的行数不能算被处理的stable行数,即不会影响 use_stable_rows 的大小;
skip
表征的是不符合条件的行数,会影响 use_stable_rows
;
如果 stable_ignore < 0 则 skip 的行数多了,需要在 if (stable_ignore < 0)
分支中进行处理。如果处理完,
stable_ingore = 0,则 use_stable_rows >= 0, 需要继续处理
stable_ingore < 0,则本次读取 stable-layer 结束,下次读取 stable-layer 时继续在 if (stable_ignore < 0)
分支中处理;
优化:这里使用了个 SkippableBlockInputStream::getSkippedRows 来减少对 stable-layer 的读取。
TODO: getSkippedRows 调用两次是否幂等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 template <bool c_delta_done>void writeFromStable (MutableColumns & output_columns, size_t & output_write_limit) { while (stable_ignore > 0 ) { if (curStableBlockRemaining ()) { stable_ignore -= skipRowsInCurStableBlock (stable_ignore); continue ; } size_t skips; stable_input_stream->getSkippedRows (skips); if (skips > 0 ) { stable_ignore -= skips; continue ; } else { if (!fillStableBlockIfNeeded ()) throw Exception ("Unexpected end of stable stream, need more rows to skip" ); } } if (stable_ignore < 0 ) { size_t stable_ignore_abs = std::abs (stable_ignore); if (use_stable_rows > stable_ignore_abs) { use_stable_rows += stable_ignore; if constexpr (skippable_place) sk_skip_total_rows + = stable_ignore_abs; stable_ignore = 0 ; } else { stable_ignore += use_stable_rows; if constexpr (skippable_place) sk_skip_total_rows + = use_stable_rows; use_stable_rows = 0 ; } } if (unlikely (!(stable_ignore == 0 || use_stable_rows == 0 ))) throw Exception ("Algorithm broken!" ); while (use_stable_rows && output_write_limit) { if (curStableBlockRemaining ()) { writeCurStableBlock (output_columns, output_write_limit); continue ; } size_t skips; stable_input_stream->getSkippedRows (skips); if (skips > 0 ) { if (skips <= use_stable_rows) { use_stable_rows -= skips; } else { stable_ignore -= skips - use_stable_rows; use_stable_rows = 0 ; } } else { if (!fillStableBlockIfNeeded ()) { if constexpr (c_delta_done) { stable_done = true ; use_stable_rows = 0 ; break ; } else throw Exception ("Unexpected end of stable stream, need more rows to write" ); } } } }
writeInsertFromDelta 读完 stable-layer 的数据,下面要从 delta-layer 中读取数据。先假设读取的数据不是deleted状态的。
use_delta_rows : 表征该 leaf-node entry 插入了多少行数据,
use_delta_offset : 表征从 delta-value 中的起始位置。
有这两个参数就可以调用 delta_reader->readRows
读取数据,读取完再更新相关参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 inline void writeInsertFromDelta (MutableColumns & output_columns, size_t & output_write_limit) { auto write_rows = std::min (output_write_limit, use_delta_rows); if (output_columns[0 ]->empty ()) { for (size_t column_id = 0 ; column_id < num_columns; ++column_id) output_columns[column_id]->reserve (max_block_size); } auto actual_write = delta_value_reader->readRows (output_columns, use_delta_offset, write_rows, &rowkey_range); if constexpr (skippable_place) { sk_skip_total_rows += write_rows - actual_write; } output_write_limit -= actual_write; use_delta_offset += write_rows; use_delta_rows -= write_rows; }
writeInsertFromDelta 如果读取到的 delta-layer 是 deleted 数据,标志接下来的 stable-layer 中的数据要忽略 n 行。
1 inline void writeDeleteFromDelta (size_t n) { stable_ignore += n; }
next 熟悉了上面几个函数,下面,我们就顺着 next
函数调用来看怎么读取 output_write_limit 行数局:
use_stable_rows
: 如果初始化时 use_stable_rows 不为 0, 则需要先从 stable-layer 中读取数据 use_stable_rows 行数据,
output_write_limit > use_stable_rows
: 则需要继续从 delta-layer 中读取数据,1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 template <bool c_stable_done, bool c_delta_done>inline void next (MutableColumns & output_columns, size_t & output_write_limit) { if constexpr (!c_stable_done) { if (use_stable_rows) { writeFromStable <c_delta_done>(output_columns, output_write_limit); return ; } } if constexpr (!c_delta_done) { if (use_delta_rows) { writeInsertFromDelta (output_columns, output_write_limit); } else { if (delta_index_it.isDelete ()) { writeDeleteFromDelta (delta_index_it.getCount ()); } else { bool do_write = true ; if constexpr (skippable_place) { if (delta_index_it.getSid () < sk_skip_stable_rows) { do_write = false ; sk_skip_total_rows += delta_index_it.getCount (); } } if (do_write) { use_delta_offset = delta_index_it.getValue (); use_delta_rows = delta_index_it.getCount (); writeInsertFromDelta (output_columns, output_write_limit); } } } if (!use_delta_rows) stepForwardEntryIt (); } }
Segment::ensurePlace 下面讲解如何将 delta_value_space 中的数据位置信息更新到 DeltaTreeIndex。
DeltaValueReader::getPlaceItems 返回的是写入到 delta_value_space 中,但是尚未更新位置新的的数据。怎么保证总是新的数据?
{my_placed_rows, my_placed_deletes} pair保证了这一点:
my_placed_rows: 表征的是当前 Segment 的 DeltaValueSpace 中已添加位置信息到 DeltaTreeIndex 中的行数
my_placed_deletes: 表征的是当前 Segment 的 DeltaValueSpace 中删除的行数已添加位置信息到 DeltaTreeIndex 中的行数
通过这两个参数,不断向前处理 DeltaValueSpace 中的新加入的数据。
下面的代码是 Segment::ensurePlace
的核心部分,得到 items
之后就准备使用 placeUpsert/placeDelete 将 item 位置信息更新到 DeltaTreeIndex 中。
全部更新完后,my_delta_index->update
会得到新的 DeltaIndex。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 std::pair<DeltaIndexPtr, bool > Segment::ensurePlace (const DMContext & dm_context, const StableSnapshotPtr & stable_snap, const DeltaValueReaderPtr & delta_reader, const RowKeyRanges & read_ranges, UInt64 max_version) const { auto items = delta_reader->getPlaceItems (my_placed_rows, my_placed_deletes, delta_snap->getRows (), delta_snap->getDeletes ()); bool fully_indexed = true ; for (auto & v : items) { if (v.isBlock ()) { auto block = v.getBlock (); auto offset = v.getBlockOffset (); auto rows = block.rows (); if (unlikely (my_placed_rows != offset)) throw Exception ("Place block offset not match" , ErrorCodes::LOGICAL_ERROR); fully_indexed &= placeUpsert <true >( dm_context, stable_snap, delta_reader, offset, std::move (block), *my_delta_tree, relevant_range, relevant_place); my_placed_rows += rows; } else { fully_indexed &= placeDelete <true >( dm_context, stable_snap, delta_reader, v.getDeleteRange (), *my_delta_tree, relevant_range, relevant_place); ++my_placed_deletes; } } my_delta_index->update (my_delta_tree, my_placed_rows, my_placed_deletes); return {my_delta_index, fully_indexed}; } }
placeInsert 这里重点注意下 merged_stream, 为什么 DM::placeInsert 的 stable_stream 要传入 merged_stram ?
stable_stream 表征的是 {stable, delta} merge之后的全局有序状态,但是其中 DeltaValueSpace 部分数据的位置信息没有更新到 DeltaTreeIndex 中。针对每个item中的block需要在 merged_stream 中寻找到合适的位置,获得相应的位置信息,再将这个位置信息更新到 DeltaTreeIndex 中,更新完DeltaTreeIndex完,merge_stream 就可以变成一个 stable-stram。
因此,你也可以看到每次 split、merge等操作中都会先 Segment::getReadInfo() –> Segment::getPlacedStream。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 template <bool skippable_place>bool Segment::placeUpsert (const DMContext & dm_context, const StableSnapshotPtr & stable_snap, const DeltaValueReaderPtr & delta_reader, size_t delta_value_space_offset, Block && block, DeltaTree & update_delta_tree, const RowKeyRange & relevant_range, bool relevant_place) const { IColumn::Permutation perm; const auto & handle = getExtraHandleColumnDefine (is_common_handle); bool do_sort = sortBlockByPk (handle, block, perm); RowKeyValueRef first_rowkey = RowKeyColumnContainer (block.getByPosition (0 ).column, is_common_handle).getRowKeyValue (0 ); RowKeyValueRef range_start = relevant_range.getStart (); auto place_handle_range = skippable_place ? RowKeyRange::startFrom (max (first_rowkey, range_start), is_common_handle, rowkey_column_size) : RowKeyRange::newAll (is_common_handle, rowkey_column_size); auto compacted_index = update_delta_tree.getCompactedEntries (); auto merged_stream = getPlacedStream <skippable_place>( dm_context, {handle, getVersionColumnDefine ()}, {place_handle_range}, EMPTY_FILTER, stable_snap, delta_reader, compacted_index->begin (), compacted_index->end (), dm_context.stable_pack_rows); if (do_sort) return DM::placeInsert <true >( merged_stream, block, relevant_range, relevant_place, update_delta_tree, delta_value_space_offset, perm, getPkSort (handle)); }
RidGenerator::nextForUpsert 先讲解下 RidGenerator,即怎么生成 delta_block 的 rid(Row-Id), 其中 rid 表征的是 delta_block 中某一行在最终合并的流 merged_stream 中的行位置。
针对delta的某一行 tuples[i], 先在 stable 中寻找到 tuples[i] <= stable 的位置 rid,先将 delta_block_pos++
注意: 字段 RidGenerator::stable 其实是上述的 merged_stream,表征的是最终merged之后的全局有序流。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 std::pair<UInt64, bool > nextForUpsert () { while (fillStableBlockIfNeed ()) { int res = compareModifyToStable (); if (res > 0 ) { ++stable_block_pos; ++rid; continue ; } auto cur_dup_prev = dup_prev; dup_prev = delta_block_dup_next[delta_block_pos++]; if (res == 0 ) { if (dup_prev) return {rid, true }; else { ++stable_block_pos; return {rid++, true }; } } else if (res < 0 ) { if (dup_prev) return {rid, cur_dup_prev}; else return {rid++, cur_dup_prev}; } } auto cur_dup_prev = dup_prev; dup_prev = delta_block_dup_next[delta_block_pos]; ++delta_block_pos; if (dup_prev) return {rid, cur_dup_prev}; else return {rid++, cur_dup_prev}; }
DM::placeInsert 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 template <bool use_row_id_ref, class DeltaTree >bool placeInsert (const SkippableBlockInputStreamPtr & stable, const Block & delta_block, const RowKeyRange & range, bool relevant_place, DeltaTree & delta_tree, size_t delta_value_space_offset, const IColumn::Permutation & row_id_ref, const SortDescription & sort) { auto rows = delta_block.rows (); size_t offset = 0 ; size_t limit = rows; if (relevant_place) { std::tie (offset, limit) = range.getPosRange (delta_block.getByPosition (0 ).column, 0 , rows); if (!limit) return rows == limit; } RidGenerator rid_gen (stable, sort, delta_block, offset, limit) ; using Rids = std::vector<std::pair<UInt64, bool >>; Rids rids (limit) ; for (size_t i = 0 ; i < limit; ++i) rids[i] = rid_gen.nextForUpsert (); for (size_t i = 0 ; i < limit; ++i) { auto [rid, dup] = rids[i]; UInt64 tuple_id; if constexpr (use_row_id_ref) tuple_id = delta_value_space_offset + row_id_ref[offset + i]; else tuple_id = delta_value_space_offset + (offset + i); if (dup) delta_tree.addDelete (rid); delta_tree.addInsert (rid, tuple_id); } return rows == limit; }
第一次调用 placeInsert 当用户第一次向 DelteValueSpace 中插入数据时,然后查询,此时 DeltaMergeBlockInputStream 输入的 stable 是空的, delta_index_start_ == delta_index_end_, 那么第一次调用 next 方法时,特化的版本是 next<false, true>, RidGenerator::nextForUpsert 简化如下:
1 2 3 4 5 6 7 8 9 10 std::pair<UInt64, bool > nextForUpsert () { auto cur_dup_prev = dup_prev; dup_prev = delta_block_dup_next[delta_block_pos]; ++delta_block_pos; if (dup_prev) return {rid, cur_dup_prev}; else return {rid++, cur_dup_prev}; }
因此生成的 rid 就是按照单调递增的。
等把这一批数据的位置信息更新到更新到 DeltaTreeIndex 中,下此在 getPlaceStream 中调用就能利用方才更新的位置信息来排序,利用 getInputStream 中调用如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 BlockInputStreamPtr Segment::getInputStream (const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, UInt64 max_version, size_t expected_block_size) { auto read_info = getReadInfo (dm_context, columns_to_read, segment_snap, read_ranges, max_version); stream = getPlacedStream (dm_context, *read_info.read_columns, real_ranges, filter, segment_snap->stable, read_info.getDeltaReader (), read_info.index_begin, read_info.index_end, expected_block_size, max_version); }
怎么保证第一次查询时就会更新 DeltaTreeIndex ?
因此 tikv 一开始就向 tiflash 先delete之前的旧数据。