剖析 ETCD.mvcc

MVCC

mvcc 的核心组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
┌─────────────────────────────────────────────────────────────┐
│ etcd MVCC 系统 │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ KV Store │ │ Index │ │ Backend │ │
│ │ │ │ │ │ │ │
│ │ • 事务管理 │ │ • B+树索引 │ │ • BoltDB存储引擎 │ │
│ │ • 版本控制 │ │ • 版本管理 │ │ • 事务缓冲 │ │
│ │ • 并发控制 │ │ • 压缩管理 │ │ • 快照管理 │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Watcher │ │ Compaction │ │ Hash │ │
│ │ │ │ │ │ │ │
│ │ • 事件监听 │ │ • 版本清理 │ │ • 数据完整性校验 │ │
│ │ • 事件分发 │ │ • 空间回收 │ │ • 快照验证 │ │
│ │ • 同步机制 │ │ • 压缩调度 │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

mvcc 的数据流转

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ Client │ │ KV Store │ │ Backend │
│ Request │───▶│ │───▶│ (BoltDB) │
└─────────────┘ └─────────────┘ └─────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Index │ │ Buffer │
│ (B+Tree) │ │ (txBuffer) │
└─────────────┘ └─────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Watcher │ │ Compaction │
│ Events │ │ Cleanup │
└─────────────┘ └─────────────┘

Compact

Compact 也是一种两阶段设计思想

1
2
3
4
5
调用 Compact(rev)

第一阶段: updateCompactRev(rev) - 更新压缩版本号

第二阶段: compact() - 异步调度压缩任务

updateCompactRev

Cpmpact 任务的第一阶段。

在 ETCD 中,存在一些单独的 Bukcet 用于存储一些元数据,比如 Buckets.Meta 存储当前需要调度的 compaction 任务的 revision 。

因此写入后,需要 ForceCommit 来保证元数据不会丢。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
s.revMu.Lock()

// 1. 版本号有效性检查
if rev <= s.compactMainRev {
// 已经压缩过,返回错误
return ch, 0, ErrCompacted
}

if rev > s.currentRev {
// 版本号过大,返回错误
return nil, 0, ErrFutureRev
}

// 2. 更新压缩版本号
compactMainRev := s.compactMainRev
s.compactMainRev = rev

// 3. 持久化压缩计划到数据库
rbytes := newRevBytes()
revToBytes(revision{main: rev}, rbytes)

tx := s.b.BatchTx()
tx.LockInsideApply()
tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes)
tx.Unlock()

// 4. 强制提交,确保压缩计划被持久化
s.b.ForceCommit()

s.revMu.Unlock()
return nil, compactMainRev, nil
}

scheduleCompaction

scheduleCompaction 两阶段压缩:先压缩内存索引,再压缩数据库:

  • 批量处理:分批处理数据,避免长时间锁定

    每次在最多处理的kv数由 s.cfg.CompactionBatchLimit 确定

  • 保留策略:根据内存索引的压缩结果决定保留哪些数据

    由 treeIndex Compact 完后的结果 keep 来决定DB中哪些数据是否保留

  • 进度控制:每批处理后休眠,避免阻塞其他操作

  • 完成标记:设置压缩完成标记,用于状态检查

    更新 buckets.Meta 中的 key finishedCompactKeyName 的值,用于后续状态查询

整体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyValueHash, error) {
totalStart := time.Now()

// 第一步:压缩内存索引
keep := s.kvindex.Compact(compactMainRev)
indexCompactionPauseMs.Observe(float64(time.Since(totalStart) / time.Millisecond))

// 准备压缩范围
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))

batchNum := s.cfg.CompactionBatchLimit
h := newKVHasher(prevCompactRev, compactMainRev, keep)
last := make([]byte, 8+1+8)

// 批量处理压缩
for {
var rev revision

start := time.Now()

tx := s.b.BatchTx()
tx.LockOutsideApply()

// 获取一批需要处理的数据
keys, values := tx.UnsafeRange(buckets.Key, last, end, int64(batchNum))

for i := range keys {
rev = bytesToRev(keys[i])
if _, ok := keep[rev]; !ok {
// 删除不需要保留的数据
tx.UnsafeDelete(buckets.Key, keys[i])
keyCompactions++
}
h.WriteKeyValue(keys[i], values[i])
}

// 检查是否处理完成
if len(keys) < batchNum {
// 设置压缩完成标记
rbytes := make([]byte, 8+1+8)
revToBytes(revision{main: compactMainRev}, rbytes)
tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes)
tx.Unlock()

// 记录压缩完成信息
hash := h.Hash()
size, sizeInUse := s.b.Size(), s.b.SizeInUse()
s.lg.Info("finished scheduled compaction", ...)
return hash, nil
}

// 更新处理位置
revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last)
tx.Unlock()

// 立即提交压缩删除操作
s.b.ForceCommit()
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))

// 一个 batch 结束, 休眠一段时间,避免阻塞其他操作
select {
case <-time.After(s.cfg.CompactionSleepInterval):
case <-s.stopc:
return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal")
}
}
}

checkPrevCompactionCompleted

根据 Buckets.Metakey:finishedCompactKeyNamekey:scheduledCompactKeyName 的最新值,来判断上次 Compaction 是否完成。

1
2
3
4
5
6
7
8
9
10
11
func (s *store) checkPrevCompactionCompleted() bool {
tx := s.b.ReadTx()
tx.Lock()
defer tx.Unlock()

scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx)
finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx)

// 检查上一次压缩是否完成
return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
}

Restore

RestoreNewStorestore.restore 提供统一实现,即是创建一个新的 KV 实例和从已有的数据恢复成一个 KV 实例,提供了一个统一实现:

  • 恢复 compactMainRev 和 currentRev 版本号

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 如果先恢复 currentRev, 再恢复 compactMainRev:
    // 1. currentRev 可能小于 compactMainRev
    // 2. 导致数据不一致
    // 3. 可能触发错误的压缩操作

    // 正确的顺序确保:
    // 1. 压缩版本号先确定
    // 2. 键值对按版本顺序恢复
    // 3. 当前版本号最后确定,确保一致性
  • 并发恢复 treeIndex 中的索引数据

  • 恢复 lease 租约信息

  • 处理未完成的 compaction 任务

完整实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
func (s *store) restore() error {
// 1. 设置监控报告器
s.setupMetricsReporter()

// 2. 准备恢复范围
min, max := newRevBytes(), newRevBytes()
revToBytes(revision{main: 1}, min) // 从版本 1 开始
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max) // 到最大版本

// 3. 初始化租约映射
keyToLease := make(map[string]lease.LeaseID)

// 4. 恢复压缩版本号
tx := s.b.ReadTx()
tx.Lock()

// 恢复已完成的压缩版本号: finishedCompact
finishedCompact, found := UnsafeReadFinishedCompact(tx)
if found {
s.revMu.Lock()
s.compactMainRev = finishedCompact
s.lg.Info("restored last compact revision",
zap.Int64("restored-compact-revision", s.compactMainRev))
s.revMu.Unlock()
}

// 读取计划的压缩版本号
scheduledCompact, _ := UnsafeReadScheduledCompact(tx)

// 5. 并发恢复 treeIndex 索引数据
keysGauge.Set(0)
rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)

// 6. 分块读取和恢复数据
for {
keys, vals := tx.UnsafeRange(buckets.Key, min, max, int64(restoreChunkKeys))
if len(keys) == 0 {
break
}

// 处理当前块的数据
restoreChunk(s.lg, rkvc, keys, vals, keyToLease)

if len(keys) < restoreChunkKeys {
// 部分集合意味着最终集合
break
}

// 更新下一个块的起始位置
newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
newMin.sub++
revToBytes(newMin, min)
}
close(rkvc)

// 7. 恢复当前版本号
{
s.revMu.Lock()
s.currentRev = <-revc

// 处理压缩后的版本号调整
if s.currentRev < s.compactMainRev {
s.currentRev = s.compactMainRev
}

// 处理未完成压缩的情况
if s.currentRev < scheduledCompact {
s.currentRev = scheduledCompact
}
s.revMu.Unlock()
}

// 8. 处理未完成的压缩
if scheduledCompact <= s.compactMainRev {
scheduledCompact = 0
}

// 9. 恢复租约信息
for key, lid := range keyToLease {
if s.le == nil {
tx.Unlock()
panic("no lessor to attach lease")
}
err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
if err != nil {
s.lg.Error("failed to attach a lease", zap.Error(err))
}
}

tx.Unlock()

s.lg.Info("kvstore restored", zap.Int64("current-rev", s.currentRev))

// 10. 处理未完成的压缩任务
if scheduledCompact != 0 {
if _, err := s.compactLockfree(scheduledCompact); err != nil {
s.lg.Warn("compaction encountered error", zap.Error(err))
}
}

return nil
}

restoreIntoIndex

restoreIntoIndex 是创建一个新的消费者 goroutine,来 restore treeIndex 与主线程 restoreChunk 一起并发 restore.

  • 生产者-消费者模式:使用通道进行数据传输
  • 缓存优化:使用 LRU 风格的缓存策略
  • 并发安全:在单独的 goroutine 中处理索引更新
  • 内存控制:通过缓存大小限制内存使用

完整实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
type revKeyValue struct {
key []byte // 版本化的键(包含 revision 信息)
kv mvccpb.KeyValue // 键值对数据 (User's Key Value)
kstr string // kv.Key 的字符串形式(用于缓存查找)
}

func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) {
rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)

go func() {
currentRev := int64(1)
defer func() { revc <- currentRev }()

// 创建键索引缓存
kiCache := make(map[string]*keyIndex, restoreChunkKeys)

for rkv := range rkvc {
ki, ok := kiCache[rkv.kstr]

// 缓存清理策略:当缓存满且未命中时,清理部分缓存
if !ok && len(kiCache) >= restoreChunkKeys {
i := 10
// kiCache 是 unordered, range 是随机访问
// 因此 delete 是随机删除
for k := range kiCache {
delete(kiCache, k)
if i--; i == 0 {
break
}
}
}

// 缓存未命中,从树索引中获取
if !ok {
ki = &keyIndex{key: rkv.kv.Key}
if idxKey := idx.KeyIndex(ki); idxKey != nil {
kiCache[rkv.kstr], ki = idxKey, idxKey
ok = true
}
}

rev := bytesToRev(rkv.key) // revision_byte to rev
currentRev = rev.main

if ok {
/// rkv.kv.Key 在 treeIndex 中已存在索引

if isTombstone(rkv.key) {
// 处理墓碑标记
if err := ki.tombstone(lg, rev.main, rev.sub); err != nil {
lg.Warn("tombstone encountered error", zap.Error(err))
}
continue
}
// 插入
ki.put(lg, rev.main, rev.sub)
} else {
// 键索引不存在,创建新的
if isTombstone(rkv.key) {
ki.restoreTombstone(lg, rev.main, rev.sub)
} else {
ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
}
idx.Insert(ki)
kiCache[rkv.kstr] = ki // update cache
}
}
}()

return rkvc, revc
}

restoreChunk

restoreChunk 在 restore 主线程中处理,将从 backend 中获取到的数据进行租约 lease 信息处理,处理完将这个 batch 的数据集 rkv 通过 channel 传递给消费者 goroutine: restoreIntoIndex 去恢复 treeIndex。

restoreChunk:

  • 一次最多处理 restoreChunkKeys 个 {key value}
  • etcdServer.Restore 需要先恢复 leasor 信息

store.restore 这部分关系如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
    // func restore
//...
for {
keys, vals := tx.UnsafeRange(buckets.Key, min, max, int64(restoreChunkKeys))
if len(keys) == 0 {
break
}
// rkvc blocks if the total pending keys exceeds the restore
// chunk size to keep keys from consuming too much memory.
restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
if len(keys) < restoreChunkKeys {
// partial set implies final set
break
}
// next set begins after where this one ended
newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
newMin.sub++
revToBytes(newMin, min)
}

func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
for i, key := range keys {
rkv := revKeyValue{key: key}

// 反序列化键值对
if err := rkv.kv.Unmarshal(vals[i]); err != nil {
lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
}

rkv.kstr = string(rkv.kv.Key)

// 处理租约信息
if isTombstone(key) {
// 墓碑标记,删除租约关联
delete(keyToLease, rkv.kstr)
} else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
// 有租约,记录租约关联
keyToLease[rkv.kstr] = lid
} else {
// 无租约,删除租约关联
delete(keyToLease, rkv.kstr)
}

// 发送到索引恢复通道
kvc <- rkv
}
}

版本一致性

在完成上述操作后,即可获得最大 revision,但是仍需要调整下版本号:

  • 压缩后调整:确保 currentRev >= compactMainRev
  • 未完成压缩:使用 scheduledCompact 调整版本号
  • 墓碑版本处理:正确处理删除操作的版本号
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 版本号恢复逻辑

finishedCompact, found := UnsafeReadFinishedCompact(tx)
if found {
s.compactMainRev = finishedCompact
}
//..

{
s.revMu.Lock()
s.currentRev = <-revc // 从 treeIndex 索引恢复过程中获取最大版本号

// 情况1:处理压缩后的版本号调整
// 在压缩过程中,某些版本可能被删除,需要确保当前版本号不小于压缩版本号
if s.currentRev < s.compactMainRev {
s.currentRev = s.compactMainRev
}

// 情况2:处理未完成压缩的情况
// 如果 etcd 在压缩过程中崩溃,可能导致版本号不一致
// 使用计划的压缩版本号来调整当前版本号
if s.currentRev < scheduledCompact {
s.currentRev = scheduledCompact
}
s.revMu.Unlock()
}

lease

调整完版本,再恢复租约:

  • 状态重建:重新建立键与租约的关联关系
  • 错误容忍:租约恢复失败时记录错误但不终止恢复过程
  • 完整性检查:确保租约管理器存在
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for key, lid := range keyToLease {
// 需要 etcdServer 先恢复 leasor
if s.le == nil {
tx.Unlock()
panic("no lessor to attach lease")
}

// 重新关联键和租约
err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
if err != nil {
s.lg.Error("failed to attach a lease",
zap.String("lease-id", fmt.Sprintf("%016x", lid)),
zap.Error(err))
}
}

compactLockfree

compaction 需要作为 restore 的最后一步,设计原因如下:

  1. 数据完整性保证
  • 所有数据都已恢复
  • 索引结构已建立
  • 租约关联已建立

如果 restore 提前 compaction,可能会导致 compactMainRev 变大,导致数据不一致,删除了部分数据。

  1. 系统可用性
  • 即使压缩失败,系统仍可运行
  • 不会因为压缩问题导致启动失败
  1. 错误隔离
    • 压缩错误不会影响数据恢复(这点很重要)
    • 可以独立处理压缩问题

从性能角度考虑: 先恢复核心功能, compaction 作为后台异步任务执行可以减少启动时间

完整实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 在 restore 过程中检测未完成的压缩
func (s *store) restore() error {
// 1. 首先读取已完成的压缩版本号
finishedCompact, found := UnsafeReadFinishedCompact(tx)
if found {
s.compactMainRev = finishedCompact
}

// 2. 读取计划的压缩版本号
scheduledCompact, _ := UnsafeReadScheduledCompact(tx)
// 3. 恢复上述所有数据...


// 4. 最后处理未完成的压缩
if scheduledCompact != 0 {
if _, err := s.compactLockfree(scheduledCompact); err != nil {
s.lg.Warn("compaction encountered error", zap.Error(err))
} else {
s.lg.Info("resume scheduled compaction",
zap.Int64("scheduled-compact-revision", scheduledCompact))
}
}
}
EtcdServer.applySnapshot

EtcdServer.applySnapshot 具有重要意义,分别应用于进程重启后的状态恢复,以及 follower 加入已有集群后的状态初始化等。实际应用价值如下:

  • 集群同步:确保 Follower 节点能够快速同步到 Leader 状态
  • 故障恢复:系统重启后能够从快照快速恢复
  • 性能优化:避免长时间的重放日志操作
  • 数据一致性:保证分布式系统中的强一致性

核心设计原则

  • 顺序重要性:ConsistentIndex → Lessor → MVCC → 其他组件的恢复顺序
  • 原子性:后端切换和进度更新使用原子操作
  • 资源管理:异步关闭旧资源,避免阻塞主流程
  • 错误处理:关键错误使用 Panic,确保系统一致性

执行程图如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
graph TD
O[等待 Raft 节点持久化] -->A[ConsistentIndex]
A[ConsistentIndex] --> B[Lessor]
B --> C[MVCC Store]
C --> D[事务钩子]
D --> E[后端切换]
E --> F[Alarm Store]
F --> G[Auth Store]
G --> H[V2 Store]
H --> I[集群配置]
I --> J[网络配置]

style A fill:#0f3460,stroke:#16213e,stroke-width:2px,color:#ffffff
style B fill:#0d7377,stroke:#14a085,stroke-width:2px,color:#ffffff
style C fill:#0d7377,stroke:#14a085,stroke-width:2px,color:#ffffff
style D fill:#0d7377,stroke:#14a085,stroke-width:2px,color:#ffffff
style E fill:#6a0572,stroke:#8b008b,stroke-width:2px,color:#ffffff
style J fill:#2d5016,stroke:#4a7c59,stroke-width:2px,color:#ffffff

这里核心的 MVCC Restore 之前已经分析过,Lesssor 后续分析,另一个核心的设计就是 ConsistentIndex

ConsistentIndex

consistIndex 存在的必要性是 etcd 确保数据一致性的核心机制:

  • 核心就是防止重复 apply entry: 通过比较 log index, 确保每个 log entry 只 applied 一次
  • 持久化保证: 只有真正持久化的数据才被标记为已应用
  • 重启一致性: 重启后能够正确恢复到一致状态
  • 异常处理: 在各种异常情况(重启、leader changed、网络分区等)下都能保证数据完整性

这种设计使得 etcd 能够在复杂的分布式环境中提供强一致性保证,是 etcd 作为可靠分布式键值存储的重要基础。没有 consistIndex,etcd 就无法正确处理重启、快照恢复、网络分区等场景下的日志重放问题,会导致数据不一致或重复应用的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// position:server/etcdserver/cindex/cindex.go:58-78
type consistentIndex struct {
// consistentIndex 表示一致副本日志中条目的偏移量
// 它缓存了 "consistent_index" 键的值
// 通过原子操作访问,必须 64 位对齐
consistentIndex uint64

// term 表示一致副本日志中已提交条目的 RAFT term
// 通过原子操作访问,必须 64 位对齐
// 从 v3.5 开始持久化到后端
term uint64

// applyingIndex 和 applyingTerm 是 raftpb.Entry.Index 和 raftpb.Entry.Term 的临时缓存
// 它们还没有准备好被持久化,将在 txPostLockInsideApplyHook 中保存到上面的 consistentIndex 和 term
applyingIndex uint64
applyingTerm uint64

// be 用于初始读取 consistentIndex
be Backend
// mutex 保护 be
mutex sync.Mutex
}

在 apply 中的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type ShouldApplyV3 bool

const (
ApplyBoth = ShouldApplyV3(true)
ApplyV2storeOnly = ShouldApplyV3(false)
)

func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) {
for i := range es {
e := es[i]
switch e.Type {
case raftpb.EntryNormal:
s.applyEntryNormal(&e)
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)

case raftpb.EntryConfChange:
// 我们需要在 v2store 上应用所有 WAL 条目
// 并且只在后端上应用 'unapplied' (e.Index>backend.ConsistentIndex)
shouldApplyV3 := membership.ApplyV2storeOnly

// 设置当前执行条目的一致性索引
if e.Index > s.consistIndex.ConsistentIndex() {
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}
// ... 处理配置变更
}
}
}

关键逻辑

  • 对于 raftpb.EntryConfChange,只有当 e.Index > s.consistIndex.ConsistentIndex() 时才应用到 v3Store(即 BlotDB)
  • 这确保了只有新的配置变更才会被处理

applyEntryNormal() 方法中的使用

关键逻辑

  • 同样也只有当 e.Index > s.consistIndex.ConsistentIndex 时才应用到 v3 存储
  • 设置 applyingIndex 作为临时缓存
  • 使用 defer 确保在异常情况下也能更新索引 (比如接收到空包等)

只有当 index 更大,shouldApplyV3 的值是 membership.ApplyBoth,即为 true,才真正 apply 到 backend。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
shouldApplyV3 := membership.ApplyV2storeOnly
var ar *applyResult
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
// 设置当前执行条目的一致性索引
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
defer func() {
// 在某些情况下 txPostLockInsideApplyHook 不会被调用
// 在这种情况下我们应该直接推进一致性索引
newIndex := s.consistIndex.ConsistentIndex()
if newIndex < e.Index {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
}
}()
}
// ... 应用条目
}

事务钩子机制

TxPostLockInsideApplyHook 方法

1
2
3
4
5
6
7
8
9
10
newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())

func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
return func() {
applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
if applyingIdx > s.consistIndex.UnsafeConsistentIndex() {
s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm)
}
}
}

分析

  • TxnApplyHook 在事务锁定时调用,因此内部调用的是 Unsafe
  • applyingIndex 提升为 consistentIndex
  • 确保只有真正持久化的数据才被标记为已应用
  • 在服务启动或者恢复存储后端时设置钩子
  • 确保后续的事务操作都会在 Unlock 前调用这个钩子
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    func (b *backend) SetTxPostLockInsideApplyHook(hook func()) {
    b.batchTx.lock()
    defer b.batchTx.Unlock() // 智能 Commit
    b.txPostLockInsideApplyHook = hook
    }

    func (t *batchTx) LockInsideApply() {
    t.lock()
    if t.backend.txPostLockInsideApplyHook != nil {
    ValidateCalledInsideApply(t.backend.lg)
    t.backend.txPostLockInsideApplyHook()
    }
    }

applySnapshot 中的应用

关键时机

  • 必须在恢复 lessor 之前设置后端
  • 避免旧的索引值覆盖新的快照值
    1
    2
    3
    4
    5
    6
    7
    // 位置:server/etcdserver/server.go:1340-1345
    // We need to set the backend to consistIndex before recovering the lessor,
    // because lessor.Recover will commit the boltDB transaction, accordingly it
    // will get the old consistent_index persisted into the db in OnPreCommitUnsafe.
    // Eventually the new consistent_index value coming from snapshot is overwritten
    // by the old value.
    s.consistIndex.SetBackend(newbe)

在启动时一致性验证中的使用

验证规则

  • consistIndex <= hardstate.Commit
  • consistIndex >= snapshot.Index
  • 确保启动时状态的一致性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 位置:server/verify/verify.go:110-131
func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error {
index, term := cindex.ReadConsistentIndex(be.ReadTx())

if cfg.ExactIndex && index != hardstate.Commit {
return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit)
}

if index > hardstate.Commit {
return fmt.Errorf("backend.ConsistentIndex (%v) must be <= WAL.HardState.commit (%v)", index, hardstate.Commit)
}

if index < snapshot.Index {
return fmt.Errorf("backend.ConsistentIndex (%v) must be >= last snapshot index (%v)", index, snapshot.Index)
}

return nil
}

consistIndex 也是批量更新到 blotdb,和事物提交同步

OnPreCommitUnsafe

1
2
3
4
5
6
7
8
9
10
func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
bh.indexer.UnsafeSave(tx)
bh.confStateLock.Lock()
defer bh.confStateLock.Unlock()
if bh.confStateDirty {
membership.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState)
// save bh.confState
bh.confStateDirty = false
}
}