WAL、MemTable 的生命周期管理(3)

本文是 WAL/MemTable 生命周期的完结篇,主要关注 FlushJob 执行流程以及如何确定哪些 WAL/MmmTable 可以安全地删除。

PickMemTable

FlushJob 首先需要从 ColumnFamilyData::imm_ 中挑选出本次所需的 FlushJob::mems_ ,这部分功能由 PickMemTable 函数实现。最大可以选择的 MemTable::id_ 是 max_memtable_id_ ,该参数值的设置见由上一节 WAL、MemTable 的生命周期管理(2)

每个 FlushJob 选中的 mems_ 是基于 MemTable 的创建时间排序,即 mems_[0] 是最早创建的,mems.back() 是最晚创建的。具体从 imm_ 获取 mems_ 的逻辑由 PickMemtablesToFlush 函数实现。

每次 Flush 操作都产生一个 VersionEdit。FlushJob 将 VersionEdit 信息记录在 mems_[0]->edit_ 中,edit_ 中 log_number_ 主要是用于追踪 WAL 的生命周期,其值是max_next_log_number,如何与WAL生命周期产生联系可见后文分析。

此外,FlushJob::meta_ 中记录着成功 Flush 后生成的 level0 SST 文件的元信息数据。

PickMemTable 核心代码逻辑如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void FlushJob::PickMemTable() {
db_mutex_->AssertHeld();
pick_memtable_called = true;

uint64_t max_next_log_number = 0;

// Save the contents of the earliest memtable as a new Table
cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_,
&max_next_log_number);
if (mems_.empty()) {
return;
}

MemTable* m = mems_[0];
edit_ = m->GetEdits();
edit_->SetPrevLogNumber(0);
edit_->SetLogNumber(max_next_log_number);
edit_->SetColumnFamily(cfd_->GetID());

meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
meta_.epoch_number = cfd_->NewEpochNumber();

base_ = cfd_->current();
base_->Ref();
}

PickMemtablesToFlush

SwitchMemTable 函数在调用 MemTableList::Add 函数向 imm_ 中插入新的 ImmtableMemTable 时,是在 current->memlist_ 头部插入节点,因此 memlist_ 尾部是最旧的 ImmutableMemTable,头部是最新的。因此需要逆序遍历 memlist_,才能获得按照 MemTable 创建顺序的 FlushJob::mems_

当前可能多个 FlushJob 在并发地执行,因此从 current->memtables_ 选择 MemTable 时,需要过滤掉不符合条件的:

  • MemTable::id_ < max_memtable_id

  • MemTable::flush_in_progress_ 为 false: 即当前没有被其他 FlushJob 选中

    由于 Flush MemTable 以及后续的 COMMIT 操作都需要保持顺序,因此,如果发现 flush_in_progress_ 为 true,则中断本次 Pick 操作,这样能保证选中的 mems_ 是连续创建的。

在 SwitchMemTable 函数中,将 old_mem 添加到 current->memlist_ 之前会先调用 MemTableList::FlushRequested,将 flush_requested_ 设置为 true,表示该 CF 的 memlist_ 当前等待 Flush 操作。接着在 MemTable::Add 函数中会递增 num_flush_not_started_ ,表示 memlist_ 中尚未被 Picked 的 MemTable 数量。

当 flush_requested_ 为 true 并且 num_flush_not_started_ > 0,则 IsFlushPending 返回 true,则该 CF 可以触发下一次 FlushJob。

直到 PickMemtablesToFlush 函数执行完,如果 num_flush_not_started_ 为 0, 会将 flush_requested_ 设置为 false,表示已经所有待 Picked 的 MemTable 都已选中了,准备进行 Flush。如果后续 Flush 流程失败,会调用 RollbackMemtableFlush 函数进行回滚,恢复状态。

最后,有个基于 RAII 设计的类 AutoThreadOperationStageUpdater,用来表征当前 FlushJob 线程的执行状态。 该函数核心代码及注释如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
autovector<MemTable*>* ret,
uint64_t* max_next_log_number) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
const auto& memlist = current_->memlist_;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;

// 过滤超过 max_memtable_id 的 MemTable
if (m->GetID() > max_memtable_id) {
break;
}

if (!m->flush_in_progress_) {
// 还没被添加到某个 Flush 任务中
num_flush_not_started_--;
if (num_flush_not_started_ == 0) {
imm_flush_needed.store(false, std::memory_order_release);
}
// 设置标志位: 表示被 Picked
m->flush_in_progress_ = true;
if (max_next_log_number) {
*max_next_log_number =
std::max(m->GetNextLogNumber(), *max_next_log_number);
}
ret->push_back(m);
} else if (!ret->empty()) {
// 遇到已经被其他 Flush 线程选中的 memtable,中断
break;
}
}
if (num_flush_not_started_ == 0) {
// start-flush request is complete
flush_requested_ = false;
}
}

RollbackMemtableFlush

如果后续的 Flush 任务执行失败,mems_ 不会从 memlist_ 中删除,则只需要把 PickMemtablesToFlush 中修改的状态重置即可,等待下一次 Flush。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
uint64_t /*file_number*/) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_ROLLBACK);

// 重置状态
for (MemTable* m : mems) {
m->flush_in_progress_ = false;
m->flush_completed_ = false;
m->edit_.Clear();
num_flush_not_started_++;
}
imm_flush_needed.store(true, std::memory_order_release);
}

FlushJob::Run

FlushJob 目前(branch-8.2.fb)有两种实现:

  • 默认将 MemTables 中的数据写入 level0,
  • 设置 experimental_mempurge_threshold > 0,开启内存裁剪(Memory Purge)。

本文只讲解默认实现,MemoryPurge 后续有空再说。默认的 FlushJob::Run 流程主要有两个部分:

  1. 根据选中的 mems_ 生成 SST 并写入 level0,由 file_meta 记录该 SST 文件元数据
  2. 如果 step(1) 成功则更新 CF 的 version,并将本次更新记录 FlushJob::edit_ 序列化后写入到 MANIFEST 中
  3. 如果 step(1) 不成功,则 RollbackMemtableFlush 进行回滚。

FlushJob::Run 核心代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
bool* switched_to_mempurge) {
db_mutex_->AssertHeld();
AutoThreadOperationStageUpdater stage_run(ThreadStatus::STAGE_FLUSH_RUN);
if (mems_.empty()) {
return Status::OK();
}
//...
// 1. 将 mems_ 写入 level0
Status s = WriteLevel0Table();

//...
// 2. 判断 Flush 结果
if (!s.ok()) {
cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
} else if (write_manifest_) {
// Replace immutable memtable with the generated Table
s = cfd_->imm()->TryInstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
log_buffer_, &committed_flush_jobs_info_, true);
}

if (s.ok() && file_meta != nullptr) {
*file_meta = meta_;
}

//..
return s;
}

WriteLevel0Table

WriteLevel0Table 核心部分主要有三个部分,每个部分都很复杂,值得单独开一期讲解,这里主要讲解下每个部分的作用:

  • NewMergingIterator: 用于将输入的多个 MemTable/SST 合并成一个有序的数据流

    merger_test.cc 的测试用例中,对于多批次随机数据,NewMergingIterator 合并多个数据流后表现和 VectorIterator 表现的一致,都是有序输出。

    多批输入数据流打平后再输入给 VectorIterator,在 VectorIterator 构造函数中对输入的数据流基于 std::sort 排序,因此 VectorIterator 的输出是全局有序的。而 NewMergingIterator 的输出和 VectorIterator 是一致的,则说明 NewMergingIterator 的作用是合并输入流并使输出有序,不过更加高效。

  • range_del_iter

    NewMergingIterator 只是将输入数据排序好,但是通常上层应用也会进行删除操作。在 MemTable 中,专门为 DeleteRange 操作单独分配了个 range_del_table_。当插入数据是 kTypeRangeDeletion 类型时,则将数据写入 range_del_table_。因此,在遍历 MemTable 时,也需要考虑 range_del_iter 来过滤那些已经被删除的数据。

    1
    2
    3
    4
    5
    // in MemTable::Add function
    std::unique_ptr<MemTableRep>& table =
    type == kTypeRangeDeletion
    ? range_del_table_
    : table_;

    关于 range_delete 操作的详细设计可以参考 DeleteRange-Implementation,后续有时间再讲解其中细节。

  • BuildTable

    内部流程:CompactionIterator 基于 {NewMergingIterator, range_del_iter} 过滤已删除数据并输出全局有序的数据,可以通过 CompactionIterator::Next 进行迭代遍历,获得输出 {key, value},再用 TableBuilder::Add 函数将该 {key, value} 添加到 level0 的新 SST 文件中。运行结束,新生成的 level0 SST 的元信息保存在 meta_ 中。

实际上,Compaction 的核心流程也是这三个部分,逻辑流程都是一样的,后续会专注分析每个具体的部分。WriteLevel0Table 核心流程如下代码注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
Status FlushJob::WriteLevel0Table() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_FLUSH_WRITE_L0);
db_mutex_->AssertHeld();
Status s;
//...
std::vector<BlobFileAddition> blob_file_additions;

{
auto write_hint = cfd_->CalculateSSTWriteHint(0);
Env::IOPriority io_priority = GetRateLimiterPriorityForWrite();
db_mutex_->Unlock();

std::vector<InternalIterator*> memtables;
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters;
ReadOptions ro;
ro.total_order_seek = true;
ro.io_activity = Env::IOActivity::kFlush;
Arena arena;

// 1. 从 MemTables 构建 range_delete_iter,获悉哪些 keys 已被删除
for (MemTable* m : mems_) {
auto* range_del_iter = m->NewRangeTombstoneIterator(
ro, kMaxSequenceNumber, true /* immutable_memtable */);
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}
}

{
// 2. 合并多个 memtable 迭代器,使得 memtables 能有序输出
ScopedArenaIterator iter(
NewMergingIterator(&cfd_->internal_comparator(), memtables.data(),
static_cast<int>(memtables.size()), &arena));
int64_t _current_time = 0;
auto status = clock_->GetCurrentTime(&_current_time);
const uint64_t current_time = static_cast<uint64_t>(_current_time);

uint64_t oldest_key_time = mems_.front()->ApproximateOldestKeyTime();

// It's not clear whether oldest_key_time is always available. In case
// it is not available, use current_time.
uint64_t oldest_ancester_time = std::min(current_time, oldest_key_time);

meta_.oldest_ancester_time = oldest_ancester_time;
meta_.file_creation_time = current_time;

uint64_t num_input_entries = 0;
uint64_t memtable_payload_bytes = 0;
uint64_t memtable_garbage_bytes = 0;
IOStatus io_s;

const std::string* const full_history_ts_low =
(full_history_ts_low_.empty()) ? nullptr : &full_history_ts_low_;
TableBuilderOptions tboptions(
*cfd_->ioptions(), mutable_cf_options_, cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), output_compression_,
mutable_cf_options_.compression_opts, cfd_->GetID(), cfd_->GetName(),
0 /* level */, false /* is_bottommost */,
TableFileCreationReason::kFlush, oldest_key_time, current_time,
db_id_, db_session_id_, 0 /* target_file_size */,
meta_.fd.GetNumber());
const SequenceNumber job_snapshot_seq =
job_context_->GetJobSnapshotSequence();
const ReadOptions read_options(Env::IOActivity::kFlush);

// 3. 合并多个 memtables 的数据,输出到 level0 sst 中
s = BuildTable(dbname_, versions_, db_options_, tboptions, file_options_,
read_options, cfd_->table_cache(), iter.get(),
std::move(range_del_iters), &meta_, &blob_file_additions,
existing_snapshots_, earliest_write_conflict_snapshot_,
job_snapshot_seq, snapshot_checker_,
mutable_cf_options_.paranoid_file_checks,
cfd_->internal_stats(), &io_s, io_tracer_,
BlobFileCreationReason::kFlush, seqno_to_time_mapping_,
event_logger_, job_context_->job_id, io_priority,
&table_properties_, write_hint, full_history_ts_low,
blob_callback_, base_, &num_input_entries,
&memtable_payload_bytes, &memtable_garbage_bytes);
//... handle error

// 4. 尝试 SYNC SST 目录
if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
s = output_file_directory_->FsyncWithDirOptions(
IOOptions(), nullptr,
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
}
db_mutex_->Lock();
}
base_->Unref();

const bool has_output = meta_.fd.GetFileSize() > 0;

// 5. 将生成的 level0 文件元数据信息写入 VersionEdit
if (s.ok() && has_output) {
// Add file to L0
edit_->AddFile(0 /* level */, meta_);
}
// Piggyback FlushJobInfo on the first first flushed memtable.
mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
//...
return s;
}

TryInstallMemtableFlushResults

如果上一步 WriteLevel0Table 成功,则需要将 Flush 的信息 COMMIT 到 MANIFEST。实际上 MANIFEST 可以视为 transaction log,保存着每次 Flush/Compaction 的记录。

当前可能有多个线程在并发执行 Flush 操作,但是只能有一个线程能 COMMIT:率先进入的此函数的线程,它先将 commit_in_progress_ 设置为 true,来阻止后来的线程。因此判断 commit_in_progress_ 是否为 true 是个分界点:

  • 之前:所有 Flush 线程都都先更新 mems[i] 的 {flush_completed_, file_number_} 状态

    flush_completed_ 表示 mems[i] 已经成功 Flush,file_number_ 则是指向 Flush 后生成的 SST 文件。 由于 mems 是 Flush 线程的局部变量,因此这一步操作线程安全。

  • 之后

    由于只有一个线程能进行 COMMIT。因此 COMMIT 对象是 memlist_ 中所有已经 Flushed MemTable,即进入此函数的线程会将所有 Flush 线程中的 mems 一起 COMMIT。 后面进入此函数的线程发现 commit_in_progress_ 为 false 则直接返回。

    其次,COMMIT 也需要按照 MemTable 的创建顺序,即 MemTable::id_ 递增的顺序,因此需要逆序遍历 memlist_,并且如果中途某个 MemTable 在 WriteLevel0Table 中失败了,则需要中断当前 COMMIT 操作,只能将前面成功 Flush 的连续 MemTables 的 VersionEdit 写入 MANIFEST,并在回调函数 RemoveMemTablesOrRestoreFlags 中将这部分 MemTable 从 memlist_ 中删除。

    因此,Flush 失败的 MemTable 就会变成 oldeest memtable,等待下一次 FlushJob 再次选择它,重复上述执行流程。

这部分代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
Status MemTableList::TryInstallMemtableFlushResults(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker,
VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer,
std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info,
bool write_edits) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
mu->AssertHeld();

const ReadOptions read_options(Env::IOActivity::kFlush);
Status s;

// 1. 更改所有成功 Flush 的 MemTable 状态
for (size_t i = 0; i < mems.size(); ++i) {
mems[i]->flush_completed_ = true;
mems[i]->file_number_ = file_number;
}

// 2. 分界点: 只有一个线程可以 commit
if (commit_in_progress_) {
return s;
}

// 进入 commit 流程
commit_in_progress_ = true;

while (s.ok()) {
auto& memlist = current_->memlist_;
// 3. 遇到 Flush 失败的 memtable 则中断
if (memlist.empty() || !memlist.back()->flush_completed_) {
break;
}

uint64_t batch_file_number = 0;
size_t batch_count = 0;
autovector<VersionEdit*> edit_list;

// 按照 memtable 创建的顺序 scan and commit
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
// 3.1 遇到 Flush 失败的 memtable 则中断
if (!m->flush_completed_) {
break;
}
// 遇到一个新的 FlushJob
// 此时 file_number_ 和 edit 都是新的
if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
batch_file_number = m->file_number_;
edit_list.push_back(&m->edit_);
std::unique_ptr<FlushJobInfo> info = m->ReleaseFlushJobInfo();
if (info != nullptr) {
committed_flush_jobs_info->push_back(std::move(info));
}
}
batch_count++;
}

// 3.2 commit 已经 Flush 成功的
if (batch_count > 0) {
// 计算当前 oldest_wal
uint64_t min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list);

// logfile_num < min_wal_number_to_keep 都应该删除
VersionEdit wal_deletion;
wal_deletion.SetMinLogNumberToKeep(min_wal_number_to_keep);
if (vset->db_options()->track_and_verify_wals_in_manifest) {
if (min_wal_number_to_keep >
vset->GetWalSet().GetMinWalNumberToKeep()) {
wal_deletion.DeleteWalsBefore(min_wal_number_to_keep);
}
}
edit_list.push_back(&wal_deletion);

const auto manifest_write_cb = [this, cfd, batch_count, log_buffer,
to_delete, mu](const Status& status) {
// 从 memlist_ 中删除 flushed memtable
RemoveMemTablesOrRestoreFlags(status, cfd, batch_count, log_buffer,
to_delete, mu);
};
// 写入 MANIFEST
s = vset->LogAndApply(cfd, mutable_cf_options, read_options, edit_list,
mu, db_directory, /*new_descriptor_log=*/false,
/*column_family_options=*/nullptr,
manifest_write_cb);
}
}

// 退出 commit 流程
commit_in_progress_ = false;
return s;
}

PrecomputeMinLogNumberToKeepNon2PC

PrecomputeMinLogNumberToKeepNon2PC 函数返回的 min_wal_number_to_keep 用来删除满足 logfile_num < min_wal_number_to_keep 条件的 WAL。由于 min_wal_number_to_keep 也写入 VersionEdit 并最终序列化到 MANIFEST,因此在 Recovery 过程中,如果磁盘上存在部分这些 WAL 也会被忽略,不会被加载。

min_log_number_to_keep 的计算分为三个阶段:

  1. 每个 FlushJob::edit_ 中记录的 log_number_ 是在 FlushJob::PickMemTable 函数中设置,指向了每个 CF 最大可以删除的 WAL

因为 CF memlist_ 指向的 WAL 对于当前 CF 来说都是可以删除的,因此 FlushJob::mems_[-1]->GetNextLogNumber() 就是FlushJob::edit_::log_numer_ 能取到的最大值。通过迭代不同 FlushJob 的 edit_,则可以获得所有 FlushJobs 中最小的 log_number_ ,作为 min_log_number_to_keep

  1. ColumnFamilyData::log_number_ 字段记录了 CF 当前指向的 WAL 文件。因此小于 ColumnFamilyData::log_number_ 的 WAL 对于当前 CF 来说都可以删除。

    这一步和 step(1) 是互斥的,如果没有成功执行的 FlushJobs,则 step(1) 中不会修改 min_log_number_to_keep 的值,此时就会进入 step(2),来获得他们中最小的 min_log_number_to_keep

  2. 再检查没有执行 Flush 的或者 Flush 失败的的 CFs 的 ColumnFamilyData::log_number_ 字段,和 min_log_number_to_keep 进行比较最小值

上述三步结束,所得的 min_log_number_to_keep 即生命周期没有结束的最小 WAL:所有满足 logfile_num < min_log_number_to_keep 的 WAL 可以安全地从磁盘上删除了,Recovery 期间也不会被加载了,即生命周期可以结束了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
uint64_t PrecomputeMinLogNumberToKeepNon2PC(
VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
const autovector<autovector<VersionEdit*>>& edit_lists) {
uint64_t min_log_number_to_keep = std::numeric_limits<uint64_t>::max();

//1. 检测 min_log_number_to_keep
for (const auto& edit_list : edit_lists) {
uint64_t log = 0;
for (const auto& e : edit_list) {
if (e->HasLogNumber()) {
log = std::max(log, e->GetLogNumber());
}
}
if (log != 0) {
min_log_number_to_keep = std::min(min_log_number_to_keep, log);
}
}

// 2. 如果 edit_lists 为空
if (min_log_number_to_keep == std::numeric_limits<uint64_t>::max()) {
min_log_number_to_keep = cfds_to_flush[0]->GetLogNumber();
for (size_t i = 1; i < cfds_to_flush.size(); i++) {
min_log_number_to_keep =
std::min(min_log_number_to_keep, cfds_to_flush[i]->GetLogNumber());
}
}

// 3. 检测没有 Flush 或者 Flush 失败的 CFs
std::unordered_set<const ColumnFamilyData*> flushed_cfds(
cfds_to_flush.begin(), cfds_to_flush.end());
min_log_number_to_keep =
std::min(min_log_number_to_keep,
vset->PreComputeMinLogNumberWithUnflushedData(flushed_cfds));

return min_log_number_to_keep;
}

uint64_t PreComputeMinLogNumberWithUnflushedData(
const ColumnFamilyData* cfd_to_skip) const {
uint64_t min_log_num = std::numeric_limits<uint64_t>::max();
for (auto cfd : *column_family_set_) {
// 跳过 step(2) 中处理过的 CF
if (cfd == cfd_to_skip) {
continue;
}
// 和 min_log_num 取较小值
if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) {
min_log_num = cfd->GetLogNumber();
}
}
return min_log_num;
}

FindObsoleteFiles

上述获得 min_log_number_to_keep 后,会生成一个 VersionEdit 对象 wal_deletion,它写入 MANIFEST 的过程中,会更新VersionSet::min_log_number_to_keep_

1
2
3
4
5
6
7
8
9
10
11
12
13
// in function: VersionSet::ProcessManifestWrites
uint64_t last_min_log_number_to_keep = 0;
for (const auto& e : batch_edits) {
//... other code
if (e->has_min_log_number_to_keep_) {
last_min_log_number_to_keep =
std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
}
}

if (last_min_log_number_to_keep != 0) {
MarkMinLogNumberToKeep(last_min_log_number_to_keep);
}

在 FindObsoleteFiles 函数中,会根据 VersionSet::min_log_number_to_keep_ 来判断一个 WAL 的生命周期是否已经结束:已经结束的则加入 JobContextl::log_delete_files 中,在 PurgeObsoleteFiles 函数中从磁盘上删除该文件,再释放 JobContextl::logs_to_free_ 中记录的 LogWriter 内存。

到此,一个 WAL 生命周期才算结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void VersionSet::MarkMinLogNumberToKeep(uint64_t number) {
if (min_log_number_to_keep_.load(std::memory_order_relaxed) < number) {
min_log_number_to_keep_.store(number, std::memory_order_relaxed);
}
}

void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
bool no_full_scan) {
//... ignore other code...
job_context->log_number = versions_->min_log_number_to_keep();
//...
log_write_mutex_.Lock();

if (alive_log_files_.empty() || logs_.empty()) {
log_write_mutex_.Unlock();
return;
}

if (!alive_log_files_.empty() && !logs_.empty()) {
// logfile_num <= min_log_number 都应该删除
uint64_t min_log_number = job_context->log_number;
size_t num_alive_log_files = alive_log_files_.size();

// 1. 检测新的 obsoleted WAL files,即生命周期已结束的
while (alive_log_files_.begin()->number < min_log_number) {
auto& earliest = *alive_log_files_.begin();

// 是否回收,不收回则加入待删除队列
if (immutable_db_options_.recycle_log_file_num > log_recycle_files_.size()) {
log_recycle_files_.push_back(earliest.number);
} else {
job_context->log_delete_files.push_back(earliest.number);
}
alive_log_files_.pop_front();
}
log_write_mutex_.Unlock();
mutex_.Unlock();

// 2. 检测待释的 LogWriter
log_write_mutex_.Lock();
while (!logs_.empty() && logs_.front().number < min_log_number) {
auto& log = logs_.front();
if (log.IsSyncing()) {
log_sync_cv_.Wait();
continue;
}
logs_to_free_.push_back(log.ReleaseWriter());
logs_.pop_front();
}
}

job_context->logs_to_free = logs_to_free_;
logs_to_free_.clear();
log_write_mutex_.Unlock();

// 3. 回收
mutex_.Lock();
job_context->log_recycle_files.assign(log_recycle_files_.begin(),
log_recycle_files_.end());
}