本文简要讲下在 Flush MemTable 之前,WAL 需要进行 SYNC 的原因。更多设计思考可以阅读 Track-WAL-in-MANIFEST
WHY NEED to SYNC WAL 出于性能考虑,默认情况下 RocksDB 不会每写入一次数据就进行一次 SYNC WAL 操作。但没有 SYNC WAL 会带来两个影响:
在 recovery 时,就无法检查 WAL 是否存在磁盘上,这是因为当机器挂了,WAL 的 innode 元数据可能尚未持久化到磁盘。
除此之外,默认情况下误删 WAL 目录下的 log 文件,也没有机制来检测是否有 WAL 丢失以及哪个文件丢失。此时 DB::Open 是会成功,但是 log 中对应的数据 “悄悄地” 丢失了。
在 recovery 期间也无法判断 WAL 的大小是否正常,因为如果不主动 SYNC,则 SYNC WAL 操作则由操作系统来完成的,这对 RocksDB 并不透明:无法知道在 RocksDB 退出前, WAL 持久化到磁盘的大小。
目前 SYNC WAL 有四种场景:
上层应用调用 DB::SyncWAL
函数,会主动 SYNC 所有生命周期还没结束的 WALs
上层应用调用 DB::FlushWAL(true)
函数,内部也是调用 SyncWAL 函数
写入数据时,设置 WriteOption::sync
为 true ,即每写入一次数据,就会进行一次 SYNC WAL
在 Flush MemTable 前,如果不止一个 ColumnFamily ,会 SYNC 所有 closed_wals ,即除了当前WAL之外的所有生命周期尚未结束的 WALs
case(1,2) 交给应用层调用,case(3) 虽然安全但是效率太低,一般不会开启。因此,交给 RocksDB 自己需要 SYNC WAL 的场景就剩下case(4)。 SYNC WAL 主要是为了防止机器宕机,而不是进程crash。虽然 case(4) 还是有丢数据的风险,但是在生产环境中,一般都会配备多副本来冗余。除非是一个副本的数据在一个机房,一个机房的数据又全部挂了,为了防止这种情况,又搞出来了跨机房部署。
BUT, 该挂的还是会挂。
因此,在执行 FlushJob 前,会将本次 synced_wals 大小信息记录到 VersionEdit
中,再序列化到 MANIFEST 中,在 recovery 期间可以对 WALs 进行强制检查:1)MANIFEST 中记录的 WAL 也应该存在磁盘上;2)磁盘上 WAL 的大小不应该小于 MANIFEST 中记录的 WAL 大小。
case(4) 是否需要 SYNC 的判断条件如下:
1 2 3 4 const bool needs_to_sync_closed_wals = logfile_number_ > 0 && versions_->GetColumnFamilySet ()->NumberOfColumnFamilies () > 1 ;
needs_to_sync_closed_wals 为 true 时,需要记录当前 CF 的 max_memtable_id ,这样后续 FlushJob::PickMemTable 函数选择待 Flush 的MemTable时,会过滤掉满足 MemTable::id_ > max_memtable_id 的 memtable。
这是因为在执行 SyncClosedLogs
函数会 DBImpl::mutex_.Unlock,而当前 CF 的 SwitchMemTable 函数可能会在这个期间执行,新增 new_mem,通过 max_memtable_id 来过滤掉这个期间新增的 new_mm。
needs_to_sync_closed_wals 为 false,会使得 max_memtable_id 为 UINT64_MAX,表示要 Flush 所有的 ImmutableMemtables
这部分代码上下文逻辑如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 uint64_t max_memtable_id = needs_to_sync_closed_wals ? cfd->imm ()->GetLatestMemTableID () : std::numeric_limits<uint64_t >::max (); IOStatus log_io_s = IOStatus::OK (); if (needs_to_sync_closed_wals) { mutex_.Unlock (); VersionEdit synced_wals; log_io_s = SyncClosedLogs (job_context, &synced_wals); mutex_.Lock (); if (log_io_s.ok () && synced_wals.IsWalAddition ()) { log_io_s = status_to_io_status (ApplyWALToManifest ( ReadOptions (Env::IOActivity::kFlush), &synced_wals)); } }
SyncClosedLogs SyncClosedLogs 函数需要在 log_write_mutex_ 保护下。如果此时仍持有 DBImpl::mutex_
会影响增加其他其线程阻塞时间,比如写路径过程中的 PreprocessWrite
,因此在进入 SyncClosedLogs 函数前会释放 DBImpl::mutex_
,让其他的线程可以执行。
SyncClosedLogs 函数的目标是 [logs_.front().number, logfile_number_) 区间的 WALs,其中 logfile_number_ 指向当前最新的 WAL。由于在后台线程池中可能会同时执行多个 FlushMemTable 任务,只要一个 WAL 的生命周期还没结束,那么每次 FlushMemTables 前都会 SYNC WAL 一次,并使用 VersionEdit 记录该 WAL 的元信息再写入 MANIFEST。 因此在 SyncClosedLogs 函数开始,会先尝试阻塞等待其他线程完成对该 WAL 的 SYNC 操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 IOStatus DBImpl::SyncClosedLogs (JobContext* job_context, VersionEdit* synced_wals) { InstrumentedMutexLock l (&log_write_mutex_) ; autovector<log::Writer*, 1 > logs_to_sync; uint64_t current_log_number = logfile_number_; while (logs_.front ().number < current_log_number && logs_.front ().IsSyncing ()) { log_sync_cv_.Wait (); } for (auto it = logs_.begin (); it != logs_.end () && it->number < current_log_number; ++it) { auto & log = *it; log.PrepareForSync (); logs_to_sync.push_back (log.writer); } IOStatus io_s; if (!logs_to_sync.empty ()) { log_write_mutex_.Unlock (); for (log::Writer* log : logs_to_sync) { io_s = log->file ()->Sync (immutable_db_options_.use_fsync); if (!io_s.ok ()) { break ; } if (immutable_db_options_.recycle_log_file_num > 0 ) { io_s = log->Close (); if (!io_s.ok ()) { break ; } } } if (io_s.ok ()) { io_s = directories_.GetWalDir ()->FsyncWithDirOptions ( IOOptions (), nullptr , DirFsyncOptions (DirFsyncOptions::FsyncReason::kNewFileSynced)); } log_write_mutex_.Lock (); if (io_s.ok ()) { MarkLogsSynced (current_log_number - 1 , true , synced_wals); } else { MarkLogsNotSynced (current_log_number - 1 ); } if (!io_s.ok ()) { return io_s; } } return io_s; }
MarkLogsSynced MarkLogsSynced
函数记录 SYNCed WALs 信息:
选项 DBOptions.track_and_verify_wals_in_manifest 用于追踪 WAL 大小信息便于在 Recovery 期间进行校验。默认值为 false,但是 RocksDB 建议在生产环境设置为 true,防止误删或者损坏 WAL 目录下的文件而无法察觉。因此每次 Flush MemTable 前会记录 WAL 的 {wal.number, prev_synced_szie}
信息,那么当 recovery 时,存放 WAL 的目录下必须存在 wal.number
文件,且文件大小至少是 prev_synced_szie
。
此外,还会更新 logs_
:
由于 WAL 在 SyncClosedLogs 函数中再次 SYNC。如果最新一次 SYNC 完成后 WAL 大小和上次 SYNC 的大小 prev_sync_size 相同,表示该 WAL 已完成持久化,则可以从 logs_
中删除,加入 logs_to_free_
队列,在 FlushJob 完成后,在 FindObsoleteFiles
函数中删除。
否则,还需要进行下一次 SYNC,将本次 SYNC 操作标记为完成,等待下一次 Flush 后台任务;
由于 MarkLogsSynced
函数会对 logs_
和 logs_to_free_
进行操作,因此需要在 log_write_mutex_
保护下执行,执行完再阻塞唤醒所有阻塞等待在 log_sync_cv_
的地方。
代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 void DBImpl::MarkLogsSynced (uint64_t up_to, bool synced_dir, VersionEdit* synced_wals) { log_write_mutex_.AssertHeld (); for (auto it = logs_.begin (); it != logs_.end () && it->number <= up_to;) { auto & wal = *it; if (wal.number < logs_.back ().number) { if (immutable_db_options_.track_and_verify_wals_in_manifest && wal.GetPreSyncSize () > 0 ) { synced_wals->AddWal (wal.number, WalMetadata (wal.GetPreSyncSize ())); } if (wal.GetPreSyncSize () == wal.writer->file ()->GetFlushedSize ()) { logs_to_free_.push_back (wal.ReleaseWriter ()); it = logs_.erase (it); } else { assert (wal.GetPreSyncSize () < wal.writer->file ()->GetFlushedSize ()); wal.FinishSync (); ++it; } } else { assert (wal.number == logs_.back ().number); wal.FinishSync (); ++it; } } log_sync_cv_.SignalAll (); }
ApplyWALToManifest ApplyWALToManifest
函数是将 MarkLogsSynced
函数中获得 synced_wals 序列化写入到 MANIFEST,写入 MANIFEST 的数据都需要指定一个所属 CF,由于 WAL 是RocksDB 中所有 CFs 共享的,而 DefaultCF 是第一个 ColumnFamily,因此将 WAL 的数据归属给 default_cf。
至于 LogAndApplyToDefaultColumnFamily
内部怎么 MANIFEST 交互,后面会讲解。
1 2 3 4 5 6 7 8 9 10 11 12 13 Status DBImpl::ApplyWALToManifest (const ReadOptions& read_options, VersionEdit* synced_wals) { mutex_.AssertHeld (); Status status = versions_->LogAndApplyToDefaultColumnFamily ( read_options, synced_wals, &mutex_, directories_.GetDbDir ()); if (!status.ok () && versions_->io_status ().IsIOError ()) { status = error_handler_.SetBGError (versions_->io_status (), BackgroundErrorReason::kManifestWrite); } return status; }
Recovery 在开启了 track_and_verify_wals_in_manifest 后,在 DBImpl::Recover
中,VersionSet 从 MANIFEST 恢复过来后,会将 MANIFEST 中记录的 WAL 元信息和 WAL 目录下的文件进行对比校验。如果该标志没有开启,则需要删除 MANIFEST 中记录的 WAL 信息,防止后续开启后,MANIFEST 中的记录的 WAL 文件已经被删除了,导致校验失败,无法恢复 RocksDB 实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 if (immutable_db_options_.track_and_verify_wals_in_manifest) { if (!immutable_db_options_.best_efforts_recovery) { s = versions_->GetWalSet ().CheckWals (env_, wal_files); } } else if (!versions_->GetWalSet ().GetWals ().empty ()) { VersionEdit edit; WalNumber max_wal_number = versions_->GetWalSet ().GetWals ().rbegin ()->first; edit.DeleteWalsBefore (max_wal_number + 1 ); recovery_ctx->UpdateVersionEdits ( versions_->GetColumnFamilySet ()->GetDefault (), edit); } if (!s.ok ()) { return s; }
CheckWals CheckWals
函数是将从 MANIFEST 中获得的 WAL 最后一次 SYNC 大小 synced_size
和实际磁盘上的 WAL 文件大小 log_file_size
进行对比校验,起码要满足 synced_size <= log_file_size
,校验才能通过。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 Status WalSet::CheckWals ( Env* env, const std::unordered_map<WalNumber, std::string>& logs_on_disk) const { Status s; for (const auto & [log_number, wal_meta] : wals_) { if (!wal_meta.HasSyncedSize ()) { continue ; } if (logs_on_disk.find (log_number) == logs_on_disk.end ()) { s = Status::Corruption (fmt::format( "Missing WAL with log number: {}" , log_number)); break ; } uint64_t log_file_size = 0 ; s = env->GetFileSize (logs_on_disk.at (log_number), &log_file_size); if (!s.ok ()) { break ; } if (log_file_size < wal_meta.GetSyncedSizeInBytes ()) { s = Status::Corruption (fmt::format( "Size mismatch: WAL (log number: {} ) in MANIFEST is {} bytes, " "but actually is {} bytes on disk." , log_number, wal_meta.GetSyncedSizeInBytes (), log_file_size)); break ; } } return s; }