WriteBatch
无论 Put/Delete/DeleteRange
都是向 RocksDB 插入一对 {key, value},在不至于引起误解的前提下,下面不加以区分地统一表述为 写入 操作
写入一条 {key, value}
数据需要先在 WriteBatch 内部序列化到 WriteBatch::rep_
字段中。其整体格式如下:
1 2 3 4
| |sequence|count| |serialized-kv-1| |serialized-kv-2|
|
WriteBatch::rep_
在首部需要 KHeader=12
个字节存储本次 writer_batch
的元数据信息:
sequence: uint64_t
: 8个字节,类似主键 id,记录的是当前 write_batch 是自从 RocksDB 创建以来第几个 write_batch。即便 RocksDB 挂了重启也不会更改,只会单增;
count: uint32_t
: 4个字节,记录本次 write_batch 写入了多少个 {key, value}
,每次写入一条数据就会递增一次。下面的Put
案例操作可见。
1 2 3 4 5 6 7 8 9 10
| WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t protection_bytes_per_key, size_t default_cf_ts_sz) : content_flags_(0), max_bytes_(max_bytes), default_cf_ts_sz_(default_cf_ts_sz) { rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ? reserved_bytes : WriteBatchInternal::kHeader); rep_.resize(WriteBatchInternal::kHeader); }
|
元信息 sequence/count
的 setter/getter 如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) { return SequenceNumber(DecodeFixed64(b->rep_.data())); }
void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { EncodeFixed64(&b->rep_[0], seq); } uint32_t WriteBatchInternal::Count(const WriteBatch* b) { return DecodeFixed32(b->rep_.data() + 8); }
void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) { EncodeFixed32(&b->rep_[8], n); }
|
Serialization
RocksDB 中数据本身基本按照 |xxx_length|xxx_bytes|
格式编码。
下面将写入的 {key, value}
序列化成 Record
,根据是否指定 ColumnFamily 有两种格式:
1 2 3 4 5 6 7 8 9
| default cf: |KTypeValue| |key_size|key_bytes| |value_length|value_bytes|
specify cf: |kTypeColumnFamilyValue|column_family_id| |key_size|key_bytes| |value_length|value_bytes|
|
在 Record
前面有两个字段:
ValueType: uint8_t
: 1个字节,表征本次是 Put/Delete
等具体操作类型,以及是否指定了 ColumnFamily
column_family_id: uint32_t
: 4个字节,只有指定了 ColumnFamily,才会有这个字段
下面的 WriteBatchInternal::Put
实现就是按照上述格式封装 {key, value}
后续写入WAL 和 MemTable。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, const Slice& key, const Slice& value) { LocalSavePoint save(b); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); if (column_family_id == 0) { b->rep_.push_back(static_cast<char>(kTypeValue)); } else { b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue)); PutVarint32(&b->rep_, column_family_id); } PutLengthPrefixedSlice(&b->rep_, key); PutLengthPrefixedSlice(&b->rep_, value); b->content_flags_.store( b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, std::memory_order_relaxed); return save.commit(); }
|
LocalSavePoint
LocalSavePoint 是基于 RAII 机制来判断 WriteBatch 写入的数据量累计是否已经超过限制: 是则回滚到生成 save 的位置,即调用本次 WriteBatchInternel::Put
之前的状态,然后返回 Status::MemoryLimit
错误,阻止本次写入流程。源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| class LocalSavePoint { public: explicit LocalSavePoint(WriteBatch* batch) : batch_(batch), // savepoint 记录初始状态 savepoint_(batch->GetDataSize(), batch->Count(), batch->content_flags_.load(std::memory_order_relaxed)) { }
Status commit() { if (batch_->max_bytes_ && batch_->rep_.size() > batch_->max_bytes_) { batch_->rep_.resize(savepoint_.size); WriteBatchInternal::SetCount(batch_, savepoint_.count); batch_->content_flags_.store(savepoint_.content_flags, std::memory_order_relaxed); return Status::MemoryLimit(); } return Status::OK(); }
private: WriteBatch* batch_; SavePoint savepoint_; };
|
而 Savepoint 也是个简单的 struct:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| struct SavePoint { size_t size; int count; uint32_t content_flags;
SavePoint() : size(0), count(0), content_flags(0) {}
SavePoint(size_t _size, int _count, uint32_t _flags) : size(_size), count(_count), content_flags(_flags) {}
void clear() { size = 0; count = 0; content_flags = 0; }
bool is_cleared() const { return (size | count | content_flags) == 0; } };
|
Deserializtion
WriteBatch 序列化时将数据连续存储,反序列化时则可以顺序读取,具有更好的局部性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| Status ReadRecordFromWriteBatch(Slice* input, char* tag, uint32_t* column_family, Slice* key, Slice* value, Slice* blob, Slice* xid) { assert(key != nullptr && value != nullptr); *tag = (*input)[0]; input->remove_prefix(1); *column_family = 0; switch (*tag) { case kTypeColumnFamilyValue: if (!GetVarint32(input, column_family)) { return Status::Corruption("bad WriteBatch Put"); } case kTypeValue: if (!GetLengthPrefixedSlice(input, key) || !GetLengthPrefixedSlice(input, value)) { return Status::Corruption("bad WriteBatch Put"); } break; }
|
数据写入 WriteBatch 后,下面就要写入 WAL 和 MemTable,先来看看写入操作之前的一些逻辑。
WriteThread
基于 RocksDB 构建的上层应用向 RocksDB 多线程写入数据时,由 RocksDB::WriteThread
保障多线程写入的有序性,并通过内部优化尽可能提高写入效率,不阻塞后来的写入流程,具体优化下一期再来详解,下面先讲解一些基础逻辑。
上层应用向 RocksDB 写入的数据序列化到 WriteBatch
后,再把 WriteBatch 封装到 WriteThread::Writer
,生成一个 writer 并向 WriteThread
申请写入许可。
WriteThread::Writer
WriterThread::Writer
是个链表:
link_older
指向 this
之前插入的 Writer
,类似 next
link_newer
指向 this
后面插入的 Writer
,类似 prev
每个 writer 节点内部存储着本次写入的数据 batch:
1 2 3 4 5 6 7 8 9 10 11 12
| struct WriteThread::Writer { WriteBatch* batch;
std::atomic<uint8_t> state; std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes; std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes; Writer* link_older; Writer* link_newer;
};
|
这里使用 std::aligned_storage
进行内存对齐的原理可以参考之前写的一篇文章 内存对齐之 alignof、alignas 、aligned_storage、align 剖析
WriteThread::WriteGroup
如果每次只有一个 writer 能写入数据,其他 writers 只能阻塞等待,那 RocksDB 也就毫无写性能可言。于是乎,RocksDB 每次将一批 writers 组成一个 WriteGroup
统一写入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| struct WriteThread::WriteGroup { Writer* leader = nullptr; Writer* last_writer = nullptr;
struct Iterator { Writer* writer; Writer* last_writer;
explicit Iterator(Writer* w, Writer* last) : writer(w), last_writer(last) {}
Writer* operator*() const { return writer; }
Iterator& operator++() { assert(writer != nullptr); if (writer == last_writer) { writer = nullptr; } else { writer = writer->link_newer; } return *this; }
bool operator!=(const Iterator& other) const { return writer != other.writer; } };
Iterator begin() const { return Iterator(leader, last_writer); } Iterator end() const { return Iterator(nullptr, nullptr); } };
|
leader
负责本次 write_group 的写入操作
last_writer
负责将本次 writer_group 中待写入的所有 writers 对象串联起来。
last_writr 实际上是 tail node,而 leader 是 head node,通过 last_writer 向 leader 方向迭代,可以遍历整个 writer_group。
因此就可以通过 leader
和 last_writer
实现 Iterator
,进一步可以轻松融入 for-loop 迭代体系,方便后续遍历 writer_group 中所有的 witers
之所以从 last_writer 向 leader 节点遍历,也是为了满足写入的顺序性:tail 是更早写 RocksDB,head 是最晚写 RocksDB,因此需要迭代时使用 link_newer
。
WriteThread::State
而每个 writer 在写入过程中都有个状态, 由 Writer::state
字段表示:
1 2 3 4 5 6 7 8
| enum WriteThread::State : uint8_t { STATE_INIT = 1, STATE_GROUP_LEADER = 2, STATE_MEMTABLE_WRITER_LEADER = 4, STATE_PARALLEL_MEMTABLE_WRITER = 8, STATE_COMPLETED = 16, STATE_LOCKED_WAITING = 32, };
|
writer 创建后,初始化状态 State::STATE_INIT ,然后进入 WriteThread::JoinBatchGroup
函数中尝试更改自己的状态:
- 要么成为本次写入流程的 leader,即 State::STATE_GROUP_LEADER 状态,然后组建自己的 writer_group,代替 writer_group 中所有 writers 完成写入,所有的 writers 状态都变成 State::STATE_COMPLETED;
- 要么加入一个已经选出 leader 但是尚未执行的 writer_group 成为 follower,让该 leader 代替自己执行完本次写入,完成后自己状态即 State::STATE_COMPLETED
- 否则,只能阻塞等待前面正在执行写操作的 writer_group 完成
这一期主要讲解个 WriteBatch 在写入前的准备工作,下一期会开始讲解 WriteThread
内部的调度过程以及一些多线程中的原子操作。