tiflash: 构建 DeltaTree Index(1)

DeltaIndexTree

DeltaIndex 在 tiflash 中是 Segment 级别的索引,用来对 Stable 层和 Delta 层的数据进行 merge。

定义在 DeltaValueSpace 中:

1
2
3
4
5
6
7
8
class DeltaValueSpace
: public std::enable_shared_from_this<DeltaValueSpace>
, private boost::noncopyable
{
// ...
DeltaIndexPtr delta_index;
//...
};

主要在两种场景会更新 delta_index

  • DeltaValueSpace::flush
  • Segment::getReadInfo

DeltaValueSpace::flush

可以分为几个部分

ColumnFileFlushTask

memtable 中的每个文件 ColumnInMemoryFile 数据及元信息由一个 ColumnFileFlushTask::task 保存.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class ColumnFileFlushTask
{
public:
struct Task
{
explicit Task(const ColumnFilePtr & column_file_)
: column_file(column_file_)
{}

ColumnFilePtr column_file;

Block block_data;
PageId data_page = 0;

bool sorted = false;
size_t rows_offset = 0;
size_t deletes_offset = 0;
};
using Tasks = std::vector<Task>;

private:
Tasks tasks;
DMContext & context;
MemTableSetPtr mem_table_set;
size_t flush_version;

size_t flush_rows = 0;
size_t flush_deletes = 0;
public:
inline Task & addColumnFile(ColumnFilePtr column_file)
{
flush_rows += column_file->getRows();
flush_deletes += column_file->getDeletes();
return tasks.emplace_back(column_file);
}
//...
};

MemTableSet::buildFlushTask 核心代码如下。

其中 rows_offset 开始存储的的起始地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
ColumnFileFlushTaskPtr MemTableSet::buildFlushTask(DMContext & context,
size_t rows_offset,
size_t deletes_offset,
size_t flush_version)
{
if (column_files.empty())
return nullptr;

size_t cur_rows_offset = rows_offset;
size_t cur_deletes_offset = deletes_offset;
auto flush_task = std::make_shared<ColumnFileFlushTask>(context,
shared_from_this(),
flush_version);
for (auto & column_file : column_files)
{
auto & task = flush_task->addColumnFile(column_file);
if (auto * m_file = column_file->tryToInMemoryFile(); m_file)
{
// If the ColumnFile is not yet persisted in the disk, it will contain block data.
// In this case, let's write the block data in the flush process as well.
task.rows_offset = cur_rows_offset; // 每个file的起始地址
task.deletes_offset = cur_deletes_offset;
task.block_data = m_file->readDataForFlush();
}
cur_rows_offset += column_file->getRows();
cur_deletes_offset += column_file->getDeletes();
}
return flush_task;
}

ColumnFileFlushTask::prepare

那么是怎么获得 delta_index 的动态更新信息 delta_index_updates 的呢?

在上面的 MemTableSet::buildFlushTask 函数返回的 ColumnFileFlushTask::Task 记录了每个ColumnFile对应的存储地址信息,即下次要寻找这个ColumnFile的位置信息,delta_index_updates 则可以记录这个数据,以及对 task.block 的排列 perm信息,则记录本次数据更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
DeltaIndex::Updates ColumnFileFlushTask::prepare(WriteBatches & wbs)
{
DeltaIndex::Updates delta_index_updates;
/// Write prepared data to disk.
for (auto & task : tasks)
{
if (!task.block_data)
continue;

IColumn::Permutation perm;
task.sorted = sortBlockByPk(getExtraHandleColumnDefine(context.is_common_handle),
task.block_data,
perm);
if (task.sorted)
delta_index_updates.emplace_back(task.deletes_offset, task.rows_offset, perm);
task.data_page = ColumnFileTiny::writeColumnFileData(context,
task.block_data,
0,
task.block_data.rows(),
wbs);
}

wbs.writeLogAndData(); // 写wal
return delta_index_updates;
}

DeltaIndex::Updates

DeltaIndex::Updates 记录的是每次数据flush到磁盘的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct Update
{
size_t delete_ranges_offset; // 存储的起始地址
size_t rows_offset;
TupleRefs idx_mapping; // old index -> new index

Update(size_t delete_ranges_offset_, size_t rows_offset_, const IColumn::Permutation & sort_perm)
: delete_ranges_offset(delete_ranges_offset_)
, rows_offset(rows_offset_)
, idx_mapping(sort_perm.size())
{
for (size_t pos = 0; pos < sort_perm.size(); ++pos)
idx_mapping[sort_perm[pos]] = pos;
}
};

using Updates = std::vector<Update>;

DeltaIndex::cloneWithUpdates

获得 DeltaIndex::Updates 之后,就可以对 delta_index 进行更新,得到新的 new_delta_index,来替换旧的索引。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bool DeltaValueSpace::flush(DMContext & context)
{
///...
/// Write prepared data to disk.
auto delta_index_updates = flush_task->prepare(wbs);
DeltaIndexPtr new_delta_index;
if (!delta_index_updates.empty())
{
LOG_FMT_DEBUG(log, "{} Update index start", simpleInfo());
new_delta_index = cur_delta_index->cloneWithUpdates(delta_index_updates);
LOG_FMT_DEBUG(log, "{} Update index done", simpleInfo());
}
//...
/// Update delta tree
if (new_delta_index)
delta_index = new_delta_index;
//...
}

DeltaIndex

下面来看 DeltaIndex 中使用到的函数及其内在逻辑

DeltaIndex::tryCloneInner

DeltaIndex::cloneWithUpdates 内部实际调用的是 DeltaIndex::tryCloneInner 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
DeltaIndexPtr DeltaIndex::cloneWithUpdates(const Updates & updates)
{
if (unlikely(updates.empty()))
throw Exception("Unexpected empty updates");

return tryCloneInner(updates.front().delete_ranges_offset, &updates);
}

DeltaIndexPtr tryCloneInner(size_t placed_deletes_limit, const Updates * updates = nullptr)
{
DeltaTreePtr delta_tree_copy;
size_t placed_rows_copy = 0;
size_t placed_deletes_copy = 0;
// Make sure the delta index do not place more deletes than `placed_deletes_limit`.
// Because delete ranges can break MVCC view.
{
std::scoped_lock lock(mutex);
// Safe to reuse the copy of the existing DeltaIndex
if (placed_deletes <= placed_deletes_limit)
{
delta_tree_copy = delta_tree;
placed_rows_copy = placed_rows;
placed_deletes_copy = placed_deletes;
}
}

if (delta_tree_copy)
{
auto new_delta_tree = std::make_shared<DefaultDeltaTree>(*delta_tree_copy);
auto new_index = std::make_shared<DeltaIndex>(new_delta_tree, placed_rows_copy, placed_deletes_copy);
// try to do some updates before return it if need
if (updates)
new_index->applyUpdates(*updates);
return new_index;
}
else
{
// Otherwise, create an empty new DeltaIndex.
return std::make_shared<DeltaIndex>();
}
}

applyUpdates

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void applyUpdates(const Updates & updates)
{
for (const auto & update : updates)
{
// Current index does not contain any inserts which go shuffled.
if (placed_rows <= update.rows_offset)
return;

// Current index contains part of inserts which go shuffled, they should be removed.
if (placed_rows < update.rows_offset + update.idx_mapping.size())
{
delta_tree->removeInsertsStartFrom(update.rows_offset);
placed_rows = update.rows_offset;
return;
}
// Current index contains all inserts which go shuffled, let's update them directly.
delta_tree->updateTupleId(update.idx_mapping, update.rows_offset);
}
}