WriteBatch 写入前的准备工作

WriteBatch

无论 Put/Delete/DeleteRange 都是向 RocksDB 插入一对 {key, value},在不至于引起误解的前提下,下面不加以区分地统一表述为 写入 操作

写入一条 {key, value} 数据需要先在 WriteBatch 内部序列化到 WriteBatch::rep_字段中。其整体格式如下:

1
2
3
4
# 实时上是一行连续存储,为便于阅读多行显示
|sequence|count|
|serialized-kv-1|
|serialized-kv-2|

WriteBatch::rep_ 在首部需要 KHeader=12 个字节存储本次 writer_batch 的元数据信息:

  • sequence: uint64_t: 8个字节,类似主键 id,记录的是当前 write_batch 是自从 RocksDB 创建以来第几个 write_batch。即便 RocksDB 挂了重启也不会更改,只会单增;
  • count: uint32_t: 4个字节,记录本次 write_batch 写入了多少个 {key, value},每次写入一条数据就会递增一次。下面的Put 案例操作可见。
1
2
3
4
5
6
7
8
9
10
  WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes,
size_t protection_bytes_per_key, size_t default_cf_ts_sz)
: content_flags_(0),
max_bytes_(max_bytes),
default_cf_ts_sz_(default_cf_ts_sz) {
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
? reserved_bytes
: WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader);
}

元信息 sequence/countsetter/getter 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
return SequenceNumber(DecodeFixed64(b->rep_.data()));
}

void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
EncodeFixed64(&b->rep_[0], seq);
}
uint32_t WriteBatchInternal::Count(const WriteBatch* b) {
return DecodeFixed32(b->rep_.data() + 8);
}

void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) {
EncodeFixed32(&b->rep_[8], n);
}

Serialization

RocksDB 中数据本身基本按照 |xxx_length|xxx_bytes| 格式编码。

下面将写入的 {key, value} 序列化成 Record,根据是否指定 ColumnFamily 有两种格式:

1
2
3
4
5
6
7
8
9
default cf:
|KTypeValue|
|key_size|key_bytes|
|value_length|value_bytes|

specify cf:
|kTypeColumnFamilyValue|column_family_id|
|key_size|key_bytes|
|value_length|value_bytes|

Record 前面有两个字段:

  • ValueType: uint8_t: 1个字节,表征本次是 Put/Delete 等具体操作类型,以及是否指定了 ColumnFamily
  • column_family_id: uint32_t: 4个字节,只有指定了 ColumnFamily,才会有这个字段

下面的 WriteBatchInternal::Put 实现就是按照上述格式封装 {key, value} 后续写入WAL 和 MemTable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
const Slice& key, const Slice& value) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeValue));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, value);
b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
std::memory_order_relaxed);
return save.commit();
}

LocalSavePoint

LocalSavePoint 是基于 RAII 机制来判断 WriteBatch 写入的数据量累计是否已经超过限制: 是则回滚到生成 save 的位置,即调用本次 WriteBatchInternel::Put 之前的状态,然后返回 Status::MemoryLimit 错误,阻止本次写入流程。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class LocalSavePoint {
public:
explicit LocalSavePoint(WriteBatch* batch)
: batch_(batch),
// savepoint 记录初始状态
savepoint_(batch->GetDataSize(),
batch->Count(),
batch->content_flags_.load(std::memory_order_relaxed))
{ }

Status commit() {
// 超过限制则 rollack 到初始状态
if (batch_->max_bytes_ && batch_->rep_.size() > batch_->max_bytes_) {
batch_->rep_.resize(savepoint_.size);
WriteBatchInternal::SetCount(batch_, savepoint_.count);
batch_->content_flags_.store(savepoint_.content_flags,
std::memory_order_relaxed);
return Status::MemoryLimit();
}
return Status::OK();
}

private:
WriteBatch* batch_;
SavePoint savepoint_;
};

而 Savepoint 也是个简单的 struct:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
struct SavePoint {
size_t size; // size of rep_
int count; // count of elements in rep_
uint32_t content_flags;

SavePoint() : size(0), count(0), content_flags(0) {}

SavePoint(size_t _size, int _count, uint32_t _flags)
: size(_size), count(_count), content_flags(_flags) {}

void clear() {
size = 0;
count = 0;
content_flags = 0;
}

bool is_cleared() const { return (size | count | content_flags) == 0; }
};

Deserializtion

WriteBatch 序列化时将数据连续存储,反序列化时则可以顺序读取,具有更好的局部性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Status ReadRecordFromWriteBatch(Slice* input, char* tag,
uint32_t* column_family, Slice* key,
Slice* value, Slice* blob, Slice* xid) {
assert(key != nullptr && value != nullptr);
*tag = (*input)[0];
input->remove_prefix(1);
*column_family = 0; // default
switch (*tag) {
case kTypeColumnFamilyValue:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch Put");
}
case kTypeValue:
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value)) {
return Status::Corruption("bad WriteBatch Put");
}
break;
// other value
}

数据写入 WriteBatch 后,下面就要写入 WAL 和 MemTable,先来看看写入操作之前的一些逻辑。

WriteThread

基于 RocksDB 构建的上层应用向 RocksDB 多线程写入数据时,由 RocksDB::WriteThread 保障多线程写入的有序性,并通过内部优化尽可能提高写入效率,不阻塞后来的写入流程,具体优化下一期再来详解,下面先讲解一些基础逻辑。

上层应用向 RocksDB 写入的数据序列化到 WriteBatch 后,再把 WriteBatch 封装到 WriteThread::Writer,生成一个 writer 并向 WriteThread 申请写入许可。

WriteThread::Writer

WriterThread::Writer 是个链表:

  • link_older 指向 this 之前插入的 Writer,类似 next
  • link_newer 指向 this 后面插入的 Writer,类似 prev

每个 writer 节点内部存储着本次写入的数据 batch:

1
2
3
4
5
6
7
8
9
10
11
12
struct WriteThread::Writer {
WriteBatch* batch;
//.. too many other fields ..

std::atomic<uint8_t> state;
std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;
Writer* link_older; // this 之前之前写入的 writers
Writer* link_newer; // this 之后写入的 writers

//... methods
};

这里使用 std::aligned_storage 进行内存对齐的原理可以参考之前写的一篇文章 内存对齐之 alignof、alignas 、aligned_storage、align 剖析

WriteThread::WriteGroup

如果每次只有一个 writer 能写入数据,其他 writers 只能阻塞等待,那 RocksDB 也就毫无写性能可言。于是乎,RocksDB 每次将一批 writers 组成一个 WriteGroup 统一写入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
struct WriteThread::WriteGroup {
Writer* leader = nullptr;
Writer* last_writer = nullptr;
//... other fields

struct Iterator {
Writer* writer;
Writer* last_writer;

explicit Iterator(Writer* w, Writer* last)
: writer(w), last_writer(last) {}

Writer* operator*() const { return writer; }

Iterator& operator++() {
assert(writer != nullptr);
if (writer == last_writer) {
writer = nullptr;
} else {
writer = writer->link_newer;
}
return *this;
}

bool operator!=(const Iterator& other) const {
return writer != other.writer;
}
};

Iterator begin() const { return Iterator(leader, last_writer); }
Iterator end() const { return Iterator(nullptr, nullptr); }
};
  • leader 负责本次 write_group 的写入操作

  • last_writer 负责将本次 writer_group 中待写入的所有 writers 对象串联起来。

    last_writr 实际上是 tail node,而 leader 是 head node,通过 last_writer 向 leader 方向迭代,可以遍历整个 writer_group。

    因此就可以通过 leaderlast_writer 实现 Iterator,进一步可以轻松融入 for-loop 迭代体系,方便后续遍历 writer_group 中所有的 witers

之所以从 last_writer 向 leader 节点遍历,也是为了满足写入的顺序性:tail 是更早写 RocksDB,head 是最晚写 RocksDB,因此需要迭代时使用 link_newer

WriteThread::State

而每个 writer 在写入过程中都有个状态, 由 Writer::state 字段表示:

1
2
3
4
5
6
7
8
enum WriteThread::State : uint8_t {
STATE_INIT = 1,
STATE_GROUP_LEADER = 2,
STATE_MEMTABLE_WRITER_LEADER = 4,
STATE_PARALLEL_MEMTABLE_WRITER = 8,
STATE_COMPLETED = 16,
STATE_LOCKED_WAITING = 32,
};

writer 创建后,初始化状态 State::STATE_INIT ,然后进入 WriteThread::JoinBatchGroup 函数中尝试更改自己的状态:

  1. 要么成为本次写入流程的 leader,即 State::STATE_GROUP_LEADER 状态,然后组建自己的 writer_group,代替 writer_group 中所有 writers 完成写入,所有的 writers 状态都变成 State::STATE_COMPLETED
  2. 要么加入一个已经选出 leader 但是尚未执行的 writer_group 成为 follower,让该 leader 代替自己执行完本次写入,完成后自己状态即 State::STATE_COMPLETED
  3. 否则,只能阻塞等待前面正在执行写操作的 writer_group 完成

这一期主要讲解个 WriteBatch 在写入前的准备工作,下一期会开始讲解 WriteThread 内部的调度过程以及一些多线程中的原子操作。