WAL 在 DB::Open 时就会创建,用于持久化 MemTable 中尚未 Flush 的数据。具体写入 WAL 的格式可以参考 Write-Ahead-Log-File-Format 及其对应实现 DBImpl::WriteToWAL 函数,本文及后续几期主要关注 WAL/MemTable 的生命周期管理。
PreprocessWrite 在写 WAL/MemTable 之前,会先在 DBImpl::PreprocessWrite
函数中基于当前 RocksDB 的一些指标判断是否需要 Flush MemTable,创建新的 WAL/MemTable 再接受本次写入:
当前 WAL 的大小 total_log_size_
是否超过阈值 DBOptions::max_total_wal_size
根据 write_buffer_manager_ 判断当前 MemTable 是否需要 Flush
flush_scheduler_.empty() 为 false,则说明上次某个 CF MemTable 写满了
上述三种触发条件的处理方式,都需要先创建新的 WAL/MemTable,然后生成 Flush 请求 flush_request,并且触发后台线程去消费该 flush_request。
正常情况下,发起 Flush 请求后,不用等待后台 Flush 完成,新的 WAL/MemTable 就可以继续接受后续读写请求,
但是如果写的压力过大,则会进一步限制,比如 DelayWrite,WriteStall,恢复正常后才会继续接受新的写入。
上述三种的不同触发条件,不同在于如何选择 ColumnFamily。比如 条件(1)WAL 的大小超过阈值,则所有 ColumnFamilys 都需要进行更换 WAL/MemTable;而条件(3)则只是需要某个具体的 CF MemTable 满了。因此只是 FlushReason 不同,处理过程都类似。
下面针对核心处理流程开始讲解。
SwitchMemTable SwitchMemtable 函数是针对具体的 ColumnFamily 创建 WAL/MemTable ,因此需要上层先筛选出需要切换 MemTable 的 CF,再传递给此函数。
由前文可知,进入 PreprocessWrite 函数时已经是单线程操作,而 SwitchMemtable 函数仍需要在 DBImpl::_mutex
的保护下才能调用,这是为了与后台 Flush 线程互斥。
根据 log_empty_
字段来判断是否需要创建新的 WAL,即当前 WAL 是否有数据写入,如果没有则不创建新的 WAL。主要是可能要为多个 CFs 创建新的 WAL/MemTable,但是 WAL 是共享的,只需要第一次创建 WAL 即可,后续的 ColumnFamily 检测到 log_empty_
为空,则不再重复创建。
每个 WAL 都有一个对应的编号 logfile_num
,由 VersionSet::NewFileNumber
函数来生成。
DBOptions::recycle_log_file_num 决定是否复用生命周期已结束的 WALs,log_recycle_files_
字段用于保存生命周期已结束待删除的 WALs。此如果 recycle_log_file_num > 0
则按照 FIFO 规则从 log_recycle_files_
取出生命周期最早结束的 WAL 的 logfile_number,因此可以 truncate 旧 WAL 文件再接受新数据。
在 RocksDB 中,数据存储目录在 DBOptions::wal_dir ,其值就是 db_name,每个 WAL 文件名就是wal_dir
+logfile_number
+.log
。因此,后文一般提及 log 都是指 WAL 文件。
比如:编译完 RocksDB,并执行 build/examples/simple_example ,在 /tmp/rocksdb_simple_example 目录下会有如下以 log 为后缀的文件。
1 2 3 4 5 6 7 $ ll /tmp/rocksdb_simple_example total 1832 -rw-r--r-- 1 root root 1045 Jul 10 08:27 000326.sst -rw-r--r-- 1 root root 999 Jul 24 14:17 000331.sst -rw-r--r-- 1 root root 68 Jul 24 14:18 000332.log -rw-r--r-- 1 root root 999 Jul 24 14:18 000336.sst
log_recycle_files_
等字段会收到 FlushJob 的影响,因此需要在 DBImpl::mutex_
的保护下调用。而 CreateWAL 只是影响局部变量 new_log,不需要 mutex_ 保护,同理构建新的 MemTable 对象 new_mem 也不需要 mutex_。
下面是创建 new_log 和 new_mem 的核心代码及其注释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 Status DBImpl::SwitchMemtable (ColumnFamilyData* cfd, WriteContext* context) { mutex_.AssertHeld (); log::Writer* new_log = nullptr ; MemTable* new_mem = nullptr ; IOStatus io_s; bool creating_new_log = !log_empty_; uint64_t recycle_log_number = 0 ; if (creating_new_log && immutable_db_options_.recycle_log_file_num && !log_recycle_files_.empty ()) { recycle_log_number = log_recycle_files_.front (); } uint64_t new_log_number = creating_new_log ? versions_->NewFileNumber () : logfile_number_; const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions (); const auto preallocate_block_size = GetWalPreallocateBlockSize (mutable_cf_options.write_buffer_size); mutex_.Unlock (); if (creating_new_log) { io_s = CreateWAL (new_log_number, recycle_log_number, preallocate_block_size, &new_log); if (s.ok ()) { s = io_s; } } if (s.ok ()) { SequenceNumber seq = versions_->LastSequence (); new_mem = cfd->ConstructNewMemtable (mutable_cf_options, seq); context->superversion_context.NewSuperVersion (); } cfd->mem ()->ConstructFragmentedRangeTombstones (); }
CreateWAL WAL 的创建也比较简单,如代码注释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 IOStatus DBImpl::CreateWAL (uint64_t log_file_num, uint64_t recycle_log_number, size_t preallocate_block_size, log::Writer** new_log) { IOStatus io_s; std::unique_ptr<FSWritableFile> lfile; DBOptions db_options = BuildDBOptions (immutable_db_options_, mutable_db_options_); FileOptions opt_file_options = fs_->OptimizeForLogWrite (file_options_, db_options); std::string wal_dir = immutable_db_options_.GetWalDir (); std::string log_fname = LogFileName (wal_dir, log_file_num); if (recycle_log_number) { std::string old_log_fname = LogFileName (wal_dir, recycle_log_number); io_s = fs_->ReuseWritableFile (log_fname, old_log_fname, opt_file_options, &lfile, nullptr ); } else { io_s = NewWritableFile (fs_.get (), log_fname, &lfile, opt_file_options); } if (io_s.ok ()) { lfile->SetWriteLifeTimeHint (CalculateWALWriteHint ()); lfile->SetPreallocationBlockSize (preallocate_block_size); const auto & listeners = immutable_db_options_.listeners; FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types; std::unique_ptr<WritableFileWriter> file_writer (new WritableFileWriter( std::move(lfile), log_fname, opt_file_options, immutable_db_options_.clock, io_tracer_, nullptr , listeners, nullptr , tmp_set.Contains(FileType::kWalFile), tmp_set.Contains(FileType::kWalFile))) ; *new_log = new log::Writer (std::move (file_writer), log_file_num, immutable_db_options_.recycle_log_file_num > 0 , immutable_db_options_.manual_wal_flush, immutable_db_options_.wal_compression); io_s = (*new_log)->AddCompressionTypeRecord (); } return io_s; }
ConstructNewMemtable WAL 创建成功后,由 MemTable 所属的 ColumnFamily 构建 MemTable。
1 2 3 4 5 MemTable* ColumnFamilyData::ConstructNewMemtable ( const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) { return new MemTable (internal_comparator_, ioptions_, mutable_cf_options, write_buffer_manager_, earliest_seq, id_); }
ColumnFamily 上面 new_log, new_mem 成功创建完后,下面对 ColumnFamily 的 WAL/MemTable 相关的元信息进行更新。
首先需要将当前 cur_log_writer 中已有的数据 flush 到操作系统中。实际上在 LogWriter::AddRecord 内部,每次写完都会进行一次 flush,将数据刷操作系统page cache 中,由操作系统决定何时将数据写入文件。flush 只能保证 RocksDB 进程中途挂了再重启数据不会丢,但是如果机器挂了数据还是有丢的可能,如果想要进一步保证安全就需要 sync 操作,将操作系统缓存层的数据同步到磁盘,很显然这个操作会拖慢写入速度。RocksDB 也配置了开启选项 WriteOptions::sync: bool ,默认 false。
logs_
字段是个map,映射关系是{logfile_number_, log_writer},记录了每一个log_writer及其对应的logfile_number,alive_log_files_
记录是 logfile_number及其log_writer写入的数据量大小。由于 WAL 的生命周期和 ColumnFamily 有关,因此需要 logs_
和 alive_log_files_
来记录一个 RocksDB 实例中所有生命周期尚未结束的 WALs。
因此,当创建了新的 WAL ,需要在 log_write_mutex_ 的保护下添加新的记录。后续 FLushJob 完成了,也会相应删除生命周期结束的 WAL 记录。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 mutex_.Lock (); if (recycle_log_number != 0 ) { log_recycle_files_.pop_front (); } if (s.ok () && creating_new_log) { InstrumentedMutexLock l (&log_write_mutex_) ; if (!logs_.empty ()) { log::Writer* cur_log_writer = logs_.back ().writer; io_s = cur_log_writer->WriteBuffer (); if (s.ok ()) { s = io_s; } } if (s.ok ()) { logfile_number_ = new_log_number; log_empty_ = true ; log_dir_synced_ = false ; logs_.emplace_back (logfile_number_, new_log); alive_log_files_.push_back (LogFileNumberSize (logfile_number_)); } }
前置工作已经完成,下面要开始更新 ColumnFaimly了。
由于传递给 SwitchMemTable 的 CF 并非都是因为自己的 MemTable 满了,可能是因为某个 CF0 把 WAL 写满了,导致所有的 MmeTable 都要切换,比如上述条件(1)。有些 CF1 可能仍然是空的,此时就需要更新 CF1 当前的 LogNumber。 这样就可以删除 CF1 之前指向的 WAL。
比如:CF0,CF1 当前都指向 log1,由于 CF0 一直写入数据,导致 log1 满了。此时需要创建新的 WAL 文件 log2。但是 CF1 一直没有写入数据,此时同步将 CF1 当前 log 指针 log_number_
指向 log2,使得指向 log1 的引用就是0,因此就可以在 MemTable 完成 Flush 完成后,将 log1 删除。
其余解释见代码,这部分代码简化如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 bool empty_cf_updated = false ;if (!empty_cf_updated) { for (auto cf : *versions_->GetColumnFamilySet ()) { if (cf->IsEmpty ()) { if (creating_new_log) { cf->SetLogNumber (logfile_number_); } cf->mem ()->SetCreationSeq (versions_->LastSequence ()); } } } cfd->mem ()->SetNextLogNumber (logfile_number_); cfd->imm ()->Add (cfd->mem (), &context->memtables_to_free_); new_mem->Ref (); cfd->SetMemtable (new_mem); InstallSuperVersionAndScheduleWork (cfd, &context->superversion_context, mutable_cf_options);
GenerateFlushRequest SwitchMemTable,完成了 WAL/MemTable 的切换。紧接着就会尝试发起一次 flush 请求。Flush 的对象是从每个 CF 的 ImmutableMemtables 中提取的,因此 FlushRequest 需要保存每个 CF 的 max_memtable_id。
CF 中的每个 MemTable 都有一个单增的 id_
,用于追踪 Flush。在使用 ColumnFamilyData::SetMemtable
函数添加到 CF 中时更新 MemTable::id_
字段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void DBImpl::GenerateFlushRequest (const autovector<ColumnFamilyData*>& cfds, FlushReason flush_reason, FlushRequest* req) { assert (req != nullptr ); req->flush_reason = flush_reason; req->cfd_to_max_mem_id_to_persist.reserve (cfds.size ()); for (const auto cfd : cfds) { if (nullptr == cfd) { continue ; } uint64_t max_memtable_id = cfd->imm ()->GetLatestMemTableID (); req->cfd_to_max_mem_id_to_persist.emplace (cfd, max_memtable_id); } }
SchedulePendingFlush flush_queue_
用于保存MemTables的Flush请求,SchedulePendingFlush
将上述生成的flush请求压入flush_queue_,再调度后台任务线程从 flush_queue_中取出FlushRequest执行。
这里也有个设计,如果开启了 DBOptions::atomic_flush 则会让多个 CFs 的 Flush 请求放到一个线程去执行,这样就可以保证 Flush 的原子性。否则,每次只 flush。但是其默认值为 false,因为只要开启了 WAL,就能保证跨多个 CFs 的写操作是原子性的,即便 flush 操作挂了也还有 WAL 可以恢复。
部分代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 void DBImpl::SchedulePendingFlush (const FlushRequest& flush_req) { mutex_.AssertHeld (); if (flush_req.cfd_to_max_mem_id_to_persist.empty ()) { return ; } if (!immutable_db_options_.atomic_flush) { assert (flush_req.cfd_to_max_mem_id_to_persist.size () == 1 ); ColumnFamilyData* cfd = flush_req.cfd_to_max_mem_id_to_persist.begin ()->first; if (!cfd->queued_for_flush () && cfd->imm ()->IsFlushPending ()) { cfd->Ref (); cfd->set_queued_for_flush (true ); ++unscheduled_flushes_; flush_queue_.push_back (flush_req); } } else { for (auto & [cfd, _] : flush_req.cfd_to_max_mem_id_to_persist) { cfd->Ref (); } ++unscheduled_flushes_; flush_queue_.push_back (flush_req); } }
MaybeScheduleFlushOrCompaction RocksDB 的后台任务线程调度是主动触发的,并没有 loop 线程在阻塞等待 flush_queue_
加入新元素后就从 flush_queue_
提取请求去执行。
因此,在向 flush_req
加入有新请求后,需要主动通过 MaybeScheduleFlushOrCompaction
函数调度后台线程执行 DBImpl::BackgroundCallFlush
函数执行 FlushRequest 请求。
最大后台任务有 bg_job_limits
限制,unscheduled_flushes_
表征当前有多少待 flush 的请求,bg_flush_scheduled_
表征当前已经调度了多少 flush 请求,配合 bg_job_limits
参数限制后台 flush 任务。
DBImpl::BGWorkFlush
函数用于执行 FlushRequest,DBImpl::UnscheduleFlushCallback
函数则是执行 FlushRequest 完的回调函数,用于释放 FlushThreadArg
对象。
MaybeScheduleFlushOrCompaction
函数执行,是个分界点:
提交完 Flush 请求后,如果没有触发 DelayWrite,WriteStall,就会接受新的读写请求了;
后台线程池接受到 BGWorkFlush
请求后,就开始执行 FlushReqeust
这部分相关代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 void DBImpl::MaybeScheduleFlushOrCompaction () { mutex_.AssertHeld (); auto bg_job_limits = GetBGJobLimits (); bool is_flush_pool_empty = env_->GetBackgroundThreads (Env::Priority::HIGH) == 0 ; while (!is_flush_pool_empty && unscheduled_flushes_ > 0 && bg_flush_scheduled_ < bg_job_limits.max_flushes) { bg_flush_scheduled_++; FlushThreadArg* fta = new FlushThreadArg; fta->db_ = this ; fta->thread_pri_ = Env::Priority::HIGH; env_->Schedule (&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this , &DBImpl::UnscheduleFlushCallback); --unscheduled_flushes_; } if (is_flush_pool_empty) { while (unscheduled_flushes_ > 0 && bg_flush_scheduled_ + bg_compaction_scheduled_ < bg_job_limits.max_flushes) { bg_flush_scheduled_++; FlushThreadArg* fta = new FlushThreadArg; fta->db_ = this ; fta->thread_pri_ = Env::Priority::LOW; env_->Schedule (&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this , &DBImpl::UnscheduleFlushCallback); --unscheduled_flushes_; } } }
BackgroundCallFlush DBImpl::BGWorkFlush 只是个 BackgroundCallFlush
的 wrapper。 进入 BackgroundCallFlush
函数后,需要获取 DBImpl::mutex_
来执行 BackgroundFlush
,执行成功后。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 void DBImpl::BackgroundCallFlush (Env::Priority thread_pri) { bool made_progress = false ; JobContext job_context (next_job_id_.fetch_add(1 ), true ) ; { InstrumentedMutexLock l (&mutex_) ; num_running_flushes_++; std::unique_ptr<std::list<uint64_t >::iterator> pending_outputs_inserted_elem (new std::list<uint64_t >::iterator ( CaptureCurrentFileNumberInPendingOutputs ())); FlushReason reason; Status s = BackgroundFlush (&made_progress, &job_context, &log_buffer, &reason, thread_pri); assert (num_running_flushes_ > 0 ); num_running_flushes_--; bg_flush_scheduled_--; MaybeScheduleFlushOrCompaction (); atomic_flush_install_cv_.SignalAll (); bg_cv_.SignalAll (); } }
BackgroundFlush PopFirstFromFlushQueue
函数从 flush_request_
中取出待 Flush 的请求,过滤掉不符合条件的的CF,符合结果的存在 bg_flush_args
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 Status DBImpl::BackgroundFlush (bool * made_progress, JobContext* job_context, LogBuffer* log_buffer, FlushReason* reason, Env::Priority thread_pri) { autovector<BGFlushArg> bg_flush_args; std::vector<SuperVersionContext>& superversion_contexts = job_context->superversion_contexts; autovector<ColumnFamilyData*> column_families_not_to_flush; while (!flush_queue_.empty ()) { auto [flush_reason, cfd_to_max_mem_id_to_persist] = PopFirstFromFlushQueue (); superversion_contexts.clear (); superversion_contexts.reserve (cfd_to_max_mem_id_to_persist.size ()); for (const auto & [cfd, max_mem_id] : cfd_to_max_mem_id_to_persist) { if (cfd->GetMempurgeUsed ()) { cfd->imm ()->FlushRequested (); } if (cfd->IsDropped () || !cfd->imm ()->IsFlushPending ()) { column_families_not_to_flush.push_back (cfd); continue ; } superversion_contexts.emplace_back (SuperVersionContext (true )); bg_flush_args.emplace_back (cfd, max_mem_id, &(superversion_contexts.back ()), flush_reason); } if (!bg_flush_args.empty ()) { break ; } } }
获得具有 Flush 条件的 bg_flush_args
,下面就是真正的准备执行 FlushJob。FlushJob
由 FlushMemTablesToOutputFiles
函数执行,这个后续再讲。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 if (!bg_flush_args.empty ()) { auto bg_job_limits = GetBGJobLimits (); status = FlushMemTablesToOutputFiles (bg_flush_args, made_progress, job_context, log_buffer, thread_pri); *reason = bg_flush_args[0 ].flush_reason_; for (auto & arg : bg_flush_args) { ColumnFamilyData* cfd = arg.cfd_; if (cfd->UnrefAndTryDelete ()) { arg.cfd_ = nullptr ; } } } for (auto cfd : column_families_not_to_flush) { cfd->UnrefAndTryDelete (); }