WAL、MemTable 的生命周期管理(1)

WAL 在 DB::Open 时就会创建,用于持久化 MemTable 中尚未 Flush 的数据。具体写入 WAL 的格式可以参考 Write-Ahead-Log-File-Format 及其对应实现 DBImpl::WriteToWAL 函数,本文及后续几期主要关注 WAL/MemTable 的生命周期管理。

PreprocessWrite

在写 WAL/MemTable 之前,会先在 DBImpl::PreprocessWrite 函数中基于当前 RocksDB 的一些指标判断是否需要 Flush MemTable,创建新的 WAL/MemTable 再接受本次写入:

  1. 当前 WAL 的大小 total_log_size_ 是否超过阈值 DBOptions::max_total_wal_size
  2. 根据 write_buffer_manager_ 判断当前 MemTable 是否需要 Flush
  3. flush_scheduler_.empty() 为 false,则说明上次某个 CF MemTable 写满了

上述三种触发条件的处理方式,都需要先创建新的 WAL/MemTable,然后生成 Flush 请求 flush_request,并且触发后台线程去消费该 flush_request。

  • 正常情况下,发起 Flush 请求后,不用等待后台 Flush 完成,新的 WAL/MemTable 就可以继续接受后续读写请求,
  • 但是如果写的压力过大,则会进一步限制,比如 DelayWrite,WriteStall,恢复正常后才会继续接受新的写入。

上述三种的不同触发条件,不同在于如何选择 ColumnFamily。比如 条件(1)WAL 的大小超过阈值,则所有 ColumnFamilys 都需要进行更换 WAL/MemTable;而条件(3)则只是需要某个具体的 CF MemTable 满了。因此只是 FlushReason 不同,处理过程都类似。

下面针对核心处理流程开始讲解。

SwitchMemTable

SwitchMemtable 函数是针对具体的 ColumnFamily 创建 WAL/MemTable ,因此需要上层先筛选出需要切换 MemTable 的 CF,再传递给此函数。

由前文可知,进入 PreprocessWrite 函数时已经是单线程操作,而 SwitchMemtable 函数仍需要在 DBImpl::_mutex 的保护下才能调用,这是为了与后台 Flush 线程互斥。

根据 log_empty_ 字段来判断是否需要创建新的 WAL,即当前 WAL 是否有数据写入,如果没有则不创建新的 WAL。主要是可能要为多个 CFs 创建新的 WAL/MemTable,但是 WAL 是共享的,只需要第一次创建 WAL 即可,后续的 ColumnFamily 检测到 log_empty_ 为空,则不再重复创建。

每个 WAL 都有一个对应的编号 logfile_num,由 VersionSet::NewFileNumber 函数来生成。

DBOptions::recycle_log_file_num 决定是否复用生命周期已结束的 WALs,log_recycle_files_ 字段用于保存生命周期已结束待删除的 WALs。此如果 recycle_log_file_num > 0 则按照 FIFO 规则从 log_recycle_files_ 取出生命周期最早结束的 WAL 的 logfile_number,因此可以 truncate 旧 WAL 文件再接受新数据。

在 RocksDB 中,数据存储目录在 DBOptions::wal_dir,其值就是 db_name,每个 WAL 文件名就是wal_dir+logfile_number+.log。因此,后文一般提及 log 都是指 WAL 文件。

比如:编译完 RocksDB,并执行 build/examples/simple_example,在 /tmp/rocksdb_simple_example 目录下会有如下以 log 为后缀的文件。

1
2
3
4
5
6
7
$ ll /tmp/rocksdb_simple_example
total 1832
-rw-r--r-- 1 root root 1045 Jul 10 08:27 000326.sst
-rw-r--r-- 1 root root 999 Jul 24 14:17 000331.sst
-rw-r--r-- 1 root root 68 Jul 24 14:18 000332.log
-rw-r--r-- 1 root root 999 Jul 24 14:18 000336.sst
# ...

log_recycle_files_ 等字段会收到 FlushJob 的影响,因此需要在 DBImpl::mutex_ 的保护下调用。而 CreateWAL 只是影响局部变量 new_log,不需要 mutex_ 保护,同理构建新的 MemTable 对象 new_mem 也不需要 mutex_。

下面是创建 new_log 和 new_mem 的核心代码及其注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
mutex_.AssertHeld();
log::Writer* new_log = nullptr;
MemTable* new_mem = nullptr;
IOStatus io_s;

// 避免多个 CFs 创建不同的 WAL
bool creating_new_log = !log_empty_;
uint64_t recycle_log_number = 0;
if (creating_new_log && immutable_db_options_.recycle_log_file_num &&
!log_recycle_files_.empty()) {
recycle_log_number = log_recycle_files_.front();
}
uint64_t new_log_number =
creating_new_log ? versions_->NewFileNumber() : logfile_number_;
const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();

// 为啥这个函数也需要保护 ???
const auto preallocate_block_size =
GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
mutex_.Unlock();

// 创建 WAL
if (creating_new_log) {
io_s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size,
&new_log);
if (s.ok()) {
s = io_s;
}
}
// WAL 创建成功后,再创建 MemTable
if (s.ok()) {
SequenceNumber seq = versions_->LastSequence();
new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
context->superversion_context.NewSuperVersion();
}
// 用于 delete_range
cfd->mem()->ConstructFragmentedRangeTombstones();

//...
// mutex_.Lock();
}

CreateWAL

WAL 的创建也比较简单,如代码注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
size_t preallocate_block_size,
log::Writer** new_log) {
IOStatus io_s;
std::unique_ptr<FSWritableFile> lfile;

DBOptions db_options =
BuildDBOptions(immutable_db_options_, mutable_db_options_);
FileOptions opt_file_options =
fs_->OptimizeForLogWrite(file_options_, db_options);
std::string wal_dir = immutable_db_options_.GetWalDir();
std::string log_fname = LogFileName(wal_dir, log_file_num);

//1. 生成底层 wal 文件
if (recycle_log_number) {
std::string old_log_fname = LogFileName(wal_dir, recycle_log_number);
io_s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
&lfile, /*dbg=*/nullptr);
} else {
io_s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options);
}

if (io_s.ok()) {
lfile->SetWriteLifeTimeHint(CalculateWALWriteHint());
lfile->SetPreallocationBlockSize(preallocate_block_size);

const auto& listeners = immutable_db_options_.listeners;
FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;

// 2. 创建写 wal 的 writer
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(lfile), log_fname, opt_file_options,
immutable_db_options_.clock, io_tracer_, nullptr /* stats */, listeners,
nullptr, tmp_set.Contains(FileType::kWalFile),
tmp_set.Contains(FileType::kWalFile)));

// 3. 封装 log_writer
*new_log = new log::Writer(std::move(file_writer), log_file_num,
immutable_db_options_.recycle_log_file_num > 0,
immutable_db_options_.manual_wal_flush,
immutable_db_options_.wal_compression);
io_s = (*new_log)->AddCompressionTypeRecord();
}
return io_s;
}

ConstructNewMemtable

WAL 创建成功后,由 MemTable 所属的 ColumnFamily 构建 MemTable。

1
2
3
4
5
MemTable* ColumnFamilyData::ConstructNewMemtable(
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
write_buffer_manager_, earliest_seq, id_);
}

ColumnFamily

上面 new_log, new_mem 成功创建完后,下面对 ColumnFamily 的 WAL/MemTable 相关的元信息进行更新。

首先需要将当前 cur_log_writer 中已有的数据 flush 到操作系统中。实际上在 LogWriter::AddRecord 内部,每次写完都会进行一次 flush,将数据刷操作系统page cache 中,由操作系统决定何时将数据写入文件。flush 只能保证 RocksDB 进程中途挂了再重启数据不会丢,但是如果机器挂了数据还是有丢的可能,如果想要进一步保证安全就需要 sync 操作,将操作系统缓存层的数据同步到磁盘,很显然这个操作会拖慢写入速度。RocksDB 也配置了开启选项 WriteOptions::sync: bool,默认 false。

logs_字段是个map,映射关系是{logfile_number_, log_writer},记录了每一个log_writer及其对应的logfile_number,alive_log_files_记录是 logfile_number及其log_writer写入的数据量大小。由于 WAL 的生命周期和 ColumnFamily 有关,因此需要 logs_alive_log_files_ 来记录一个 RocksDB 实例中所有生命周期尚未结束的 WALs。

因此,当创建了新的 WAL ,需要在 log_write_mutex_ 的保护下添加新的记录。后续 FLushJob 完成了,也会相应删除生命周期结束的 WAL 记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//... above code

// 上面 Unlock, 现在恢复 Lock
mutex_.Lock();
if (recycle_log_number != 0) {
log_recycle_files_.pop_front();
}

if (s.ok() && creating_new_log) {
InstrumentedMutexLock l(&log_write_mutex_);
// 1. Flush
if (!logs_.empty()) {
log::Writer* cur_log_writer = logs_.back().writer;
io_s = cur_log_writer->WriteBuffer();
if (s.ok()) {
s = io_s;
}
}
// 2. Flush 成功后,更新 log 相关元信息
if (s.ok()) {
logfile_number_ = new_log_number;
log_empty_ = true;
log_dir_synced_ = false;
logs_.emplace_back(logfile_number_, new_log);
alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
}
}

前置工作已经完成,下面要开始更新 ColumnFaimly了。

由于传递给 SwitchMemTable 的 CF 并非都是因为自己的 MemTable 满了,可能是因为某个 CF0 把 WAL 写满了,导致所有的 MmeTable 都要切换,比如上述条件(1)。有些 CF1 可能仍然是空的,此时就需要更新 CF1 当前的 LogNumber。 这样就可以删除 CF1 之前指向的 WAL。

比如:CF0,CF1 当前都指向 log1,由于 CF0 一直写入数据,导致 log1 满了。此时需要创建新的 WAL 文件 log2。但是 CF1 一直没有写入数据,此时同步将 CF1 当前 log 指针 log_number_ 指向 log2,使得指向 log1 的引用就是0,因此就可以在 MemTable 完成 Flush 完成后,将 log1 删除。

其余解释见代码,这部分代码简化如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// ...above code
bool empty_cf_updated = false;
if (!empty_cf_updated) {
for (auto cf : *versions_->GetColumnFamilySet()) {
if (cf->IsEmpty()) {
if (creating_new_log) {
cf->SetLogNumber(logfile_number_);
}
cf->mem()->SetCreationSeq(versions_->LastSequence());
}
}
}

// CF 当前 Memtable 下一个活跃的 logfile_numer_,
// 选择需要 Flush 的 ImmutableMemTables 使用
cfd->mem()->SetNextLogNumber(logfile_number_);
// 将当前 Memtable 添加到 ImmutableMemTable 链表中,等待 Flush 删除。
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
// 设置新的 MemTable
new_mem->Ref();
cfd->SetMemtable(new_mem);
// 更新 superversion
InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
mutable_cf_options);

GenerateFlushRequest

SwitchMemTable,完成了 WAL/MemTable 的切换。紧接着就会尝试发起一次 flush 请求。Flush 的对象是从每个 CF 的 ImmutableMemtables 中提取的,因此 FlushRequest 需要保存每个 CF 的 max_memtable_id。

CF 中的每个 MemTable 都有一个单增的 id_,用于追踪 Flush。在使用 ColumnFamilyData::SetMemtable 函数添加到 CF 中时更新 MemTable::id_ 字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
FlushReason flush_reason, FlushRequest* req) {
assert(req != nullptr);
req->flush_reason = flush_reason;
req->cfd_to_max_mem_id_to_persist.reserve(cfds.size());
for (const auto cfd : cfds) {
if (nullptr == cfd) {
continue;
}
// max_memtable_id 即最新加入 ImmutableMemtables 的 memtable 的 Id
uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID();
req->cfd_to_max_mem_id_to_persist.emplace(cfd, max_memtable_id);
}
}

SchedulePendingFlush

flush_queue_ 用于保存MemTables的Flush请求,SchedulePendingFlush 将上述生成的flush请求压入flush_queue_,再调度后台任务线程从 flush_queue_中取出FlushRequest执行。

这里也有个设计,如果开启了 DBOptions::atomic_flush 则会让多个 CFs 的 Flush 请求放到一个线程去执行,这样就可以保证 Flush 的原子性。否则,每次只 flush。但是其默认值为 false,因为只要开启了 WAL,就能保证跨多个 CFs 的写操作是原子性的,即便 flush 操作挂了也还有 WAL 可以恢复。

部分代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
mutex_.AssertHeld();
if (flush_req.cfd_to_max_mem_id_to_persist.empty()) {
return;
}

if (!immutable_db_options_.atomic_flush) {
// only one
assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
ColumnFamilyData* cfd =
flush_req.cfd_to_max_mem_id_to_persist.begin()->first;
if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
cfd->Ref();
cfd->set_queued_for_flush(true);
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
}
} else {
for (auto& [cfd, _] : flush_req.cfd_to_max_mem_id_to_persist) {
cfd->Ref();
}
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
}
}

MaybeScheduleFlushOrCompaction

RocksDB 的后台任务线程调度是主动触发的,并没有 loop 线程在阻塞等待 flush_queue_ 加入新元素后就从 flush_queue_ 提取请求去执行。

因此,在向 flush_req 加入有新请求后,需要主动通过 MaybeScheduleFlushOrCompaction 函数调度后台线程执行 DBImpl::BackgroundCallFlush 函数执行 FlushRequest 请求。

最大后台任务有 bg_job_limits 限制,unscheduled_flushes_ 表征当前有多少待 flush 的请求,bg_flush_scheduled_ 表征当前已经调度了多少 flush 请求,配合 bg_job_limits 参数限制后台 flush 任务。

DBImpl::BGWorkFlush 函数用于执行 FlushRequest,DBImpl::UnscheduleFlushCallback 函数则是执行 FlushRequest 完的回调函数,用于释放 FlushThreadArg 对象。

MaybeScheduleFlushOrCompaction 函数执行,是个分界点:

  • 提交完 Flush 请求后,如果没有触发 DelayWrite,WriteStall,就会接受新的读写请求了;
  • 后台线程池接受到 BGWorkFlush 请求后,就开始执行 FlushReqeust

这部分相关代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void DBImpl::MaybeScheduleFlushOrCompaction() {
mutex_.AssertHeld();
//... precondition check

auto bg_job_limits = GetBGJobLimits();
bool is_flush_pool_empty =
env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ < bg_job_limits.max_flushes) {
bg_flush_scheduled_++;
FlushThreadArg* fta = new FlushThreadArg;
fta->db_ = this;
fta->thread_pri_ = Env::Priority::HIGH;
env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this,
&DBImpl::UnscheduleFlushCallback);
--unscheduled_flushes_;
}

if (is_flush_pool_empty) {
while (unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ + bg_compaction_scheduled_ <
bg_job_limits.max_flushes) {
bg_flush_scheduled_++;
FlushThreadArg* fta = new FlushThreadArg;
fta->db_ = this;
fta->thread_pri_ = Env::Priority::LOW;
env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this,
&DBImpl::UnscheduleFlushCallback);
--unscheduled_flushes_;
}
}
//..
}

BackgroundCallFlush

DBImpl::BGWorkFlush 只是个 BackgroundCallFlush 的 wrapper。 进入 BackgroundCallFlush 函数后,需要获取 DBImpl::mutex_ 来执行 BackgroundFlush,执行成功后。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
bool made_progress = false;
JobContext job_context(next_job_id_.fetch_add(1), true);

{
// 需要 DBImpl::mutex_
InstrumentedMutexLock l(&mutex_);
num_running_flushes_++;

std::unique_ptr<std::list<uint64_t>::iterator>
pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
FlushReason reason;

Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
&reason, thread_pri);
//...

assert(num_running_flushes_ > 0);
num_running_flushes_--;
bg_flush_scheduled_--;
// See if there's more work to be done
MaybeScheduleFlushOrCompaction();
atomic_flush_install_cv_.SignalAll();
bg_cv_.SignalAll();
}
}

BackgroundFlush

PopFirstFromFlushQueue 函数从 flush_request_ 中取出待 Flush 的请求,过滤掉不符合条件的的CF,符合结果的存在 bg_flush_args

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
LogBuffer* log_buffer, FlushReason* reason,
Env::Priority thread_pri) {
//..
autovector<BGFlushArg> bg_flush_args;
std::vector<SuperVersionContext>& superversion_contexts =
job_context->superversion_contexts;
autovector<ColumnFamilyData*> column_families_not_to_flush;
while (!flush_queue_.empty()) {
// This cfd is already referenced
auto [flush_reason, cfd_to_max_mem_id_to_persist] =
PopFirstFromFlushQueue();
superversion_contexts.clear();
superversion_contexts.reserve(cfd_to_max_mem_id_to_persist.size());

for (const auto& [cfd, max_mem_id] : cfd_to_max_mem_id_to_persist) {
if (cfd->GetMempurgeUsed()) {
cfd->imm()->FlushRequested();
}

if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
// can't flush this CF, try next one
column_families_not_to_flush.push_back(cfd);
continue;
}
superversion_contexts.emplace_back(SuperVersionContext(true));
bg_flush_args.emplace_back(cfd, max_mem_id,
&(superversion_contexts.back()), flush_reason);
}
// 是否有待 Flush 的 CF
if (!bg_flush_args.empty()) {
break;
}
}
//...
}

获得具有 Flush 条件的 bg_flush_args,下面就是真正的准备执行 FlushJob。FlushJobFlushMemTablesToOutputFiles 函数执行,这个后续再讲。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//...
if (!bg_flush_args.empty()) {
auto bg_job_limits = GetBGJobLimits();
status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer, thread_pri);
*reason = bg_flush_args[0].flush_reason_;
for (auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
if (cfd->UnrefAndTryDelete()) {
arg.cfd_ = nullptr;
}
}
}
for (auto cfd : column_families_not_to_flush) {
cfd->UnrefAndTryDelete();
}