IndexBlock 源码分析

IndexBuilder

IndexBuilder 是个基类,一共有三个子类:

  1. ShortenedIndexBuilder
  2. HashIndexBuilder
  3. PartitionedIndexBuilder

一个IndexBlock 会构建一个许多IndexBlock,每个IndexBlock都包含两个字段:

  • primary index block
  • meta_blocks:主要是存储着关于 primary index block 的一些列meta data.

这个IndexBlocks用于接受 IndexBuilder::Finish 调用时候的传出参数,用来接受IndexBuilder添加的block

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class IndexBuilder {
public:
static IndexBuilder* CreateIndexBuilder(BlockBasedTableOptions::IndexType index_type,
const InternalKeyComparator* comparator,
const InternalKeySliceTransform* int_key_slice_transform,
const bool use_value_delta_encoding,
const BlockBasedTableOptions& table_opt);

struct IndexBlocks {
Slice index_block_contents;
std::unordered_map<std::string, Slice> meta_blocks;
};
explicit IndexBuilder(const InternalKeyComparator* comparator)
: comparator_(comparator) {}

virtual ~IndexBuilder() {}

/// 添加 {key, block_handle} 至 block_builder
/// 其中 key in [last_key_in_current_block, first_key_in_next_block)
virtual void AddIndexEntry(std::string* last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle) = 0;

/// 回调函数:只要有 key 添加,这个函数就会被调用
virtual void OnKeyAdded(const Slice& /*key*/) {}

/// 将 IndexBuilder 添加的数据序存储到 @c index_blocks 中
inline Status Finish(IndexBlocks* index_blocks) {
BlockHandle last_partition_block_handle;
return Finish(index_blocks, last_partition_block_handle);
}

/// @brief 子类重写这个函数
virtual Status Finish(IndexBlocks* index_blocks,
const BlockHandle& last_partition_block_handle) = 0;

/// 当前这个IndexBluiler 的大小
/// 必须先调用 ::Finish 函数
virtual size_t IndexSize() const = 0;

virtual bool seperator_is_key_plus_seq() { return true; }

protected:
const InternalKeyComparator* comparator_;
size_t index_size_ = 0;
};

ShortenedIndexBuilder

ShortenedIndexBuilder 是个基础的IndexBuilder,在存储{k,v}时,可以尽可能的压缩内存空间,被在后面的HashIndexBuilderPartitionedIndexBuilder 内置使用。

他的使用过程大致如下:

  1. 通过 AddIndexEntry 函数来添加{k, v},其中key是计算出来位于[last_key_in_current_block, first_key_in_next_block)区间的最短字符串,valueblock_hadnle,即是表征某个block。
  2. 在每个添加{k, v}时,都会触发回调函数onKeyAdd,这可以用来记录一些额外的信息;
  3. 添加完毕的时候,调用::Finish函数,将通过 AddIndexEntry 函数添加的{k ,block_handle}序列化成字符串返回。

下面先大致看下 ShortenedIndexBuilder类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class ShortenedIndexBuilder : public IndexBuilder {
public:
explicit ShortenedIndexBuilder(
const InternalKeyComparator* comparator,
const int index_block_restart_interval,
const uint32_t format_version,
const bool use_value_delta_encoding,
BlockBasedTableOptions::IndexShorteningMode shortening_mode,
bool include_first_key)
: IndexBuilder(comparator),
index_block_builder_(index_block_restart_interval,
true /*use_delta_encoding*/,
use_value_delta_encoding),
index_block_builder_without_seq_(index_block_restart_interval,
true /*use_delta_encoding*/,
use_value_delta_encoding),
use_value_delta_encoding_(use_value_delta_encoding),
include_first_key_(include_first_key),
shortening_mode_(shortening_mode) {

// 低于版本2的不支持,需要 >= 3
seperator_is_key_plus_seq_ = (format_version <= 2);
}

// 在每个block添加第一个key时候进入if分支
virtual void OnKeyAdded(const Slice& key) override {
if (include_first_key_ && current_block_first_internal_key_.empty()) {
current_block_first_internal_key_.assign(key.data(), key.size());
}
}

/// @brief 目的是 将 @c block_handle 添加到 @c index_block_builder_ 中,
/// 细节编码可以参考代码解释。
/// @param last_key_in_current_block 会变成存储 @c block_handle 对应的key
/// @param first_key_in_next_block 指示当前 block 是否是最后一个block
/// @param block_handle
virtual void AddIndexEntry(std::string* last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle) override;

// 复用基类的 ::Finish 函数
using IndexBuilder::Finish;

/// @brief 将当前使用 @c AddIndexEntry 添加block handle, @c index_blocks 中
virtual Status Finish(IndexBlocks* index_blocks,
const BlockHandle& /*last_partition_block_handle*/) override;

virtual size_t IndexSize() const override { return index_size_; }

virtual bool seperator_is_key_plus_seq() override {
return seperator_is_key_plus_seq_;
}

friend class PartitionedIndexBuilder; // 友元,侵入式设计

private:
BlockBuilder index_block_builder_; // 存储添加的数据
BlockBuilder index_block_builder_without_seq_;
const bool use_value_delta_encoding_;
bool seperator_is_key_plus_seq_; // 如果可以为 false,则能节省内存
const bool include_first_key_;
BlockBasedTableOptions::IndexShorteningMode shortening_mode_;
BlockHandle last_encoded_handle_ = BlockHandle::NullBlockHandle(); // 记录最后添加的 block
std::string current_block_first_internal_key_; // 当前block的第一个key
};

ShortenedIndexBuilder中有个成员变量seperator_is_key_plus_seq_,这是为进一步压缩存储空间。

在前面,我们知道一个internalkey是由{userkey, seq, type}三部分组成,如果可以仅由userkey便能分辨不同的internalkey,那么就没必要加上{seq, type},自然就可以进一步减少内存消耗。

因此,存在两个成员变量index_block_builder_index_block_builder_without_seq_

下面我们来看看AddIndexEntry 函数是如何处理的。

AddIndexEntry

last_key_in_urrent_block 是个输入输出参数,最终存储的{k, v}k会记录在last_key_in_current_block中。

整个过程如下:

1. 计算待存储的 key

对于非最后一个block,则 first_key_in_next_block 不为 nullptr

对于压缩模式shortening_mode_有三种选择:

1
2
3
4
5
enum class IndexShorteningMode : char {
kNoShortening,
kShortenSeparators,
kShortenSeparatorsAndSuccessor,
};

只要不是kNoShortening,都会选择一个[last_key_in_current_block, first_key_in_next_block)区间最短的字符串来当做即将存储的key,而这个key也会记录在last_key_in_current_block 中。

注意,上面寻找key的区间是左闭右开。

如果seperator_is_key_plus_seq_ == false,即仅使用keyuserkeyfirst_key_in_next_blockuserkey来比较,发现二者相等,那么就需要将 seperator_is_key_plus_seq_ = true,来满足区间需求。

但是,如果当前blocks是最后一个,此时first_key_in_next_block == nullptr

如果此时的shortening_mode_ == IndexShorteningMode::kShortenSeparatorsAndSuccessor,那么只需找出大于last_key_in_current_block的键,来来做带存储的key

第一部分的代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
virtual void AddIndexEntry(std::string* last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle) override {
// ...1....
if (first_key_in_next_block != nullptr) {
if (shortening_mode_ !=
BlockBasedTableOptions::IndexShorteningMode::kNoShortening) {
// last_key_in_current_block 的值修改为
// [last_key_in_current_block, first_key_in_next_block] 区间的
comparator_->FindShortestSeparator(last_key_in_current_block,
*first_key_in_next_block);
}

// 不用携带 seq_,则直接用 UserKey 进行比较
// 此时,如果两个 UserKey 相等
// 则说明,光靠 UserKEy 无法比较
if (!seperator_is_key_plus_seq_ &&
comparator_->user_comparator()->Compare(
ExtractUserKey(*last_key_in_current_block),
ExtractUserKey(*first_key_in_next_block)) == 0) {
seperator_is_key_plus_seq_ = true;
}
} else { // first_key_in_next_block == nullptr
// 即当前 block 即最后一个 block
if (shortening_mode_ == BlockBasedTableOptions::IndexShorteningMode::
kShortenSeparatorsAndSuccessor) {
// 将 last_key_in_current_block 设置大于他的字符串
comparator_->FindShortSuccessor(last_key_in_current_block);
}
}

//...next...
}

2. 编码 block_handle

经过第一步,找出了待存储的key,即 last_key_in_current_block,它宛如一个分隔符,分开两个block:

  • 基于block_handle来构造IndexValue对象 entry,并将entry编码序列化到encoded_entry中。

  • 如果ShortenedIndexBuilder支持use_value_delta_encoding_,并且当前待添加的block不是首个block,那么就可以改变entry的序列化方式,进一步压缩存储空间,如此就得到delta_encoded_entry

这一部分的代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
virtual void AddIndexEntry(std::string* last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle) override {
// ..续..

//!!! key
auto sep = Slice(*last_key_in_current_block);

// 要么 include_first_key_ 为false
// 要么 include_first_key_ 为true,且 current_block_first_internal_key_ 不能为空
assert(!include_first_key_ || !current_block_first_internal_key_.empty());
//!!! value
IndexValue entry(block_handle, current_block_first_internal_key_);
std::string encoded_entry;
std::string delta_encoded_entry;
// 将 entry 的内容编码到 encoded_entry 中
// 如果 include_first_key_ 为 true,
// 则 encoded_entry 中的数据是 block_handle + current_block_first_internal_key_
entry.EncodeTo(&encoded_entry, include_first_key_, nullptr);
if (use_value_delta_encoding_ && !last_encoded_handle_.IsNull()) {
// 此时 block 不是首个 block
// 使用 delta eccoding 的方式
entry.EncodeTo(&delta_encoded_entry, include_first_key_,
&last_encoded_handle_);
} else {
// 此时 block 是首个 block 或者 use_value_delta_encoding_ 功能关闭了
// BlockBuilder::Add() below won't use delta-encoded slice.
}

// 更新最后添加的 handle
last_encoded_handle_ = block_handle;
const Slice delta_encoded_entry_slice(delta_encoded_entry);

//...next...
}

3. 添加 {k, v}

到此,key已经获得,value也已经编码序列化完毕。

最后,根据``seperator_is_key_plus_seq_选择将序列化结果存储到index_block_builder_ 还是index_block_builder_without_seq_` 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
virtual void AddIndexEntry(std::string* last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle) override {
// ..续..

// 如果 use_value_delta_encoding_ 为 true,
// 此时 delta_encoded_entry_slice 也不为空
// 则 index_block_builder_ 也会使用进一步压缩value的存储空间

// 添加 {k,v},即 {sep, }
index_block_builder_.Add(sep, encoded_entry, &delta_encoded_entry_slice);

// 不需要加上 seq
if (!seperator_is_key_plus_seq_) {
index_block_builder_without_seq_.Add(ExtractUserKey(sep),
encoded_entry,
&delta_encoded_entry_slice);
}

// 过了第一个key,就需要清除
current_block_first_internal_key_.clear();
}

到此,一个AddIndexEntry 函数分析完毕,可以说是在尽可能的在压缩存储空间,关键的三个点:

  • IndexShorteningMode:用来确定待存储的key是否为最短;
  • seperator_is_key_plus_seq_:存储key时,可以是否需要带上{seq, type};
  • use_value_delta_encoding_:来确定是否需要进一步对value 进行delta encoding

下面,就来看看::Finish 函数。

Finish

Finish函数的语义很简单,就是将通过AddIndexEntry 添加的 {k, v}保存到index_blocks->index_block_contents 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct IndexBlocks {
Slice index_block_contents;
std::unordered_map<std::string, Slice> meta_blocks;
};

/// @brief 将当前使用 @c AddIndexEntry 添加block handle, @c index_blocks 中
virtual Status Finish(IndexBlocks* index_blocks,
const BlockHandle& /*last_partition_block_handle*/) override {
if (seperator_is_key_plus_seq_) {
index_blocks->index_block_contents = index_block_builder_.Finish();
} else {
index_blocks->index_block_contents =
index_block_builder_without_seq_.Finish();
}
index_size_ = index_blocks->index_block_contents.size();
return Status::OK();
}

HashIndexBuilder

HashIndexBuilder 包含了了两部分:

  • 可用于二分查找的primary index,即类中的 primary_index_builder_;

  • 用于构造二级hash index的metadata

    这个metadata 由两部分 metablock 组成:

    • prefix_block_:这个 metablock 连续存储着前缀;
    • prefix_meta_block_:这个metablock存储着prefix_block_的信息

两个之间的联系 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class HashIndexBuilder : public IndexBuilder {
public:
explicit HashIndexBuilder(
const InternalKeyComparator* comparator,
const SliceTransform* hash_key_extractor,
int index_block_restart_interval,
int format_version,
bool use_value_delta_encoding,
BlockBasedTableOptions::IndexShorteningMode shortening_mode)
: IndexBuilder(comparator),
primary_index_builder_(comparator,
index_block_restart_interval,
format_version,
use_value_delta_encoding,
shortening_mode,
false /* include_first_key */),
hash_key_extractor_(hash_key_extractor) {}

virtual void AddIndexEntry(std::string* last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle) override;

virtual void OnKeyAdded(const Slice& key) override;

virtual Status Finish(IndexBlocks* index_blocks,
const BlockHandle& last_partition_block_handle) override;

virtual size_t IndexSize() const override {
return primary_index_builder_.IndexSize()
+ prefix_block_.size()
+ prefix_meta_block_.size();
}

virtual bool seperator_is_key_plus_seq() override {
return primary_index_builder_.seperator_is_key_plus_seq();
}

private:
void FlushPendingPrefix();

ShortenedIndexBuilder primary_index_builder_;
const SliceTransform* hash_key_extractor_;

std::string prefix_block_;
std::string prefix_meta_block_;

uint32_t pending_block_num_ = 0;
uint32_t pending_entry_index_ = 0;
std::string pending_entry_prefix_;

uint64_t current_restart_index_ = 0;
};

AddIndexEntry

HashIndexBuilderrestart interval == 1 ,因此每次调用AddIndexEntry 函数时 ++current_restart_index_

因此,current_restart_index_ 可用于指示当前添加的block的数量。

1
2
3
4
5
6
7
8
virtual void AddIndexEntry(std::string* last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle) override {
++current_restart_index_;
primary_index_builder_.AddIndexEntry(last_key_in_current_block,
first_key_in_next_block,
block_handle);
}

OnKeyAdded

OnKeyAdded函数,是个回调函数,即在每次添加 key的时候调用。

  • pending_entry_prefix_:是当前pending的前缀;

    如果待加入的key的前缀 key_prefix != pending_entry_prefix_,则说明遇到新的前缀,那么就pending_entry_prefix_ 要设置为新的前缀key_prefix

  • pending_block_num_:表征的是前缀是pending_entry_prefix_的key的数量;

  • pending_entry_index_:当前pending_entry_prefix_ 所属的restart index

    通过后面两个参数,就可以知道 [pending_entry_index_, pending_entry_index_ + pending_block_num_)范围内的key的前缀都是pending_entry_prefix_

此外,这里的pending的意思,还没有 flushprefix_block_prefix_meta_block_ 中。

下面先来看看onKeyAdd函数的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
virtual void OnKeyAdded(const Slice& key) override {
auto key_prefix = hash_key_extractor_->Transform(key);
// 当前没有等待 flush 的 entry,那么此次添加的 key 就是 first_entry
bool is_first_entry = pending_block_num_ == 0;

if (is_first_entry || pending_entry_prefix_ != key_prefix) {
// 如果是遇到新的前缀了,则 flush
if (!is_first_entry) {
FlushPendingPrefix();
}
// 更新其他状态
pending_entry_prefix_ = key_prefix.ToString();
pending_block_num_ = 1;
pending_entry_index_ = static_cast<uint32_t>(current_restart_index_);
} else {
/// 相同前缀,即 pending_entry_prefix_ == key_prefix

auto last_restart_index = pending_entry_index_ + pending_block_num_ - 1;
assert(last_restart_index <= current_restart_index_);
if (last_restart_index != current_restart_index_) {
++pending_block_num_;
}
}
}

FlushPendingPrefix

这里的 flush,即将 pending_entry_prefix_ 及其元数据分别存储到prefix_block_prefix_meta_block_

1
2
3
4
5
6
7
8
9
10
11
void FlushPendingPrefix() {
// prefix_block_ 添加 pending_entry_prefix_
prefix_block_.append(pending_entry_prefix_.data(),
pending_entry_prefix_.size());
// prefix_meta_block_ 存储前缀的元数据
PutVarint32Varint32Varint32(
&prefix_meta_block_,
static_cast<uint32_t>(pending_entry_prefix_.size()), // pending_entry_prefix_ 大小
pending_entry_index_, // pending_entry_prefix_ 所属的 restart index
pending_block_num_); // 前缀 pending_entry_prefix_ 的 key 数量
}

Finish

当调用::Finish 函数时,就是为将之前添加的数据都序列化到 index_blocks中。此时有两部分需要序列化:

  • primary index: 即 primary_index_builder_
  • 元数据:即 prefix_block_prefix_meta_block_

逻辑简单如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
virtual Status Finish(IndexBlocks* index_blocks,
const BlockHandle& last_partition_block_handle) override {
if (pending_block_num_ != 0) {
FlushPendingPrefix();
}
// 先序列化已添加的数据
Status s = primary_index_builder_.Finish(index_blocks,
last_partition_block_handle);
// 再添加元数据
index_blocks->meta_blocks.insert(
{kHashIndexPrefixesBlock.c_str(), prefix_block_});
index_blocks->meta_blocks.insert(
{kHashIndexPrefixesMetadataBlock.c_str(), prefix_meta_block_});
return s;
}

TopLevelIndex

既然,已经有了上述两个IndexBuilder,那为啥还需要PartitionedIndexBuilder

顾名思义,Partition,即分割的语义,由上面的源码分析可知,每次IndexBuilder::Finish函数都会将之前添加的{k, v}序列化并返回。那么反过来,当需要超找一个key时,是不是也要把之前的存储整个IndexBuilder::Finish返回的内容读取出来?

但是cache资源是有限的,你一次性把读取了太多数据,会导致其他cache中数据被挤出去。

因此,有了PartitionedIndexBuilder,即将一整个Index 分割为许多个。此时在硬盘的存储格式:

1
I I I I I I IP

其中,I 是使用 ShortenedIndexBuilder 构建的 partition index blockIP,记录的是前面已构建的 partitions的元数据。

图???

PartitionedIndexBuilder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class PartitionedIndexBuilder : public IndexBuilder {
public:
static PartitionedIndexBuilder* CreateIndexBuilder(
const InternalKeyComparator* comparator,
const bool use_value_delta_encoding,
const BlockBasedTableOptions& table_opt);

explicit PartitionedIndexBuilder(const InternalKeyComparator* comparator,
const BlockBasedTableOptions& table_opt,
const bool use_value_delta_encoding);

virtual ~PartitionedIndexBuilder();

virtual void AddIndexEntry(std::string* last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle) override;

virtual Status Finish(
IndexBlocks* index_blocks,
const BlockHandle& last_partition_block_handle) override;

virtual size_t IndexSize() const override { return index_size_; }
size_t TopLevelIndexSize(uint64_t) const { return top_level_index_size_; }
size_t NumPartitions() const;

/// 用于 PartitionFilterBlock 中
inline bool ShouldCutFilterBlock()

/// 用于 PartitionFilterBlock 中
void RequestPartitionCut();

std::string& GetPartitionKey() { return sub_index_last_key_; }

virtual bool seperator_is_key_plus_seq() override {
return seperator_is_key_plus_seq_;
}

bool get_use_value_delta_encoding() { return use_value_delta_encoding_; }

private:

size_t top_level_index_size_ = 0; // Set after ::Finish is called
size_t partition_cnt_ = 0; // Set after ::Finish is called

// 生成二级索引
void MakeNewSubIndexBuilder();

struct Entry {
std::string key;
std::unique_ptr<ShortenedIndexBuilder> value; // partition index
};

std::list<Entry> entries_;
BlockBuilder index_block_builder_; // top-level index builder
BlockBuilder index_block_builder_without_seq_;

// the active partition index builder
ShortenedIndexBuilder* sub_index_builder_;
// the last key in the active partition index builder
std::string sub_index_last_key_;
std::unique_ptr<FlushBlockPolicy> flush_policy_;

bool finishing_indexes = false; // Finish 函数已被调用但是尚未完成
const BlockBasedTableOptions& table_opt_;
bool seperator_is_key_plus_seq_;
bool use_value_delta_encoding_;
bool partition_cut_requested_ = true;
bool cut_filter_block = false;
BlockHandle last_encoded_handle_;
};

AddIndexEntry

PartitionedIndexBuilder类中,有个内嵌类PartitionedIndexBuilder::Entry

1
2
3
4
5
6
 struct Entry {
std::string key;
std::unique_ptr<ShortenedIndexBuilder> value; // partition index
};

std::list<Entry> entries_;
  • key字段,即sub_index_last_key_
  • value字段,指向的是即sub_index_builder_

因此,entries_的每一个元素都是一个Partitionentries_ 有三种可能添加一个对象 Entry(key, value)

  • 如果本次调用AddIndexEntru会导致sub_index_builder_ 的大小超过了阈值,即FlushBlockPolicy::Update 返回true,那么就会将{sub_index_last_key_, sub_index_builder_} 添加到entries_,并重新将sub_index_builder_ = nullptr
  • 其他类,比如PartitionFilterBlock 发起请求;
  • 这是最后一个block。如果添加的block_handle是最后一个,即 first_key_in_next_block == nullptr

下面,看代码注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
void PartitionedIndexBuilder::AddIndexEntry(
std::string* last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle) {
// 最后一个blockhandle
if (UNLIKELY(first_key_in_next_block == nullptr)) { // no more keys
if (sub_index_builder_ == nullptr) {
MakeNewSubIndexBuilder();
}
sub_index_builder_->AddIndexEntry(last_key_in_current_block,
first_key_in_next_block,
block_handle);

// 如果 sub_index_builder_->seperator_is_key_plus_seq_ 是 true
// 而 seperator_is_key_plus_seq_ 是 false吗,两者不一致
// 说明 sub_index_builder_ 的 seperator_is_key_plus_seq_ 发生了改变
// 需要使得 sub_index_builder_ 的相应设置
if (!seperator_is_key_plus_seq_ &&
sub_index_builder_->seperator_is_key_plus_seq_) {
seperator_is_key_plus_seq_ = true;
flush_policy_.reset(FlushBlockBySizePolicyFactory::NewFlushBlockPolicy(
table_opt_.metadata_block_size, table_opt_.block_size_deviation,
sub_index_builder_->index_block_builder_));
}
sub_index_last_key_ = std::string(*last_key_in_current_block);
entries_.push_back(
{sub_index_last_key_,
std::unique_ptr<ShortenedIndexBuilder>(sub_index_builder_)});
sub_index_builder_ = nullptr;
cut_filter_block = true;
} else {
if (sub_index_builder_ != nullptr) {
// 判断添加 block_handle 之后是否需要刷新
// 如果需要,在添加之前就先flush

std::string handle_encoding;
block_handle.EncodeTo(&handle_encoding);
// 判断是否需要 flush
bool do_flush =
partition_cut_requested_ ||
flush_policy_->Update(*last_key_in_current_block, handle_encoding);
// 需要刷新,则将之前的未 flush 的数据都存储到 entries_ 中
if (do_flush) {
entries_.push_back(
{sub_index_last_key_,
std::unique_ptr<ShortenedIndexBuilder>(sub_index_builder_)});
cut_filter_block = true;
sub_index_builder_ = nullptr;
}
}

if (sub_index_builder_ == nullptr) {
MakeNewSubIndexBuilder();
}

// 考虑添加本次的数据 : block_handle
sub_index_builder_->AddIndexEntry(last_key_in_current_block,
first_key_in_next_block,
block_handle);

sub_index_last_key_ = std::string(*last_key_in_current_block);
if (!seperator_is_key_plus_seq_ &&
sub_index_builder_->seperator_is_key_plus_seq_) {
// then we need to apply it to all sub-index builders and reset
// flush_policy to point to Block Builder of sub_index_builder_ that store
// internal keys.
seperator_is_key_plus_seq_ = true;
flush_policy_.reset(FlushBlockBySizePolicyFactory::NewFlushBlockPolicy(
table_opt_.metadata_block_size, table_opt_.block_size_deviation,
sub_index_builder_->index_block_builder_));
}
}
}

Finish

Partition模式下,Finish函数的流程如下:

  • 每次调用PartitionedIndexBuilder::Finish 函数,会按照FIFO的原则,从entries_中弹出一个sub_index_builder_,并将其内容序列化到index_blocks->index_block_contents 中;
  • 上层(即TableBuilder)会把index_blocks的内容存储到SST文件中,并把存储的位置和数据大小记录在 last_partition_block_handle中;
  • 等下一次调用 PartitionedIndexBuilder::Finish 时,会把last_partition_block_handle序列化到index_block_builder_
  • PartitionedIndexBuilder::Finish 函数返回Status::OK(),说明entries_ 中的数据已全部存储到SST中。但是,此时index_blocks 中记录的是之前所有sub_index_builer的信息,即在SST中的存储位置和大小。

也就是说,PartitionedIndexBuilder在SST文件中存储的格式是:

1
2
3
4
partition_index_block_1
...
partition_index_block_N
TopLevelIndexBlock

下面,顺着注释来看看源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
Status PartitionedIndexBuilder::Finish(
IndexBlocks* index_blocks, const BlockHandle& last_partition_block_handle) {
if (partition_cnt_ == 0) {
partition_cnt_ = entries_.size();
}

// 最后一个block已经被添加了
assert(sub_index_builder_ == nullptr);
// 重复调用 Finish 函数
if (finishing_indexes == true) {
Entry& last_entry = entries_.front();
std::string handle_encoding;
last_partition_block_handle.EncodeTo(&handle_encoding);
std::string handle_delta_encoding;
// delta 编码
PutVarsignedint64(
&handle_delta_encoding,
last_partition_block_handle.size() - last_encoded_handle_.size());
last_encoded_handle_ = last_partition_block_handle;
const Slice handle_delta_encoding_slice(handle_delta_encoding);
// 记录 {key, last_partition_block_handle}
index_block_builder_.Add(last_entry.key,
handle_encoding,
&handle_delta_encoding_slice);
if (!seperator_is_key_plus_seq_) {
index_block_builder_without_seq_.Add(ExtractUserKey(last_entry.key),
handle_encoding,
&handle_delta_encoding_slice);
}
entries_.pop_front();
}
// If there is no sub_index left, then return the 2nd level index.
if (UNLIKELY(entries_.empty())) {
// 这里的 index_block 获得的才是
if (seperator_is_key_plus_seq_) {
index_blocks->index_block_contents = index_block_builder_.Finish();
} else {
index_blocks->index_block_contents =
index_block_builder_without_seq_.Finish();
}
top_level_index_size_ = index_blocks->index_block_contents.size();
index_size_ += top_level_index_size_;
return Status::OK();
} else {
// Finish the next partition index in line and Incomplete() to indicate we
// expect more calls to Finish
Entry& entry = entries_.front();
// Apply the policy to all sub-indexes
entry.value->seperator_is_key_plus_seq_ = seperator_is_key_plus_seq_;
// 这里为了获得当前block的内容
// 并将其返回,返回的只是一个 partition
auto s = entry.value->Finish(index_blocks);
index_size_ += index_blocks->index_block_contents.size();
finishing_indexes = true;
return s.ok() ? Status::Incomplete() : s;
}
}