本文是 WAL/MemTable 生命周期的完结篇,主要关注 FlushJob 执行流程以及如何确定哪些 WAL/MmmTable 可以安全地删除。
PickMemTable FlushJob 首先需要从 ColumnFamilyData::imm_ 中挑选出本次所需的 FlushJob::mems_ ,这部分功能由 PickMemTable
函数实现。最大可以选择的 MemTable::id_ 是 max_memtable_id_ ,该参数值的设置见由上一节 WAL、MemTable 的生命周期管理(2) 。
每个 FlushJob 选中的 mems_ 是基于 MemTable 的创建时间排序,即 mems_[0]
是最早创建的,mems.back()
是最晚创建的。具体从 imm_ 获取 mems_ 的逻辑由 PickMemtablesToFlush
函数实现。
每次 Flush 操作都产生一个 VersionEdit。FlushJob 将 VersionEdit 信息记录在 mems_[0]->edit_
中,edit_ 中 log_number_
主要是用于追踪 WAL 的生命周期,其值是max_next_log_number
,如何与WAL生命周期产生联系可见后文分析。
此外,FlushJob::meta_
中记录着成功 Flush 后生成的 level0 SST 文件的元信息数据。
PickMemTable 核心代码逻辑如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 void FlushJob::PickMemTable () { db_mutex_->AssertHeld (); pick_memtable_called = true ; uint64_t max_next_log_number = 0 ; cfd_->imm ()->PickMemtablesToFlush (max_memtable_id_, &mems_, &max_next_log_number); if (mems_.empty ()) { return ; } MemTable* m = mems_[0 ]; edit_ = m->GetEdits (); edit_->SetPrevLogNumber (0 ); edit_->SetLogNumber (max_next_log_number); edit_->SetColumnFamily (cfd_->GetID ()); meta_.fd = FileDescriptor (versions_->NewFileNumber (), 0 , 0 ); meta_.epoch_number = cfd_->NewEpochNumber (); base_ = cfd_->current (); base_->Ref (); }
PickMemtablesToFlush SwitchMemTable 函数在调用 MemTableList::Add
函数向 imm_ 中插入新的 ImmtableMemTable 时,是在 current->memlist_
头部插入节点,因此 memlist_ 尾部是最旧的 ImmutableMemTable,头部是最新的。因此需要逆序遍历 memlist_,才能获得按照 MemTable 创建顺序的 FlushJob::mems_
。
当前可能多个 FlushJob 在并发地执行,因此从 current->memtables_
选择 MemTable 时,需要过滤掉不符合条件的:
MemTable::id_ < max_memtable_id
MemTable::flush_in_progress_ 为 false: 即当前没有被其他 FlushJob 选中
由于 Flush MemTable 以及后续的 COMMIT 操作都需要保持顺序,因此,如果发现 flush_in_progress_ 为 true,则中断本次 Pick 操作,这样能保证选中的 mems_
是连续创建的。
在 SwitchMemTable 函数中,将 old_mem 添加到 current->memlist_
之前会先调用 MemTableList::FlushRequested
,将 flush_requested_ 设置为 true,表示该 CF 的 memlist_ 当前等待 Flush 操作。接着在 MemTable::Add
函数中会递增 num_flush_not_started_ ,表示 memlist_ 中尚未被 Picked 的 MemTable 数量。
当 flush_requested_ 为 true 并且 num_flush_not_started_ > 0,则 IsFlushPending
返回 true,则该 CF 可以触发下一次 FlushJob。
直到 PickMemtablesToFlush
函数执行完,如果 num_flush_not_started_ 为 0, 会将 flush_requested_
设置为 false,表示已经所有待 Picked 的 MemTable 都已选中了,准备进行 Flush。如果后续 Flush 流程失败,会调用 RollbackMemtableFlush
函数进行回滚,恢复状态。
最后,有个基于 RAII 设计的类 AutoThreadOperationStageUpdater
,用来表征当前 FlushJob 线程的执行状态。 该函数核心代码及注释如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 void MemTableList::PickMemtablesToFlush (uint64_t max_memtable_id, autovector<MemTable*>* ret, uint64_t * max_next_log_number) { AutoThreadOperationStageUpdater stage_updater ( ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH) ; const auto & memlist = current_->memlist_; for (auto it = memlist.rbegin (); it != memlist.rend (); ++it) { MemTable* m = *it; if (m->GetID () > max_memtable_id) { break ; } if (!m->flush_in_progress_) { num_flush_not_started_--; if (num_flush_not_started_ == 0 ) { imm_flush_needed.store (false , std::memory_order_release); } m->flush_in_progress_ = true ; if (max_next_log_number) { *max_next_log_number = std::max (m->GetNextLogNumber (), *max_next_log_number); } ret->push_back (m); } else if (!ret->empty ()) { break ; } } if (num_flush_not_started_ == 0 ) { flush_requested_ = false ; } }
RollbackMemtableFlush 如果后续的 Flush 任务执行失败,mems_ 不会从 memlist_
中删除,则只需要把 PickMemtablesToFlush 中修改的状态重置即可,等待下一次 Flush。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void MemTableList::RollbackMemtableFlush (const autovector<MemTable*>& mems, uint64_t ) { AutoThreadOperationStageUpdater stage_updater ( ThreadStatus::STAGE_MEMTABLE_ROLLBACK) ; for (MemTable* m : mems) { m->flush_in_progress_ = false ; m->flush_completed_ = false ; m->edit_.Clear (); num_flush_not_started_++; } imm_flush_needed.store (true , std::memory_order_release); }
FlushJob::Run FlushJob 目前(branch-8.2.fb)有两种实现:
默认将 MemTables 中的数据写入 level0,
设置 experimental_mempurge_threshold > 0 ,开启内存裁剪(Memory Purge)。
本文只讲解默认实现,MemoryPurge 后续有空再说。默认的 FlushJob::Run 流程主要有两个部分:
根据选中的 mems_ 生成 SST 并写入 level0,由 file_meta
记录该 SST 文件元数据
如果 step(1) 成功则更新 CF 的 version,并将本次更新记录 FlushJob::edit_ 序列化后写入到 MANIFEST 中
如果 step(1) 不成功,则 RollbackMemtableFlush
进行回滚。
FlushJob::Run
核心代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 Status FlushJob::Run (LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta, bool * switched_to_mempurge) { db_mutex_->AssertHeld (); AutoThreadOperationStageUpdater stage_run (ThreadStatus::STAGE_FLUSH_RUN) ; if (mems_.empty ()) { return Status::OK (); } Status s = WriteLevel0Table (); if (!s.ok ()) { cfd_->imm ()->RollbackMemtableFlush (mems_, meta_.fd.GetNumber ()); } else if (write_manifest_) { s = cfd_->imm ()->TryInstallMemtableFlushResults ( cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_, meta_.fd.GetNumber (), &job_context_->memtables_to_free, db_directory_, log_buffer_, &committed_flush_jobs_info_, true ); } if (s.ok () && file_meta != nullptr ) { *file_meta = meta_; } return s; }
WriteLevel0Table WriteLevel0Table 核心部分主要有三个部分,每个部分都很复杂,值得单独开一期讲解,这里主要讲解下每个部分的作用:
NewMergingIterator: 用于将输入的多个 MemTable/SST 合并成一个有序的数据流
在 merger_test.cc
的测试用例中,对于多批次随机数据,NewMergingIterator
合并多个数据流后表现和 VectorIterator
表现的一致,都是有序输出。
多批输入数据流打平后再输入给 VectorIterator,在 VectorIterator 构造函数中对输入的数据流基于 std::sort
排序,因此 VectorIterator 的输出是全局有序的。而 NewMergingIterator 的输出和 VectorIterator 是一致的,则说明 NewMergingIterator 的作用是合并输入流并使输出有序,不过更加高效。
range_del_iter
NewMergingIterator 只是将输入数据排序好,但是通常上层应用也会进行删除操作。在 MemTable 中,专门为 DeleteRange 操作单独分配了个 range_del_table_。当插入数据是 kTypeRangeDeletion 类型时,则将数据写入 range_del_table_。因此,在遍历 MemTable 时,也需要考虑 range_del_iter 来过滤那些已经被删除的数据。
1 2 3 4 5 std::unique_ptr<MemTableRep>& table = type == kTypeRangeDeletion ? range_del_table_ : table_;
关于 range_delete 操作的详细设计可以参考 DeleteRange-Implementation ,后续有时间再讲解其中细节。
BuildTable
内部流程:CompactionIterator
基于 {NewMergingIterator, range_del_iter} 过滤已删除数据并输出全局有序的数据,可以通过 CompactionIterator::Next 进行迭代遍历,获得输出 {key, value}
,再用 TableBuilder::Add 函数将该 {key, value} 添加到 level0 的新 SST 文件中。运行结束,新生成的 level0 SST 的元信息保存在 meta_ 中。
实际上,Compaction 的核心流程也是这三个部分,逻辑流程都是一样的,后续会专注分析每个具体的部分。WriteLevel0Table
核心流程如下代码注释:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 Status FlushJob::WriteLevel0Table () { AutoThreadOperationStageUpdater stage_updater ( ThreadStatus::STAGE_FLUSH_WRITE_L0) ; db_mutex_->AssertHeld (); Status s; std::vector<BlobFileAddition> blob_file_additions; { auto write_hint = cfd_->CalculateSSTWriteHint (0 ); Env::IOPriority io_priority = GetRateLimiterPriorityForWrite (); db_mutex_->Unlock (); std::vector<InternalIterator*> memtables; std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>> range_del_iters; ReadOptions ro; ro.total_order_seek = true ; ro.io_activity = Env::IOActivity::kFlush; Arena arena; for (MemTable* m : mems_) { auto * range_del_iter = m->NewRangeTombstoneIterator ( ro, kMaxSequenceNumber, true ); if (range_del_iter != nullptr ) { range_del_iters.emplace_back (range_del_iter); } } { ScopedArenaIterator iter ( NewMergingIterator(&cfd_->internal_comparator(), memtables.data(), static_cast <int >(memtables.size()), &arena)) ; int64_t _current_time = 0 ; auto status = clock_->GetCurrentTime (&_current_time); const uint64_t current_time = static_cast <uint64_t >(_current_time); uint64_t oldest_key_time = mems_.front ()->ApproximateOldestKeyTime (); uint64_t oldest_ancester_time = std::min (current_time, oldest_key_time); meta_.oldest_ancester_time = oldest_ancester_time; meta_.file_creation_time = current_time; uint64_t num_input_entries = 0 ; uint64_t memtable_payload_bytes = 0 ; uint64_t memtable_garbage_bytes = 0 ; IOStatus io_s; const std::string* const full_history_ts_low = (full_history_ts_low_.empty ()) ? nullptr : &full_history_ts_low_; TableBuilderOptions tboptions ( *cfd_->ioptions(), mutable_cf_options_, cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(), output_compression_, mutable_cf_options_.compression_opts, cfd_->GetID(), cfd_->GetName(), 0 , false , TableFileCreationReason::kFlush, oldest_key_time, current_time, db_id_, db_session_id_, 0 , meta_.fd.GetNumber()) ; const SequenceNumber job_snapshot_seq = job_context_->GetJobSnapshotSequence (); const ReadOptions read_options (Env::IOActivity::kFlush) ; s = BuildTable (dbname_, versions_, db_options_, tboptions, file_options_, read_options, cfd_->table_cache (), iter.get (), std::move (range_del_iters), &meta_, &blob_file_additions, existing_snapshots_, earliest_write_conflict_snapshot_, job_snapshot_seq, snapshot_checker_, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats (), &io_s, io_tracer_, BlobFileCreationReason::kFlush, seqno_to_time_mapping_, event_logger_, job_context_->job_id, io_priority, &table_properties_, write_hint, full_history_ts_low, blob_callback_, base_, &num_input_entries, &memtable_payload_bytes, &memtable_garbage_bytes); if (s.ok () && output_file_directory_ != nullptr && sync_output_directory_) { s = output_file_directory_->FsyncWithDirOptions ( IOOptions (), nullptr , DirFsyncOptions (DirFsyncOptions::FsyncReason::kNewFileSynced)); } db_mutex_->Lock (); } base_->Unref (); const bool has_output = meta_.fd.GetFileSize () > 0 ; if (s.ok () && has_output) { edit_->AddFile (0 , meta_); } mems_[0 ]->SetFlushJobInfo (GetFlushJobInfo ()); return s; }
TryInstallMemtableFlushResults 如果上一步 WriteLevel0Table
成功,则需要将 Flush 的信息 COMMIT 到 MANIFEST。实际上 MANIFEST 可以视为 transaction log,保存着每次 Flush/Compaction 的记录。
当前可能有多个线程在并发执行 Flush 操作,但是只能有一个线程能 COMMIT:率先进入的此函数的线程,它先将 commit_in_progress_
设置为 true,来阻止后来的线程。因此判断 commit_in_progress_ 是否为 true 是个分界点:
之前:所有 Flush 线程都都先更新 mems[i] 的 {flush_completed_, file_number_} 状态
flush_completed_ 表示 mems[i] 已经成功 Flush,file_number_ 则是指向 Flush 后生成的 SST 文件。 由于 mems 是 Flush 线程的局部变量,因此这一步操作线程安全。
之后
由于只有一个线程能进行 COMMIT。因此 COMMIT 对象是 memlist_ 中所有已经 Flushed MemTable,即进入此函数的线程会将所有 Flush 线程中的 mems 一起 COMMIT。 后面进入此函数的线程发现 commit_in_progress_ 为 false 则直接返回。
其次,COMMIT 也需要按照 MemTable 的创建顺序,即 MemTable::id_
递增的顺序,因此需要逆序遍历 memlist_,并且如果中途某个 MemTable 在 WriteLevel0Table 中失败了,则需要中断当前 COMMIT 操作,只能将前面成功 Flush 的连续 MemTables 的 VersionEdit 写入 MANIFEST,并在回调函数 RemoveMemTablesOrRestoreFlags
中将这部分 MemTable 从 memlist_ 中删除。
因此,Flush 失败的 MemTable 就会变成 oldeest memtable,等待下一次 FlushJob 再次选择它,重复上述执行流程。
这部分代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 Status MemTableList::TryInstallMemtableFlushResults ( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, autovector<MemTable*>* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer, std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info, bool write_edits) { AutoThreadOperationStageUpdater stage_updater ( ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS) ; mu->AssertHeld (); const ReadOptions read_options (Env::IOActivity::kFlush) ; Status s; for (size_t i = 0 ; i < mems.size (); ++i) { mems[i]->flush_completed_ = true ; mems[i]->file_number_ = file_number; } if (commit_in_progress_) { return s; } commit_in_progress_ = true ; while (s.ok ()) { auto & memlist = current_->memlist_; if (memlist.empty () || !memlist.back ()->flush_completed_) { break ; } uint64_t batch_file_number = 0 ; size_t batch_count = 0 ; autovector<VersionEdit*> edit_list; for (auto it = memlist.rbegin (); it != memlist.rend (); ++it) { MemTable* m = *it; if (!m->flush_completed_) { break ; } if (it == memlist.rbegin () || batch_file_number != m->file_number_) { batch_file_number = m->file_number_; edit_list.push_back (&m->edit_); std::unique_ptr<FlushJobInfo> info = m->ReleaseFlushJobInfo (); if (info != nullptr ) { committed_flush_jobs_info->push_back (std::move (info)); } } batch_count++; } if (batch_count > 0 ) { uint64_t min_wal_number_to_keep = PrecomputeMinLogNumberToKeepNon2PC (vset, *cfd, edit_list); VersionEdit wal_deletion; wal_deletion.SetMinLogNumberToKeep (min_wal_number_to_keep); if (vset->db_options ()->track_and_verify_wals_in_manifest) { if (min_wal_number_to_keep > vset->GetWalSet ().GetMinWalNumberToKeep ()) { wal_deletion.DeleteWalsBefore (min_wal_number_to_keep); } } edit_list.push_back (&wal_deletion); const auto manifest_write_cb = [this , cfd, batch_count, log_buffer, to_delete, mu](const Status& status) { RemoveMemTablesOrRestoreFlags (status, cfd, batch_count, log_buffer, to_delete, mu); }; s = vset->LogAndApply (cfd, mutable_cf_options, read_options, edit_list, mu, db_directory, false , nullptr , manifest_write_cb); } } commit_in_progress_ = false ; return s; }
PrecomputeMinLogNumberToKeepNon2PC PrecomputeMinLogNumberToKeepNon2PC 函数返回的 min_wal_number_to_keep
用来删除满足 logfile_num < min_wal_number_to_keep 条件的 WAL。由于 min_wal_number_to_keep 也写入 VersionEdit 并最终序列化到 MANIFEST,因此在 Recovery 过程中,如果磁盘上存在部分这些 WAL 也会被忽略,不会被加载。
min_log_number_to_keep 的计算分为三个阶段:
每个 FlushJob::edit_
中记录的 log_number_
是在 FlushJob::PickMemTable
函数中设置,指向了每个 CF 最大可以删除的 WAL
因为 CF memlist_ 指向的 WAL 对于当前 CF 来说都是可以删除的,因此 FlushJob::mems_[-1]->GetNextLogNumber() 就是FlushJob::edit_::log_numer_ 能取到的最大值。通过迭代不同 FlushJob 的 edit_,则可以获得所有 FlushJobs 中最小的 log_number_ ,作为 min_log_number_to_keep
ColumnFamilyData::log_number_ 字段记录了 CF 当前指向的 WAL 文件。因此小于 ColumnFamilyData::log_number_
的 WAL 对于当前 CF 来说都可以删除。
这一步和 step(1) 是互斥的,如果没有成功执行的 FlushJobs,则 step(1) 中不会修改 min_log_number_to_keep 的值,此时就会进入 step(2),来获得他们中最小的 min_log_number_to_keep
;
再检查没有执行 Flush 的或者 Flush 失败的的 CFs 的 ColumnFamilyData::log_number_
字段,和 min_log_number_to_keep
进行比较最小值
上述三步结束,所得的 min_log_number_to_keep
即生命周期没有结束的最小 WAL:所有满足 logfile_num < min_log_number_to_keep
的 WAL 可以安全地从磁盘上删除了,Recovery 期间也不会被加载了,即生命周期可以结束了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 uint64_t PrecomputeMinLogNumberToKeepNon2PC ( VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush, const autovector<autovector<VersionEdit*>>& edit_lists) { uint64_t min_log_number_to_keep = std::numeric_limits<uint64_t >::max (); for (const auto & edit_list : edit_lists) { uint64_t log = 0 ; for (const auto & e : edit_list) { if (e->HasLogNumber ()) { log = std::max (log, e->GetLogNumber ()); } } if (log != 0 ) { min_log_number_to_keep = std::min (min_log_number_to_keep, log); } } if (min_log_number_to_keep == std::numeric_limits<uint64_t >::max ()) { min_log_number_to_keep = cfds_to_flush[0 ]->GetLogNumber (); for (size_t i = 1 ; i < cfds_to_flush.size (); i++) { min_log_number_to_keep = std::min (min_log_number_to_keep, cfds_to_flush[i]->GetLogNumber ()); } } std::unordered_set<const ColumnFamilyData*> flushed_cfds ( cfds_to_flush.begin(), cfds_to_flush.end()) ; min_log_number_to_keep = std::min (min_log_number_to_keep, vset->PreComputeMinLogNumberWithUnflushedData (flushed_cfds)); return min_log_number_to_keep; } uint64_t PreComputeMinLogNumberWithUnflushedData ( const ColumnFamilyData* cfd_to_skip) const { uint64_t min_log_num = std::numeric_limits<uint64_t >::max (); for (auto cfd : *column_family_set_) { if (cfd == cfd_to_skip) { continue ; } if (min_log_num > cfd->GetLogNumber () && !cfd->IsDropped ()) { min_log_num = cfd->GetLogNumber (); } } return min_log_num; }
FindObsoleteFiles 上述获得 min_log_number_to_keep
后,会生成一个 VersionEdit 对象 wal_deletion,它写入 MANIFEST 的过程中,会更新VersionSet::min_log_number_to_keep_
1 2 3 4 5 6 7 8 9 10 11 12 13 uint64_t last_min_log_number_to_keep = 0 ;for (const auto & e : batch_edits) { if (e->has_min_log_number_to_keep_) { last_min_log_number_to_keep = std::max (last_min_log_number_to_keep, e->min_log_number_to_keep_); } } if (last_min_log_number_to_keep != 0 ) { MarkMinLogNumberToKeep (last_min_log_number_to_keep); }
在 FindObsoleteFiles 函数中,会根据 VersionSet::min_log_number_to_keep_
来判断一个 WAL 的生命周期是否已经结束:已经结束的则加入 JobContextl::log_delete_files
中,在 PurgeObsoleteFiles
函数中从磁盘上删除该文件,再释放 JobContextl::logs_to_free_
中记录的 LogWriter
内存。
到此,一个 WAL 生命周期才算结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 void VersionSet::MarkMinLogNumberToKeep (uint64_t number) { if (min_log_number_to_keep_.load (std::memory_order_relaxed) < number) { min_log_number_to_keep_.store (number, std::memory_order_relaxed); } } void DBImpl::FindObsoleteFiles (JobContext* job_context, bool force, bool no_full_scan) { job_context->log_number = versions_->min_log_number_to_keep (); log_write_mutex_.Lock (); if (alive_log_files_.empty () || logs_.empty ()) { log_write_mutex_.Unlock (); return ; } if (!alive_log_files_.empty () && !logs_.empty ()) { uint64_t min_log_number = job_context->log_number; size_t num_alive_log_files = alive_log_files_.size (); while (alive_log_files_.begin ()->number < min_log_number) { auto & earliest = *alive_log_files_.begin (); if (immutable_db_options_.recycle_log_file_num > log_recycle_files_.size ()) { log_recycle_files_.push_back (earliest.number); } else { job_context->log_delete_files.push_back (earliest.number); } alive_log_files_.pop_front (); } log_write_mutex_.Unlock (); mutex_.Unlock (); log_write_mutex_.Lock (); while (!logs_.empty () && logs_.front ().number < min_log_number) { auto & log = logs_.front (); if (log.IsSyncing ()) { log_sync_cv_.Wait (); continue ; } logs_to_free_.push_back (log.ReleaseWriter ()); logs_.pop_front (); } } job_context->logs_to_free = logs_to_free_; logs_to_free_.clear (); log_write_mutex_.Unlock (); mutex_.Lock (); job_context->log_recycle_files.assign (log_recycle_files_.begin (), log_recycle_files_.end ()); }