剖析 ETCD.backend
backend
Overview
backend 的接口定义如下:
1 | type Backend interface { |
所有的读写操作都需要通过 func BatchTx(for write) 和 func ConcurrentReadTx (for read) 来实现,这符合DB的读写操作:读写都需要在一个 transaction 中完成。
除了读写操作, backend 还需要提供一些 ForceCommit 、Close 等保证数据安全的操作。
1 | ┌─────────────────────────────────────────────────────────────────────────────────┐ |
BatchTx
写入流程
1 | Client Request |
写缓冲区:批量写入,减少磁盘 I/O
1 |
|
ReadTx
读缓存:避免重复复制,支持并发读取
分离读写:读写事务独立,提高并发性
1 | Client Request |
缓冲区同步机制
1 | ┌─────────────────┐ writeback ┌─────────────────┐ |
Lock 流程
1 | ┌─────────────────────────────────────────────────────────┐ |
Detail
batchTxBuffered
1 | type batchTxBuffered struct { |
batchTxBuffered 继承 batchTx, 进行双重写入(同时写入 BoltDB 和内存缓冲区)的核心设计:
1 | Client Write Request |
Unlock
Unlock 函数在 unlock 中进行智能commit,进行 commit 的条件是:
- 批量限制: pending >= batchLimit(默认 10000)
- 删除操作: pendingDeleteOperations > 0(立即提交)
1 | func (t *batchTxBuffered) Unlock() { |
为什么删除操作需要立即提交?线性一致性保证:
1 | 时间线: T1 ────── T2 ────── T3 |
如果删除操作不立即提交:
- T2 删除操作在缓冲区中
- T3 读取可能从
BoltDB读到旧数据 - 违反线性一致性
unsafeCommit
1 | 1. 写操作完成,触发 commit |
unsafeCommit 中存在一个异步优化:
非阻塞提交:读事务回滚在 goroutine 中异步进行
BlotDB 的 Read Transaction 有几个关键点:
1
2// BoltDB 读事务是只读的,不能提交
tx, err := b.db.Begin(false) // false = 只读事务- 只读事务:BoltDB 的读事务是只读的,不能调用 Commit()
- 必须关闭:读事务必须通过 Rollback() 来关闭
- 资源管理:未关闭的事务会占用数据库资源
无缝切换:新读事务立即开始,不等待旧事务完成
原因:
- 数据一致性:新提交的数据需要对新读事务可见, 基于当前 snapshot 创建新的读事物
- 快照隔离:每个读事务看到的是特定时间点的数据快照
- 资源管理:避免长时间持有旧事务
1 | func (t *batchTxBuffered) unsafeCommit(stop bool) { |
WaitGroup 的保护机制
在 ConcurrentReadTx 中有个 txWg,在创建并发读事物时会增加一个引用计数:
1 | // 在 ConcurrentReadTx 中 |
WaitGroup 在 unsafeCommit 中的异步保护机制:
- 引用计数:每个活跃的读操作增加计数
b.readTx.txWg.Add(1) - 安全关闭:只有当所有读操作完成后才回滚事务
RUnlock: txWg.Done() - 避免崩溃:防止在活跃读操作时关闭事务
wg.Wait(); tx.Rollback()
commit
流程如下:
1 | batchTxBuffered.commit() |
commit 函数中先执行 t.backend.readTx.Lock() 的作用:
- 阻止新读事务: 防止在提交期间启动新的读事务
- 等待活跃读事务: 等待所有正在进行的读事务完成
- 保护事务切换: 确保读事务的安全切换
1
2
3
4
5
6func (t *batchTxBuffered) commit(stop bool) {
// all read txs must be closed to acquire boltdb commit rwlock
t.backend.readTx.Lock()
t.unsafeCommit(stop)
t.backend.readTx.Unlock()
}
这里的核心是 t.backend.readTx.Lock() 的作用。
首先,这里使用的 Lock 而不是 RLOCK(),由于写锁会阻塞所有的读锁,即阻塞 ConcurrentReadTx 函数创建新的 ReadTx, 需要等待当前所有活跃的 ReadTxs 完成,并确保写操作提交完成,再释放 Lock,才允许创建新的 ReadTx。
1 | func (b *backend) ConcurrentReadTx() ReadTx { |
无写操作
1 | 时间线: T1 ────── T2 ────── T3 |
有写操作
1 | 时间线: T1 ────── T2 ────── T3 ────── T4 |
如果没有 readTx.Lock() 的情况:
1 | 时间线: T1 ────── T2 ────── T3 ────── T4 |
问题:
- 数据不一致:新读事务可能读到部分更新的数据
- 竞态条件:读事务创建和旧事务回滚的竞态
- 资源冲突:多个读事务同时访问 BoltDB
存在 readTx.Lock() 的情况:
1 | 时间线: T1 ────── T2 ────── T3 ────── T4 |
Commit
Commit 函数相比较 commit 函数多了一个 lock() 函数,这是继承自 BatchTx 的 lock 函数:
1 | func (t *batchTxBuffered) Commit() { |
lock 主要是保护 BatchTx 的共享状态,与 readTx.Lock() 的协同作用:
t.lock():保护 batchTx 内部状态,防止并发写操作readTx.Lock():保护 BoltDB 事务提交,防止读事务干扰
这么设计设计,优点是确保数据一致性和事务完整性, 缺点是在提交期间阻塞所有写操作,因此通过批量提交减少锁的持有时间来优化。
UsafePut/UnsafeDelete
batchTxBuffered 有几个 Unsafe 函数,Unsafe 的意思这些函数都需要在 t.lock() 的保护下执行,分别由 BatchTx 接口中的 LockInsideApply 和 LockOutsideApply 封装。
1 | func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) { |
BatchTx 的 LockOutsideApply 和 LockInsideApply 区别就是在于是否在 Log Apply 的过程。
LockInsideApply:- 验证调用是否在 Apply 上下文中(即 Raft 日志应用过程中)
- 通过检查调用栈是否包含 .applyEntries 来判断
- 如果设置了
txPostLockInsideApplyHook,会额外验证调用栈
实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17func (t *batchTx) LockInsideApply() {
t.lock()
if t.backend.txPostLockInsideApplyHook != nil {
ValidateCalledInsideApply(t.backend.lg)
t.backend.txPostLockInsideApplyHook() // 执行钩子函数
}
}
// HOOK: 这个钩子确保在 Apply 过程中,consistIndex 能够正确更新
func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
return func() {
applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
if applyingIdx > s.consistIndex.UnsafeConsistentIndex() {
s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm)
}
}
}LockOutsideApply():
- 验证调用是否在 Apply 上下文之外
- 如果检测到在 Apply 上下文中调用,会触发 panic
实现如下:
1
2
3
4func (t *batchTx) LockOutsideApply() {
ValidateCalledOutSideApply(t.backend.lg)
t.lock() // 只获取锁,不执行钩子函数
}主要用于系统初始化、恢复、维护操作等不需要更新一致性索引的操作 例如:存储初始化、数据清理、压缩等
Unlock
到此,再重新审视下 batchTxBuffered::Unlock
1 | func (t *batchTxBuffered) Unlock() { |
这里核心是 t.buf.writeback 和 t.batchTx.Unlock 两个功能。后者是为了和 t.lock() 配对,释放锁资源。**writeback 的作用是将写缓冲区的数据同步到读缓冲区,来确保新写入的数据对后续读操作可见,进而保证 etcd 的线性一致性语义**
ReadTx
上面阐述了写事物 BatchTx 如何进行写操作,下面来阐述读事物 ConcurrentReadTx。
txReadBufferCache
缓存策略
bufVersion版本控制:通过版本号检测缓存有效性- 延迟更新:只在必要时更新缓存
- 并发安全:使用锁保护缓存操作
1 | type txReadBufferCache struct { |
TxReadBuffer
txReadBufferCache.bufVersion 缓存的是 txReadBuffer.bufVersion 的值: 只有当写事物提交时(BatchTx.commit)并在 writeback 函数中才会自增 txReadBuffer.bufVersion++ 因此可以用两个 bufVersion 进行比较来确定缓存 b.txReadBufferCache 的有效性。
1 | // txReadBuffer accesses buffered updates. |
下图展示了一个 txReadBufferCache.buf demo: 如果在 bufVersion 有效期间创建 ConcurrentReadTx,则只会复制一次 b.readTx.buf,减少了多次 copy 行为,降低了集群压力。
1 | type backend struct { |
ConcurrentReadTx
ConcurrentReadT 的核心设计目标:
- 非阻塞读操作
- 并发安全:多个读操作可以同时进行
- 无锁设计:读操作不需要获取锁, 多个读操作可以并行执行
- 性能优化:避免读操作被写操作阻塞
1
2
3
4
5
6
7func (rt *concurrentReadTx) Lock() {} // 空实现
func (rt *concurrentReadTx) RLock() {} // 空实现
func (rt *concurrentReadTx) Unlock() {} // 空实现
// 创建时:txWg.Add(1) 增加计数
// 完成时:txWg.Done() 减少计数
// 保护机制: 在 batchTx.commit 时,先等待所有并发读事物结束, 防止 BoltDB 事务过早回滚,
func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() } // 引用计数,管理多个读事物生命周期
- 数据一致性保证
- 快照隔离:每个读事务看到特定时间点的数据
- 缓冲区同步:确保读操作能看到最新的写入数据
实现分析
第一阶段: 获取 readTx 读锁和事务保护。这里和 batchTxBuffered.commit 形成对比,当 BatchTx 在执行 commit 时会导致 backend.ConcurrentReadTx 函数中第一阶段 readTx.RLock() 阻塞。
1 | func (b *backend) ConcurrentReadTx() ReadTx { |
第二阶段: 智能缓存管理
存策略: 通过 bufVersion 检测缓存是否过期。三种情况处理:空缓存、过期缓存、有效缓存。
由于 commit 中存在 writeback 策略,因此在 commit 结束使得 b.readTx.RLock() 解除阻塞后,b.readTx.buf 中总是缓存着当前最新的数据,那么 ConcurrentReadTx 得到 ReadTx 即基于最新的 snapshot, 保证了数据一致性。
1 | func (b *backend) ConcurrentReadTx() ReadTx { |
第三阶段:缓冲区复制优化策略
- 延迟复制:只在必要时才复制缓冲区
- 并发优化:在复制时释放锁,提高并发性
- 版本检查:防止并发复制导致的数据不一致
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24func (b *backend) ConcurrentReadTx() ReadTx {
//... above
var buf *txReadBuffer
switch {
case isEmptyCache:
// 首次创建缓存
curBuf := b.readTx.buf.unsafeCopy()
buf = &curBuf
case isStaleCache:
// 过期缓存,需要更新
b.txReadBufferCache.mu.Unlock() // 释放锁进行复制
curBuf := b.readTx.buf.unsafeCopy()
b.txReadBufferCache.mu.Lock() // 重新获取锁
buf = &curBuf
default:
// 缓存有效,直接使用
buf = curCache
}
//...below
}
第四阶段: 缓存更新和版本控制机制
- 原子性检查:确保缓存版本没有被其他线程修改
- 安全更新:只在版本一致时更新缓存
- 失败处理:版本不一致时跳过更新,下次调用会重新复制
1
2
3
4
5if isEmptyCache || curCacheVer == b.txReadBufferCache.bufVersion {
// 更新缓存
b.txReadBufferCache.buf = buf
b.txReadBufferCache.bufVersion = curBufVer
}
第五阶段: 创建并发读事务
1 | return &concurrentReadTx{ |