WriteThread 如何自适应优化线程同步

ROCKSDB_NAMESPACE::WriteThread 中,通过 WriteThread::AwaitStateWriteThread::SetState 两个函数来 writers 的控制并发写入行为。 这一期主要来分析 RocksDB 中 WriteThread::AwaitState 中的优化,是如何尽可能地缩短阻塞等待时间。

WriteThread::AwaitState

AwaitState 函数用于阻塞等待直到满足 w->state & goal_mask != 0 这一条件。如 JoinBatchGroup 函数中新插入的 w 需要阻塞等待 w->state 变成 goal_mask 中的一种才能继续执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
void WriteThread::JoinBatchGroup(Writer* w) {
//... above code
if (!linked_as_leader) {
AwaitState(
w,
// goal_mask
STATE_GROUP_LEADER |
STATE_MEMTABLE_WRITER_LEADER |
STATE_PARALLEL_MEMTABLE_WRITER |
STATE_COMPLETED,
&jbg_ctx);
}
}

RocksDB 为尽可能降低阻塞时间,将等待情况分为三种:

  • short-uncontended: 几乎无竞争,直接使用 spin-wait loop 就能解决;
  • short-contended: 存在竞争,但是预测阻塞时间不会太久,使用 loop + yield 应对
  • long: 竞争激烈,使用 mutex + condition_variable 应对。

如果开启了上帝视角,即事先知道本次 AwaitState 要等待的时间,就可以直接使用 spin-wait loop 来应对 short-uncontended, mutex 来应对 long。显然没有这样的上帝视角,那么就只能通过一种自适应的方式来判断了。

pause-based spin-wait loop

spin-wait loop 是应对阻塞时间很短 (short-periods) 场景的常用方式,即占据着 CPU 做空转阻塞等待 w->state 变成预期值,这样可以减少线程上下文切换(context switch)带来的开销。如下:

1
2
3
4
5
6
while (true) { 
uint8_t state = w->state.load(std::memory_order_acquired);
if (state & goal_mask != 0) {
break;
}
}

但是这么简单的写法却有问题,一般都会在 spin-wait loop 中加上一条 “Pause“ 指令来提升性能。在 Pause 指令中有这么一段描述:

When executing a ‘spin-wait loop’, processors will suffer a severe performance penalty when exiting the loop because it detects a possible memory order violation

意思就是说,如果 spin-wait loop 里如果不加上 Pause 指令,则很可能因为 memory order violation 问题导致退出 loop 时遭受严重的性能惩罚

memory order violation

如下从 Intel 64 and IA-32 Architectures Optimization Reference Manual 中选取的示例代码,来解释为什么退出loop时会带来性能惩罚。

1
2
3
4
5
6
7
8
9
10
11
12
Spin_Lock:
CMP lockvar, 0; // (1) Check if lock is free.
JE Get_Lock; // (1.1) lockvar == 0
JMP Spin_Lock; // (1.2) lockvar != 0
Get_Lock:
MOV EAX, 1;
XCHG EAX, lockvar; // (2) Try to get lock
CMP EAX, 0; // (3) Test if successful.
JNE Spin_Lock;
Critical_Section:
// critical section cod>
MOV lockvar, 0; // (4) Release lock

当线程 T1 spin-wait loop 几轮迭代后,条件分支(1)处一直都是 false,即 lockvar 一直不是 0。这种情况下,CPU 的分支预测器会认为条件(1) 永远不会为 true,就会将 JMP(1.2) 的指令填充整个 pipeline。

当线程 T2 将 lockvar 写为 0 时,由于此时 T1 的 pipeline 已经被错误的预测指令 (1.2) 填充满,即(1) 处的 lockvar 已经预测为0。这时 memory order violation 就会发生了:T1 线程看到 T2 线程对 lockvar 的修改后,就会在 T1 的 pipeline 中搜索访问了 lockvar 变量且还没执行的预测指令(1.2),如果发现了则会使得这部分预测指令失效,并且 flush pipeline 来删除这部分预测指令。获取锁后,就会退出 spin-wait loop。

在退出时 flush pipeline 的代价会很高。

因此,每次进入 spin-wait loop ,就会在满足条件退出 spin-wait loop 时带来严重的性能惩罚,比预期的同步时间要久。

加上 PAUSE 指令,是通过引入轻微的延迟来 de-pipelined,使得 pipeline 中不会填充错误的预测指令: 插入 PAUSE 指令后,会等待 PAUSE 之前的 pipeline 执行完变空,然后再执行下一轮 CMP,如果 T2 线程将 lockvar 的值为 0,则 T1 能立即检测到。因此现在 CMP 指令就是顺序执行,消除了预测。

1
2
3
4
5
6
Spin_Lock:
CMP lockvar, 0; // (1) Check if lock is free.
JE Get_Lock
PAUSE; // (2) short a delay, wait for memory pipeline to become empty
JMP Spin_Lock;
Get_Lock:

从使用角度来说,”PAUSE” 指令甚至是专门用于优化 spin-wait loop 的,需要搭配使用。

而 RocksDB 进入 AwaitState 后率先使用 spin-wait loop 迭代两百次(大约 1us)来尝试满足 short-uncontended 场景。原生代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx) {
for (uint32_t tries = 0; tries < 200; ++tries) {
state = w->state.load(std::memory_order_acquire);
if ((state & goal_mask) != 0) {
return state;
}
port::AsmVolatilePause();
}
//...
}
````
由于 "PAUSE" 指令随着架构的变化,latency 也会更改,甚至差一个数量级,故而这里如果想要更准确的控制 wait 的时间可以使用 `__rdtsc()` 函数来控制:
```cpp
// __rdtsc intrinsic is used to read the time stamp counter
// This allows the loop to run for a fixed number of cycles
uint64_t prev = __rdtsc();
do {
state = w->state.load(std::memory_order_acquire);
if ((state & goal_mask) != 0) {
return state;
}
port::AsmVolatilePause();
} while (__rdtsc() - prev < max_spin_time);

std::this_thread::yield

如果第一阶段的 spin-wait loop 没能等到 w->state 的值变更为预期值,说明还是存在竞争,则进入第二阶段 short-contended。 这一阶段由 DBOptions::enable_write_thread_adaptive_yield 配置是否开启,默认值为 true。

这一段有两个问题:

  • 如何从 short-contended 阶段进入 long 阶段
  • 如何判断下一次是否需要再进入 short-contended 阶段

当进入 short-contended 阶段说明存在竞争,但是假设竞争可能不大。比如线程数可能不超过 CPU cores 数目,这种情况下使用 std::thread::yield() 并不会导致 cotext switch,效果比 pthread_mutex 要好。

注意:从 RocksDB 开发者角度,没有上帝视角,只能先假设没有竞争(short-uncontended),不满足则再假设存在竞争但是不激烈(short-contended),如果还没不满足再考虑mutex阻塞。

因此,需要对调用 std::thread::yield() 前后的 latency 进行统计,粗略判断竞争激烈程度:

  • max_yield_usec_: 默认 100us,控制 short-contended 阶段最大等待时间

  • slow_yield_usec_: 默认 3us, 是 yield 的 latency 上限,用来反应是否有其他线程也想占据当前 cpu core

    如果当前竞争比较激烈,那么调用 std::thread::yield() 前后的 latency 肯定会增加,(比如 threads > cpu-cores 时,甚至会产生 context switch),最终导致 latency > slow_yield_usec_。如果累计 kMaxSlowYieldsWhileSpinning = 3 次都超过该值,则可以跳出 short-contended 阶段,直接进入 long 阶段,阻塞等待。

开启第二阶段的开关有3个,这一部分代码如下:

1
2
3
4
5
6
7
8
9
10
bool would_spin_again = false;
bool update_ctx = false;
uint64_t sampling_base = 256;
if (max_yield_usec_ > 0) {
update_ctx = Random::GetTLSInstance()->OneIn(sampling_base);

if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) {
// short-contended code...
}
}
  • max_yield_usec_ > 0: 默认情况下总是开启

  • update_ctx: 表示是否更新 yield_credit 的值。当且仅当基于均匀分布从 [0, 255] 区间获得 0 时值为 true,即 Random::OneIn(sampling_base) 返回 true 的概率为 1/256

    would_spin_again 表示 w->state 是否在第二阶段等到预期值,如果成功则 yield_credit 值增加,反之降低。yield_credit 的更新公式如下:

    1
    2
    3
    4
    5
    if (update_ctx) {
    auto v = yield_credit.load(std::memory_order_relaxed);
    v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072;
    yield_credit.store(v, std::memory_order_relaxed);
    }
  • yield_credit: 默认值为 0,在是否开启第二阶段中起着决定性作用。基本上需要满足 yield_credit >= 0 才能进入第二阶段

    yield_credit 值在两种情况下会更改,也是自适应原理:

    1)进入了第二阶段,但是因为竞争太激烈,没有在第二阶段实现 w->state & goal_mask != 0,此时会将 update_ctx = true,再由上述公式降低 yield_credit 的值,使其小于 0,这样 AwaitState 函数下次不会再进入第二阶段;

    1. 如果长时间 yield_credit < 0 会一直无法进入第二阶段。但是由均匀分布可知,存在 1/256 的概率将 update_ctx = true,进入第二阶段,来试探现在竞争是否没那么激烈了。如果此时在第二阶段等到了 w->state & goal_mask != 0 ,那么就会再根据上述公式将 yield_credit 调节为非负数,使得下一次 AwaitState 函数仍能进入第二阶段。

这里的 yield_credit 值记录在 AdaptationContext::value 中,AdaptationContext 的所有对象在 WriteThread 中都是 static 变量,因此会一直反应着进程生命周期中线程竞争状态,故而上述的 update_ctx 中的 ctxyield_credit

1
2
3
4
5
6
struct AdaptationContext {
const char* name;
std::atomic<int32_t> value;

explicit AdaptationContext(const char* name0) : name(name0), value(0) {}
};

上面的介绍大致介绍了 short-contended 阶段的自适应原理,下面来看看核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
auto& yield_credit = ctx->value;
bool update_ctx = false;
bool would_spin_again = false;
if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) {
auto spin_begin = std::chrono::steady_clock::now();

// 超过 slow_yield_usec_ 的次数
size_t slow_yield_count = 0;

auto iter_begin = spin_begin;
// 最多消耗的时间片: max_yield_usec_
while (iter_begin - spin_begin) <=
std::chrono::microseconds(max_yield_usec_) {
std::this_thread::yield();

state = w->state.load(std::memory_order_acquire);
if ((state & goal_mask) != 0) {
// 1. 成功,则退出第二阶段
would_spin_again = true;
break;
}

auto now = std::chrono::steady_clock::now();
// 2. latency 超过 slow_yield_usec_
if (now == iter_begin ||
now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) {
++slow_yield_count;
// 2.1 累计超过 kMaxSlowYieldsWhileSpinning 次则退出第二阶段,
// 并设置 update_ctx =true,后续更新 yield_credit
if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) {
update_ctx = true;
break;
}
}
iter_begin = now;
}
}

BlockingAwaitState

如果不幸,w->state 的值仍然没有变更为预期值,则需要进入第三阶段:使用 Mutex + ConditionVarable 进行阻塞等待。

1
2
3
if ((state & goal_mask) == 0) {
state = BlockingAwaitState(w, goal_mask);
}

BlockingAwaitState 函数比较简单:先再次判断 w->state 是否变更为预期值 goal_mask; 没有,则将 w->state 设置为 STATE_LOCKED_WAITING 状态,等待唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
w->CreateMutex();

auto state = w->state.load(std::memory_order_acquire);
assert(state != STATE_LOCKED_WAITING);
if ((state & goal_mask) == 0 &&
w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) {
// we have permission (and an obligation) to use StateMutex
std::unique_lock<std::mutex> guard(w->StateMutex());
w->StateCV().wait(guard, [w] {
return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING;
});
state = w->state.load(std::memory_order_relaxed);
}
// else tricky. Goal is met or CAS failed. In the latter case the waker
// must have changed the state, and compare_exchange_strong has updated
// our local variable with the new one. At the moment WriteThread never
// waits for a transition across intermediate states, so we know that
// since a state change has occurred the goal must have been met
assert((state & goal_mask) != 0);
return state;
}

WriteThread::SetState

1
2
3
4
5
6
7
8
9
10
11
12
13
void WriteThread::SetState(Writer* w, uint8_t new_state) {
assert(w);
auto state = w->state.load(std::memory_order_acquire);
if (state == STATE_LOCKED_WAITING ||
!w->state.compare_exchange_strong(state, new_state)) {
assert(state == STATE_LOCKED_WAITING);

std::lock_guard<std::mutex> guard(w->StateMutex());
assert(w->state.load(std::memory_order_relaxed) != new_state);
w->state.store(new_state, std::memory_order_relaxed);
w->StateCV().notify_one();
}
}