BlockBasedTableBuilder::ParallelCompressionRep 先来看看BlockBasedTableBuilder
中的多线程压缩部分 BlockBasedTableBuilder::ParallelCompressionRep
。
以下代码不加说明,都是在类 BlockBasedTableBuilder::ParallelCompressionRep
里面。
Keys 在 class ParallelCompressionRep
中,有个class Keys
,这是一个为了提高内存利用率的类。涉及点:
预分配 kKeysInitSize
个元素的内存:若向Keys
中添加的元素个数小于 kKeysInitSize
,则使用可以使用预分配的内存,避免了动态内存分配;
每次Keys::Clear()
的时候,并不是真的释放了之前的内存,仅仅是调整了计数器size_
,即常见的『惰性删除』。
这是常见而有效的设计,代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class Keys { public : Keys () : keys_ (kKeysInitSize), size_ (0 ) {} void PushBack (const Slice& key) { if (size_ == keys_.size ()) { keys_.emplace_back (key.data (), key.size ()); } else { keys_[size_].assign (key.data (), key.size ()); } size_++; } void SwapAssign (std::vector<std::string>& keys) { size_ = keys.size (); std::swap (keys_, keys); } void Clear () { size_ = 0 ; } size_t Size () { return size_; } std::string& Back () { return keys_[size_ - 1 ]; } std::string& operator [](size_t idx) { assert (idx < size_); return keys_[idx]; } private : const size_t kKeysInitSize = 32 ; std::vector<std::string> keys_; size_t size_; }; std::unique_ptr<Keys> curr_block_keys;
WorkQueue 在继续讲解之前,先讲解下Rocksdb保证多线程安全的队列。
在rocksdb中,是怎么保证多线程压缩的顺序性呢,依赖WorkQueue
,WorkQueue
其实就是个很简单的『生产-消费』多线程模型:写线程和多线程之间共享一个任务队列 queue_
,以及保护这个共享队列queue_
的互斥锁mutex_
。
WorkQueue
在::Finish
函数之后,就不可用了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 class WorkQueue { std::mutex mutex_; std::condition_variable readerCv_; std::condition_variable writerCv_; std::condition_variable finishCv_; std::queue<T> queue_; bool done_; std::size_t maxSize_; bool full () const { if (maxSize_ == 0 ) { return false ; } return queue_.size () >= maxSize_; } public : WorkQueue (std::size_t maxSize = 0 ) : done_ (false ), maxSize_ (maxSize) {} template <typename U> bool push (U&& item) ; template <typename T> bool pop (T& item) ; void finish () ; void waitUntilFinished () ; void setMaxSize (std::size_t maxSize) { { std::lock_guard<std::mutex> lock (mutex_) ; maxSize_ = maxSize; } writerCv_.notify_all (); } };
push push
函数,向共享队列queue_
中添加一个任务,成功则返回true。如果已经调用了::Finish
函数,则返回false。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 template <typename U>bool WorkQueue::push (U&& item) { { std::unique_lock<std::mutex> lock (mutex_) ; while (full () && !done_) { writerCv_.wait (lock); } if (done_) { return false ; } queue_.push (std::forward<U>(item)); } readerCv_.notify_one (); return true ; }
pop pop
函数,按照FIFO规则从共享任务队列 queue_
中弹出一个待处理任务,并返回true。
如果已经调用了::Finish
函数,则返回false。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 template <typename T> bool WorkQueue::pop (T& item) { { std::unique_lock<std::mutex> lock (mutex_) ; while (queue_.empty () && !done_) { readerCv_.wait (lock); } if (queue_.empty ()) { assert (done_); return false ; } item = queue_.front (); queue_.pop (); } writerCv_.notify_one (); return true ; }
waitUntilFinished waitUntilFinished
函数,用于阻塞等待 WorkQueue::Finish
函数调用。
1 2 3 4 5 6 void waitUntilFinished () { std::unique_lock<std::mutex> lock (mutex_) ; while (!done_) { finishCv_.wait (lock); } }
finish ::Finish
函数调用后,WorkQueue
不再处理接受新的元素,也不再弹出旧的元素,waitUntilFinished
函数就能返回了。
1 2 3 4 5 6 7 8 9 10 void WorkQueue::finish () {{ std::lock_guard<std::mutex> lock (mutex_) ; assert (!done_); done_ = true ; } readerCv_.notify_all (); writerCv_.notify_all (); finishCv_.notify_all (); }
BlockRepSlot BlockRepSlot
是基于WorkQueue
实现的线程安全队列。
每个节点BlockRep*
记录一个block的相关数据及其状态,那么就可以由写线程将BlockRep
节点加入到BlockRepSlot
中,压缩等工作线程从BlockRepSlot
中通过take
函数取出BlockRep
节点,对其进行压缩等操作。
如此,就能完成多线程操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 struct BlockRep { Slice contents; Slice compressed_contents; std::unique_ptr<std::string> data; std::unique_ptr<std::string> compressed_data; CompressionType compression_type; std::unique_ptr<std::string> first_key_in_next_block; std::unique_ptr<Keys> keys; std::unique_ptr<BlockRepSlot> slot; Status status; }; class BlockRepSlot { public : BlockRepSlot () : slot_ (1 ) {} template <typename T> void Fill (T&& rep) { slot_.push (std::forward<T>(rep)); }; void Take (BlockRep*& rep) { slot_.pop (rep); } private : WorkQueue<BlockRep*> slot_; };
FileSizeEstimator 当并行压缩开启时,类 FileSizeEstimator
用于计算输出文件大小。主要是有两个回调函数 EmitBlock
、ReapBlock
:
EmitBlock
:在添加到压缩线程之前调用
ReapBlock
:在压缩完成之后调用
FileSizeEstimator
的源码简单如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class FileSizeEstimator { public : explicit FileSizeEstimator () : raw_bytes_compressed(0 ), raw_bytes_curr_block(0 ), raw_bytes_curr_block_set(false), raw_bytes_inflight(0 ), blocks_inflight(0 ), curr_compression_ratio(0 ), estimated_file_size(0 ) { } void EmitBlock (uint64_t raw_block_size, uint64_t curr_file_size) ; void ReapBlock (uint64_t compressed_block_size, uint64_t curr_file_size) ; void SetEstimatedFileSize (uint64_t size) { estimated_file_size.store (size, std::memory_order_relaxed); } uint64_t GetEstimatedFileSize () { return estimated_file_size.load (std::memory_order_relaxed); } void SetCurrBlockRawSize (uint64_t size) { raw_bytes_curr_block = size; raw_bytes_curr_block_set = true ; } private : uint64_t raw_bytes_compressed; uint64_t raw_bytes_curr_block; bool raw_bytes_curr_block_set; std::atomic<uint64_t > raw_bytes_inflight; std::atomic<uint64_t > blocks_inflight; std::atomic<double > curr_compression_ratio; std::atomic<uint64_t > estimated_file_size; };
EmitBlock EmitBlock
函数,在将一个待压缩的block发送给压缩线程之前调用,用于计算当所有block压缩完成后sst文件的近似大小。主要有三部分组成:
curr_file_size
:已写入到sst的部分
new_raw_bytes_inflight
:当前正在压缩,还没写入sst的部分。这部分按照压缩率curr_compression_ratio
大致折算成最终写入sst的大小。
kBlockTrailerSize
:每个block都有footer
,这部分不压缩,按照正在压缩的block数new_blocks_inflight
乘以每个footer
的大小kBlockTrailerSize
,即这部分写入sst的大小。
源码简单如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void EmitBlock (uint64_t raw_block_size, uint64_t curr_file_size) { uint64_t new_raw_bytes_inflight = raw_bytes_inflight.fetch_add (raw_block_size, std::memory_order_relaxed) + raw_block_size; uint64_t new_blocks_inflight = blocks_inflight.fetch_add (1 , std::memory_order_relaxed) + 1 ; estimated_file_size.store ( curr_file_size + static_cast <uint64_t >( static_cast <double >(new_raw_bytes_inflight) * curr_compression_ratio.load (std::memory_order_relaxed)) + new_blocks_inflight * kBlockTrailerSize, std::memory_order_relaxed); }
ReapBlock ReapBlock
,也是个回调函数 ,当一个block压缩完毕时调用,来更新当前压缩的状态:
raw_bytes_compressed
:到目前为止,有多少字节的数据被压缩了;
curr_compression_ratio
:已压缩的字节数 / 当前对应的原始字节数
重新估算最终写入sst文件的大小
代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 void ReapBlock (uint64_t compressed_block_size, uint64_t curr_file_size) { assert (raw_bytes_curr_block_set); uint64_t new_raw_bytes_compressed = raw_bytes_compressed + raw_bytes_curr_block; assert (new_raw_bytes_compressed > 0 ); curr_compression_ratio.store ( (curr_compression_ratio.load (std::memory_order_relaxed) * raw_bytes_compressed + compressed_block_size) / static_cast <double >(new_raw_bytes_compressed), std::memory_order_relaxed); raw_bytes_compressed = new_raw_bytes_compressed; uint64_t new_raw_bytes_inflight = raw_bytes_inflight.fetch_sub (raw_bytes_curr_block, std::memory_order_relaxed) - raw_bytes_curr_block; uint64_t new_blocks_inflight = blocks_inflight.fetch_sub (1 , std::memory_order_relaxed) - 1 ; estimated_file_size.store ( curr_file_size + static_cast <uint64_t >( static_cast <double >(new_raw_bytes_inflight) * curr_compression_ratio.load (std::memory_order_relaxed)) + new_blocks_inflight * kBlockTrailerSize, std::memory_order_relaxed); raw_bytes_curr_block_set = false ; }
ParallelCompressionRep 为了更好地理解 class ParallelCompressionRep
,下面在讲解源码时,会调整下不同字段的顺序。
ParallelCompressionRep
,使用顺序如下:
在ParallelCompressionRep
内部,会调用ParallelCompressionRep::PrepareBlock
接口,准备此block的数据;
调用 EmitBlock
函数发送给压缩线程;
当压缩完成,会调用ParallelCompressionRep::ReapBlock
接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 struct BlockBasedTableBuilder ::ParallelCompressionRep { class Keys { }; class BlockRepSlot { }; struct BlockRep { }; class FileSizeEstimator { }; typedef std::vector<BlockRep> BlockRepBuffer; typedef WorkQueue<BlockRep*> BlockRepPool; typedef WorkQueue<BlockRep*> CompressQueue; typedef WorkQueue<BlockRepSlot*> WriteQueue; std::unique_ptr<Keys> curr_block_keys; BlockRepBuffer block_rep_buf; BlockRepPool block_rep_pool; CompressQueue compress_queue; std::vector<port::Thread> compress_thread_pool; WriteQueue write_queue; std::unique_ptr<port::Thread> write_thread; FileSizeEstimator file_size_estimator; std::atomic<bool > first_block_processed; std::condition_variable first_block_cond; std::mutex first_block_mutex; explicit ParallelCompressionRep (uint32_t parallel_threads) : curr_block_keys(new Keys()), block_rep_buf(parallel_threads), block_rep_pool(parallel_threads), compress_queue(parallel_threads), write_queue(parallel_threads), first_block_processed(false) { for (uint32_t i = 0 ; i < parallel_threads; i++) { block_rep_buf[i].contents = Slice (); block_rep_buf[i].compressed_contents = Slice (); block_rep_buf[i].data.reset (new std::string ()); block_rep_buf[i].compressed_data.reset (new std::string ()); block_rep_buf[i].compression_type = CompressionType (); block_rep_buf[i].first_key_in_next_block.reset (new std::string ()); block_rep_buf[i].keys.reset (new Keys ()); block_rep_buf[i].slot.reset (new BlockRepSlot ()); block_rep_buf[i].status = Status::OK (); block_rep_pool.push (&block_rep_buf[i]); } } ~ParallelCompressionRep () { block_rep_pool.finish (); } BlockRep* PrepareBlock (CompressionType compression_type, const Slice* first_key_in_next_block, BlockBuilder* data_block) ; BlockRep* PrepareBlock (CompressionType compression_type, const Slice* first_key_in_next_block, std::string* data_block, std::vector<std::string>* keys) ; void EmitBlock (BlockRep* block_rep) ; void ReapBlock (BlockRep* block_rep) ; private : BlockRep* PrepareBlockInternal (CompressionType compression_type, const Slice* first_key_in_next_block) ;};
PrepareBlockInternal 下面,先来讲讲怎么创建一个 BlockRep
对象。
TODO block_rep_pool
的意义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 BlockRep* PrepareBlockInternal (CompressionType compression_type, const Slice* first_key_in_next_block) { BlockRep* block_rep = nullptr ; block_rep_pool.pop (block_rep); assert (block_rep != nullptr ); assert (block_rep->data); block_rep->compression_type = compression_type; if (first_key_in_next_block == nullptr ) { block_rep->first_key_in_next_block.reset (nullptr ); } else { block_rep->first_key_in_next_block->assign ( first_key_in_next_block->data (), first_key_in_next_block->size ()); } return block_rep; }
PrepareBlock 在将待压缩的block发送给压缩线程之前,要先准备好这个block。
PrepareBlock
函数,即用于完成这个过程。
compression_type
:压缩类型
first_key_in_next_block
:下一个block的第一个key
data_block
:待压缩的block的数据部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 BlockRep* PrepareBlock (CompressionType compression_type, const Slice* first_key_in_next_block, BlockBuilder* data_block) { BlockRep* block_rep = PrepareBlockInternal (compression_type, first_key_in_next_block); assert (block_rep != nullptr ); data_block->SwapAndReset (*(block_rep->data)); block_rep->contents = *(block_rep->data); std::swap (block_rep->keys, curr_block_keys); curr_block_keys->Clear (); return block_rep; }
PrepareBlock PrepareBlock
只是上述函数的重载。上述函数传入的是一个BlockBuilder*
,这里只是把原本一个BlockBuilder
记录的数据分别传入。
1 2 3 4 5 6 7 8 9 10 11 12 BlockRep* PrepareBlock (CompressionType compression_type, const Slice* first_key_in_next_block, std::string* data_block, std::vector<std::string>* keys) { BlockRep* block_rep = PrepareBlockInternal (compression_type, first_key_in_next_block); assert (block_rep != nullptr ); std::swap (*(block_rep->data), *data_block); block_rep->contents = *(block_rep->data); block_rep->keys->SwapAssign (*keys); return block_rep; }
EmitBlock 将上述准备好的block发送给压缩线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void EmitBlock (BlockRep* block_rep) { assert (block_rep != nullptr ); assert (block_rep->status.ok ()); if (!write_queue.push (block_rep->slot.get ())) { return ; } if (!compress_queue.push (block_rep)) { return ; } if (!first_block_processed.load (std::memory_order_relaxed)) { std::unique_lock<std::mutex> lock (first_block_mutex) ; first_block_cond.wait (lock, [this ] { return first_block_processed.load (std::memory_order_relaxed); }); } }
ReapBlock TODO ReapBlock
,当压缩完成时的回调函数。??? 应该是写入文件???
用于清除压缩数据,通知emit线程,可以继续压缩
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void ReapBlock (BlockRep* block_rep) { assert (block_rep != nullptr ); block_rep->compressed_data->clear (); block_rep_pool.push (block_rep); if (!first_block_processed.load (std::memory_order_relaxed)) { std::lock_guard<std::mutex> lock (first_block_mutex) ; first_block_processed.store (true , std::memory_order_relaxed); first_block_cond.notify_one (); } }
BlockBasedTableBuilder 初始化操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 BlockBasedTableBuilder::BlockBasedTableBuilder ( const BlockBasedTableOptions& table_options, const TableBuilderOptions& tbo, WritableFileWriter* file) { BlockBasedTableOptions sanitized_table_options (table_options) ; if (sanitized_table_options.format_version == 0 && sanitized_table_options.checksum != kCRC32c) { ROCKS_LOG_WARN ( tbo.ioptions.logger, "Silently converting format_version to 1 because checksum is " "non-default" ); sanitized_table_options.format_version = 1 ; } rep_ = new Rep (sanitized_table_options, tbo, file); if (rep_->filter_builder != nullptr ) { rep_->filter_builder->StartBlock (0 ); } if (table_options.block_cache_compressed.get () != nullptr ) { BlockBasedTable::GenerateCachePrefix <Cache, FSWritableFile>( table_options.block_cache_compressed.get (), file->writable_file (), &rep_->compressed_cache_key_prefix[0 ], &rep_->compressed_cache_key_prefix_size, tbo.db_session_id, tbo.cur_file_num); } if (rep_->IsParallelCompressionEnabled ()) { StartParallelCompression (); } }
ParallelCompression 下面从压缩开始。
每个data_block在写入sst之前,如果设置了压缩,则都会经过一个压缩的过程。
IsParallelCompressionEnabled 是否开启多线程压缩,则由 CompressionOptions::parallel_threads
字段的值指示。
1 2 3 bool BlockBasedTableBuilder::Rep::IsParallelCompressionEnabled () const { return compression_opts.parallel_threads > 1 ; }
StartParallelCompression 如果开启了多线程压缩,压缩流程如下:
在每次压缩前,主线程,使用ParallelCompressionRep::PrepareBlock
函数,准备好待压缩的block;
主线程调用ParallelCompressionRep::EmitBlock
函数,将block送入压缩线程;
压缩线程再进行压缩
将压缩完毕的data_block,写入到sst文件中
下面是开启多个压缩线程的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void BlockBasedTableBuilder::StartParallelCompression () { rep_->pc_rep.reset ( new ParallelCompressionRep (rep_->compression_opts.parallel_threads)); rep_->pc_rep->compress_thread_pool.reserve ( rep_->compression_opts.parallel_threads); for (uint32_t i = 0 ; i < rep_->compression_opts.parallel_threads; i++) { rep_->pc_rep->compress_thread_pool.emplace_back ([this , i] { BGWorkCompression (*(rep_->compression_ctxs[i]), rep_->verify_ctxs[i].get ()); }); } rep_->pc_rep->write_thread.reset ( new port::Thread ([this ] { BGWorkWriteRawBlock (); })); }
BGWorkCompression 压缩线程的入口函数,BGWorkCompression
,仅用于压缩data_block
,他一直在等待主线程发送待压缩的block,然后取出来,进行压缩。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void BlockBasedTableBuilder::BGWorkCompression (const CompressionContext& compression_ctx, UncompressionContext* verify_ctx) { ParallelCompressionRep::BlockRep* block_rep = nullptr ; while (rep_->pc_rep->compress_queue.pop (block_rep)) { assert (block_rep != nullptr ); CompressAndVerifyBlock (block_rep->contents, true , compression_ctx, verify_ctx, block_rep->compressed_data.get (), &block_rep->compressed_contents, &(block_rep->compression_type), &block_rep->status); block_rep->slot->Fill (block_rep); } }
CompressAndVerifyBlock CompressAndVerifyBlock` 函数,不仅尝试去压缩,而且会统计压缩过程中的一些信息。为便于下面的代码简洁,易于理解,把这部分去掉了,专注于功能。
sst是由一系列的block
组成,每个block
的格式都是:
1 2 3 block_data: uint8[n] type : uint8 crc: uint32
顺序的压缩过程如下:
当前待压缩的数据是raw_block_contents
,其大小不能超过 kCompressionSizeLimit
限制;
先尝试调用 CompressBlock
函数压缩,结果保存至block_contents
;
如果设置了校验压缩结果,即设置了 table_options.verify_compression
标志位,则会对block_contents
进行解压,保存至contents
,顺利的压缩,需要保证 block_contents
和 contents
相同;
如果一切顺利,则:
block_contents
中保存了raw_block_contents
的压缩结果,
type
保存了压缩类型;
代码简洁后如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 void BlockBasedTableBuilder::CompressAndVerifyBlock (const Slice& raw_block_contents, bool is_data_block, const CompressionContext& compression_ctx, UncompressionContext* verify_ctx, std::string* compressed_output, Slice* block_contents, CompressionType* type, Status* out_status) { Rep* r = rep_; bool is_status_ok = ok (); if (!r->IsParallelCompressionEnabled ()) { assert (is_status_ok); } *type = r->compression_type; uint64_t sample_for_compression = r->sample_for_compression; bool abort_compression = false ; StopWatchNano timer ( r->ioptions.clock, ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats)) ; if (is_status_ok && raw_block_contents.size () < kCompressionSizeLimit) { const CompressionDict* compression_dict; if (!is_data_block || r->compression_dict == nullptr ) { compression_dict = &CompressionDict::GetEmptyDict (); } else { compression_dict = r->compression_dict.get (); } assert (compression_dict != nullptr ); CompressionInfo compression_info (r->compression_opts, compression_ctx, *compression_dict, *type, sample_for_compression) ; std::string sampled_output_fast; std::string sampled_output_slow; *block_contents = CompressBlock (raw_block_contents, compression_info, type, r->table_options.format_version, is_data_block , compressed_output, &sampled_output_fast, &sampled_output_slow); if (*type != kNoCompression && r->table_options.verify_compression) { const UncompressionDict* verify_dict; if (!is_data_block || r->verify_dict == nullptr ) { verify_dict = &UncompressionDict::GetEmptyDict (); } else { verify_dict = r->verify_dict.get (); } assert (verify_dict != nullptr ); BlockContents contents; UncompressionInfo uncompression_info (*verify_ctx, *verify_dict, r->compression_type) ; Status stat = UncompressBlockContentsForCompressionType ( uncompression_info, block_contents->data (), block_contents->size (), &contents, r->table_options.format_version, r->ioptions); if (stat.ok ()) { bool compressed_ok = contents.data.compare (raw_block_contents) == 0 ; if (!compressed_ok) { abort_compression = true ; ROCKS_LOG_ERROR (r->ioptions.logger, "Decompressed block did not match raw block" ); *out_status = Status::Corruption ("Decompressed block did not match raw block" ); } } else { *out_status = Status::Corruption (std::string ("Could not decompress: " ) + stat.getState ()); abort_compression = true ; } } } else { if (is_data_block) { r->uncompressible_input_data_bytes.fetch_add (raw_block_contents.size (), std::memory_order_relaxed); } abort_compression = true ; } if (abort_compression) { *type = kNoCompression; *block_contents = raw_block_contents; } }
BGWorkWriteRawBlock 在BGWorkCompression
函数,我们可以看到,压缩线程主要有两个动作:
先压缩 data block
,并将相关信息记录在block_rep
中
再将 block_rep
保存在 block_rep->slot
中
此时呢,可以回顾下 ParallelCompressionRep::EmitBlock
函数,block_rep->slot
一开始就就加入到 write_queue
中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void EmitBlock (BlockRep* block_rep) { if (!write_queue.push (block_rep->slot.get ())) { return ; } if (!compress_queue.push (block_rep)) { return ; } if (!first_block_processed.load (std::memory_order_relaxed)) { std::unique_lock<std::mutex> lock (first_block_mutex) ; first_block_cond.wait (lock, [this ] { return first_block_processed.load (std::memory_order_relaxed); }); }
这说明什么?
如果前面的压缩部分没有完成,则当执行到BGWorkWriteRawBlock
函数时,会一直阻塞在:
无论前面的 CompressAndVerifyBlock
函数是否压缩成功,都需要调用ReapBlock
函数,以防止EmitBlock
处产生死锁。
Of Course,若前面压缩成功,则还大致需要执行以下流程:
先将当前block
中的每个key添加到 filter_builder
、index_builder
;
再生成 filter_block
、idnex_block
;
将block压缩后的数据写入sst
下面来看看细节。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 void BlockBasedTableBuilder::BGWorkWriteRawBlock () { Rep* r = rep_; ParallelCompressionRep::BlockRepSlot* slot = nullptr ; ParallelCompressionRep::BlockRep* block_rep = nullptr ; while (r->pc_rep->write_queue.pop (slot)) { assert (slot != nullptr ); slot->Take (block_rep); assert (block_rep != nullptr ); if (!block_rep->status.ok ()) { r->SetStatus (block_rep->status); block_rep->status = Status::OK (); r->pc_rep->ReapBlock (block_rep); continue ; } for (size_t i = 0 ; i < block_rep->keys->Size (); i++) { auto & key = (*block_rep->keys)[i]; if (r->filter_builder != nullptr ) { size_t ts_sz = r->internal_comparator.user_comparator ()->timestamp_size (); r->filter_builder->Add (ExtractUserKeyAndStripTimestamp (key, ts_sz)); } r->index_builder->OnKeyAdded (key); } r->pc_rep->file_size_estimator.SetCurrBlockRawSize (block_rep->data->size ()); WriteRawBlock (block_rep->compressed_contents, block_rep->compression_type, &r->pending_handle, true , &block_rep->contents); if (!ok ()) break ; if (r->filter_builder != nullptr ) { r->filter_builder->StartBlock (r->get_offset ()); } r->props.data_size = r->get_offset (); ++r->props.num_data_blocks; if (block_rep->first_key_in_next_block == nullptr ) { r->index_builder->AddIndexEntry (&(block_rep->keys->Back ()), nullptr , r->pending_handle); } else { Slice first_key_in_next_block = Slice (*block_rep->first_key_in_next_block); r->index_builder->AddIndexEntry (&(block_rep->keys->Back ()), &first_key_in_next_block, r->pending_handle); } r->pc_rep->ReapBlock (block_rep); } }
WriteRawBlock 最后一步,就是要将压缩完的数据按照如下格式写入到sst文件中:
1 2 3 4 block_data: uint8[n] type : uint8 crc: uint32 padding
注意,WriteRawBlock
函数的传入的参数handle
,在函数返回时记录了关于block的两个元信息:
此block在sst文件中存储的起始位置
此block压缩后的大小。
这个参数,实际上由Add(key, value)
函数中的r->pending_handle
传入,后续写入index builder
,建立index block
,这个在后面会讲解。
下面,简洁了部分代码后如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 void BlockBasedTableBuilder::WriteRawBlock (const Slice& block_contents, CompressionType type, BlockHandle* handle, bool is_data_block, const Slice* raw_block_contents) { Rep* r = rep_; Status s = Status::OK (); IOStatus io_s = IOStatus::OK (); StopWatch sw (r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS) ; handle->set_offset (r->get_offset ()); handle->set_size (block_contents.size ()); assert (status ().ok ()); assert (io_status ().ok ()); io_s = r->file->Append (block_contents); if (io_s.ok ()) { char trailer[kBlockTrailerSize]; trailer[0 ] = type; uint32_t checksum = 0 ; switch (r->table_options.checksum) { } EncodeFixed32 (trailer + 1 , checksum); assert (io_s.ok ()); TEST_SYNC_POINT_CALLBACK ( "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum" , static_cast <char *>(trailer)); io_s = r->file->Append (Slice (trailer, kBlockTrailerSize)); if (s.ok () && io_s.ok ()) { r->set_offset (r->get_offset () + block_contents.size () + kBlockTrailerSize); if (r->table_options.block_align && is_data_block) { size_t pad_bytes = (r->alignment - ((block_contents.size () + kBlockTrailerSize) & (r->alignment - 1 ))) & (r->alignment - 1 ); io_s = r->file->Pad (pad_bytes); if (io_s.ok ()) { r->set_offset (r->get_offset () + pad_bytes); } else { r->SetIOStatus (io_s); } } if (r->IsParallelCompressionEnabled ()) { if (is_data_block) { r->pc_rep->file_size_estimator.ReapBlock (block_contents.size (), r->get_offset ()); } else { r->pc_rep->file_size_estimator.SetEstimatedFileSize (r->get_offset ()); } } } } else { r->SetIOStatus (io_s); } if (!io_s.ok () && s.ok ()) { r->SetStatus (io_s); } }
StopParallelCompression 和启动多线程部分相应,停止压缩线程、写线程。
1 2 3 4 5 6 7 8 void BlockBasedTableBuilder::StopParallelCompression () { rep_->pc_rep->compress_queue.finish (); for (auto & thread : rep_->pc_rep->compress_thread_pool) { thread.join (); } rep_->pc_rep->write_queue.finish (); rep_->pc_rep->write_thread->join (); }
Add 好嘞,终于到了这里。
经过前面压缩部分的铺垫,相信这里会让你更加易懂。
每个key的编码信息中,都包含着一个value_type
信息,关于value_type
,详细地后续再说,这里一点,IsValueType
返回值为true,表示这对{k, v}
可以写入到sst文件中。
1 2 3 4 5 6 7 8 inline bool IsValueType (ValueType t) { return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeBlobIndex || kTypeDeletionWithTimestamp == t; } inline bool IsExtendedValueType (ValueType t) { return IsValueType (t) || t == kTypeRangeDeletion; }
BlockBasedTableBuilder
在添加{k, v}
并写入到sst文件的过程,会经历三个阶段State
。
State 1 2 3 4 5 enum class State { kBuffered, kUnbuffered, kClosed, };
在kBuffered
模式下,将待压缩、待写入sst文件的data_block
暂时缓存在r->data_block_buffer
中。当缓存中的数据长度r->data_begin_offset
超过限制buffer_limit
时,就会进入到kUnbuffered
模式。
一旦进入到kUnbuffered
,就不可逆转到kBuffered
。最终只能在调用 BlockBasedTableBuilder::Finish
时进入kClosed
模式。
should_flush 每个BlockBasedTableBuilder
中都有个刷新策略r->flush_block_policy
:将当前{k, v}
添加到 r->data_block
,若会触发更新,则会先将当前 r->data_block
的数据进行flush
:
若 r->data_begin_offset < r->buffer_limit
:暂时缓存到 r->data_block_buffer
中;
否则,会先压缩,再写入sst文件。
此外,由于多线程的压缩、写入sst的过程在相应的子线程中完成,当没有开启多线程时,一些细节需要在主线程中单独进行处理,因此在下面的代码中会经常看到下面的代码结构:
1 2 3 4 if (r->IsParallelCompressionEnabled()) { } else { // 对单线程进行处理 }
只要理解了之前讲解的多线程逻辑,下面的Add
函数会很好理解。
代码简略后如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 void BlockBasedTableBuilder::Add (const Slice& key, const Slice& value) { Rep* r = rep_; assert (rep_->state != Rep::State::kClosed); if (!ok ()) return ; ValueType value_type = ExtractValueType (key); if (IsValueType (value_type)) { auto should_flush = r->flush_block_policy->Update (key, value); if (should_flush) { assert (!r->data_block.empty ()); r->first_key_in_next_block = &key; Flush (); if (r->state == Rep::State::kBuffered && r->buffer_limit != 0 &&r->data_begin_offset > r->buffer_limit) { EnterUnbuffered (); } if (ok () && r->state == Rep::State::kUnbuffered) { if (r->IsParallelCompressionEnabled ()) { r->pc_rep->curr_block_keys->Clear (); } else { r->index_builder->AddIndexEntry (&r->last_key, &key, r->pending_handle); } } } if (r->state == Rep::State::kUnbuffered) { if (r->IsParallelCompressionEnabled ()) { r->pc_rep->curr_block_keys->PushBack (key); } else { if (r->filter_builder != nullptr ) { size_t ts_sz = r->internal_comparator.user_comparator ()->timestamp_size (); r->filter_builder->Add (ExtractUserKeyAndStripTimestamp (key, ts_sz)); } } } r->last_key.assign (key.data (), key.size ()); r->data_block.Add (key, value); if (r->state == Rep::State::kBuffered) { } else { if (!r->IsParallelCompressionEnabled ()) { r->index_builder->OnKeyAdded (key); } } NotifyCollectTableCollectorsOnAdd (key, value, r->get_offset (), r->table_properties_collectors, r->ioptions.logger); } else if (value_type == kTypeRangeDeletion) { r->range_del_block.Add (key, value); NotifyCollectTableCollectorsOnAdd (key, value, r->get_offset (), r->table_properties_collectors, r->ioptions.logger); } else { assert (false ); } }
Flush 在Flush
函数中,针对多线程和单线程两种模式:
如果开启了多线程 && 已经处于 kUnbuffered
模式,则调用多线程压缩、写入sst文件。
否则,直接调用单线程的压缩写入。
下面来看看源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 void BlockBasedTableBuilder::Flush () { Rep* r = rep_; assert (rep_->state != Rep::State::kClosed); if (!ok ()) return ; if (r->data_block.empty ()) return ; if (r->IsParallelCompressionEnabled () && r->state == Rep::State::kUnbuffered) { r->data_block.Finish (); ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock ( r->compression_type, r->first_key_in_next_block, &(r->data_block)); assert (block_rep != nullptr ); assert (r->data_block.emoty ()); r->pc_rep->file_size_estimator.EmitBlock (block_rep->data->size (), r->get_offset ()); r->pc_rep->EmitBlock (block_rep); } else { WriteBlock (&r->data_block, &r->pending_handle, true ); } }
WriteBlock 调用 WriteBlock
函数,不仅会来自于上面的 Flush
函数,还有会后面的WriteIndexBlock
、EnterUnbuffered
函数等。因此,WriteBlock
函数做了统一接口,来应对 State::kBuffered
和 State::kUnBuffered
两种状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void BlockBasedTableBuilder::WriteBlock (BlockBuilder* block, BlockHandle* handle, bool is_data_block) { block->Finish (); std::string raw_block_contents; block->SwapAndReset (raw_block_contents); if (rep_->state == Rep::State::kBuffered) { assert (is_data_block); rep_->data_block_buffers.emplace_back (std::move (raw_block_contents)); rep_->data_begin_offset += rep_->data_block_buffers.back ().size (); return ; } WriteBlock (raw_block_contents, handle, is_data_block); }
WriteBlock 重载形式的WriteBlock
,只用于单线程压缩(没有写入sst文件的操作),且BlockBasedTableBuilder
当前处于kUnbuffered
模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void BlockBasedTableBuilder::WriteBlock (const Slice& raw_block_contents, BlockHandle* handle, bool is_data_block) { Rep* r = rep_; assert (r->state == Rep::State::kUnbuffered); Slice block_contents; CompressionType type; Status compress_status; CompressAndVerifyBlock (raw_block_contents, is_data_block, *(r->compression_ctxs[0 ]), r->verify_ctxs[0 ].get (), &(r->compressed_output), &(block_contents), &type, &compress_status); r->SetStatus (compress_status); if (!ok ()) { return ; }
EnterUnbuffered EnterUnbuffered
函数,将BlockBasedTableBuilder
的状态,从kBuffered
推向kUnBuffered
,仅会执行一次。
那么这个函数的使命?
我们知道在 kBuffered
模式下,每次调用Flush
函数时,都是将r->data_block
的数据缓存到r->data_block_buffers
。
当从kBuffered
专向kUnBuffered
时,很自然,就需要让r->data_block_buffers
中的数据也经历两个过程:
带着这个思路,下面的代码就很好理解。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 void BlockBasedTableBuilder::EnterUnbuffered () { Rep* r = rep_; assert (r->state == Rep::State::kBuffered); r->state = Rep::State::kUnbuffered; auto get_iterator_for_block = [&r](size_t i) { auto & data_block = r->data_block_buffers[i]; assert (!data_block.empty ()); Block reader{BlockContents{data_block}}; DataBlockIter* iter = reader.NewDataIterator (r->internal_comparator.user_comparator (), kDisableGlobalSequenceNumber); iter->SeekToFirst (); assert (iter->Valid ()); return std::unique_ptr <DataBlockIter>(iter); }; std::unique_ptr<DataBlockIter> iter = nullptr , next_block_iter = nullptr ; for (size_t i = 0 ; ok () && i < r->data_block_buffers.size (); ++i) { if (iter == nullptr ) { iter = get_iterator_for_block (i); assert (iter != nullptr ); }; if (i + 1 < r->data_block_buffers.size ()) { next_block_iter = get_iterator_for_block (i + 1 ); } auto & data_block = r->data_block_buffers[i]; if (r->IsParallelCompressionEnabled ()) { Slice first_key_in_next_block; const Slice* first_key_in_next_block_ptr = &first_key_in_next_block; if (i + 1 < r->data_block_buffers.size ()) { assert (next_block_iter != nullptr ); first_key_in_next_block = next_block_iter->key (); } else { first_key_in_next_block_ptr = r->first_key_in_next_block; } std::vector<std::string> keys; for (; iter->Valid (); iter->Next ()) { keys.emplace_back (iter->key ().ToString ()); } ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock ( r->compression_type, first_key_in_next_block_ptr, &data_block, &keys); assert (block_rep != nullptr ); r->pc_rep->file_size_estimator.EmitBlock (block_rep->data->size (), r->get_offset ()); r->pc_rep->EmitBlock (block_rep); } else { for (; iter->Valid (); iter->Next ()) { Slice key = iter->key (); if (r->filter_builder != nullptr ) { size_t ts_sz = r->internal_comparator.user_comparator ()->timestamp_size (); r->filter_builder->Add (ExtractUserKeyAndStripTimestamp (key, ts_sz)); } r->index_builder->OnKeyAdded (key); } WriteBlock (Slice (data_block), &r->pending_handle, true ); if (ok () && i + 1 < r->data_block_buffers.size ()) { assert (next_block_iter != nullptr ); Slice first_key_in_next_block = next_block_iter->key (); Slice* first_key_in_next_block_ptr = &first_key_in_next_block; iter->SeekToLast (); std::string last_key = iter->key ().ToString (); r->index_builder->AddIndexEntry (&last_key, first_key_in_next_block_ptr, r->pending_handle); } } std::swap (iter, next_block_iter); } r->data_block_buffers.clear (); }
Finish 在前文说过,一个table序列化到sst文件的格式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <beginning_of_file> [data block 1] [data block 2] ... [data block N] [meta block 1: filter block] [meta block 2: index block] [meta block 3: compression dictionary block] [meta block 4: range deletion block] [meta block 5: stats block] ... [meta block K: future extended block] [metaindex block] [Footer] <end_of_file>
当调用::Finish
函数时,data block
部分已经构建完毕,下面就需要开始构建meta block
。
前面的filter block
、index block
、compression dictionary block
、range deletion block
、prop block
等记录着data block
各种信息,我们把这些记录data block
信息的block
统一叫做meta block
。所谓meta
,即信息的信息。
MetaIndexBuilder
,则用于存储前面这些meta block
在sst中的存储位置及其大小,最终用于构建整个Table
的footer
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class MetaIndexBuilder {public : MetaIndexBuilder () : meta_index_block_ (new BlockBuilder (1 )) {} MetaIndexBuilder (const MetaIndexBuilder&) = delete ; MetaIndexBuilder& operator =(const MetaIndexBuilder&) = delete ; void Add (const std::string& key, const BlockHandle& handle) { std::string handle_encoding; handle.EncodeTo (&handle_encoding); meta_block_handles_.emplace (key, std::move (handle_encoding)); } Slice Finish () { for (const auto & metablock : meta_block_handles_) { meta_index_block_->Add (metablock.first, metablock.second); } return meta_index_block_->Finish () } private : stl_wrappers::KVMap meta_block_handles_; std::unique_ptr<BlockBuilder> meta_index_block_; };
有了这个思路,下面,就可以先来看看整体代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 Status BlockBasedTableBuilder::Finish () { Rep* r = rep_; assert (r->state != Rep::State::kClosed); bool empty_data_block = r->data_block.empty (); r->first_key_in_next_block = nullptr ; Flush (); if (r->state == Rep::State::kBuffered) { EnterUnbuffered (); } if (r->IsParallelCompressionEnabled ()) { StopParallelCompression (); } else { if (ok () && !empty_data_block) { r->index_builder->AddIndexEntry (&r->last_key, nullptr , r->pending_handle); } } BlockHandle metaindex_block_handle, index_block_handle; MetaIndexBuilder meta_index_builder; WriteFilterBlock (&meta_index_builder); WriteIndexBlock (&meta_index_builder, &index_block_handle); WriteCompressionDictBlock (&meta_index_builder); WriteRangeDelBlock (&meta_index_builder); WritePropertiesBlock (&meta_index_builder); if (ok ()) { WriteRawBlock (meta_index_builder.Finish (), kNoCompression, &metaindex_block_handle); } if (ok ()) { WriteFooter (metaindex_block_handle, index_block_handle); } r->state = Rep::State::kClosed; r->SetStatus (r->CopyIOStatus ()); Status ret_status = r->CopyStatus (); assert (!ret_status.ok () || io_status ().ok ()); return ret_status; }
WriteFilterBlock WriteFilterBlock
函数的逻辑如下:
如果不是 PartitionFilterBuilder
,则直接将整个FilterBuiler
构建的数据,在直接序列化后写入sst文件。
如果是PartitionFilterBuilder
,则以partition
为单位,逐个写入到sst文件;
等上述过程完毕,再将将此 FilterBuilder
的属性作为{k, v}
添加到 meta_index_builder
中:
k
:是FilterBuilderType.FilterPolicyName
;
v
:是写入sst文件的filter block
的元信息。
代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 void BlockBasedTableBuilder::WriteFilterBlock (MetaIndexBuilder* meta_index_builder) { BlockHandle filter_block_handle; bool empty_filter_block = (rep_->filter_builder == nullptr || rep_->filter_builder->IsEmpty ()); if (ok () && !empty_filter_block) { rep_->props.num_filter_entries += rep_->filter_builder->EstimateEntriesAdded (); Status s = Status::Incomplete (); while (ok () && s.IsIncomplete ()) { Slice filter_content = rep_->filter_builder->Finish (filter_block_handle, &s); assert (s.ok () || s.IsIncomplete ()); rep_->props.filter_size += filter_content.size (); WriteRawBlock (filter_content, kNoCompression, &filter_block_handle); } } if (ok () && !empty_filter_block) { std::string key; if (rep_->filter_builder->IsBlockBased ()) { key = BlockBasedTable::kFilterBlockPrefix; } else { key = rep_->table_options.partition_filters ? BlockBasedTable::kPartitionedFilterBlockPrefix : BlockBasedTable::kFullFilterBlockPrefix; } key.append (rep_->table_options.filter_policy->Name ()); meta_index_builder->Add (key, filter_block_handle); } }
WriteIndexBlock 由前文可知,IndexBuiler
对外提供了两种:
HashIndexBuilder
PartitionIndexBuiler
无论是哪种,最后的index block
结果都是由 IndexBuilder::IndexBlocks::index_block_contents
保存。
1 2 3 4 struct IndexBuilder ::IndexBlocks { Slice index_block_contents; std::unordered_map<std::string, Slice> meta_blocks; };
区别在于:
PartitionIndexBuiler
有很多个partition,而且这个partition的数量和 PartitionFilterBuilder 中的 partition 的数量一致。而HashIndexBuilder
可等效的看做只有一个partition;
IndexBuilder::IndexBlocks::meta_blocks
字段仅有HashIndexBuilder
使用。
因此,WriteIndexBlock
函数的最终目的也是将所有partitions
的内容写入sst。写入sst的所有信息都记录在index_block_handle
中,这用于后续的footer
。
下面,带着上述理解,并顺着代码注释来阅读源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 void BlockBasedTableBuilder::WriteIndexBlock ( MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) { IndexBuilder::IndexBlocks index_blocks; auto index_builder_status = rep_->index_builder->Finish (&index_blocks); if (index_builder_status.IsIncomplete ()) { assert (index_blocks.meta_blocks.empty ()); } else if (ok () && !index_builder_status.ok ()) { rep_->SetStatus (index_builder_status); } if (ok ()) { for (const auto & item : index_blocks.meta_blocks) { BlockHandle block_handle; WriteBlock (item.second, &block_handle, false ); if (!ok ()) { break ; } meta_index_builder->Add (item.first, block_handle); } } if (ok ()) { if (rep_->table_options.enable_index_compression) { WriteBlock (index_blocks.index_block_contents, index_block_handle, false ); } else { WriteRawBlock (index_blocks.index_block_contents, kNoCompression, index_block_handle); } } if (index_builder_status.IsIncomplete ()) { Status s = Status::Incomplete (); while (ok () && s.IsIncomplete ()) { s = rep_->index_builder->Finish (&index_blocks, *index_block_handle); if (!s.ok () && !s.IsIncomplete ()) { rep_->SetStatus (s); return ; } if (rep_->table_options.enable_index_compression) { WriteBlock (index_blocks.index_block_contents, index_block_handle, false ); } else { WriteRawBlock (index_blocks.index_block_contents, kNoCompression, index_block_handle); } } } }
最后就是写入footer。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 void BlockBasedTableBuilder::WriteFooter (BlockHandle& metaindex_block_handle, BlockHandle& index_block_handle) { Rep* r = rep_; bool legacy = (r->table_options.format_version == 0 ); assert (r->table_options.checksum == kCRC32c || r->table_options.format_version != 0 ); Footer footer ( legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber, r->table_options.format_version) ; footer.set_metaindex_handle (metaindex_block_handle); footer.set_index_handle (index_block_handle); footer.set_checksum (r->table_options.checksum); std::string footer_encoding; footer.EncodeTo (&footer_encoding); assert (ok ()); IOStatus ios = r->file->Append (footer_encoding); if (ios.ok ()) { r->set_offset (r->get_offset () + footer_encoding.size ()); } else { r->SetIOStatus (ios); r->SetStatus (ios); } }