剖析 ETCD.backend

backend

Overview

backend 的接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type Backend interface {
// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
ReadTx() ReadTx
BatchTx() BatchTx
// ConcurrentReadTx returns a non-blocking read transaction.
ConcurrentReadTx() ReadTx

Snapshot() Snapshot
Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)
Size() int64
SizeInUse() int64
OpenReadTxN() int64
Defrag() error
ForceCommit()
Close() error
SetTxPostLockInsideApplyHook(func())
}

所有的读写操作都需要通过 func BatchTx(for write) 和 func ConcurrentReadTx (for read) 来实现,这符合DB的读写操作:读写都需要在一个 transaction 中完成。

除了读写操作, backend 还需要提供一些 ForceCommit 、Close 等保证数据安全的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
┌─────────────────────────────────────────────────────────────────────────────────┐
│ Backend 架构 │
└─────────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────────┐
│ Backend Interface │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────┐ │
│ │ BatchTx() │ │ ReadTx() │ │ ConcurrentReadTx│ │ Snapshot() │ │
│ │ │ │ │ │ │ │ │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────────────────────┐
│ Backend 实现层 │
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ backend struct │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ mu │ │ db │ │ batchTx │ │ readTx │ │ │
│ │ │(RWMutex) │ │(*bolt.DB) │ │(*batchTxBuf)│ │(*readTx) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │txReadBuffer │ │ hooks │ │ stopc │ │ lg │ │ │
│ │ │ Cache │ │ (Hooks) │ │ (chan) │ │(*zap.Logger)│ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────────┘

┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌─────────────────────────┐ ┌─────────────────────────┐ ┌─────────────────────────┐
│ BatchTx 系统 │ │ ReadTx 系统 │ │ 其他核心操作 │
│ │ │ │ │ │
│ ┌─────────────────────┐ │ │ ┌─────────────────────┐ │ │ ┌─────────────────────┐ │
│ │ batchTxBuffered │ │ │ │ readTx │ │ │ │ Snapshot │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ ┌─────────────────┐ │ │ │ │ ┌─────────────────┐ │ │ │ │ ┌─────────────────┐ │ │
│ │ │ txWriteBuffer │ │ │ │ │ │ baseReadTx │ │ │ │ │ │ bolt.Tx │ │ │
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ (read-only) │ │ │
│ │ │ ┌─────────────┐ │ │ │ │ │ │ ┌─────────────┐ │ │ │ │ │ └─────────────────┘ │ │
│ │ │ │bucketBuffer │ │ │ │ │ │ │ │txReadBuffer │ │ │ │ │ └─────────────────────┘ │
│ │ │ │ (sorted) │ │ │ │ │ │ │ │ │ │ │ │ │ │
│ │ │ └─────────────┘ │ │ │ │ │ │ └─────────────┘ │ │ │ │ ┌─────────────────────┐ │
│ │ └─────────────────┘ │ │ │ │ └─────────────────┘ │ │ │ │ Defrag │ │
│ └─────────────────────┘ │ │ └─────────────────────┘ │ │ │ │ │
│ │ │ │ │ │ ┌─────────────────┐ │ │
│ ┌─────────────────────┐ │ │ ┌─────────────────────┐ │ │ │ │ Temp DB │ │ │
│ │ Lock 机制 │ │ │ │ concurrentReadTx │ │ │ │ │ (defrag) │ │ │
│ │ │ │ │ │ │ │ │ │ └─────────────────┘ │ │
│ │ ┌─────────────────┐ │ │ │ │ ┌─────────────────┐ │ │ │ └─────────────────────┘ │
│ │ │LockInsideApply │ │ │ │ │ │ baseReadTx │ │ │ │ │
│ │ │LockOutsideApply│ │ │ │ │ │ (no locks) │ │ │ │ ┌─────────────────────┐ │
│ │ │ + Hook │ │ │ │ │ └─────────────────┘ │ │ │ │ Hash │ │
│ │ └─────────────────┘ │ │ │ └─────────────────────┘ │ │ │ │ │
│ └─────────────────────┘ │ └─────────────────────────┘ │ │ ┌─────────────────┐ │ │
└─────────────────────────┘ │ │ │ CRC32 Hash │ │ │
│ │ └─────────────────┘ │ │
│ └─────────────────────┘ │
└─────────────────────────┘

BatchTx

写入流程

1
2
3
4
5
6
7
8
9
10
11
Client Request


┌─────────────────┐
│ BatchTx() │ ──► batchTxBuffered
│ │
│ LockInsideApply │ ──► 1. 获取锁
│ │ 2. 执行 Hook (更新一致性索引)
│ │ 3. 写入 txWriteBuffer
│ │ 4. 批量提交到 BoltDB
└─────────────────┘

写缓冲区:批量写入,减少磁盘 I/O

1
2
3
4
5
6

type batchTxBuffered struct {
batchTx
buf txWriteBuffer
pendingDeleteOperations int
}

ReadTx

读缓存:避免重复复制,支持并发读取

分离读写:读写事务独立,提高并发性

1
2
3
4
5
6
7
8
9
10
11
Client Request


┌─────────────────┐
│ ReadTx() │ ──► readTx (单线程)
│ │
│ ConcurrentReadTx│ ──► concurrentReadTx (多线程)
│ │ 1. 从 txReadBufferCache 获取数据
│ │ 2. 如果缓存过期,从 readTx 复制
│ │ 3. 无锁并发读取
└─────────────────┘

缓冲区同步机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
┌─────────────────┐    writeback    ┌─────────────────┐
│ txWriteBuffer │ ──────────────► │ txReadBuffer │
│ │ │ │
│ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │bucketBuffer │ │ │ │bucketBuffer │ │
│ │ (sorted) │ │ │ │ (sorted) │ │
│ └─────────────┘ │ │ └─────────────┘ │
└─────────────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ BoltDB │ │ Read Cache │
│ (持久化) │ │ (内存缓存) │
└─────────────────┘ └─────────────────┘

Lock 流程

1
2
3
4
5
6
7
┌─────────────────────────────────────────────────────────┐
│ 锁的获取顺序 │
├─────────────────────────────────────────────────────────┤
│ 1. batchTx.LockInsideApply() / LockOutsideApply() │
│ 2. backend.mu.RLock() / Lock() │
│ 3. readTx.Lock() / RLock() │
└─────────────────────────────────────────────────────────┘

Detail

batchTxBuffered

1
2
3
4
5
type batchTxBuffered struct {
batchTx // base
buf txWriteBuffer // 写缓冲区
pendingDeleteOperations int // 待删除操作计数
}

batchTxBuffered 继承 batchTx, 进行双重写入(同时写入 BoltDB 和内存缓冲区)的核心设计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Client Write Request


┌─────────────────┐
│ batchTxBuffered │
│ │
│ ┌─────────────┐ │ ┌─────────────────┐
│ │ batchTx │ │───►│ BoltDB │ (持久化)
│ │ (直接写入) │ │ │ (最终存储) │
│ └─────────────┘ │ └─────────────────┘
│ │
│ ┌─────────────┐ │ ┌─────────────────┐
│ │ buf │ │───►│ txWriteBuffer │ (内存缓冲)
│ │ (缓冲写入) │ │ │ (性能优化) │
│ └─────────────┘ │ └─────────────────┘
└─────────────────┘

Unlock

Unlock 函数在 unlock 中进行智能commit,进行 commit 的条件是:

  • 批量限制: pending >= batchLimit(默认 10000)
  • 删除操作: pendingDeleteOperations > 0(立即提交)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (t *batchTxBuffered) Unlock() {
if t.pending != 0 {
// 1. 同步缓冲区到读缓存
t.backend.readTx.Lock()
t.buf.writeback(&t.backend.readTx.buf)
t.backend.readTx.Unlock()

// 2. 智能提交判断
if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 {
t.commit(false)
}
}
t.batchTx.Unlock()
}

为什么删除操作需要立即提交?线性一致性保证:

1
2
时间线: T1 ────── T2 ────── T3
操作: PUT A DELETE A READ A

如果删除操作不立即提交:

  • T2 删除操作在缓冲区中
  • T3 读取可能从 BoltDB 读到旧数据
  • 违反线性一致性

unsafeCommit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1. 写操作完成,触发 commit


2. 检查是否有旧的读事务


3. 启动异步 goroutine 处理旧事务

├─→ 等待所有读操作完成 (wg.Wait())

├─→ 回滚旧读事务 (tx.Rollback())

└─→ 释放资源


4. 立即重置 readTx 状态


5. 提交写事务到 BoltDB


6. 开始新的读事务

unsafeCommit 中存在一个异步优化:

  • 非阻塞提交:读事务回滚在 goroutine 中异步进行

    BlotDB 的 Read Transaction 有几个关键点:

    1
    2
    // BoltDB 读事务是只读的,不能提交
    tx, err := b.db.Begin(false) // false = 只读事务
    • 只读事务:BoltDB 的读事务是只读的,不能调用 Commit()
    • 必须关闭:读事务必须通过 Rollback() 来关闭
    • 资源管理:未关闭的事务会占用数据库资源
  • 无缝切换:新读事务立即开始,不等待旧事务完成

    原因:

    • 数据一致性:新提交的数据需要对新读事务可见, 基于当前 snapshot 创建新的读事物
    • 快照隔离:每个读事务看到的是特定时间点的数据快照
    • 资源管理:避免长时间持有旧事务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (t *batchTxBuffered) unsafeCommit(stop bool) {
// 1. 异步处理旧读事务
if t.backend.readTx.tx != nil {
go func(tx *bolt.Tx, wg *sync.WaitGroup) {
wg.Wait() // 异步等待所有读操作完成
tx.Rollback()
}(t.backend.readTx.tx, t.backend.readTx.txWg)
// 立即重置读事物
t.backend.readTx.reset()
}

// 2. 提交写事务
t.batchTx.commit(stop)
t.pendingDeleteOperations = 0

// 3. 开始新读事务,保证新提交的数据对新读事务可见
if !stop {
t.backend.readTx.tx = t.backend.begin(false)
}
}
WaitGroup 的保护机制

ConcurrentReadTx 中有个 txWg,在创建并发读事物时会增加一个引用计数:

1
2
3
4
5
6
7
// 在 ConcurrentReadTx 中
b.readTx.txWg.Add(1) // 增加计数

// 在 concurrentReadTx.RUnlock() 中
func (rt *concurrentReadTx) RUnlock() {
rt.txWg.Done() // 减少计数
}

WaitGroup 在 unsafeCommit 中的异步保护机制:

  • 引用计数:每个活跃的读操作增加计数 b.readTx.txWg.Add(1)
  • 安全关闭:只有当所有读操作完成后才回滚事务 RUnlock: txWg.Done()
  • 避免崩溃:防止在活跃读操作时关闭事务 wg.Wait(); tx.Rollback()

commit

流程如下:

1
2
3
4
5
6
7
8
9
10
11
batchTxBuffered.commit()

├─→ t.backend.readTx.Lock() // 1. 锁定读事务

├─→ t.unsafeCommit() // 2. 执行提交逻辑
│ │
│ ├─→ 异步回滚旧读事务
│ ├─→ 提交写事务到 BoltDB
│ └─→ 创建新读事务

└─→ t.backend.readTx.Unlock() // 3. 释放读事务锁

commit 函数中先执行 t.backend.readTx.Lock() 的作用:

  • 阻止新读事务: 防止在提交期间启动新的读事务
  • 等待活跃读事务: 等待所有正在进行的读事务完成
  • 保护事务切换: 确保读事务的安全切换
    1
    2
    3
    4
    5
    6
    func (t *batchTxBuffered) commit(stop bool) {
    // all read txs must be closed to acquire boltdb commit rwlock
    t.backend.readTx.Lock()
    t.unsafeCommit(stop)
    t.backend.readTx.Unlock()
    }

这里的核心是 t.backend.readTx.Lock() 的作用。

首先,这里使用的 Lock 而不是 RLOCK(),由于写锁会阻塞所有的读锁,即阻塞 ConcurrentReadTx 函数创建新的 ReadTx, 需要等待当前所有活跃的 ReadTxs 完成,并确保写操作提交完成,再释放 Lock,才允许创建新的 ReadTx。

1
2
3
4
5
6
7
8
func (b *backend) ConcurrentReadTx() ReadTx {
b.readTx.RLock() // 获取读锁
defer b.readTx.RUnlock()
// prevent boltdb read Tx from been rolled back until store read Tx is done.
// Needs to be called when holding readTx.RLock().
b.readTx.txWg.Add(1)
// ...
}

无写操作

1
2
3
时间线: T1 ────── T2 ────── T3
操作: 读事务1 读事务2 读事务3
锁状态: RLock() RLock() RLock() (并发)

有写操作

1
2
3
时间线: T1 ────── T2 ────── T3 ────── T4
操作: 读事务1 写提交 新读事务 写提交完成
锁状态: RLock() Lock() 阻塞 Unlock()

如果没有 readTx.Lock() 的情况:

1
2
3
时间线: T1 ────── T2 ────── T3 ────── T4
操作: 写事务提交 新读事务开始 旧读事务回滚 新读事务读取
状态: 准备提交 创建新事务 回滚旧事务 可能读到不一致数据

问题:

  • 数据不一致:新读事务可能读到部分更新的数据
  • 竞态条件:读事务创建和旧事务回滚的竞态
  • 资源冲突:多个读事务同时访问 BoltDB

存在 readTx.Lock() 的情况:

1
2
3
时间线: T1 ────── T2 ────── T3 ────── T4
操作: 获取读锁 等待读事务完成 提交写事务 释放锁
状态: 阻塞新读 等待活跃读完成 原子提交 允许新读事务

Commit

Commit 函数相比较 commit 函数多了一个 lock() 函数,这是继承自 BatchTxlock 函数:

1
2
3
4
5
6
7
8
9
10
11
12
func (t *batchTxBuffered) Commit() {
t.lock() // 第一层:batchTx 的互斥锁
t.commit(false) // 内部调用
t.Unlock()
}

type batchTx struct {
sync.Mutex // for lcok()
tx *bolt.Tx // BoltDB 事务对象
backend *backend // 后端引用
pending int // 待处理操作计数
}

lock 主要是保护 BatchTx 的共享状态,与 readTx.Lock() 的协同作用:

  • t.lock():保护 batchTx 内部状态,防止并发写操作
  • readTx.Lock():保护 BoltDB 事务提交,防止读事务干扰

这么设计设计,优点是确保数据一致性和事务完整性, 缺点是在提交期间阻塞所有写操作,因此通过批量提交减少锁的持有时间来优化。

UsafePut/UnsafeDelete

batchTxBuffered 有几个 Unsafe 函数,Unsafe 的意思这些函数都需要在 t.lock() 的保护下执行,分别由 BatchTx 接口中的 LockInsideApplyLockOutsideApply 封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) {
t.batchTx.UnsafePut(bucket, key, value)
t.buf.put(bucket, key, value)
}

func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
t.batchTx.UnsafeSeqPut(bucket, key, value)
t.buf.putSeq(bucket, key, value)
}

func (t *batchTxBuffered) UnsafeDelete(bucketType Bucket, key []byte) {
t.batchTx.UnsafeDelete(bucketType, key)
t.pendingDeleteOperations++
}

func (t *batchTxBuffered) UnsafeDeleteBucket(bucket Bucket) {
t.batchTx.UnsafeDeleteBucket(bucket)
t.pendingDeleteOperations++
}

BatchTxLockOutsideApplyLockInsideApply 区别就是在于是否在 Log Apply 的过程。

  • LockInsideApply

    • 验证调用是否在 Apply 上下文中(即 Raft 日志应用过程中)
    • 通过检查调用栈是否包含 .applyEntries 来判断
    • 如果设置了 txPostLockInsideApplyHook,会额外验证调用栈

    实现如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
       func (t *batchTx) LockInsideApply() {
    t.lock()
    if t.backend.txPostLockInsideApplyHook != nil {
    ValidateCalledInsideApply(t.backend.lg)
    t.backend.txPostLockInsideApplyHook() // 执行钩子函数
    }
    }

    // HOOK: 这个钩子确保在 Apply 过程中,consistIndex 能够正确更新
    func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
    return func() {
    applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
    if applyingIdx > s.consistIndex.UnsafeConsistentIndex() {
    s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm)
    }
    }
    }
  • LockOutsideApply():

    • 验证调用是否在 Apply 上下文之外
    • 如果检测到在 Apply 上下文中调用,会触发 panic

    实现如下:

    1
    2
    3
    4
    func (t *batchTx) LockOutsideApply() {
    ValidateCalledOutSideApply(t.backend.lg)
    t.lock() // 只获取锁,不执行钩子函数
    }

    主要用于系统初始化、恢复、维护操作等不需要更新一致性索引的操作 例如:存储初始化、数据清理、压缩等

Unlock

到此,再重新审视下 batchTxBuffered::Unlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (t *batchTxBuffered) Unlock() {
if t.pending != 0 {
// 1. 同步缓冲区到读缓存
t.backend.readTx.Lock()
t.buf.writeback(&t.backend.readTx.buf)
t.backend.readTx.Unlock()

// 2. 智能提交判断
if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 {
t.commit(false)
}
}
t.batchTx.Unlock()
}

这里核心是 t.buf.writebackt.batchTx.Unlock 两个功能。后者是为了和 t.lock() 配对,释放锁资源。**writeback 的作用是将写缓冲区的数据同步到读缓冲区,来确保新写入的数据对后续读操作可见,进而保证 etcd 的线性一致性语义**

ReadTx

上面阐述了写事物 BatchTx 如何进行写操作,下面来阐述读事物 ConcurrentReadTx

txReadBufferCache

缓存策略

  • bufVersion 版本控制:通过版本号检测缓存有效性
  • 延迟更新:只在必要时更新缓存
  • 并发安全:使用锁保护缓存操作
1
2
3
4
5
type txReadBufferCache struct {
mu sync.Mutex
buf *txReadBuffer
bufVersion uint64
}

TxReadBuffer

txReadBufferCache.bufVersion 缓存的是 txReadBuffer.bufVersion 的值: 只有当写事物提交时(BatchTx.commit)并在 writeback 函数中才会自增 txReadBuffer.bufVersion++ 因此可以用两个 bufVersion 进行比较来确定缓存 b.txReadBufferCache 的有效性。

1
2
3
4
5
6
7
8
9
10
11
12
13
// txReadBuffer accesses buffered updates.
type txReadBuffer struct {
txBuffer
// bufVersion is used to check if the buffer is modified recently
bufVersion uint64
}

func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
// ... 数据合并逻辑 ...
txw.reset()
// increase the buffer version
txr.bufVersion++ // 只有每次 writeback 后版本号 +1
}

下图展示了一个 txReadBufferCache.buf demo: 如果在 bufVersion 有效期间创建 ConcurrentReadTx,则只会复制一次 b.readTx.buf,减少了多次 copy 行为,降低了集群压力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type backend struct {
readTx *readTx
// txReadBufferCache mirrors "txReadBuffer" within "readTx" -- readTx.baseReadTx.buf.
// When creating "concurrentReadTx":
// - if the cache is up-to-date, "readTx.baseReadTx.buf" copy can be skipped
// - if the cache is empty or outdated, "readTx.baseReadTx.buf" copy is required

// Without Cache:
// readTx.baseReadTx.buf → copy → concurrentReadTx1
// readTx.baseReadTx.buf → copy → concurrentReadTx2
// readTx.baseReadTx.buf → copy → concurrentReadTx3

// With Cache:
// readTx.baseReadTx.buf → copy → cache → concurrentReadTx1
// ............................. cache → concurrentReadTx2
// ............................. cache → concurrentReadTx3
txReadBufferCache txReadBufferCache
}

ConcurrentReadTx

ConcurrentReadT 的核心设计目标:

  • 非阻塞读操作
    • 并发安全:多个读操作可以同时进行
    • 无锁设计:读操作不需要获取锁, 多个读操作可以并行执行
    • 性能优化:避免读操作被写操作阻塞
      1
      2
      3
      4
      5
      6
      7
        	func (rt *concurrentReadTx) Lock()   {}  // 空实现
      func (rt *concurrentReadTx) RLock() {} // 空实现
      func (rt *concurrentReadTx) Unlock() {} // 空实现
      // 创建时:txWg.Add(1) 增加计数
      // 完成时:txWg.Done() 减少计数
      // 保护机制: 在 batchTx.commit 时,先等待所有并发读事物结束, 防止 BoltDB 事务过早回滚,
      func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() } // 引用计数,管理多个读事物生命周期
  • 数据一致性保证
    • 快照隔离:每个读事务看到特定时间点的数据
    • 缓冲区同步:确保读操作能看到最新的写入数据

实现分析

第一阶段: 获取 readTx 读锁和事务保护。这里和 batchTxBuffered.commit 形成对比,当 BatchTx 在执行 commit 时会导致 backend.ConcurrentReadTx 函数中第一阶段 readTx.RLock() 阻塞。

1
2
3
4
5
6
func (b *backend) ConcurrentReadTx() ReadTx {
b.readTx.RLock() // 1. 获取读锁: 如果在 commit 则等待 readTx.UnLock
defer b.readTx.RUnlock() // 2. 确保释放读锁
b.readTx.txWg.Add(1) // 3. 增加等待组计数,用于生命周期管理,跟踪活跃的读事务
// ...below
}

第二阶段: 智能缓存管理

存策略: 通过 bufVersion 检测缓存是否过期。三种情况处理:空缓存、过期缓存、有效缓存。

由于 commit 中存在 writeback 策略,因此在 commit 结束使得 b.readTx.RLock() 解除阻塞后,b.readTx.buf 中总是缓存着当前最新的数据,那么 ConcurrentReadTx 得到 ReadTx 即基于最新的 snapshot, 保证了数据一致性。

1
2
3
4
5
6
7
8
9
10
11
12
   func (b *backend) ConcurrentReadTx() ReadTx {
//... above

b.txReadBufferCache.mu.Lock() // 4. 获取缓存锁
curCache := b.txReadBufferCache.buf
curCacheVer := b.txReadBufferCache.bufVersion
curBufVer := b.readTx.buf.bufVersion

isEmptyCache := curCache == nil
isStaleCache := curCacheVer != curBufVer
//...below
}

第三阶段:缓冲区复制优化策略

  • 延迟复制:只在必要时才复制缓冲区
  • 并发优化:在复制时释放锁,提高并发性
  • 版本检查:防止并发复制导致的数据不一致
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
       func (b *backend) ConcurrentReadTx() ReadTx {
    //... above

    var buf *txReadBuffer
    switch {
    case isEmptyCache:
    // 首次创建缓存
    curBuf := b.readTx.buf.unsafeCopy()
    buf = &curBuf

    case isStaleCache:
    // 过期缓存,需要更新
    b.txReadBufferCache.mu.Unlock() // 释放锁进行复制
    curBuf := b.readTx.buf.unsafeCopy()
    b.txReadBufferCache.mu.Lock() // 重新获取锁
    buf = &curBuf

    default:
    // 缓存有效,直接使用
    buf = curCache
    }

    //...below
    }

第四阶段: 缓存更新和版本控制机制

  • 原子性检查:确保缓存版本没有被其他线程修改
  • 安全更新:只在版本一致时更新缓存
  • 失败处理:版本不一致时跳过更新,下次调用会重新复制
    1
    2
    3
    4
    5
    if isEmptyCache || curCacheVer == b.txReadBufferCache.bufVersion {
    // 更新缓存
    b.txReadBufferCache.buf = buf
    b.txReadBufferCache.bufVersion = curBufVer
    }

第五阶段: 创建并发读事务

1
2
3
4
5
6
7
8
9
return &concurrentReadTx{
baseReadTx: baseReadTx{
buf: *buf, // 复制的缓冲区
txMu: b.readTx.txMu, // 共享的事务锁
tx: b.readTx.tx, // 共享的 BoltDB 事务
buckets: b.readTx.buckets, // 共享的桶缓存
txWg: b.readTx.txWg, // 共享的等待组
},
}