WriteThread 如何控制并发写入流程

建议先阅读上一篇文章 WriteBatch 写入前的准备工作 了解一些基本概念

本篇博客讲解批量写流程的控制流程,了解这个流程之后再来讲解 PipelineWriteImpl 中优化。

整个写入流程大致如下:
write_path_2_1

WriteThread::JoinBatchGroup

JoinBatchGroup 函数充当着锁的作用:能成为 leader 的 writer 才能真正执行 WriteToWALMemTable::Add 操作,其他 writers 只能阻塞等待 leader-writer 完成。

WriteThread 中有个 newest_writer_ 字段总是指向最新插入的 Writer 对象:

  1. 如果新插入一个 Writer 对象 w,则会尝试让 newest_writer_ 指向该 w 。如果当前触发了 WriteStall 则会等待 WriteStall 被解除,才会再次尝试让 newest_writer_ 指向该 w
  2. 如果 w 插入时,newest_writer_ == NULL ,则 w 能顺利通过 JoinBatchGroup 函数,进入后续写入流程
  3. 否则,说明当前已经存在 leader-writer,则只能阻塞等待 w->state 更改为下面其中一种才能解除阻塞:
    • STATE_GROUP_LEADER: 变成下一个 write_group 的 leader,执行写入
    • STATE_COMPLETED: 说明已经存在一个 leader-writer 替自己完成了写入,解除阻塞后就可以直接返回应用层
    • STATE_MEMTABLE_WRITER_LEADER: …
    • STATE_PARALLEL_MEMTABLE_WRITER: …

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void WriteThread::JoinBatchGroup(Writer* w) {
assert(w->batch != nullptr);
// 1. 让 nnewest_writer_ 指向 w
bool linked_as_leader = LinkOne(w, &newest_writer_);

if (linked_as_leader) {
SetState(w, STATE_GROUP_LEADER);
}
if (!linked_as_leader) {
// 2. 阻塞等待 w->state & mask != 0
AwaitState(w,
STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
&jbg_ctx);
}
}

WriteThread::LinkOne

LinkOne的功能其实就是在链表头发插入一个节点,不过需要先检查一个前提 WriteStall

WriteStall

如果写操作触发了 WriteStall ,会向 WriterThread 中写入一个 write_stall_dummy_ 标志。因此在尝试将 newest_writer_ 指向新加入的 w 时都需要先检查下 newest_writer_ 是否等于 write_stall_dummy_。如果等于,有以下两种情况:

  • Writer::no_slowdown == false,这是默认情况,即基于条件变量 stall_cv_ 阻塞等待 WriteStall 解除。解除后则需要重新读取 newest_writer_

    条件变量一般配合 while 一起使用,防止虚假唤醒,因此 stall_cv_ 被唤醒后会 continue 并进行下一轮 while 循环,确认 newest_writer_ 不是 write_stall_dummy_ 才能继续下一步。

  • Writer::no_slowdown == true,此时将错误 Status::Incomplete 返回给上层应用,让其自行决定如何处理 WriteStall,这就类似网络编程的非阻塞行为。

compare_exchange_weak

当没有 WriteStall 或者有已经解除,则可以继续写入,将 w->link_older = writers。这里的 link_older 的语义其实就是 next 指针,效果即 w->next = writers

完成这一步后 却并没有执行 writers->prev = w 操作,为什么呢?这是为了在 leader-writer 写流程结束时能通过 link_newer/prev 是否为 NULL 选出下一轮的 leader-writer,详见后文的 ExitAsBatchGroupLeader 函数。

再通过 compare_exchange_weak 操作将 newest_writer_ 指向最新插入的 w,

  • 成功,则通过判断 writers 是否为 NULL,来判断 w 是不是下一轮 writer_group 的 leader-writer

    这是因为上一个 write_group 完成后就会尝试将 newest_writer_ 设置为 NULL,只需要通过判断 writers == NULL 就可以确定新插入的 w 是不是下一轮 write_group 的 leader。

  • 失败,则说明有个并发 w0 在自己之前完成了 compare_exchange_weak 操作,则自己会在下一轮 while 循环中完成此操作,使得 newest_writer_ 指向 w,并形成 w->next = w0 连接。

每次有新的 w,都只是改变 newest_writer_,并通过 link_older/next 指针把所有插入的 writers 串联起来,且不重不漏。由于每次只有 leader-writer 具有写权限,再让 leader-writer 在将所有的 writers 打包成一个 writer_group 时,给缺失的 link_newer/prev 指针赋值,就完成了双链表创建。由于此时实际上已经是单线程操作,因此不需要借助任何同步措施也没有并发的风险。

代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
assert(newest_writer != nullptr);
assert(w->state == STATE_INIT);
Writer* writers = newest_writer->load(std::memory_order_relaxed);
while (true) {
assert(writers != w);
if (writers == &write_stall_dummy_) {
if (w->no_slowdown) {
w->status = Status::Incomplete("Write stall");
SetState(w, STATE_COMPLETED);
return false;
}
{
MutexLock lock(&stall_mu_);
writers = newest_writer->load(std::memory_order_relaxed);
if (writers == &write_stall_dummy_) {
stall_cv_.Wait();
// Load newest_writers_ again since it may have changed
writers = newest_writer->load(std::memory_order_relaxed);
continue;
}
}
}
w->link_older = writers;
if (newest_writer->compare_exchange_weak(writers, w)) {
return (writers == nullptr);
}
}
}

WriteThread::AwaitState

WriteThread::AwaitState 中的优化点较多,下一期单独开一篇讲解这里的优化。这个函数的作用是阻塞等待直到满足 w->state & goal_mask != 0

1
uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx);

缺失的 prev 指针由 CreateMissingNewerLinks 函数补全。

传入的 headnewest_writer_ 的值,目前所有待执行的 writers 已经通过 next 指针串联起来了,这里要做的就是将其 prev 指针补全。代码如下:

1
2
3
4
5
6
7
8
9
10
11
void WriteThread::CreateMissingNewerLinks(Writer* head) {
while (true) {
Writer* next = head->link_older;
if (next == nullptr || next->link_newer != nullptr) {
assert(next == nullptr || next->link_newer == head);
break;
}
next->link_newer = head; // next->prev = head;
head = next;
}
}

WriteThread::EnterAsBatchGroupLeader

writer 没有阻塞在 WriteThread::JoinBatchGroup 函数,则说明 writer 目前已经成为 leader-writer,则需要由 leader-writer 尝试将目前所有待执行的 writers 封装到一个 write_group 中,这个由 EnterAsBatchGroupLeader 函数完成。

执行到 EnterAsBatchGroupLeader 函数时,newest_writer_ 可能一直在更改,即不断指向最新的 writer。但是没关系,因为此时 leader-writer 已经诞生了,更新的 writer 在执行 WriteThread::JoinBatchGroup 时候会被阻塞在 AwaitState,如果此时 leader-writer 刚好执行到 EnterAsBatchGroupLeader 函数,则会由 leader-writer 将 [leader-writer, newest_writer] 区间的所有 writers 封装到 writer_group 中,由 leader-writer 来统一执行批量写入。其中 write_group->last_writer 指向的就是当前最新的 newest_writer。

在上一篇文章,讲解了 WriteGroup 内部封装了迭代器,那么就可以使用如下方式并以 leader_writer -> newst_writer 顺序遍历所有 writers。

1
2
3
for(auto writer : *writer_group) {
// 批量写入
}

代码如下4步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
WriteGroup* write_group) {
assert(leader->link_older == nullptr);
assert(leader->batch != nullptr);
assert(write_group != nullptr);

size_t size = WriteBatchInternal::ByteSize(leader->batch);

// 1. write_group 大小限制
size_t max_size = max_write_batch_group_size_bytes;
const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
if (size <= min_batch_size_bytes) {
max_size = size + min_batch_size_bytes;
}

// 2. 初始化 write_group
leader->write_group = write_group;
write_group->leader = leader;
write_group->last_writer = leader;
write_group->size = 1;
Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);

// 3. 补全 [leader-writer, newest_writer] 丢失的 prev 指针
CreateMissingNewerLinks(newest_writer);

// 4. 封装到 write_group
Writer* w = leader;
while (w != newest_writer) {
assert(w->link_newer);
w = w->link_newer;

// ... other break conditions

if (size + WriteBatchInternal::ByteSize(w->batch) > max_size) {
break;
}

w->write_group = write_group; // 设置 w 属于当前 write_group
size += batch_size; // 这个 write_group 的数据量大小
write_group->last_writer = w; // 更新 last_writer
write_group->size++; // writer_group 中 writers 的个数
}
return size;
}

WriteThread::ExitAsBatchGroupLeader

这里暂时不关注 enable_pipelined_write_, 这是开启 PipelineWriteImpl 的标志位。

当数据都已经写入 WAL 和 MemTable,则会调用 ExitAsBatchGroupLeader,此时需要判断在当前 write_group 写入过程中是否出现了新的 writers:

  • 是:则需要从等待的 writers 中挑选出新的 leader-writer
  • 否:则需要将 newest_writer_ 赋值为 NULL

需要先读取 newest_writer_ 的最新值 head,来判断是否有新的 writer 插入:

  • 如果 head != last_writer 则说明在当前 write_group 写操作过程中有出现新的 writers ,并阻塞等待在 JoinBatchGroup

  • 如果 head == last_writer 但是 newest_writer_.compare_exchange_strong(head, nullptr) 为 false,则说明在 newest_writer_ load 之后并在 compare_exchange_strong 之前 有新的 writers 出现

    上述两种情况,都需要将 [last_writer, head] 区间所有 writers 缺失的 prev 指针补全,因为补全后 last_writer->prev 指向的就是新的 leader-writer。

    这是因为 last_writer->prev 是在 last_writer 之后最早插入的 writer,为保持顺序性,这就是新的 leader-writer。

完成上述操作,剩下的就是将当前 write_group 中的所有 writers 状态更改为 STATE_COMPLETED 解除他们在 JoinBatchGroup 处的阻塞,尽快返回应用层。这一步也有个小细节,见代码注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
Status& status) {
Writer* leader = write_group.leader;
Writer* last_writer = write_group.last_writer;
assert(leader->link_older == nullptr);

if (enable_pipelined_write_) {
// ...
} else {
Writer* head = newest_writer_.load(std::memory_order_acquire);
if (head != last_writer ||
!newest_writer_.compare_exchange_strong(head, nullptr)) {
// 存在新插入的 writer
assert(head != last_writer);

// 1. 补全 [last_writer, head] 的 prev指针
CreateMissingNewerLinks(head);
assert(last_writer->link_newer != nullptr);
assert(last_writer->link_newer->link_older == last_writer);
// 2. 断开链表
last_writer->link_newer->link_older = nullptr;

// 3. 设置新的 leader-writer
SetState(last_writer->link_newer, STATE_GROUP_LEADER);
}

// 4. 将 [leader, last_writer] 区间的 writers 状态设置为 STATE_COMPLETED
// 即解除阻塞,让 follower-writer 返回应用层
while (last_writer != leader) {
assert(last_writer);
last_writer->status = status;
// 需要先获取 next指针,再更改状态为 STATE_COMPLETED
// 因为先更改 STATE_COMPLETED 很可能导致 last_writer 就被正在阻塞的线程销毁了
// 再获取 next 指针就会触发 coredump
auto next = last_writer->link_older;
SetState(last_writer, STATE_COMPLETED); // 解除阻塞

last_writer = next;
}
}
}

By the Way

  • 这篇文章里面用了许多原子操作以及一些内存序,诸如 compare_exchange_strong/compare_exchange_weak 区别,acquire/release语义,等后面有空再单独讲解下这个问题。