BlockBasedTableBuilder 源码分析

BlockBasedTableBuilder::ParallelCompressionRep

先来看看BlockBasedTableBuilder中的多线程压缩部分 BlockBasedTableBuilder::ParallelCompressionRep

以下代码不加说明,都是在类 BlockBasedTableBuilder::ParallelCompressionRep 里面。

Keys

class ParallelCompressionRep 中,有个class Keys,这是一个为了提高内存利用率的类。涉及点:

  • 预分配 kKeysInitSize个元素的内存:若向Keys中添加的元素个数小于 kKeysInitSize,则使用可以使用预分配的内存,避免了动态内存分配;
  • 每次Keys::Clear()的时候,并不是真的释放了之前的内存,仅仅是调整了计数器size_,即常见的『惰性删除』。

这是常见而有效的设计,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
 class Keys {
public:
Keys() : keys_(kKeysInitSize), size_(0) {}

void PushBack(const Slice& key) {
if (size_ == keys_.size()) {
// 说明 keys_的元素个数超过预分配
keys_.emplace_back(key.data(), key.size());
} else {
// 使用预分配内存
keys_[size_].assign(key.data(), key.size());
}
size_++;
}

void SwapAssign(std::vector<std::string>& keys) {
size_ = keys.size();
std::swap(keys_, keys);
}

// 惰性删除
void Clear() { size_ = 0; }
size_t Size() { return size_; }
std::string& Back() { return keys_[size_ - 1]; }
std::string& operator[](size_t idx) {
assert(idx < size_);
return keys_[idx];
}

private:
const size_t kKeysInitSize = 32;
std::vector<std::string> keys_;
size_t size_;
};

std::unique_ptr<Keys> curr_block_keys;

WorkQueue

在继续讲解之前,先讲解下Rocksdb保证多线程安全的队列。

在rocksdb中,是怎么保证多线程压缩的顺序性呢,依赖WorkQueueWorkQueue其实就是个很简单的『生产-消费』多线程模型:写线程和多线程之间共享一个任务队列 queue_,以及保护这个共享队列queue_的互斥锁mutex_

WorkQueue::Finish函数之后,就不可用了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class WorkQueue {
std::mutex mutex_;
std::condition_variable readerCv_;
std::condition_variable writerCv_;
std::condition_variable finishCv_;

std::queue<T> queue_;
bool done_;
std::size_t maxSize_;

/// 必须在锁中调用这个函数
bool full() const {
if (maxSize_ == 0) {
return false;
}
return queue_.size() >= maxSize_;
}

public:
WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}

template <typename U>
bool push(U&& item);

template<typename T>
bool pop(T& item);

void finish();
void waitUntilFinished();

/// 更改 queue_ 大小
void setMaxSize(std::size_t maxSize) {
{
std::lock_guard<std::mutex> lock(mutex_);
maxSize_ = maxSize;
}
writerCv_.notify_all();
}


};

push

push 函数,向共享队列queue_ 中添加一个任务,成功则返回true。如果已经调用了::Finish 函数,则返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
template <typename U>
bool WorkQueue::push(U&& item) {
{
std::unique_lock<std::mutex> lock(mutex_);
// 等待 queue_ 队列为空
while (full() && !done_) {
writerCv_.wait(lock);
}

// 如果调用了 Finish 函数
// 则不再接受新的元素
if (done_) {
return false;
}

// 添加新的元素
queue_.push(std::forward<U>(item));
}

// 通知读线程
readerCv_.notify_one();
return true;
}

pop

pop函数,按照FIFO规则从共享任务队列 queue_ 中弹出一个待处理任务,并返回true。

如果已经调用了::Finish 函数,则返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
template<typename T>
bool WorkQueue::pop(T& item) {
{
// 等写线程添加元素
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.empty() && !done_) {
readerCv_.wait(lock);
}

// 如果已经调用了 ::Finish 函数
// 则不再处理
if (queue_.empty()) {
assert(done_);
return false;
}
// 弹出
item = queue_.front();
queue_.pop();
}

// 通知写线程
writerCv_.notify_one();
return true;
}

waitUntilFinished

waitUntilFinished 函数,用于阻塞等待 WorkQueue::Finish 函数调用。

1
2
3
4
5
6
void waitUntilFinished() {
std::unique_lock<std::mutex> lock(mutex_);
while (!done_) {
finishCv_.wait(lock);
}
}

finish

::Finish 函数调用后,WorkQueue 不再处理接受新的元素,也不再弹出旧的元素,waitUntilFinished 函数就能返回了。

1
2
3
4
5
6
7
8
9
10
void WorkQueue::finish() {
{
std::lock_guard<std::mutex> lock(mutex_);
assert(!done_);
done_ = true;
}
readerCv_.notify_all();
writerCv_.notify_all();
finishCv_.notify_all();
}

BlockRepSlot

BlockRepSlot 是基于WorkQueue实现的线程安全队列。

每个节点BlockRep*记录一个block的相关数据及其状态,那么就可以由写线程将BlockRep节点加入到BlockRepSlot中,压缩等工作线程从BlockRepSlot中通过take函数取出BlockRep节点,对其进行压缩等操作。

如此,就能完成多线程操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 // BlockRep 记录一个 block 的数据及其相关信息
struct BlockRep {
Slice contents; // 原始内容
Slice compressed_contents; // 压缩内容
std::unique_ptr<std::string> data; // 原始内容, 与 contents 区别
std::unique_ptr<std::string> compressed_data; // 压缩之后的数据
CompressionType compression_type; // 压缩类型
std::unique_ptr<std::string> first_key_in_next_block;
std::unique_ptr<Keys> keys; // 此block的所有keys
std::unique_ptr<BlockRepSlot> slot; // 所属的 BlockRepSlot ???
Status status;
};

class BlockRepSlot {
public:
BlockRepSlot() : slot_(1) {}

// 向 slot_ 中添加一
template <typename T>
void Fill(T&& rep) {
slot_.push(std::forward<T>(rep));
};

void Take(BlockRep*& rep) { slot_.pop(rep); }

private:
WorkQueue<BlockRep*> slot_;
};

FileSizeEstimator

当并行压缩开启时,类 FileSizeEstimator 用于计算输出文件大小。主要是有两个回调函数 EmitBlockReapBlock

  • EmitBlock:在添加到压缩线程之前调用
  • ReapBlock:在压缩完成之后调用

FileSizeEstimator 的源码简单如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class FileSizeEstimator {
public:
explicit FileSizeEstimator()
: raw_bytes_compressed(0),
raw_bytes_curr_block(0),
raw_bytes_curr_block_set(false),
raw_bytes_inflight(0),
blocks_inflight(0),
curr_compression_ratio(0),
estimated_file_size(0) {}

/// 当一个 block 要 emit to 压缩线程时,则计算一个文件大小
void EmitBlock(uint64_t raw_block_size, uint64_t curr_file_size);

/// 当一个 block 从压缩线程压缩完毕时
void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size);

/// 设置文件近似大小
void SetEstimatedFileSize(uint64_t size) {
estimated_file_size.store(size, std::memory_order_relaxed);
}

/// 获得文件近似大小
uint64_t GetEstimatedFileSize() {
return estimated_file_size.load(std::memory_order_relaxed);
}

/// 设置block大小
void SetCurrBlockRawSize(uint64_t size) {
raw_bytes_curr_block = size;
raw_bytes_curr_block_set = true;
}

private:
uint64_t raw_bytes_compressed; // 到目前为止压缩的字节,即这么多的字节是要要压缩的
uint64_t raw_bytes_curr_block; // Size of current block being appended.
bool raw_bytes_curr_block_set; //
std::atomic<uint64_t> raw_bytes_inflight; // 正在压缩,但尚未添加到sst文件的字节大小
std::atomic<uint64_t> blocks_inflight; // Number of blocks under compression and not appended yet.
std::atomic<double> curr_compression_ratio; // 压缩率
std::atomic<uint64_t> estimated_file_size; // SST 文件的近似大小
};

EmitBlock

EmitBlock函数,在将一个待压缩的block发送给压缩线程之前调用,用于计算当所有block压缩完成后sst文件的近似大小。主要有三部分组成:

  1. curr_file_size:已写入到sst的部分
  2. new_raw_bytes_inflight:当前正在压缩,还没写入sst的部分。这部分按照压缩率curr_compression_ratio 大致折算成最终写入sst的大小。
  3. kBlockTrailerSize:每个block都有footer,这部分不压缩,按照正在压缩的block数new_blocks_inflight 乘以每个footer的大小kBlockTrailerSize,即这部分写入sst的大小。

源码简单如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/// @param raw_block_size 待压缩的 block 大小
/// @param curr_file_size
void EmitBlock(uint64_t raw_block_size, uint64_t curr_file_size) {
// 当前正在压缩的字节数
uint64_t new_raw_bytes_inflight =
raw_bytes_inflight.fetch_add(raw_block_size,
std::memory_order_relaxed) +
raw_block_size;

// 当前正在压缩的block数据
uint64_t new_blocks_inflight =
blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1;

estimated_file_size.store(
curr_file_size + // 1. 已经写入 sst 的大小
static_cast<uint64_t>(
static_cast<double>(new_raw_bytes_inflight) * // 2. 压缩完成的大小 = 当前正在压缩 * 压缩率
curr_compression_ratio.load(std::memory_order_relaxed)) +
new_blocks_inflight * kBlockTrailerSize, // 3. new_blocks_inflight * 脚注大小
std::memory_order_relaxed);
}

ReapBlock

ReapBlock,也是个回调函数 ,当一个block压缩完毕时调用,来更新当前压缩的状态:

  • raw_bytes_compressed:到目前为止,有多少字节的数据被压缩了;
  • curr_compression_ratio:已压缩的字节数 / 当前对应的原始字节数
  • 重新估算最终写入sst文件的大小

代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) {
assert(raw_bytes_curr_block_set);

// 目前一共被压缩的字节数
uint64_t new_raw_bytes_compressed =
raw_bytes_compressed + raw_bytes_curr_block;
assert(new_raw_bytes_compressed > 0);

// 更新压缩率
// 分子:压缩的字节,分母:原始字节
curr_compression_ratio.store(
(curr_compression_ratio.load(std::memory_order_relaxed) *
raw_bytes_compressed +
compressed_block_size) /
static_cast<double>(new_raw_bytes_compressed),
std::memory_order_relaxed);
// 更新
raw_bytes_compressed = new_raw_bytes_compressed;

// 恢复
uint64_t new_raw_bytes_inflight =
raw_bytes_inflight.fetch_sub(raw_bytes_curr_block,
std::memory_order_relaxed) -
raw_bytes_curr_block;

uint64_t new_blocks_inflight =
blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1;

// 重新计算下文件大小
estimated_file_size.store(
curr_file_size +
static_cast<uint64_t>(
static_cast<double>(new_raw_bytes_inflight) *
curr_compression_ratio.load(std::memory_order_relaxed)) +
new_blocks_inflight * kBlockTrailerSize,
std::memory_order_relaxed);

raw_bytes_curr_block_set = false;
}

ParallelCompressionRep

为了更好地理解 class ParallelCompressionRep,下面在讲解源码时,会调整下不同字段的顺序。

ParallelCompressionRep,使用顺序如下:

  • ParallelCompressionRep内部,会调用ParallelCompressionRep::PrepareBlock接口,准备此block的数据;
  • 调用 EmitBlock函数发送给压缩线程;
  • 当压缩完成,会调用ParallelCompressionRep::ReapBlock接口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
struct BlockBasedTableBuilder::ParallelCompressionRep {
class Keys { /***/ };
class BlockRepSlot { /***/ };
struct BlockRep { /***/ };
class FileSizeEstimator { /***/ };

typedef std::vector<BlockRep> BlockRepBuffer;
typedef WorkQueue<BlockRep*> BlockRepPool;
typedef WorkQueue<BlockRep*> CompressQueue;
typedef WorkQueue<BlockRepSlot*> WriteQueue;

// 字段
std::unique_ptr<Keys> curr_block_keys; // 当前block的key
BlockRepBuffer block_rep_buf; // 记录各个block的数据状态
BlockRepPool block_rep_pool; // block_rep_pool 中记录的是 block_rep_buf
CompressQueue compress_queue; // 待压缩的任务队列
std::vector<port::Thread> compress_thread_pool; // 压缩线程
WriteQueue write_queue; // 写入文件的队列
std::unique_ptr<port::Thread> write_thread; // 写线程
FileSizeEstimator file_size_estimator; // 评估文件大小

// 等待第一个 block 压缩完成
std::atomic<bool> first_block_processed;
std::condition_variable first_block_cond;
std::mutex first_block_mutex;

explicit ParallelCompressionRep(uint32_t parallel_threads)
: curr_block_keys(new Keys()),
block_rep_buf(parallel_threads),
block_rep_pool(parallel_threads),
compress_queue(parallel_threads),
write_queue(parallel_threads),
first_block_processed(false) {
for (uint32_t i = 0; i < parallel_threads; i++) {
block_rep_buf[i].contents = Slice();
block_rep_buf[i].compressed_contents = Slice();
block_rep_buf[i].data.reset(new std::string());
block_rep_buf[i].compressed_data.reset(new std::string());
block_rep_buf[i].compression_type = CompressionType();
block_rep_buf[i].first_key_in_next_block.reset(new std::string());
block_rep_buf[i].keys.reset(new Keys());
block_rep_buf[i].slot.reset(new BlockRepSlot());
block_rep_buf[i].status = Status::OK();
// 放到线程池
// 因此,block_rep_pool 中每个元素的生命周期是由 block_rep_buf 中的每个元素负责
// block_rep_buf 中存储着原始数据
block_rep_pool.push(&block_rep_buf[i]);
}
}

/// 等待完成
~ParallelCompressionRep() { block_rep_pool.finish(); }

/// 使用在 non-buffered 模式
/// 准备好一个 block,并准备发送给 压缩线程
BlockRep* PrepareBlock(CompressionType compression_type,
const Slice* first_key_in_next_block,
BlockBuilder* data_block);

/// Used in EnterUnbuffered
BlockRep* PrepareBlock(CompressionType compression_type,
const Slice* first_key_in_next_block,
std::string* data_block,
std::vector<std::string>* keys);

/// 将 block 发送给 压缩线程
void EmitBlock(BlockRep* block_rep);

/// 接受从压缩线程的结果
void ReapBlock(BlockRep* block_rep);

private:
/// 生成 block
BlockRep* PrepareBlockInternal(CompressionType compression_type,
const Slice* first_key_in_next_block);
};

PrepareBlockInternal

下面,先来讲讲怎么创建一个 BlockRep对象。

TODO

block_rep_pool 的意义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 生成 block
BlockRep* PrepareBlockInternal(CompressionType compression_type,
const Slice* first_key_in_next_block) {
BlockRep* block_rep = nullptr;
// 从 block_rep_pool 头部出来
block_rep_pool.pop(block_rep);
assert(block_rep != nullptr);

assert(block_rep->data);

block_rep->compression_type = compression_type;

if (first_key_in_next_block == nullptr) {
// 表示最后一个block
block_rep->first_key_in_next_block.reset(nullptr);
} else {
// 非最后一个block
block_rep->first_key_in_next_block->assign(
first_key_in_next_block->data(), first_key_in_next_block->size());
}

return block_rep;
}

PrepareBlock

在将待压缩的block发送给压缩线程之前,要先准备好这个block。

PrepareBlock 函数,即用于完成这个过程。

  • compression_type:压缩类型
  • first_key_in_next_block:下一个block的第一个key
  • data_block:待压缩的block的数据部分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
BlockRep* PrepareBlock(CompressionType compression_type,
const Slice* first_key_in_next_block,
BlockBuilder* data_block) {
// 创建 block_req
BlockRep* block_rep =
PrepareBlockInternal(compression_type, first_key_in_next_block);
assert(block_rep != nullptr);
// 将 data_block 中数据给 block_rep->data
data_block->SwapAndReset(*(block_rep->data));
// 数据复制给 block_rep->contents
block_rep->contents = *(block_rep->data);
// 再将当前block的数据给 block_rep->keys
std::swap(block_rep->keys, curr_block_keys);
curr_block_keys->Clear();
return block_rep;
}

PrepareBlock

PrepareBlock 只是上述函数的重载。上述函数传入的是一个BlockBuilder*,这里只是把原本一个BlockBuilder 记录的数据分别传入。

1
2
3
4
5
6
7
8
9
10
11
12
BlockRep* PrepareBlock(CompressionType compression_type,
const Slice* first_key_in_next_block,
std::string* data_block,
std::vector<std::string>* keys) {
BlockRep* block_rep =
PrepareBlockInternal(compression_type, first_key_in_next_block);
assert(block_rep != nullptr);
std::swap(*(block_rep->data), *data_block);
block_rep->contents = *(block_rep->data);
block_rep->keys->SwapAssign(*keys);
return block_rep;
}

EmitBlock

将上述准备好的block发送给压缩线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 将 block 发送给 压缩线程
void EmitBlock(BlockRep* block_rep) {
assert(block_rep != nullptr);
assert(block_rep->status.ok());
// 将准备好的block加入工作队列
if (!write_queue.push(block_rep->slot.get())) {
return;
}
if (!compress_queue.push(block_rep)) {
return;
}

// 等待压缩完成
if (!first_block_processed.load(std::memory_order_relaxed)) {
std::unique_lock<std::mutex> lock(first_block_mutex);
// 阻塞等待,直到 first_block_processed 为 true
first_block_cond.wait(lock, [this] {
return first_block_processed.load(std::memory_order_relaxed);
});
}
}

ReapBlock

TODO

ReapBlock,当压缩完成时的回调函数。??? 应该是写入文件???

用于清除压缩数据,通知emit线程,可以继续压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Reap a block from compression thread
// 接受从压缩线程的结果
void ReapBlock(BlockRep* block_rep) {
assert(block_rep != nullptr);
// 清除压缩数据
block_rep->compressed_data->clear();
// ???
block_rep_pool.push(block_rep);

// 通知 emit Block,可以继续压缩
if (!first_block_processed.load(std::memory_order_relaxed)) {
std::lock_guard<std::mutex> lock(first_block_mutex);
first_block_processed.store(true, std::memory_order_relaxed);
first_block_cond.notify_one();
}
}

BlockBasedTableBuilder

初始化操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
BlockBasedTableBuilder::BlockBasedTableBuilder(
const BlockBasedTableOptions& table_options, const TableBuilderOptions& tbo,
WritableFileWriter* file) {
BlockBasedTableOptions sanitized_table_options(table_options);

// 压缩格式
if (sanitized_table_options.format_version == 0 &&
sanitized_table_options.checksum != kCRC32c) {
ROCKS_LOG_WARN(
tbo.ioptions.logger,
"Silently converting format_version to 1 because checksum is "
"non-default");
// silently convert format_version to 1 to keep consistent with current
// behavior
sanitized_table_options.format_version = 1;
}

// 创建 rep_
rep_ = new Rep(sanitized_table_options, tbo, file);

// filter block 此时还没创建,初始化
if (rep_->filter_builder != nullptr) {
rep_->filter_builder->StartBlock(0);
}

// 生成前缀
if (table_options.block_cache_compressed.get() != nullptr) {
BlockBasedTable::GenerateCachePrefix<Cache, FSWritableFile>(
table_options.block_cache_compressed.get(),
file->writable_file(),
&rep_->compressed_cache_key_prefix[0], // 输出参数
&rep_->compressed_cache_key_prefix_size, // 输出参数
tbo.db_session_id,
tbo.cur_file_num);
}

// 开启多线程压缩
if (rep_->IsParallelCompressionEnabled()) {
StartParallelCompression();
}
}

ParallelCompression

下面从压缩开始。

每个data_block在写入sst之前,如果设置了压缩,则都会经过一个压缩的过程。

IsParallelCompressionEnabled

是否开启多线程压缩,则由 CompressionOptions::parallel_threads 字段的值指示。

1
2
3
bool BlockBasedTableBuilder::Rep::IsParallelCompressionEnabled() const {
return compression_opts.parallel_threads > 1;
}

StartParallelCompression

如果开启了多线程压缩,压缩流程如下:

  • 在每次压缩前,主线程,使用ParallelCompressionRep::PrepareBlock函数,准备好待压缩的block;
  • 主线程调用ParallelCompressionRep::EmitBlock函数,将block送入压缩线程;
  • 压缩线程再进行压缩
  • 将压缩完毕的data_block,写入到sst文件中

下面是开启多个压缩线程的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void BlockBasedTableBuilder::StartParallelCompression() {
// 初始化 ParallelCompressionRep 对象
rep_->pc_rep.reset(
new ParallelCompressionRep(rep_->compression_opts.parallel_threads));
rep_->pc_rep->compress_thread_pool.reserve(
rep_->compression_opts.parallel_threads);
// 开启 parallel_threads 个压缩线程
for (uint32_t i = 0; i < rep_->compression_opts.parallel_threads; i++) {
rep_->pc_rep->compress_thread_pool.emplace_back([this, i] {
BGWorkCompression(*(rep_->compression_ctxs[i]), // 压缩上下文
rep_->verify_ctxs[i].get()); // 解压上下文
});
}
// 开启 1 个写线程
rep_->pc_rep->write_thread.reset(
new port::Thread([this] { BGWorkWriteRawBlock(); }));
}
BGWorkCompression

压缩线程的入口函数,BGWorkCompression ,仅用于压缩data_block,他一直在等待主线程发送待压缩的block,然后取出来,进行压缩。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void BlockBasedTableBuilder::BGWorkCompression(const CompressionContext& compression_ctx,
UncompressionContext* verify_ctx) {
ParallelCompressionRep::BlockRep* block_rep = nullptr;
while (rep_->pc_rep->compress_queue.pop(block_rep)) {
assert(block_rep != nullptr);
CompressAndVerifyBlock(block_rep->contents,
true, /* is_data_block*/
compression_ctx,
verify_ctx,
block_rep->compressed_data.get(), // 输出参数,unused ?
&block_rep->compressed_contents, // 保存压缩结果
&(block_rep->compression_type), // 压缩结果的字节数
&block_rep->status); // 压缩是否成功
// 将压缩完毕的 block 加入到 block_rep->slot 中
// 为后面写入sst准备
block_rep->slot->Fill(block_rep);
}
}
CompressAndVerifyBlock

CompressAndVerifyBlock` 函数,不仅尝试去压缩,而且会统计压缩过程中的一些信息。为便于下面的代码简洁,易于理解,把这部分去掉了,专注于功能。

sst是由一系列的block组成,每个block的格式都是:

1
2
3
block_data: uint8[n] # block 存储的数据
type: uint8 # 压缩类型
crc: uint32 # 校验和

顺序的压缩过程如下:

  1. 当前待压缩的数据是raw_block_contents,其大小不能超过 kCompressionSizeLimit 限制;
  2. 先尝试调用 CompressBlock 函数压缩,结果保存至block_contents
  3. 如果设置了校验压缩结果,即设置了 table_options.verify_compression 标志位,则会对block_contents进行解压,保存至contents,顺利的压缩,需要保证 block_contentscontents 相同;

如果一切顺利,则:

  • block_contents 中保存了raw_block_contents的压缩结果,
  • type 保存了压缩类型;

代码简洁后如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
void BlockBasedTableBuilder::CompressAndVerifyBlock(const Slice& raw_block_contents, 
bool is_data_block,
const CompressionContext& compression_ctx,
UncompressionContext* verify_ctx,
std::string* compressed_output, // unused
Slice* block_contents,
CompressionType* type,
Status* out_status) {
Rep* r = rep_;
bool is_status_ok = ok();
if (!r->IsParallelCompressionEnabled()) {
assert(is_status_ok);
}

*type = r->compression_type;
// 采样多少个字节用于压缩
uint64_t sample_for_compression = r->sample_for_compression;
// 中止压缩
bool abort_compression = false;

StopWatchNano timer(
r->ioptions.clock,
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats));

// 每次压缩字节大小有限制
if (is_status_ok && raw_block_contents.size() < kCompressionSizeLimit) {
// 1. 获取 compression_dict
const CompressionDict* compression_dict;
if (!is_data_block || r->compression_dict == nullptr) {
compression_dict = &CompressionDict::GetEmptyDict();
} else {
compression_dict = r->compression_dict.get();
}
assert(compression_dict != nullptr);
// 构造对象
CompressionInfo compression_info(r->compression_opts,
compression_ctx,
*compression_dict,
*type,
sample_for_compression);

std::string sampled_output_fast;
std::string sampled_output_slow;
/// 尝试将 @c raw_block_contents 进行压缩,并返回至 @c compressed_output
*block_contents = CompressBlock(raw_block_contents,
compression_info,
type,
r->table_options.format_version,
is_data_block /* do_sample */,
compressed_output,
&sampled_output_fast,
&sampled_output_slow);

// 由于一些压缩算法不太可靠,因此如果设置了 verify_compression
// 则需要校验
if (*type != kNoCompression && r->table_options.verify_compression) {
// Retrieve the uncompressed contents into a new buffer
const UncompressionDict* verify_dict;
if (!is_data_block || r->verify_dict == nullptr) {
verify_dict = &UncompressionDict::GetEmptyDict();
} else {
verify_dict = r->verify_dict.get();
}
assert(verify_dict != nullptr);
BlockContents contents;
UncompressionInfo uncompression_info(*verify_ctx,
*verify_dict,
r->compression_type);
Status stat = UncompressBlockContentsForCompressionType(
uncompression_info, block_contents->data(), block_contents->size(),
&contents, r->table_options.format_version, r->ioptions);

if (stat.ok()) {
// 将解压的内容与压缩之前的源内容比较
bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
if (!compressed_ok) {
// 解压内容与原来的待压缩的内容,不匹配,终止压缩
abort_compression = true;
ROCKS_LOG_ERROR(r->ioptions.logger,
"Decompressed block did not match raw block");
*out_status =
Status::Corruption("Decompressed block did not match raw block");
}
} else {
// 解压失败,终止压缩
*out_status = Status::Corruption(std::string("Could not decompress: ") +
stat.getState());
abort_compression = true;
}
}
} else {
// block 内容太多,无法一次性压缩,也要终止压缩
if (is_data_block) {
r->uncompressible_input_data_bytes.fetch_add(raw_block_contents.size(),
std::memory_order_relaxed);
}
abort_compression = true;
}

// 校验压缩结果
if (abort_compression) {
*type = kNoCompression;
*block_contents = raw_block_contents;
}
}
BGWorkWriteRawBlock

BGWorkCompression 函数,我们可以看到,压缩线程主要有两个动作:

  • 先压缩 data block,并将相关信息记录在block_rep
  • 再将 block_rep 保存在 block_rep->slot

此时呢,可以回顾下 ParallelCompressionRep::EmitBlock 函数,block_rep->slot 一开始就就加入到 write_queue 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void EmitBlock(BlockRep* block_rep) {
if (!write_queue.push(block_rep->slot.get())) {
return;
}
if (!compress_queue.push(block_rep)) {
return;
}

// 等待压缩完成
if (!first_block_processed.load(std::memory_order_relaxed)) {
std::unique_lock<std::mutex> lock(first_block_mutex);
// 阻塞等待,直到 ReapBlock 函数调用
first_block_cond.wait(lock, [this] {
return first_block_processed.load(std::memory_order_relaxed);
});
}

这说明什么?

如果前面的压缩部分没有完成,则当执行到BGWorkWriteRawBlock 函数时,会一直阻塞在:

1
slot->Take(block_rep);

无论前面的 CompressAndVerifyBlock函数是否压缩成功,都需要调用ReapBlock 函数,以防止EmitBlock处产生死锁。

Of Course,若前面压缩成功,则还大致需要执行以下流程:

  • 先将当前block中的每个key添加到 filter_builderindex_builder
  • 再生成 filter_blockidnex_block
  • 将block压缩后的数据写入sst

下面来看看细节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
Rep* r = rep_;
ParallelCompressionRep::BlockRepSlot* slot = nullptr;
ParallelCompressionRep::BlockRep* block_rep = nullptr;
while (r->pc_rep->write_queue.pop(slot)) {
assert(slot != nullptr);
// 阻塞于此,直到压缩完成,Take 函数才能返回
slot->Take(block_rep);
assert(block_rep != nullptr);
if (!block_rep->status.ok()) {
r->SetStatus(block_rep->status);
// Flush() 中存在 Emit(), 需要 ReapBlock
block_rep->status = Status::OK();
r->pc_rep->ReapBlock(block_rep);
continue;
}

for (size_t i = 0; i < block_rep->keys->Size(); i++) {
auto& key = (*block_rep->keys)[i];
if (r->filter_builder != nullptr) {
size_t ts_sz =
r->internal_comparator.user_comparator()->timestamp_size();
// 为后面创建 filter block 准备
r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
}
// 为后面创建 index block 准备
r->index_builder->OnKeyAdded(key);
}

// 为后续重新估算 sst 文件大小准备
r->pc_rep->file_size_estimator.SetCurrBlockRawSize(block_rep->data->size());

// 将压缩后的数据写入sst
WriteRawBlock(block_rep->compressed_contents, // 压缩的数据
block_rep->compression_type, // 压缩类型
&r->pending_handle,
true /* is_data_block*/,
&block_rep->contents);
if (!ok()) break;

// 基于前面添加的key,生成 filter block
if (r->filter_builder != nullptr) {
r->filter_builder->StartBlock(r->get_offset());
}
r->props.data_size = r->get_offset();
++r->props.num_data_blocks;

// 基于前面添加的key, 生成 index block
if (block_rep->first_key_in_next_block == nullptr) {
// 最后一个data block
r->index_builder->AddIndexEntry(&(block_rep->keys->Back()), nullptr,
r->pending_handle);
} else {
// 非最后一个data block
Slice first_key_in_next_block =
Slice(*block_rep->first_key_in_next_block);
r->index_builder->AddIndexEntry(&(block_rep->keys->Back()),
&first_key_in_next_block,
r->pending_handle);
}

// 通知 ParallelCompressionRep::EmitBlock
r->pc_rep->ReapBlock(block_rep);
}
}
WriteRawBlock

最后一步,就是要将压缩完的数据按照如下格式写入到sst文件中:

1
2
3
4
block_data: uint8[n] # block 存储的数据
type: uint8 # 压缩类型
crc: uint32 # 校验和
padding # 填充

注意,WriteRawBlock 函数的传入的参数handle ,在函数返回时记录了关于block的两个元信息:

  • 此block在sst文件中存储的起始位置
  • 此block压缩后的大小。

这个参数,实际上由Add(key, value)函数中的r->pending_handle传入,后续写入index builder,建立index block,这个在后面会讲解。

下面,简洁了部分代码后如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
CompressionType type,
BlockHandle* handle,
bool is_data_block,
const Slice* raw_block_contents) {
Rep* r = rep_;
Status s = Status::OK();
IOStatus io_s = IOStatus::OK();
StopWatch sw(r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS);
// 1. 初始化 handle
handle->set_offset(r->get_offset()); // block 在sst文件中的offset
handle->set_size(block_contents.size()); // 这个block 压缩后的大小
assert(status().ok());
assert(io_status().ok());
// 2. 向sst文件中追加内容 block_contents
io_s = r->file->Append(block_contents);
// 3. 下面写入footer
if (io_s.ok()) {
char trailer[kBlockTrailerSize];
trailer[0] = type; // 压缩类型
uint32_t checksum = 0; // 校验和
switch (r->table_options.checksum) {
// 计算校验和
//...
}
// 存储校验和
EncodeFixed32(trailer + 1, checksum);
assert(io_s.ok());
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
static_cast<char*>(trailer));

// 写入 footer
io_s = r->file->Append(Slice(trailer, kBlockTrailerSize));
//...
if (s.ok() && io_s.ok()) {
// 设置文件偏移量 : 当前偏移量 + block 数据大小 + footer
r->set_offset(r->get_offset() + block_contents.size() +
kBlockTrailerSize);

// 4. 填充
if (r->table_options.block_align && is_data_block) {
size_t pad_bytes =
(r->alignment - ((block_contents.size() + kBlockTrailerSize) & (r->alignment - 1))) &
(r->alignment - 1);
io_s = r->file->Pad(pad_bytes);
if (io_s.ok()) {
// 填充后,重新更新文件偏移量
r->set_offset(r->get_offset() + pad_bytes);
} else {
r->SetIOStatus(io_s);
}
}

// 5. 这个时候再重新计算文件大小
if (r->IsParallelCompressionEnabled()) {
if (is_data_block) {
r->pc_rep->file_size_estimator.ReapBlock(block_contents.size(),
r->get_offset());
} else {
r->pc_rep->file_size_estimator.SetEstimatedFileSize(r->get_offset());
}
}
}
} else {
r->SetIOStatus(io_s);
}
if (!io_s.ok() && s.ok()) {
r->SetStatus(io_s);
}
}

StopParallelCompression

和启动多线程部分相应,停止压缩线程、写线程。

1
2
3
4
5
6
7
8
void BlockBasedTableBuilder::StopParallelCompression() {
rep_->pc_rep->compress_queue.finish();
for (auto& thread : rep_->pc_rep->compress_thread_pool) {
thread.join();
}
rep_->pc_rep->write_queue.finish();
rep_->pc_rep->write_thread->join();
}

Add

好嘞,终于到了这里。

经过前面压缩部分的铺垫,相信这里会让你更加易懂。

每个key的编码信息中,都包含着一个value_type信息,关于value_type,详细地后续再说,这里一点,IsValueType 返回值为true,表示这对{k, v}可以写入到sst文件中。

1
2
3
4
5
6
7
8
inline bool IsValueType(ValueType t) {
return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeBlobIndex
|| kTypeDeletionWithTimestamp == t;
}

inline bool IsExtendedValueType(ValueType t) {
return IsValueType(t) || t == kTypeRangeDeletion;
}

BlockBasedTableBuilder 在添加{k, v}并写入到sst文件的过程,会经历三个阶段State

State

1
2
3
4
5
enum class State {
kBuffered,
kUnbuffered,
kClosed,
};

kBuffered模式下,将待压缩、待写入sst文件的data_block 暂时缓存在r->data_block_buffer中。当缓存中的数据长度r->data_begin_offset超过限制buffer_limit 时,就会进入到kUnbuffered模式。

一旦进入到kUnbuffered,就不可逆转到kBuffered。最终只能在调用 BlockBasedTableBuilder::Finish 时进入kClosed模式。

should_flush

每个BlockBasedTableBuilder 中都有个刷新策略r->flush_block_policy:将当前{k, v}添加到 r->data_block,若会触发更新,则会先将当前 r->data_block 的数据进行flush

  • r->data_begin_offset < r->buffer_limit:暂时缓存到 r->data_block_buffer中;
  • 否则,会先压缩,再写入sst文件。

此外,由于多线程的压缩、写入sst的过程在相应的子线程中完成,当没有开启多线程时,一些细节需要在主线程中单独进行处理,因此在下面的代码中会经常看到下面的代码结构:

1
2
3
4
if (r->IsParallelCompressionEnabled()) { 
} else {
// 对单线程进行处理
}

只要理解了之前讲解的多线程逻辑,下面的Add函数会很好理解。

代码简略后如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/// 向 cur_block 中添加 {key, value}
void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(rep_->state != Rep::State::kClosed);
if (!ok()) return;
ValueType value_type = ExtractValueType(key);
if (IsValueType(value_type)) {
// 判断是否需要 flush
auto should_flush = r->flush_block_policy->Update(key, value);
if (should_flush) {
assert(!r->data_block.empty());
r->first_key_in_next_block = &key;
Flush();

// 从 kBuffer 模式进入 kUnbuffered 模式
if (r->state == Rep::State::kBuffered
&& r->buffer_limit != 0
&&r->data_begin_offset > r->buffer_limit) {
EnterUnbuffered();
}

if (ok() && r->state == Rep::State::kUnbuffered) {
if (r->IsParallelCompressionEnabled()) {
r->pc_rep->curr_block_keys->Clear();
} else {
// 单线程:在 index builder 中添加一个 {k, pending_handle}
// 其中 pending_handle 记录了写入sst的位置及大小
r->index_builder->AddIndexEntry(&r->last_key,
&key,
r->pending_handle);
}
}
}

// NOTE: PartitionedFilterBlockBuilder 需要在 key先添加到 index builder 之后,
// 再添加到 PartitionedFilterBlockBuilder 中
if (r->state == Rep::State::kUnbuffered) {
if (r->IsParallelCompressionEnabled()) {
r->pc_rep->curr_block_keys->PushBack(key);
} else {
// 单线程:在 filter builder 中添加一个记录
if (r->filter_builder != nullptr) {
size_t ts_sz =
r->internal_comparator.user_comparator()->timestamp_size();
r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
}
}
}

// 更新当前data_block的last_key
r->last_key.assign(key.data(), key.size());
// 添加 {k, v} 到当前 data_block
r->data_block.Add(key, value);
if (r->state == Rep::State::kBuffered) {
// Buffered keys will be replayed from data_block_buffers during
// `Finish()` once compression dictionary has been finalized.
} else {
if (!r->IsParallelCompressionEnabled()) {
// 单线程
r->index_builder->OnKeyAdded(key);
}
}

NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
r->table_properties_collectors,
r->ioptions.logger);

} else if (value_type == kTypeRangeDeletion) {
r->range_del_block.Add(key, value);
NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
r->table_properties_collectors,
r->ioptions.logger);
} else {
assert(false);
}
//...
// 统计
}

Flush

Flush 函数中,针对多线程和单线程两种模式:

  • 如果开启了多线程 && 已经处于 kUnbuffered 模式,则调用多线程压缩、写入sst文件。
  • 否则,直接调用单线程的压缩写入。

下面来看看源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void BlockBasedTableBuilder::Flush() {
Rep* r = rep_;
assert(rep_->state != Rep::State::kClosed);
if (!ok()) return;
if (r->data_block.empty()) return;

if (r->IsParallelCompressionEnabled() && r->state == Rep::State::kUnbuffered) {
// 序列化 r->data_block 中的数据
r->data_block.Finish();
// 利用 r->data_block 中的数据生成 block_rep
ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
r->compression_type, r->first_key_in_next_block, &(r->data_block));
assert(block_rep != nullptr);
// 此时,r->data_block 中是空的了
assert(r->data_block.emoty());
// 在压缩前,估计下 sst 文件大小
r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
r->get_offset());
// 将 data_block 送入压缩线程,后台线程会压缩、写入sst文件
r->pc_rep->EmitBlock(block_rep);
} else {
// 单线程压缩
WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
}
}
WriteBlock

调用 WriteBlock 函数,不仅会来自于上面的 Flush 函数,还有会后面的WriteIndexBlockEnterUnbuffered函数等。因此,WriteBlock 函数做了统一接口,来应对 State::kBufferedState::kUnBuffered两种状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
BlockHandle* handle, // 正在pending的
bool is_data_block) {
// 序列化 block 中的数据
block->Finish();
std::string raw_block_contents;
// 将block序列化后的内容swap到 raw_block_contents 中
block->SwapAndReset(raw_block_contents);
if (rep_->state == Rep::State::kBuffered) {
// 只有 data_block 存在 kbuffered 模式
assert(is_data_block);
// 将数据先写入 data_block_buffers
rep_->data_block_buffers.emplace_back(std::move(raw_block_contents));
rep_->data_begin_offset += rep_->data_block_buffers.back().size();
return;
} // else 就是 KUnBuffer 模式
WriteBlock(raw_block_contents, handle, is_data_block);
}

WriteBlock

重载形式的WriteBlock ,只用于单线程压缩(没有写入sst文件的操作),且BlockBasedTableBuilder当前处于kUnbuffered模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/// @brief 用于 kUnbuffered 模式:单线程压缩
void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
BlockHandle* handle,
bool is_data_block) {
Rep* r = rep_;
assert(r->state == Rep::State::kUnbuffered);
Slice block_contents;
CompressionType type;
Status compress_status;
// 进行压缩
CompressAndVerifyBlock(raw_block_contents,
is_data_block,
*(r->compression_ctxs[0]),
r->verify_ctxs[0].get(),
&(r->compressed_output),
&(block_contents),
&type,
&compress_status);
r->SetStatus(compress_status);
// 如果压缩不成功
if (!ok()) {
return;
}

EnterUnbuffered

EnterUnbuffered函数,将BlockBasedTableBuilder的状态,从kBuffered推向kUnBuffered,仅会执行一次。

那么这个函数的使命?

我们知道在 kBuffered 模式下,每次调用Flush函数时,都是将r->data_block的数据缓存到r->data_block_buffers

当从kBuffered专向kUnBuffered时,很自然,就需要让r->data_block_buffers 中的数据也经历两个过程:

  • 压缩
  • 写入sst

带着这个思路,下面的代码就很好理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
void BlockBasedTableBuilder::EnterUnbuffered() {
Rep* r = rep_;
assert(r->state == Rep::State::kBuffered);
r->state = Rep::State::kUnbuffered;
//...

/// 获得 r->data_block_buffers[i] 的读迭代器
auto get_iterator_for_block = [&r](size_t i) {
auto& data_block = r->data_block_buffers[i];
assert(!data_block.empty());

Block reader{BlockContents{data_block}};
DataBlockIter* iter = reader.NewDataIterator(r->internal_comparator.user_comparator(),
kDisableGlobalSequenceNumber);

// 定位到这个 data_block 的起始处
iter->SeekToFirst();
assert(iter->Valid());
return std::unique_ptr<DataBlockIter>(iter);
};

std::unique_ptr<DataBlockIter> iter = nullptr, next_block_iter = nullptr;

for (size_t i = 0; ok() && i < r->data_block_buffers.size(); ++i) {
// 初始化迭代器
if (iter == nullptr) {
iter = get_iterator_for_block(i);
assert(iter != nullptr);
};

// 下一个 data_block 的迭代器
if (i + 1 < r->data_block_buffers.size()) {
next_block_iter = get_iterator_for_block(i + 1);
}

// 当前 data_block
auto& data_block = r->data_block_buffers[i];

/*** 下面对data_block进行压缩、写入***/
if (r->IsParallelCompressionEnabled()) {
/*** 开启了多线程压缩 ***/

Slice first_key_in_next_block;
const Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
if (i + 1 < r->data_block_buffers.size()) {
assert(next_block_iter != nullptr);
// 下一个 data_block 的第一个key
first_key_in_next_block = next_block_iter->key();
} else {
// r->first_key_in_next_block
// 即处于 kUnBuffered 状态的 data_block 第一个key
first_key_in_next_block_ptr = r->first_key_in_next_block;
}

// 迭代当前 data_block, 将所有的key,全部添加到 keys
std::vector<std::string> keys;
for (; iter->Valid(); iter->Next()) {
keys.emplace_back(iter->key().ToString());
}

// 生成 block_req
ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
r->compression_type, first_key_in_next_block_ptr, &data_block, &keys);

assert(block_rep != nullptr);
// 在发送前,估计下sst文件大小
r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
r->get_offset());
// 发送至压缩线程
r->pc_rep->EmitBlock(block_rep);
} else {
/*** 没有开启多线程压缩 ***/

// fileter block,index block
for (; iter->Valid(); iter->Next()) {
Slice key = iter->key();
if (r->filter_builder != nullptr) {
size_t ts_sz =
r->internal_comparator.user_comparator()->timestamp_size();
r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
}
r->index_builder->OnKeyAdded(key);
}
WriteBlock(Slice(data_block),
&r->pending_handle, // 记录了 data_block 在文件中的位置及其大小
true /* is_data_block */);

//! 此for循环,不包含最后一个data block
//! 因此,在 ::Finish 函数中,需要为最后一个 data block 单独调用一次 AddIndexEntry
if (ok() && i + 1 < r->data_block_buffers.size()) {
assert(next_block_iter != nullptr);
Slice first_key_in_next_block = next_block_iter->key();

Slice* first_key_in_next_block_ptr = &first_key_in_next_block;

iter->SeekToLast();
std::string last_key = iter->key().ToString();
// 添加一个 {key, handle}
r->index_builder->AddIndexEntry(&last_key, first_key_in_next_block_ptr,
r->pending_handle);
}
}

/*** 遍历完当前 data_block ***/

std::swap(iter, next_block_iter);
} // 遍历完所有的 data_block

/// 清除数据
r->data_block_buffers.clear();
}

Finish

在前文说过,一个table序列化到sst文件的格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<beginning_of_file>
[data block 1]
[data block 2]
...
[data block N]
[meta block 1: filter block]
[meta block 2: index block]
[meta block 3: compression dictionary block]
[meta block 4: range deletion block]
[meta block 5: stats block]
...
[meta block K: future extended block]
[metaindex block]
[Footer]
<end_of_file>

当调用::Finish函数时,data block部分已经构建完毕,下面就需要开始构建meta block

MetaIndexBuilder

前面的filter blockindex blockcompression dictionary blockrange deletion blockprop block等记录着data block各种信息,我们把这些记录data block信息的block统一叫做meta block。所谓meta,即信息的信息。

MetaIndexBuilder,则用于存储前面这些meta block在sst中的存储位置及其大小,最终用于构建整个Tablefooter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MetaIndexBuilder {
public:
MetaIndexBuilder()
: meta_index_block_(new BlockBuilder(1 /* restart interval */)) {}

MetaIndexBuilder(const MetaIndexBuilder&) = delete;
MetaIndexBuilder& operator=(const MetaIndexBuilder&) = delete;

/// 添加 {k, v}
/// handle 中记录了指向的 meta block 在sst中的位置及其大小
void Add(const std::string& key, const BlockHandle& handle) {
std::string handle_encoding;
handle.EncodeTo(&handle_encoding); // 编码为字符串
meta_block_handles_.emplace(key, std::move(handle_encoding));
}

/// 将所有添加到 meta_block_handles_ 中的{k, v} 序列化后返回
Slice Finish() {
for (const auto& metablock : meta_block_handles_) {
meta_index_block_->Add(metablock.first, metablock.second);
}
return meta_index_block_->Finish()
}

private:
stl_wrappers::KVMap meta_block_handles_; // 存着所有meta block的信息
std::unique_ptr<BlockBuilder> meta_index_block_; // 由这些meta block的元信息构建的block
};

有了这个思路,下面,就可以先来看看整体代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Status BlockBasedTableBuilder::Finish() {
Rep* r = rep_;
assert(r->state != Rep::State::kClosed);
bool empty_data_block = r->data_block.empty();
// 在flush时,暗示最后一个data block
r->first_key_in_next_block = nullptr;
// flush最后一个data block
Flush();
// 如果状态不是 kUnBuffered,则直接进入
if (r->state == Rep::State::kBuffered) {
EnterUnbuffered();
}

if (r->IsParallelCompressionEnabled()) {
StopParallelCompression();
} else {
// 对于单线程,EnterUnbuffered 函数没发为最后一个 data block 建立 index
// 因此,需要手动添加
if (ok() && !empty_data_block) {
r->index_builder->AddIndexEntry(&r->last_key,
nullptr /* no next data block */,
r->pending_handle);
}
}

// 开始构建 meta-block
BlockHandle metaindex_block_handle, index_block_handle;
MetaIndexBuilder meta_index_builder;
WriteFilterBlock(&meta_index_builder);
WriteIndexBlock(&meta_index_builder, &index_block_handle);
WriteCompressionDictBlock(&meta_index_builder);
WriteRangeDelBlock(&meta_index_builder);
WritePropertiesBlock(&meta_index_builder);
if (ok()) {
// 将meta-block写入sst文件
WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
&metaindex_block_handle);
}
// metaindex_block_handle 记录着 meta-block的信息
if (ok()) {
WriteFooter(metaindex_block_handle, index_block_handle);
}
r->state = Rep::State::kClosed;
r->SetStatus(r->CopyIOStatus());
Status ret_status = r->CopyStatus();
assert(!ret_status.ok() || io_status().ok());
return ret_status;
}

WriteFilterBlock

WriteFilterBlock函数的逻辑如下:

  • 如果不是 PartitionFilterBuilder,则直接将整个FilterBuiler构建的数据,在直接序列化后写入sst文件。
  • 如果是PartitionFilterBuilder,则以partition为单位,逐个写入到sst文件;
  • 等上述过程完毕,再将将此 FilterBuilder 的属性作为{k, v}添加到 meta_index_builder中:
    • k:是FilterBuilderType.FilterPolicyName
    • v:是写入sst文件的filter block的元信息。

代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void BlockBasedTableBuilder::WriteFilterBlock(MetaIndexBuilder* meta_index_builder) {
BlockHandle filter_block_handle;
bool empty_filter_block =
(rep_->filter_builder == nullptr || rep_->filter_builder->IsEmpty());
if (ok() && !empty_filter_block) {
// 添加的key数
rep_->props.num_filter_entries +=
rep_->filter_builder->EstimateEntriesAdded();
Status s = Status::Incomplete();
// 如果是 PartitionFilterBuilder
// 返回值是 IsIncomplete
while (ok() && s.IsIncomplete()) {
// 此处的 filter_block_handle 表示上一个partition的handle
Slice filter_content =
rep_->filter_builder->Finish(filter_block_handle, &s);
assert(s.ok() || s.IsIncomplete());
rep_->props.filter_size += filter_content.size();
// filter_content : 当前 filter partition 的内容
// filter_block_handle : 记录该 partition 在sst中大小及偏移量
WriteRawBlock(filter_content, kNoCompression, &filter_block_handle);
}
}

/*** 全部写入sst ***/

if (ok() && !empty_filter_block) {
// 获取 FilterBuilder 的名字
std::string key;
if (rep_->filter_builder->IsBlockBased()) {
key = BlockBasedTable::kFilterBlockPrefix;
} else {
key = rep_->table_options.partition_filters
? BlockBasedTable::kPartitionedFilterBlockPrefix
: BlockBasedTable::kFullFilterBlockPrefix;
}
// 获取 FilterPolicy 的名字
key.append(rep_->table_options.filter_policy->Name());
// 添加元信息
meta_index_builder->Add(key, filter_block_handle);
}
}

WriteIndexBlock

由前文可知,IndexBuiler对外提供了两种:

  • HashIndexBuilder
  • PartitionIndexBuiler

无论是哪种,最后的index block结果都是由 IndexBuilder::IndexBlocks::index_block_contents 保存。

1
2
3
4
struct IndexBuilder::IndexBlocks {
Slice index_block_contents;
std::unordered_map<std::string, Slice> meta_blocks;
};

区别在于:

  • PartitionIndexBuiler 有很多个partition,而且这个partition的数量和 PartitionFilterBuilder 中的 partition 的数量一致。而HashIndexBuilder可等效的看做只有一个partition;
  • IndexBuilder::IndexBlocks::meta_blocks 字段仅有HashIndexBuilder使用。

因此,WriteIndexBlock 函数的最终目的也是将所有partitions的内容写入sst。写入sst的所有信息都记录在index_block_handle中,这用于后续的footer

下面,带着上述理解,并顺着代码注释来阅读源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/// @param index_block_handle 用于记录 partition 的信息
void BlockBasedTableBuilder::WriteIndexBlock(
MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
IndexBuilder::IndexBlocks index_blocks;
// 可等效看做获取 first partition 的内容
auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
if (index_builder_status.IsIncomplete()) {
// meta_blocks 仅用于 HashIndexBuilder,在 PartitionIndexBuilder 下不支持
assert(index_blocks.meta_blocks.empty());
} else if (ok() && !index_builder_status.ok()) {
rep_->SetStatus(index_builder_status);
}
if (ok()) {
// 这一部分是针对 HashIndexBuilder
// item: {name, content}
// + {hashindex.prefixes, prefix_block_}
// + {"rocksdb.hashindex.metadata", prefix_meta_block_}
for (const auto& item : index_blocks.meta_blocks) {
BlockHandle block_handle;
WriteBlock(item.second, &block_handle, false /* is_data_block */);
if (!ok()) {
break;
}
meta_index_builder->Add(item.first, block_handle);
}
}

// 这一部分是通用的
if (ok()) {
// first patition
if (rep_->table_options.enable_index_compression) {
WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
} else {
WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
index_block_handle);
}
}

// 下面是针对 PartitionIndexBuiler
if (index_builder_status.IsIncomplete()) {
Status s = Status::Incomplete();
while (ok() && s.IsIncomplete()) {
// index_block_handle 表示上一个 partition
s = rep_->index_builder->Finish(&index_blocks, *index_block_handle);
if (!s.ok() && !s.IsIncomplete()) {
rep_->SetStatus(s);
return;
}
// 将每个partition写入sst文件
if (rep_->table_options.enable_index_compression) {
WriteBlock(index_blocks.index_block_contents, index_block_handle,
false);
} else {
WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
index_block_handle);
}
// The last index_block_handle will be for the partition index block
}
}
}

WriteFooter

最后就是写入footer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
BlockHandle& index_block_handle) {
Rep* r = rep_;
bool legacy = (r->table_options.format_version == 0);
assert(r->table_options.checksum == kCRC32c ||
r->table_options.format_version != 0);
Footer footer(
legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber,
r->table_options.format_version);
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
footer.set_checksum(r->table_options.checksum);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
assert(ok());
// 最后在此写入
IOStatus ios = r->file->Append(footer_encoding);
if (ios.ok()) {
r->set_offset(r->get_offset() + footer_encoding.size());
} else {
r->SetIOStatus(ios);
r->SetStatus(ios);
}
}