WAL、MemTable 的生命周期管理(2)

本文简要讲下在 Flush MemTable 之前,WAL 需要进行 SYNC 的原因。更多设计思考可以阅读 Track-WAL-in-MANIFEST

WHY NEED to SYNC WAL

出于性能考虑,默认情况下 RocksDB 不会每写入一次数据就进行一次 SYNC WAL 操作。但没有 SYNC WAL 会带来两个影响:

  1. 在 recovery 时,就无法检查 WAL 是否存在磁盘上,这是因为当机器挂了,WAL 的 innode 元数据可能尚未持久化到磁盘。

    除此之外,默认情况下误删 WAL 目录下的 log 文件,也没有机制来检测是否有 WAL 丢失以及哪个文件丢失。此时 DB::Open 是会成功,但是 log 中对应的数据 “悄悄地” 丢失了。

  2. 在 recovery 期间也无法判断 WAL 的大小是否正常,因为如果不主动 SYNC,则 SYNC WAL 操作则由操作系统来完成的,这对 RocksDB 并不透明:无法知道在 RocksDB 退出前, WAL 持久化到磁盘的大小。

目前 SYNC WAL 有四种场景:

  1. 上层应用调用 DB::SyncWAL 函数,会主动 SYNC 所有生命周期还没结束的 WALs
  2. 上层应用调用 DB::FlushWAL(true) 函数,内部也是调用 SyncWAL 函数
  3. 写入数据时,设置 WriteOption::synctrue,即每写入一次数据,就会进行一次 SYNC WAL
  4. 在 Flush MemTable 前,如果不止一个 ColumnFamily ,会 SYNC 所有 closed_wals,即除了当前WAL之外的所有生命周期尚未结束的 WALs

case(1,2) 交给应用层调用,case(3) 虽然安全但是效率太低,一般不会开启。因此,交给 RocksDB 自己需要 SYNC WAL 的场景就剩下case(4)。 SYNC WAL 主要是为了防止机器宕机,而不是进程crash。虽然 case(4) 还是有丢数据的风险,但是在生产环境中,一般都会配备多副本来冗余。除非是一个副本的数据在一个机房,一个机房的数据又全部挂了,为了防止这种情况,又搞出来了跨机房部署。

BUT, 该挂的还是会挂。

因此,在执行 FlushJob 前,会将本次 synced_wals 大小信息记录到 VersionEdit 中,再序列化到 MANIFEST 中,在 recovery 期间可以对 WALs 进行强制检查:1)MANIFEST 中记录的 WAL 也应该存在磁盘上;2)磁盘上 WAL 的大小不应该小于 MANIFEST 中记录的 WAL 大小。

case(4) 是否需要 SYNC 的判断条件如下:

1
2
3
4
// 1. 判断本次 Flush 前是否需要 sync closed_wals
const bool needs_to_sync_closed_wals =
logfile_number_ > 0 &&
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1;
  • needs_to_sync_closed_wals 为 true 时,需要记录当前 CF 的 max_memtable_id,这样后续 FlushJob::PickMemTable 函数选择待 Flush 的MemTable时,会过滤掉满足 MemTable::id_ > max_memtable_id 的 memtable。

    这是因为在执行 SyncClosedLogs 函数会 DBImpl::mutex_.Unlock,而当前 CF 的 SwitchMemTable 函数可能会在这个期间执行,新增 new_mem,通过 max_memtable_id 来过滤掉这个期间新增的 new_mm。

  • needs_to_sync_closed_wals 为 false,会使得 max_memtable_id 为 UINT64_MAX,表示要 Flush 所有的 ImmutableMemtables

这部分代码上下文逻辑如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 2. 设置 max_memtable_id
uint64_t max_memtable_id = needs_to_sync_closed_wals
? cfd->imm()->GetLatestMemTableID()
: std::numeric_limits<uint64_t>::max();

//... other middle codes
IOStatus log_io_s = IOStatus::OK();
if (needs_to_sync_closed_wals) {
mutex_.Unlock();

// 3. 释放 mutex_, 再进行 sync
VersionEdit synced_wals;
log_io_s = SyncClosedLogs(job_context, &synced_wals);

mutex_.Lock();
// 4. write to MANIFEST
if (log_io_s.ok() && synced_wals.IsWalAddition()) {
log_io_s = status_to_io_status(ApplyWALToManifest(
ReadOptions(Env::IOActivity::kFlush), &synced_wals));
}
}

SyncClosedLogs

SyncClosedLogs 函数需要在 log_write_mutex_ 保护下。如果此时仍持有 DBImpl::mutex_ 会影响增加其他其线程阻塞时间,比如写路径过程中的 PreprocessWrite,因此在进入 SyncClosedLogs 函数前会释放 DBImpl::mutex_,让其他的线程可以执行。

SyncClosedLogs 函数的目标是 [logs_.front().number, logfile_number_) 区间的 WALs,其中 logfile_number_ 指向当前最新的 WAL。由于在后台线程池中可能会同时执行多个 FlushMemTable 任务,只要一个 WAL 的生命周期还没结束,那么每次 FlushMemTables 前都会 SYNC WAL 一次,并使用 VersionEdit 记录该 WAL 的元信息再写入 MANIFEST。 因此在 SyncClosedLogs 函数开始,会先尝试阻塞等待其他线程完成对该 WAL 的 SYNC 操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
IOStatus DBImpl::SyncClosedLogs(JobContext* job_context,
VersionEdit* synced_wals) {
InstrumentedMutexLock l(&log_write_mutex_);
autovector<log::Writer*, 1> logs_to_sync;
uint64_t current_log_number = logfile_number_;
// 1. 阻塞等待其他线程完成 SYNC
while (logs_.front().number < current_log_number &&
logs_.front().IsSyncing()) {
log_sync_cv_.Wait();
}
//2. 获取所有待 SYNC 的对象: logs_to_sync
for (auto it = logs_.begin();
it != logs_.end() && it->number < current_log_number; ++it) {
auto& log = *it;
log.PrepareForSync();
logs_to_sync.push_back(log.writer);
}

IOStatus io_s;
// 3. SYNC
if (!logs_to_sync.empty()) {
// 操作局部变量 logs_to_sync 不需要 log_write_mutex_
log_write_mutex_.Unlock();
for (log::Writer* log : logs_to_sync) {
// 每个文件进行 SYNC
io_s = log->file()->Sync(immutable_db_options_.use_fsync);
if (!io_s.ok()) {
break;
}
// 关闭
if (immutable_db_options_.recycle_log_file_num > 0) {
io_s = log->Close();
if (!io_s.ok()) {
break;
}
}
}
// 只要有一个 WAL 需要 SYNC ,整个目录都需要 SYNC
if (io_s.ok()) {
io_s = directories_.GetWalDir()->FsyncWithDirOptions(
IOOptions(), nullptr,
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
}

// 要操作 active_logs_, logs_ 因此需要 log_write_mutex_
log_write_mutex_.Lock();

// 标记是否完成
if (io_s.ok()) {
MarkLogsSynced(current_log_number - 1, true, synced_wals);
} else {
MarkLogsNotSynced(current_log_number - 1);
}
if (!io_s.ok()) {
return io_s;
}
}
return io_s;
}

MarkLogsSynced

MarkLogsSynced 函数记录 SYNCed WALs 信息:

选项 DBOptions.track_and_verify_wals_in_manifest 用于追踪 WAL 大小信息便于在 Recovery 期间进行校验。默认值为 false,但是 RocksDB 建议在生产环境设置为 true,防止误删或者损坏 WAL 目录下的文件而无法察觉。因此每次 Flush MemTable 前会记录 WAL 的 {wal.number, prev_synced_szie} 信息,那么当 recovery 时,存放 WAL 的目录下必须存在 wal.number 文件,且文件大小至少是 prev_synced_szie

此外,还会更新 logs_

  • 由于 WAL 在 SyncClosedLogs 函数中再次 SYNC。如果最新一次 SYNC 完成后 WAL 大小和上次 SYNC 的大小 prev_sync_size 相同,表示该 WAL 已完成持久化,则可以从 logs_ 中删除,加入 logs_to_free_ 队列,在 FlushJob 完成后,在 FindObsoleteFiles 函数中删除。
  • 否则,还需要进行下一次 SYNC,将本次 SYNC 操作标记为完成,等待下一次 Flush 后台任务;

由于 MarkLogsSynced 函数会对 logs_logs_to_free_ 进行操作,因此需要在 log_write_mutex_ 保护下执行,执行完再阻塞唤醒所有阻塞等待在 log_sync_cv_ 的地方。

代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
VersionEdit* synced_wals) {
log_write_mutex_.AssertHeld();
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
auto& wal = *it;

if (wal.number < logs_.back().number) {
// Inactive WAL
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
wal.GetPreSyncSize() > 0) {
synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
}
if (wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) {
// Fully synced
logs_to_free_.push_back(wal.ReleaseWriter());
it = logs_.erase(it);
} else {
assert(wal.GetPreSyncSize() < wal.writer->file()->GetFlushedSize());
wal.FinishSync();
++it;
}
} else {
// Active WAL
assert(wal.number == logs_.back().number);
wal.FinishSync();
++it;
}
}
log_sync_cv_.SignalAll();
}

ApplyWALToManifest

ApplyWALToManifest 函数是将 MarkLogsSynced 函数中获得 synced_wals 序列化写入到 MANIFEST,写入 MANIFEST 的数据都需要指定一个所属 CF,由于 WAL 是RocksDB 中所有 CFs 共享的,而 DefaultCF 是第一个 ColumnFamily,因此将 WAL 的数据归属给 default_cf。

至于 LogAndApplyToDefaultColumnFamily 内部怎么 MANIFEST 交互,后面会讲解。

1
2
3
4
5
6
7
8
9
10
11
12
13
Status DBImpl::ApplyWALToManifest(const ReadOptions& read_options,
VersionEdit* synced_wals) {
// not empty, write to MANIFEST.
mutex_.AssertHeld();

Status status = versions_->LogAndApplyToDefaultColumnFamily(
read_options, synced_wals, &mutex_, directories_.GetDbDir());
if (!status.ok() && versions_->io_status().IsIOError()) {
status = error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
return status;
}

Recovery

在开启了 track_and_verify_wals_in_manifest 后,在 DBImpl::Recover 中,VersionSet 从 MANIFEST 恢复过来后,会将 MANIFEST 中记录的 WAL 元信息和 WAL 目录下的文件进行对比校验。如果该标志没有开启,则需要删除 MANIFEST 中记录的 WAL 信息,防止后续开启后,MANIFEST 中的记录的 WAL 文件已经被删除了,导致校验失败,无法恢复 RocksDB 实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
if (!immutable_db_options_.best_efforts_recovery) {
// Verify WALs in MANIFEST.
s = versions_->GetWalSet().CheckWals(env_, wal_files);
} // else since best effort recovery does not recover from WALs, no need
// to check WALs.
} else if (!versions_->GetWalSet().GetWals().empty()) {
VersionEdit edit;
WalNumber max_wal_number =
versions_->GetWalSet().GetWals().rbegin()->first;
edit.DeleteWalsBefore(max_wal_number + 1);
recovery_ctx->UpdateVersionEdits(
versions_->GetColumnFamilySet()->GetDefault(), edit);
}
if (!s.ok()) {
return s;
}

CheckWals

CheckWals 函数是将从 MANIFEST 中获得的 WAL 最后一次 SYNC 大小 synced_size 和实际磁盘上的 WAL 文件大小 log_file_size 进行对比校验,起码要满足 synced_size <= log_file_size,校验才能通过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Status WalSet::CheckWals(
Env* env,
const std::unordered_map<WalNumber, std::string>& logs_on_disk) const {
Status s;
for (const auto& [log_number, wal_meta] : wals_) {
if (!wal_meta.HasSyncedSize()) {
continue;
}

// 1. MANIFEST 中有存在的 WALs,磁盘上也必须存在
if (logs_on_disk.find(log_number) == logs_on_disk.end()) {
s = Status::Corruption(fmt::format(
"Missing WAL with log number: {}", log_number));
break;
}

//2. 磁盘上的 WAL 文件大小 >= 最后一次 SYNC 大小
uint64_t log_file_size = 0;
s = env->GetFileSize(logs_on_disk.at(log_number), &log_file_size);
if (!s.ok()) {
break;
}

if (log_file_size < wal_meta.GetSyncedSizeInBytes()) {
s = Status::Corruption(fmt::format(
"Size mismatch: WAL (log number: {} ) in MANIFEST is {} bytes, "
"but actually is {} bytes on disk.",
log_number, wal_meta.GetSyncedSizeInBytes(), log_file_size));
break;
}
}
return s;
}