DataBlock 源码剖析

Table

BlockBased Table Format

在 Rocksdb 中,BlockBased Table 是 SST 默认的 table 格式。SST文件默认的格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<beginning_of_file>
[data block 1]
[data block 2]
...
[data block N]
[meta block 1: filter block]
[meta block 2: index block]
[meta block 3: compression dictionary block]
[meta block 4: range deletion block]
[meta block 5: stats block]
...
[meta block K: future extended block]
[metaindex block]
[Footer]
<end_of_file>

[1] data block : SST文件中的 {key, value} 键值对是按照key有序的存储,并且将key划分到不同 data blocks。这些 data blocks ,从SST文件头部开始并连续存储,每个数据 date block 的格式后面再讲解。

[2 ] meta block : 在 data block 之后 。所谓meta,即信息的信息,用于记录 data block 的相关信息。目前支持的meta block类型如上面的所述。

[3] metaindex block : 用于索引上面的每个meta block,即针对每个meta block, 都有一个 entry 来记录这些 meta block的信息。

1
2
key: meta block name	// 该meta block 的名字
value: BlockHandle* // 指向该 meta block

[4] **footer ** : 在SST文件末尾有一个固定长度的footer,主要有三个部分:

  • metaindex_hanle:指向 metaindex_handle
  • index_handle : 指向 idnex
  • Magic number

格式如下。

1
2
3
4
5
metaindex_handle: char[p];      // Block handle for metaindex
index_handle: char[q]; // Block handle for index
padding: char[40-p-q]; // zeroed bytes to make fixed length
// (40==2*BlockHandle::kMaxEncodedLength)
magic: fixed64; // 0x88e241b785f4cff7 (little-endian)

源码分析

DataBlock

DataBlockHashIndexBuilder

先来讲解下 DataBlockHashIndexBuilder。

为了降低在一个 DataBlock查询时的 CPU 利用率,Rocksdb 专门为 DataBlock 设计了一个 DataBlockHashIndexBuilder 类,仅支持 BlockBasedTable::Get() 操作时使用。

格式

现在的 DataBlock 格式如下:

1
DATA_BLOCK: [RI RI RI ... RI RI_IDX HASH_IDX FOOTER]

其中,

  • RIRestart Interval

  • RD_IDX : Restart Interval Index

  • HASH_IDX : 新的 data-block index 特征

    Data-block hash index 的格式如下。

    1
    2
    3
    4
     HASH_IDX: [B B B ... B NUM_BUCK]

    // B : bucket, 存放 restart index,类型是 uint8_t
    // NUM_BUCK : bucket 的数量

    由于已经使用了两个特殊的 flag :

    • kNoEntry=255
    • kCollision=254

    因为,restart interval 的最大数量只有253个。每个bucket的初始化值是 kNoEntry

  • FOOTER : 32 位

store & query

stroe : 当在hash index 中存储一个key时,首先将这个key hash 到一个bucket中:

  • 如果这个bucket是空的,即 kNoEntry,那么这个key所属的 RI 则存储在这个bucket中;
  • 如果这个bucket已经有元素了,则将已经存在的 RI 更新为 kCollision 标志位,并不会存储 RI

query : 当查询一个key时,也会先计算这个key经过hash后会落到哪一个bucket。然后检查该 bucket 存储的值,是 kNoEntry 还是 kCollision,如果都不是,则存储的就是该 key 的 RI,直接获取该 RI,那么就可以直接去 RI 并搜索 key。

Note: hash index 中的 RI 的数量必须小于 254,超过的不会创建。

即,尝试记录每个key所属的restart index,加速查询。

现在我就来看看这个类 DataBlockHashIndexBuilder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
const uint8_t kNoEntry = 255;
const uint8_t kCollision = 254;
const uint8_t kMaxRestartSupportedByHashIndex = 253;

// Because we use uint16_t address, we only support block no more than 64KB
const size_t kMaxBlockSizeSupportedByHashIndex = 1u << 16;
// 此哈希表的默认负载因子
const double kDefaultUtilRatio = 0.75;

class DataBlockHashIndexBuilder {
public:
DataBlockHashIndexBuilder()
: bucket_per_key_(-1 /*uninitialized marker*/),
estimated_num_buckets_(0),
valid_(false) {}

void Initialize(double util_ratio) {
if (util_ratio <= 0) {
util_ratio = kDefaultUtilRatio;
}

// 每个 bucket 能容纳的 key 数量
bucket_per_key_ = 1 / util_ratio;
valid_ = true;
}

/// 这个 HashIndexBuilder 是否可用
inline bool Valid() const { return valid_ && bucket_per_key_ > 0; }

/// 这个 HashIndexBuilder 的大致大小
inline size_t EstimateSize() const {
uint16_t estimated_num_buckets = static_cast<uint16_t>( estimated_num_buckets_);

// Maching the num_buckets number in DataBlockHashIndexBuilder::Finish.
estimated_num_buckets |= 1;

return sizeof(uint16_t) +
static_cast<size_t>(estimated_num_buckets * sizeof(uint8_t));
}

/// 向缓存 hash_and_restart_pairs_ 中添加 {hash_value, ri}
void Add(const Slice& key, const size_t restart_index);
/// 将 @c hash_and_restart_pairs_ 中数据 dump 到 buffer 中
void Finish(std::string& buffer);
void Reset();

private:
double bucket_per_key_; // util_ratio_ 的逆
double estimated_num_buckets_; // 添加的key个数

// 添加的 RI 是否超过 253。如果超过,则此 HashIndex 就不可用了
bool valid_;
// buckets
std::vector<std::pair<uint32_t, uint8_t>> hash_and_restart_pairs_;
friend class DataBlockHashIndex_DataBlockHashTestSmall_Test;
};

主要来看两个函数。

Add函数,是先计算出待添加 {key, ri} 中key的 hash_value,然后将{hash_value, ri} 添加到 hash_and_restart_pairs_ 中。

在此,可以将hash_and_restart_pairs_理解为缓存。在调用Finish函数时,再将 hash_and_restart_pairs_ 的内容给dump到buffer中,供给调用者使用。

DataBlockHashIndexBuilder::Add
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void DataBlockHashIndexBuilder::Add(const Slice& key, const size_t restart_index) {
assert(Valid());
// restart_index 不能超过 253
if (restart_index > kMaxRestartSupportedByHashIndex) {
valid_ = false;
return;
}

// 计算 key 的hash值
uint32_t hash_value = GetSliceHash(key);
// 添加一个bucket : {hash_value, RI}
hash_and_restart_pairs_.emplace_back(hash_value, static_cast<uint8_t>(restart_index));
estimated_num_buckets_ += bucket_per_key_;
}
DataBlockHashIndexBuilder::Finish

输出参数 buffer 最后的格式如下:

1
HASH_IDX: [B B B ... B NUM_BUCK]

整个序列有两步:

  1. 先将 hash_and_restart_pairs_ 通过处理,转换到 buckets
  2. 再将buckets 给序列化到输出参数 buffer 中。

代码解释如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void DataBlockHashIndexBuilder::Finish(std::string& buffer) {
assert(Valid());
uint16_t num_buckets = static_cast<uint16_t>(estimated_num_buckets_);

if (num_buckets == 0) {
num_buckets = 1; // sanity check
}

// 当 num_buckets 是偶数时,内置的 hash 函数不能很好地将不同的key散列到不同的buckets中
// 因此,将 num_buckets 变为奇数,来避免这个问题
num_buckets |= 1;

// 1. 先将 hash_and_restart_pairs_ 中的record存储到 buckets 中
std::vector<uint8_t> buckets(num_buckets, kNoEntry);
for (auto& entry : hash_and_restart_pairs_) {
uint32_t hash_value = entry.first;
uint8_t restart_index = entry.second;
// 通过 hash_value 计算该 key 在 buckets 中的 index
uint16_t buck_idx = static_cast<uint16_t>(hash_value % num_buckets);
if (buckets[buck_idx] == kNoEntry) {
// 如果该 bucket 之前是空的,则存储 RI
buckets[buck_idx] = restart_index;
} else if (buckets[buck_idx] != restart_index) {
// 否则发生了hash冲突,则标记为 kCollision
buckets[buck_idx] = kCollision;
}
}

// 2. 再将 buckets 中的数据序列化到 buffer 中
for (uint8_t restart_index : buckets) {
buffer.append(const_cast<const char*>(reinterpret_cast<char*>(&restart_index)),
sizeof(restart_index));
}

// 3. 以 NUM_BUCK 结尾
PutFixed16(&buffer, num_buckets);

assert(buffer.size() <= kMaxBlockSizeSupportedByHashIndex);
}

现在,我们再来看看 BlockBuilder,用于构建data-block。

BlockBuilder

BlockBuilder 用于构建 data-block 。

每一对{key, value} 即一个entry,一个entry的存储格式如下:

先看下基本的注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class BlockBuilder {
public:
BlockBuilder(const BlockBuilder&) = delete;
void operator=(const BlockBuilder&) = delete;

explicit BlockBuilder(int block_restart_interval,
bool use_delta_encoding = true,
bool use_value_delta_encoding = false,
BlockBasedTableOptions::DataBlockIndexType index_type =
BlockBasedTableOptions::kDataBlockBinarySearch,
double data_block_hash_table_util_ratio = 0.75);

/// Reset the contents as if the BlockBuilder was just constructed.
void Reset() {
buffer_.clear();
restarts_.clear();
restarts_.push_back(0); // First restart point is at offset 0
estimate_ = sizeof(uint32_t) + sizeof(uint32_t);
counter_ = 0;
finished_ = false;
last_key_.clear();
if (data_block_hash_index_builder_.Valid()) {
data_block_hash_index_builder_.Reset();
}
}

bool empty() const { return buffer_.empty(); }

/// 将 BlockBuild 中的内容与 buffer 交换
/// 再 Reset BlockBuilder
void SwapAndReset(std::string& buffer) {
std::swap(buffer_, buffer);
Reset();
}

/// 返回当前data-block的近似大小
inline size_t CurrentSizeEstimate() const {
// 当前大小 + 正在创建的 data_block_hash_index_builder_ 大小
return estimate_ + (data_block_hash_index_builder_.Valid()
? data_block_hash_index_builder_.EstimateSize()
: 0);
}

/// 返回添加了 {key,value} 后此data-block的近似大小
size_t EstimateSizeAfterKV(const Slice& key, const Slice& value) const;

/// 添加 {key, value}
void Add(const Slice& key, const Slice& value,
const Slice* const delta_value = nullptr);

/// 将 data-block 序列化到 buffer中,并返回 buffer 的 slice
Slice Finish();
private:
const int block_restart_interval_;
const bool use_delta_encoding_; // key 是否使用共享前缀的编码方式
const bool use_value_delta_encoding_; // value 是否使用共享前缀的编码方式

std::string buffer_; // 存储这个data-block的数据
std::vector<uint32_t> restarts_; // 记录所有的 Restart points (rp) 在 buffer 中偏移量
size_t estimate_; // restarts_ 的大小
int counter_; // 当前 rp 添加的 key 数
bool finished_; // Has Finish() been called?
std::string last_key_; // 当前 rp 最后添加的 key
DataBlockHashIndexBuilder data_block_hash_index_builder_;
};

BlockBuilder::BlockBuilder

BlockBuilder 的构造函数里,可以通过index_type来选择是否开启DataBlockHashIndexBuilder,来降低 CPU 利用率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
BlockBuilder::BlockBuilder(int block_restart_interval, 
bool use_delta_encoding,
bool use_value_delta_encoding,
BlockBasedTableOptions::DataBlockIndexType index_type,
double data_block_hash_table_util_ratio)
: block_restart_interval_(block_restart_interval),
use_delta_encoding_(use_delta_encoding),
use_value_delta_encoding_(use_value_delta_encoding),
restarts_(),
counter_(0),
finished_(false) {
switch (index_type) {
case BlockBasedTableOptions::kDataBlockBinarySearch: // 传统的二分查找key
break;
case BlockBasedTableOptions::kDataBlockBinaryAndHash: // hash + 二分查找
data_block_hash_index_builder_.Initialize(data_block_hash_table_util_ratio);
break;
default:
assert(0);
}
assert(block_restart_interval_ >= 1);
restarts_.push_back(0); // First restart point is at offset 0
estimate_ = sizeof(uint32_t) + sizeof(uint32_t);
}

BlockBuilder::Add

向这个 data-block 中添加一对 {key, value}时,为了降低内存使用率,rocksb会压缩前缀:如果当前key 与前一个last_key存在公共前缀,设公共前缀大小为 shared,那么rocksdb 将不会存储这个shared个字节。

但是有两种情况,rocksdb会完整地存储一个key

  1. shared == 0:即当前key与前一个last_key不存在公共部门。很自然,要全部完全地存储key
  2. counter_ >= block_restart_interval_:其中block_restart_interval_是能连续压缩前缀的最大key数, counter_ 是当前已经以压缩前缀存储的key的数量,如果counter_ 超过阈值,则下一个next_key 即便与key存在公共前缀,也不会压缩next_key前缀的方式来存储。

在rocksdb里,把这种压缩前缀的存储方式叫做delta encoding,把要完整存储一个key的地址叫做 restart point。这样,在查询这个key时,先定位到这个key所属的restart point,然后线性搜索该key,可以降低时间复杂度。

由于存在线性搜索,因此要设计一个 block_restart_interval_

当key使用delta encoding时,即 used_delta_encoding_ == true,一对{key, value}的存储格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
//An entry for a particular key-value pair has the form:
// shared_bytes: varint32
// unshared_bytes: varint32
// value_length: varint32
// key_delta: char[unshared_bytes]
// value: char[value_length]
// shared_bytes == 0 for restart points.
//
// The trailer of the block has the form:
// restarts: uint32[num_restarts]
// num_restarts: uint32
// restarts[i] contains the offset within the block of the ith restart point.

当对value也使用delta encoding时,即 use_value_delta_encoding_ == true,此时必须存储格式如下:

1
2
3
4
5
6
7
8
9
10
11
//An entry for a particular key-value pair has the form:
// if shared != 0:
// shared_bytes: varint32
// unshared_bytes: varint32
// key_delta: char[unshared_bytes]
// delta_value: char[]
// else:
// 0: varint32
// key_length: varint32
// key: char[key_length]
// value: char[]

因此在use_value_delta_encoding_ == true 时,可以通过 shared == 0 来判断当前key的编码方式:

  • shared != 0:说明是delta encoding
  • shared == 0:说明是个restart point,此时 value 也不必使用delta encoding

TODO:关注下读取的时候

最后,通过 data_block_hash_index_builder_ 来加速索引,将每个 key 映射到所属的 rp

下面通过代码注释来理解上面的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
void BlockBuilder::Add(const Slice& key, const Slice& value,
const Slice* const delta_value) {
assert(!finished_);
assert(counter_ <= block_restart_interval_);
assert(!use_value_delta_encoding_ || delta_value);
size_t shared = 0;

// 超过阈值:当前rp下不能继续添加 key
if (counter_ >= block_restart_interval_) {
// Restart compression
restarts_.push_back(static_cast<uint32_t>(buffer_.size())); // 新的rp在 buffer 中的起始偏移量
estimate_ += sizeof(uint32_t);
counter_ = 0;

if (use_delta_encoding_) {
last_key_.assign(key.data(), key.size()); // 记录当前rp的第一个key
}
} else if (use_delta_encoding_) {
Slice last_key_piece(last_key_);
// 计算当前 key 和当前 rp 最后一个key的公共前缀
shared = key.difference_offset(last_key_piece);

// 更新当前rp最后的key
last_key_.assign(key.data(), key.size());
}

const size_t non_shared = key.size() - shared;
const size_t curr_size = buffer_.size();

if (use_value_delta_encoding_) {
// Add "<shared><non_shared>" to buffer_
PutVarint32Varint32(&buffer_,
static_cast<uint32_t>(shared),
static_cast<uint32_t>(non_shared));
} else {
// Add "<shared><non_shared><value_size>" to buffer_
PutVarint32Varint32Varint32(&buffer_,
static_cast<uint32_t>(shared),
static_cast<uint32_t>(non_shared),
static_cast<uint32_t>(value.size()));
}

buffer_.append(key.data() + shared, non_shared);
// 只有在 shared !=0 时,才使用 value delta encoding;
// 因为对 restart point 使用 delta_encoding 无意义
if (shared != 0 && use_value_delta_encoding_) {
// <shared><non_shared><delta_key><delta_value>
buffer_.append(delta_value->data(), delta_value->size());
} else {
// <shared><non_shared><value_size><key><value>
buffer_.append(value.data(), value.size());
}

// 为key建立hash索引
if (data_block_hash_index_builder_.Valid()) {
data_block_hash_index_builder_.Add(ExtractUserKey(key), restarts_.size() - 1);
}

counter_++;
estimate_ += buffer_.size() - curr_size; // 增加了 {k, v} 序列化后的大小
}

BlockBuilder::EstimateSizeAfterKV

EstimateSizeAfterKV 函数,计算添加{k, v}之后的大小,但是这个计算结果是偏大的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
size_t BlockBuilder::EstimateSizeAfterKV(const Slice& key, const Slice& value) const {
size_t estimate = CurrentSizeEstimate();
// 计算的是key的大小,而不是non-shared,因此 estimate 是近似的
estimate += key.size();
// 传入的 value 是 BlockHandle 的序列化表示
// 在 use_value_delta_encoding_ 为 true 时,只有 BlockHandle 的 size 字段被编码了
// 刚好是一半 ???
estimate +=
!use_value_delta_encoding_ || (counter_ >= block_restart_interval_)
? value.size()
: value.size() / 2;

if (counter_ >= block_restart_interval_) {
estimate += sizeof(uint32_t); // a new restart entry.
}

estimate += sizeof(int32_t); // varint for shared prefix length.
// Note: this is an imprecise estimate as we will have to encoded size, one
// for shared key and one for non-shared key.
estimate += VarintLength(key.size()); // varint for key length.
if (!use_value_delta_encoding_ || (counter_ >= block_restart_interval_)) {
estimate += VarintLength(value.size()); // varint for value length.
}

return estimate;
}

BlockBuilder::Finish

由于调用 BlockBuilder::Add 时,已经将 {key, value} 都序列化完毕,到调用 Finsh 函数时,只需要在buffer 中添加一些meta info

  • 需要将restarts_ 中的记录的 rp 都序列化到 buffer_ 中;
  • 如果data_block_hash_index_builder_有效,则将其数据也序列化到 buffer_中;
  • 最后,将{index_type, num_restarts} 序列化到buffer

因此,整个data-block的序列化图如下所示。

代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/// @brief 返回的是这个DataBlock的序列化结果
Slice BlockBuilder::Finish() {
// 将 rp 序列化到 buffer 中
for (size_t i = 0; i < restarts_.size(); i++) {
PutFixed32(&buffer_, restarts_[i]);
}

uint32_t num_restarts = static_cast<uint32_t>(restarts_.size());
BlockBasedTableOptions::DataBlockIndexType index_type =
BlockBasedTableOptions::kDataBlockBinarySearch;

// 如果 开启了 kDataBlockBinaryAndHash
// 并且 rp 的个数没有超过 kMaxBlockSizeSupportedByHashIndex
// 则建立 HashIndex
if (data_block_hash_index_builder_.Valid() &&
CurrentSizeEstimate() <= kMaxBlockSizeSupportedByHashIndex) {
data_block_hash_index_builder_.Finish(buffer_);
index_type = BlockBasedTableOptions::kDataBlockBinaryAndHash;
}

// 生成footer
uint32_t block_footer = PackIndexTypeAndNumRestarts(index_type, num_restarts);

PutFixed32(&buffer_, block_footer);
finished_ = true;
return Slice(buffer_);
}

BlockHandle

BlockHandle 对象block_handle,用于记录一个block的大小及其在SST中的存储的位置。

BlockBuilder 存储{k, v}时,其中vblock_handle 序列化为字符串后的形式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class BlockHandle {
public:
BlockHandle()
: BlockHandle(~static_cast<uint64_t>(0), ~static_cast<uint64_t>(0)) {}
BlockHandle(uint64_t offset, uint64_t size)
: offset_(_offset), size_(_size) {}

uint64_t offset() const { return offset_; }
uint64_t size() const { return size_; }
bool IsNull() const { return offset_ == 0 && size_ == 0; }

void set_offset(uint64_t _offset) { offset_ = _offset; }
void set_size(uint64_t _size) { size_ = _size; }

/// 将这个block的两个字段 {offset, size} 序列化成字符串,
/// 并保存至 @c dst 中
void EncodeTo(std::string* dst) const {
assert(offset_ != ~static_cast<uint64_t>(0));
assert(size_ != ~static_cast<uint64_t>(0));
PutVarint64Varint64(dst, offset_, size_);
}

/// 这个block的两个字段{offset, size} 内容从 @c input 中获取
Status DecodeFrom(Slice* input) {
if (GetVarint64(input, &offset_) && GetVarint64(input, &size_)) {
return Status::OK();
}

// 解析失败
offset_ = 0;
size_ = 0;
return Status::Corruption("bad block handle");
}

static const BlockHandle& NullBlockHandle() { return kNullBlockHandle; }

// Maximum encoding length of a BlockHandle
enum { kMaxEncodedLength = 10 + 10 };

inline bool operator==(const BlockHandle& rhs) const {
return offset_ == rhs.offset_ && size_ == rhs.size_;
}
inline bool operator!=(const BlockHandle& rhs) const {
return !(*this == rhs);
}

private:
uint64_t offset_; // 该 block 在文件中的起始偏移量
uint64_t size_; // 该 block 的大小

static const BlockHandle kNullBlockHandle;
};

const BlockHandle BlockHandle::kNullBlockHandle(0, 0);

Refernce