WriteThread 如何使用 Pipeline 提升写入吞吐,降低延迟

在前面几篇讲解了从 WriteBatch 的内部序列化流程 和 WriteThread 如何控制并发写入的基本流程,本文进一步讲解 WriteThread 如何使用 PipelinedWrite 来提升写吞吐,其中和前文相似的逻辑不再细说。

通过设置选项 DBOptions::enable_pipelined_write = true 来开启,开启后整个db的 WriteThread 都是通过 PipelinedWrite 方式控制写入流程。

1
2
3
4
if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
log_ref, disable_memtable, seq_used);
}

WAL: Pipelined Write

在之前的 WriteThread 写入控制流程中, 是依次有序地将数据写入 WAL 和 MemTable,那么如何使得 WAL 和 MemTable 的写入操作并行起来?

在多个 writers 并发写入同一个 RocksdB 实例时,只要前一个 writer_group0 将数据写入 WAL 了,那么下一个 writer_group1 就不必等待 writer_group0 完成 MemTable 写入流程,writer_group1 就可以开始自己的 WAL 写入流程: 因此 writer_group0 的 MemTable 写流程和 writer_group1 的 WAL 写流程就可以并行起来。

上述就是 PipelinedWrite 的核心思想。因此 PipelinedWrite 需要开启 WAL,来保证 writer_group0 写入 WAL 的数据不会丢。

MemTable: Concurrent Write

已经写入 WAL 的数据不会丢,那么是否可以在写入 MemTable 时,允许多个 writers 并发地写 MemTable,而不是原先由 leader-writer 来完成 memtable_write_group 的写入?

选项 DBOptions::allow_concurrent_memtable_write 默认值为 true,即默认支持并发写入 MemTable,但实际上当前只有基于 skiplist 实现的 MemTable 才支持这一特性( SkipListRep 也是 MemTable 的默认实现)。

因此,在写路径使用 PipelineWrite 实现时,实际上就有了两种优化:

  • WriteGroup 之间可以 Pipeline
  • MemTable 可以并发写入

从实现角度,可以粗略地将有 PipelinedWriteImpl 函数划分成三个阶段(任务):

  • T1:完成当前 write_group0 的 WAL 写入流程,

  • T2:通知 write_group1 开启 WAL 写入流程,即 write_group1 无需等待 write_group0 完成 MemTable 写入流程才开启自己的 WAL 写入流程;

  • T3:write_group0 的 WAL 写入流程完成后,需要启动 write_group0 的并发写 MemTable 流程。

    实际上,在 T3 阶段可能会携带其他 write_group 的 writers 一起进入 T3 阶段,来提升性能,详情见后文。

由于后两个任务都需要等待第一个任务完成,因此三个任务的分界点就可以设置在 WrriteThread::ExitAsBatchGroupLeader 函数中: T1 在写 WAL 期间需要整个 RocksDB 只有一个 leader-writer,在 T1 任务结束后就可以不再担任 leader 角色。此时有两件事需要做 1) 挑选出下一个 WriteGroup 中的 leader-writer,让 T2 任务可以 pipeline 执行;2) 开启当前 WriteGroup 并发写入 MemTable 流程。

整个设计如下图:

write_path_4_1

T1

T1 任务的流程和 WriteThread 如何控制并发写入流程 的基本一致,都需要经过 JoinBatchGroup -> EnterAsBatchGroupLeader -> ExitAsBatchGroupLeader, 只是将原来 leader-writer 作用范围(WriteToWAL 和 MemTable::Add) 范围缩小到了仅有 WALs。

因此,只有当 JoinBatchGroup 函数返回的 writer 是 leader-writer 时才会进入 WAL 写流程,而且当前 write_group0 的其他 writers 和后续 write_group1 等都会阻塞在 JoinBatchGroupl 函数处。

具体的阻塞逻辑可以参考 WriteThread 如何自适应优化线程同步。PipelineWriteImpl 的函数入口代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable, /*_batch_cnt=*/0,
/*_pre_release_callback=*/nullptr);
write_thread_.JoinBatchGroup(&w);
if (w.state == WriteThread::STATE_GROUP_LEADER) {
WriteThread::WriteGroup wal_write_group;
last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);

if (w.status.ok() && !write_options.disableWAL) {
// 将当前 wal_write_group 数据写入 WAL
io_s =
WriteToWAL(wal_write_group, log_context.writer, log_used,
log_context.need_log_sync, log_context.need_log_dir_sync,
current_sequence, log_file_number_size);
w.status = io_s;
}

//...
write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
}

因此,PipelinedWrite 如何关联前后 writer_group 的核心就在 ExitAsBatchGroupLeader 函数中了。

ExitAsBatchGroupLeader

在前文 WriteThread 如何控制并发写入流程 已经详细讲解了 ExitAsBatchGroupLeader 函数在 enable_pipelined_write == false 时的执行流程,下面来讲另一个分支。主要有两个任务:

  • T2:选出下一个 write_group1 中的 leader-writer,使其也进入写 WAL 流程

  • T3:由 CompleteLeaderCompleteFollower 函数提前将不用写入 MemTable 的 writrs 从 write_group0 链表中删除。将 write_group0 中剩余的 writers 移动到 newest_memtable_writer_ 指向的链表。

    只有 leader-memtable-writer 才能开启并发写 MemTable 的流程,w->state 状态会变成 STATE_MEMTABLE_WRITER_LEADER

    如果此时多个 write_group(wg0,wg1)的 leader-writer 都想变成 leader-memtable-writer,假设最终 wg0->leader 成功,则新的 memtable_write_group 实际上会包含 (wg0, wg1),并且 wg1->link_older = wg0 的方式串联起来。

    接着,leader-memtable-writer 通过 LaunchParallelMemTableWriters 函数来启动 memtable_write_group 并发写 MemTables 的流程,follower-writer 才会解除阻塞等待, 进入 MemTable::Add 写入流程。

整体代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
Status& status) {
Writer* leader = write_group.leader;
Writer* last_writer = write_group.last_writer;
assert(leader->link_older == nullptr);

if (enable_pipelined_write_) {
Writer dummy;
Writer* head = newest_writer_.load(std::memory_order_acquire);
// 1. 在当前 write_group0 和下一个 write_group1 中间插入一个 dummy 节点
if (head != last_writer ||
!newest_writer_.compare_exchange_strong(head, &dummy)) {

CreateMissingNewerLinks(head);
assert(last_writer->link_newer != nullptr);
last_writer->link_newer->link_older = &dummy;
dummy.link_newer = last_writer->link_newer;
}

// 3. 提前删除不用写 MemTable 的 writers 节点
for (Writer* w = last_writer; w != leader;) {
Writer* next = w->link_older;
w->status = status;
if (!w->ShouldWriteToMemtable()) {
CompleteFollower(w, write_group);
}
w = next;
}
if (!leader->ShouldWriteToMemtable()) {
CompleteLeader(write_group);
}

// 3.1 (T3) 将 write_group0 中剩余的 writers 转移到
// newest_memtable_writer_ 指向的链表
if (write_group.size > 0) {
if (LinkGroup(write_group, &newest_memtable_writer_)) {
// 设置 leader-memtable-writer
SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
}
}

// 4. (T2) 选取 writ_group1 中 leader-writer
head = newest_writer_.load(std::memory_order_acquire);
if (head != &dummy ||
!newest_writer_.compare_exchange_strong(head, nullptr)) {
CreateMissingNewerLinks(head);
Writer* new_leader = dummy.link_newer;
assert(new_leader != nullptr);
new_leader->link_older = nullptr;
SetState(new_leader, STATE_GROUP_LEADER);
}

// 5. 等待 leader-memtable-writer 调用 LaunchParallelMemTableWriters
AwaitState(leader,
STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_WRITER |
STATE_COMPLETED,
&eabgl_ctx);
} else {
//... enable_pipelined_write_ == false
}
}

CompleteFollower

WriteThread 是逆序从 [last_writer, leader) 来删除不需要写入 MemTable 的 writers-follower 节点。这个 write_group 的 writers list 首尾节点是 leaderlast_writer。 CompleteFollower 删除的都是 followers 节点,因此在删除时需要注意下是不是尾部节点 last_writer 即可。

这里只需要将 w 从 write_group 中剔除即可,并不需要 delete w,而是将 w->state 状态设置为 STATE_COMPLETED。这是因为 w 这份资源由 w 所属的线程去释放,将其状态更改为 STATE_COMPLETED 后,w 所属的线程就会解除阻塞,去释放这份资源,回到应用层。

CompleteLeader 逻辑也类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
assert(write_group.size > 1);
assert(w != write_group.leader);
if (w == write_group.last_writer) {
w->link_older->link_newer = nullptr;
write_group.last_writer = w->link_older;
} else {
w->link_older->link_newer = w->link_newer;
w->link_newer->link_older = w->link_older;
}
write_group.size -= 1;
SetState(w, STATE_COMPLETED);
}

LinkGroup

LinkGroup 函数的作用是将当前 write_group0 中需要向 MemTable 写数据的 writers 转移到 newest_memtable_writer_ 中来,并且保持 write_group0 中的顺序。

  1. 先断开 (leader, last_writer] 区间所有的 w->link_newer/prev 指针(leader->link_newer 已经是 NULL)。这是因为 LinkGroup 可能会将多个 WriteGroup 的 writers 串到一个 newest_memtable_writer_ 指针中,后续需要重新建立 prev 指针。

  2. 通过 compare_exchange_weak 尝试让 newest_memtable_writer_ 指向 last_writer。多个并发的 WriteGroup 通过 link_older 指针串联起来。

    因此,在 ExitAsBatchGroupLeader 函数末尾会有个 AwaitState(leader, state) 代码,用于阻塞那些被串在 memtable_write_group 中的 follower-memtable-writers,等待 leader-memtable-writer 调用 LaunchParallelMemTableWriters 函数才能解除阻塞。

原理和 LinkOne 函数差不多,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bool WriteThread::LinkGroup(WriteGroup& write_group,
std::atomic<Writer*>* newest_writer) {
Writer* leader = write_group.leader; // head
Writer* last_writer = write_group.last_writer; // tail
Writer* w = last_writer;
// 1. 断开 [last_writer, leader) 区间 w->link_newer 指针
while (true) {
w->link_newer = nullptr;
w->write_group = nullptr;
if (w == leader) {
break;
}
w = w->link_older;
}
// 2. 将 newest_writer 指向 last_writer
Writer* newest = newest_writer->load(std::memory_order_relaxed);
while (true) {
leader->link_older = newest; // leader->next = newest
if (newest_writer->compare_exchange_weak(newest, last_writer)) {
// newest_writer = last_writer
return (newest == nullptr);
}
}
}

T2

T1 在 ExitAsBatchGroupLeader 函数中已经选出 write_group1 的 leader-writer, 因此不会等待 write_group0 完成,T2 就会直接启动。重复write_group0 的 T1 流程。

T3

write_group0 执行完 T1 任务后就进入 T3 阶段,准备并发写 MemTables 操作。这个阶段的写流程如下:

leader-memtable-writer 会通过 EnterAsMemTableWriter 获取 memtable_write_group

  • 如果 memtable_write_group 的 writers 数量大于 1 则调用 LaunchParallelMemTableWriters 函数启动所有的 writers 进入 MemTable::Add 阶段。

  • 否则,当前 leader-memtable-writer 就直接写 MemTable。

    写 MemTable 的操作是由 WriteBatchInternal::InsertInto 函数完成。

而 follower-memtable-writers 会一直阻塞等待在两个的地方:

  • 当前 write_group0 的 followers 会阻塞在 PipelineWriteImpl 函数开始的 JoinBatchGroup 处
  • 其他 write_group1 的 followers 会阻塞在 ExitAsBatchGroupLeader 函数末尾的 AwaitState 处

因此,当 leader-memtable-writer 调用 LaunchParallelMemTableWriters 函数,则会解除上述两处阻塞状态的 writers,进入 T3 阶段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
WriteThread::WriteGroup memtable_write_group;

if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
// memtable-writer-leader reach here
write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
if (memtable_write_group.size > 1 &&
immutable_db_options_.allow_concurrent_memtable_write) {
write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
} else {
memtable_write_group.status = WriteBatchInternal::InsertInto(
memtable_write_group, w.sequence, column_family_memtables_.get(),
&flush_scheduler_, &trim_history_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
false /*concurrent_memtable_writes*/, seq_per_batch_, batch_per_txn_);
versions_->SetLastSequence(memtable_write_group.last_sequence);
write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
}
} else {
// memtable-writer-follower reach here
memtable_write_group.status.PermitUncheckedError();
}

EnterAsMemTableWriter

EnterAsMemTableWriter 函数和 EnterAsBatchGroupLeader 工作逻辑类似,从 newest_memtable_writer_ 中取出memtable-writers,组成一个由 [leader, last_writer] 组成的 memtable_write_group。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void WriteThread::EnterAsMemTableWriter(Writer* leader,
WriteGroup* write_group) {
size_t size = WriteBatchInternal::ByteSize(leader->batch);
//..
leader->write_group = write_group;
write_group->leader = leader;
write_group->size = 1;
Writer* last_writer = leader;

if (allow_concurrent_memtable_write_ && !leader->batch->HasMerge()) {
Writer* newest_writer = newest_memtable_writer_.load();
CreateMissingNewerLinks(newest_writer);

Writer* w = leader;
while (w != newest_writer) {
assert(w->link_newer);
w = w->link_newer;

if (w->batch == nullptr) {
break;
}

if (w->batch->HasMerge()) {
break;
}

w->write_group = write_group;
last_writer = w;
write_group->size++;
}
}

write_group->last_writer = last_writer;
write_group->last_sequence =
last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1;
}

LaunchParallelMemTableWriters

LaunchParallelMemTableWriters 函数即遍历 memtable_write_group 中所有的 writers,将其 w->state 设置为 STATE_PARALLEL_MEMTABLE_WRITER 来解除阻塞。

1
2
3
4
5
6
void WriteThread::LaunchParallelMemTableWriters(WriteGroup* memtable_write_group) {
memtable_write_group->running.store(memtable_write_group->size);
for (auto w : *memtable_write_group) {
SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
}
}

CompleteParallelMemTableWriter

所有的 w->state 都在 LaunchParallelMemTableWriters 函数中被更改 STATE_PARALLEL_MEMTABLE_WRITER,此时已经没有主从 writers 的概念。

每个 writer 都并发地调用 WriteBatchInternal::InsertInto 函数向 MemTable 写数据。每个 writer 写完 MemTable,都会调用一次 CompleteParallelMemTableWriter 来检测自己是不是 memtable_write_group 中最后一个完成写 MemTable 的 writer。最后一个 memtable-writers 在 ExitAsMemTableWriter 函数中做善后工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
false /*seq_per_batch*/, 0 /*batch_cnt*/, true /*batch_per_txn*/,
write_options.memtable_insert_hint_per_batch);
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
MemTableInsertStatusCheck(w.status);
versions_->SetLastSequence(w.write_group->last_sequence);
write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
}
}

每次 memtable-writer 写完 MemTable,都会进入 CompleteParallelMemTableWritermemtable_write_group->running 减 1: 如果不是最后一个 memtable-writer,则阻塞等待最后一个 memtable-writer 完成后在 ExitAsMemTableWriter 函数中将所有的 writers 状态更改为 STATE_COMPLETED,则本次并发写 MemTable 操作结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bool WriteThread::CompleteParallelMemTableWriter(Writer* w) {
auto* write_group = w->write_group;
if (!w->status.ok()) {
std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
write_group->status = w->status;
}

if (write_group->running-- > 1) {
// we're not the last one
AwaitState(w, STATE_COMPLETED, &cpmtw_ctx);
return false;
}
// else we're the last parallel worker and should perform exit duties.
w->status = write_group->status;
// Callers of this function must ensure w->status is checked.
write_group->status.PermitUncheckedError();
return true;
}

ExitAsMemTableWriter

ExitAsMemTableWriter 函数和 ExitAsBatchGroupLeader 函数类似,主要有两个作用:

  • 选出下一轮 memtable_write_group1 的的 leader-memtable-writer,将其 state 设置为 STATE_MEMTABLE_WRITER_LEADER,使下一轮 memtable_write_group1 能尽快进入 T3 阶段;

  • 将本轮 memtable_write_group0 的所有 memtable-writers 的状态更改为 STATE_COMPLETED,解除阻塞在 CompleteParallelMemTableWriter 函数处阻塞的 writers,让他们释放资源返回应用层。

    这里需要让 leader-memtable-writer 最后一个退出,因为它拥有 memtable_write_group 所有权:如果它不是最后一个释放,会造成 ExitAsMemTableWriter 函数在执行过程中 coredump。

这部分代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void WriteThread::ExitAsMemTableWriter(Writer* /*self*/,
WriteGroup& write_group) {
Writer* leader = write_group.leader;
Writer* last_writer = write_group.last_writer;

// 1.
Writer* newest_writer = last_writer;
if (!newest_memtable_writer_.compare_exchange_strong(newest_writer,
nullptr)) {
CreateMissingNewerLinks(newest_writer);
Writer* next_leader = last_writer->link_newer;
assert(next_leader != nullptr);
next_leader->link_older = nullptr;
SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER);
}
// 2.
Writer* w = leader;
while (true) {
if (!write_group.status.ok()) {
w->status = write_group.status;
}
Writer* next = w->link_newer;
if (w != leader) {
SetState(w, STATE_COMPLETED);
}
if (w == last_writer) {
break;
}
assert(next);
w = next;
}
//! Note that leader has to exit last, since it owns the write group.
SetState(leader, STATE_COMPLETED);
}