MVCC mvcc 的核心组件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ┌─────────────────────────────────────────────────────────────┐ │ etcd MVCC 系统 │ ├─────────────────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │ │ KV Store │ │ Index │ │ Backend │ │ │ │ │ │ │ │ │ │ │ │ • 事务管理 │ │ • B+树索引 │ │ • BoltDB存储引擎 │ │ │ │ • 版本控制 │ │ • 版本管理 │ │ • 事务缓冲 │ │ │ │ • 并发控制 │ │ • 压缩管理 │ │ • 快照管理 │ │ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ ├─────────────────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │ │ Watcher │ │ Compaction │ │ Hash │ │ │ │ │ │ │ │ │ │ │ │ • 事件监听 │ │ • 版本清理 │ │ • 数据完整性校验 │ │ │ │ • 事件分发 │ │ • 空间回收 │ │ • 快照验证 │ │ │ │ • 同步机制 │ │ • 压缩调度 │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ └─────────────────────────────────────────────────────────────┘
mvcc 的数据流转
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Client │ │ KV Store │ │ Backend │ │ Request │───▶│ │───▶│ (BoltDB) │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ ▼ ▼ ┌─────────────┐ ┌─────────────┐ │ Index │ │ Buffer │ │ (B+Tree) │ │ (txBuffer) │ └─────────────┘ └─────────────┘ │ │ ▼ ▼ ┌─────────────┐ ┌─────────────┐ │ Watcher │ │ Compaction │ │ Events │ │ Cleanup │ └─────────────┘ └─────────────┘
Compact Compact 也是一种两阶段设计思想
1 2 3 4 5 调用 Compact(rev) ↓ 第一阶段: updateCompactRev(rev) - 更新压缩版本号 ↓ 第二阶段: compact() - 异步调度压缩任务
updateCompactRev Cpmpact 任务的第一阶段。
在 ETCD 中,存在一些单独的 Bukcet 用于存储一些元数据,比如 Buckets.Meta 存储当前需要调度的 compaction 任务的 revision 。
因此写入后,需要 ForceCommit 来保证元数据不会丢。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 func (s *store) updateCompactRev(rev int64 ) (<-chan struct {}, int64 , error ) { s.revMu.Lock() if rev <= s.compactMainRev { return ch, 0 , ErrCompacted } if rev > s.currentRev { return nil , 0 , ErrFutureRev } compactMainRev := s.compactMainRev s.compactMainRev = rev rbytes := newRevBytes() revToBytes(revision{main: rev}, rbytes) tx := s.b.BatchTx() tx.LockInsideApply() tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes) tx.Unlock() s.b.ForceCommit() s.revMu.Unlock() return nil , compactMainRev, nil }
scheduleCompaction scheduleCompaction 两阶段压缩:先压缩内存索引,再压缩数据库:
批量处理:分批处理数据,避免长时间锁定
每次在最多处理的kv数由 s.cfg.CompactionBatchLimit 确定
保留策略:根据内存索引的压缩结果决定保留哪些数据
由 treeIndex Compact 完后的结果 keep 来决定DB中哪些数据是否保留
进度控制:每批处理后休眠,避免阻塞其他操作
完成标记:设置压缩完成标记,用于状态检查
更新 buckets.Meta 中的 key finishedCompactKeyName 的值,用于后续状态查询
整体如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64 ) (KeyValueHash, error ) { totalStart := time.Now() keep := s.kvindex.Compact(compactMainRev) indexCompactionPauseMs.Observe(float64 (time.Since(totalStart) / time.Millisecond)) end := make ([]byte , 8 ) binary.BigEndian.PutUint64(end, uint64 (compactMainRev+1 )) batchNum := s.cfg.CompactionBatchLimit h := newKVHasher(prevCompactRev, compactMainRev, keep) last := make ([]byte , 8 +1 +8 ) for { var rev revision start := time.Now() tx := s.b.BatchTx() tx.LockOutsideApply() keys, values := tx.UnsafeRange(buckets.Key, last, end, int64 (batchNum)) for i := range keys { rev = bytesToRev(keys[i]) if _, ok := keep[rev]; !ok { tx.UnsafeDelete(buckets.Key, keys[i]) keyCompactions++ } h.WriteKeyValue(keys[i], values[i]) } if len (keys) < batchNum { rbytes := make ([]byte , 8 +1 +8 ) revToBytes(revision{main: compactMainRev}, rbytes) tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes) tx.Unlock() hash := h.Hash() size, sizeInUse := s.b.Size(), s.b.SizeInUse() s.lg.Info("finished scheduled compaction" , ...) return hash, nil } revToBytes(revision{main: rev.main, sub: rev.sub + 1 }, last) tx.Unlock() s.b.ForceCommit() dbCompactionPauseMs.Observe(float64 (time.Since(start) / time.Millisecond)) select { case <-time.After(s.cfg.CompactionSleepInterval): case <-s.stopc: return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal" ) } } }
checkPrevCompactionCompleted 根据 Buckets.Meta 中 key:finishedCompactKeyName 和 key:scheduledCompactKeyName 的最新值,来判断上次 Compaction 是否完成。
1 2 3 4 5 6 7 8 9 10 11 func (s *store) checkPrevCompactionCompleted() bool { tx := s.b.ReadTx() tx.Lock() defer tx.Unlock() scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx) finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx) return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound }
Restore Restore 为 NewStore 和 store.restore 提供统一实现,即是创建一个新的 KV 实例和从已有的数据恢复成一个 KV 实例,提供了一个统一实现:
完整实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 func (s *store) restore() error { s.setupMetricsReporter() min, max := newRevBytes(), newRevBytes() revToBytes(revision{main: 1 }, min) revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max) keyToLease := make (map [string ]lease.LeaseID) tx := s.b.ReadTx() tx.Lock() finishedCompact, found := UnsafeReadFinishedCompact(tx) if found { s.revMu.Lock() s.compactMainRev = finishedCompact s.lg.Info("restored last compact revision" , zap.Int64("restored-compact-revision" , s.compactMainRev)) s.revMu.Unlock() } scheduledCompact, _ := UnsafeReadScheduledCompact(tx) keysGauge.Set(0 ) rkvc, revc := restoreIntoIndex(s.lg, s.kvindex) for { keys, vals := tx.UnsafeRange(buckets.Key, min, max, int64 (restoreChunkKeys)) if len (keys) == 0 { break } restoreChunk(s.lg, rkvc, keys, vals, keyToLease) if len (keys) < restoreChunkKeys { break } newMin := bytesToRev(keys[len (keys)-1 ][:revBytesLen]) newMin.sub++ revToBytes(newMin, min) } close (rkvc) { s.revMu.Lock() s.currentRev = <-revc if s.currentRev < s.compactMainRev { s.currentRev = s.compactMainRev } if s.currentRev < scheduledCompact { s.currentRev = scheduledCompact } s.revMu.Unlock() } if scheduledCompact <= s.compactMainRev { scheduledCompact = 0 } for key, lid := range keyToLease { if s.le == nil { tx.Unlock() panic ("no lessor to attach lease" ) } err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}}) if err != nil { s.lg.Error("failed to attach a lease" , zap.Error(err)) } } tx.Unlock() s.lg.Info("kvstore restored" , zap.Int64("current-rev" , s.currentRev)) if scheduledCompact != 0 { if _, err := s.compactLockfree(scheduledCompact); err != nil { s.lg.Warn("compaction encountered error" , zap.Error(err)) } } return nil }
restoreIntoIndex restoreIntoIndex 是创建一个新的消费者 goroutine,来 restore treeIndex 与主线程 restoreChunk 一起并发 restore.
生产者-消费者模式:使用通道进行数据传输
缓存优化:使用 LRU 风格的缓存策略
并发安全:在单独的 goroutine 中处理索引更新
内存控制:通过缓存大小限制内存使用
完整实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 type revKeyValue struct { key []byte kv mvccpb.KeyValue kstr string } func restoreIntoIndex (lg *zap.Logger, idx index) (chan <- revKeyValue, <-chan int64 ) { rkvc, revc := make (chan revKeyValue, restoreChunkKeys), make (chan int64 , 1 ) go func () { currentRev := int64 (1 ) defer func () { revc <- currentRev }() kiCache := make (map [string ]*keyIndex, restoreChunkKeys) for rkv := range rkvc { ki, ok := kiCache[rkv.kstr] if !ok && len (kiCache) >= restoreChunkKeys { i := 10 for k := range kiCache { delete (kiCache, k) if i--; i == 0 { break } } } if !ok { ki = &keyIndex{key: rkv.kv.Key} if idxKey := idx.KeyIndex(ki); idxKey != nil { kiCache[rkv.kstr], ki = idxKey, idxKey ok = true } } rev := bytesToRev(rkv.key) currentRev = rev.main if ok { if isTombstone(rkv.key) { if err := ki.tombstone(lg, rev.main, rev.sub); err != nil { lg.Warn("tombstone encountered error" , zap.Error(err)) } continue } ki.put(lg, rev.main, rev.sub) } else { if isTombstone(rkv.key) { ki.restoreTombstone(lg, rev.main, rev.sub) } else { ki.restore(lg, revision{rkv.kv.CreateRevision, 0 }, rev, rkv.kv.Version) } idx.Insert(ki) kiCache[rkv.kstr] = ki } } }() return rkvc, revc }
restoreChunk restoreChunk 在 restore 主线程中处理,将从 backend 中获取到的数据进行租约 lease 信息处理,处理完将这个 batch 的数据集 rkv 通过 channel 传递给消费者 goroutine: restoreIntoIndex 去恢复 treeIndex。
restoreChunk:
一次最多处理 restoreChunkKeys 个 {key value}
etcdServer.Restore 需要先恢复 leasor 信息
store.restore 这部分关系如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 for { keys, vals := tx.UnsafeRange(buckets.Key, min, max, int64 (restoreChunkKeys)) if len (keys) == 0 { break } restoreChunk(s.lg, rkvc, keys, vals, keyToLease) if len (keys) < restoreChunkKeys { break } newMin := bytesToRev(keys[len (keys)-1 ][:revBytesLen]) newMin.sub++ revToBytes(newMin, min) } func restoreChunk (lg *zap.Logger, kvc chan <- revKeyValue, keys, vals [][]byte , keyToLease map [string ]lease.LeaseID) { for i, key := range keys { rkv := revKeyValue{key: key} if err := rkv.kv.Unmarshal(vals[i]); err != nil { lg.Fatal("failed to unmarshal mvccpb.KeyValue" , zap.Error(err)) } rkv.kstr = string (rkv.kv.Key) if isTombstone(key) { delete (keyToLease, rkv.kstr) } else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease { keyToLease[rkv.kstr] = lid } else { delete (keyToLease, rkv.kstr) } kvc <- rkv } }
版本一致性 在完成上述操作后,即可获得最大 revision,但是仍需要调整下版本号:
压缩后调整:确保 currentRev >= compactMainRev
未完成压缩:使用 scheduledCompact 调整版本号
墓碑版本处理:正确处理删除操作的版本号
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 finishedCompact, found := UnsafeReadFinishedCompact(tx) if found { s.compactMainRev = finishedCompact } { s.revMu.Lock() s.currentRev = <-revc if s.currentRev < s.compactMainRev { s.currentRev = s.compactMainRev } if s.currentRev < scheduledCompact { s.currentRev = scheduledCompact } s.revMu.Unlock() }
lease 调整完版本,再恢复租约:
状态重建:重新建立键与租约的关联关系
错误容忍:租约恢复失败时记录错误但不终止恢复过程
完整性检查:确保租约管理器存在
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 for key, lid := range keyToLease { if s.le == nil { tx.Unlock() panic ("no lessor to attach lease" ) } err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}}) if err != nil { s.lg.Error("failed to attach a lease" , zap.String("lease-id" , fmt.Sprintf("%016x" , lid)), zap.Error(err)) } }
compactLockfree compaction 需要作为 restore 的最后一步,设计原因如下:
数据完整性保证
如果 restore 提前 compaction,可能会导致 compactMainRev 变大,导致数据不一致,删除了部分数据。
系统可用性
即使压缩失败,系统仍可运行
不会因为压缩问题导致启动失败
错误隔离
压缩错误不会影响数据恢复(这点很重要)
可以独立处理压缩问题
从性能角度考虑: 先恢复核心功能, compaction 作为后台异步任务执行可以减少启动时间
完整实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (s *store) restore() error { finishedCompact, found := UnsafeReadFinishedCompact(tx) if found { s.compactMainRev = finishedCompact } scheduledCompact, _ := UnsafeReadScheduledCompact(tx) if scheduledCompact != 0 { if _, err := s.compactLockfree(scheduledCompact); err != nil { s.lg.Warn("compaction encountered error" , zap.Error(err)) } else { s.lg.Info("resume scheduled compaction" , zap.Int64("scheduled-compact-revision" , scheduledCompact)) } } }
EtcdServer.applySnapshot EtcdServer.applySnapshot 具有重要意义,分别应用于进程重启后的状态恢复,以及 follower 加入已有集群后的状态初始化等。实际应用价值如下:
集群同步:确保 Follower 节点能够快速同步到 Leader 状态
故障恢复:系统重启后能够从快照快速恢复
性能优化:避免长时间的重放日志操作
数据一致性:保证分布式系统中的强一致性
核心设计原则
顺序重要性:ConsistentIndex → Lessor → MVCC → 其他组件的恢复顺序
原子性:后端切换和进度更新使用原子操作
资源管理:异步关闭旧资源,避免阻塞主流程
错误处理:关键错误使用 Panic,确保系统一致性
执行程图如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 graph TD O[等待 Raft 节点持久化] -->A[ConsistentIndex] A[ConsistentIndex] --> B[Lessor] B --> C[MVCC Store] C --> D[事务钩子] D --> E[后端切换] E --> F[Alarm Store] F --> G[Auth Store] G --> H[V2 Store] H --> I[集群配置] I --> J[网络配置] style A fill:#0f3460,stroke:#16213e,stroke-width:2px,color:#ffffff style B fill:#0d7377,stroke:#14a085,stroke-width:2px,color:#ffffff style C fill:#0d7377,stroke:#14a085,stroke-width:2px,color:#ffffff style D fill:#0d7377,stroke:#14a085,stroke-width:2px,color:#ffffff style E fill:#6a0572,stroke:#8b008b,stroke-width:2px,color:#ffffff style J fill:#2d5016,stroke:#4a7c59,stroke-width:2px,color:#ffffff
这里核心的 MVCC Restore 之前已经分析过,Lesssor 后续分析,另一个核心的设计就是 ConsistentIndex。
ConsistentIndex consistIndex 存在的必要性是 etcd 确保数据一致性的核心机制:
核心就是防止重复 apply entry: 通过比较 log index, 确保每个 log entry 只 applied 一次
持久化保证: 只有真正持久化的数据才被标记为已应用
重启一致性: 重启后能够正确恢复到一致状态
异常处理: 在各种异常情况(重启、leader changed、网络分区等)下都能保证数据完整性
这种设计使得 etcd 能够在复杂的分布式环境中提供强一致性保证,是 etcd 作为可靠分布式键值存储的重要基础。没有 consistIndex,etcd 就无法正确处理重启、快照恢复、网络分区等场景下的日志重放问题,会导致数据不一致或重复应用的问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 type consistentIndex struct { consistentIndex uint64 term uint64 applyingIndex uint64 applyingTerm uint64 be Backend mutex sync.Mutex }
在 apply 中的使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 type ShouldApplyV3 bool const ( ApplyBoth = ShouldApplyV3(true ) ApplyV2storeOnly = ShouldApplyV3(false ) ) func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64 , appliedi uint64 , shouldStop bool ) { for i := range es { e := es[i] switch e.Type { case raftpb.EntryNormal: s.applyEntryNormal(&e) s.setAppliedIndex(e.Index) s.setTerm(e.Term) case raftpb.EntryConfChange: shouldApplyV3 := membership.ApplyV2storeOnly if e.Index > s.consistIndex.ConsistentIndex() { s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } } } }
关键逻辑 :
对于 raftpb.EntryConfChange,只有当 e.Index > s.consistIndex.ConsistentIndex() 时才应用到 v3Store(即 BlotDB)
这确保了只有新的配置变更才会被处理
applyEntryNormal() 方法中的使用 关键逻辑 :
同样也只有当 e.Index > s.consistIndex.ConsistentIndex 时才应用到 v3 存储
设置 applyingIndex 作为临时缓存
使用 defer 确保在异常情况下也能更新索引 (比如接收到空包等)
只有当 index 更大,shouldApplyV3 的值是 membership.ApplyBoth,即为 true,才真正 apply 到 backend。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { shouldApplyV3 := membership.ApplyV2storeOnly var ar *applyResult index := s.consistIndex.ConsistentIndex() if e.Index > index { s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth defer func () { newIndex := s.consistIndex.ConsistentIndex() if newIndex < e.Index { s.consistIndex.SetConsistentIndex(e.Index, e.Term) } }() } }
事务钩子机制 TxPostLockInsideApplyHook 方法 1 2 3 4 5 6 7 8 9 10 newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook()) func (s *EtcdServer) getTxPostLockInsideApplyHook() func () { return func () { applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex() if applyingIdx > s.consistIndex.UnsafeConsistentIndex() { s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm) } } }
分析 :
TxnApplyHook 在事务锁定时调用,因此内部调用的是 Unsafe
将 applyingIndex 提升为 consistentIndex
确保只有真正持久化的数据才被标记为已应用
在服务启动或者恢复存储后端时设置钩子
确保后续的事务操作都会在 Unlock 前调用这个钩子1 2 3 4 5 6 7 8 9 10 11 12 13 func (b *backend) SetTxPostLockInsideApplyHook(hook func () ) { b.batchTx.lock() defer b.batchTx.Unlock() b.txPostLockInsideApplyHook = hook } func (t *batchTx) LockInsideApply() { t.lock() if t.backend.txPostLockInsideApplyHook != nil { ValidateCalledInsideApply(t.backend.lg) t.backend.txPostLockInsideApplyHook() } }
applySnapshot 中的应用 关键时机 :
必须在恢复 lessor 之前设置后端
避免旧的索引值覆盖新的快照值1 2 3 4 5 6 7 s.consistIndex.SetBackend(newbe)
在启动时一致性验证中的使用 验证规则 :
consistIndex <= hardstate.Commit
consistIndex >= snapshot.Index
确保启动时状态的一致性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func validateConsistentIndex (cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error { index, term := cindex.ReadConsistentIndex(be.ReadTx()) if cfg.ExactIndex && index != hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)" , index, hardstate.Commit) } if index > hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) must be <= WAL.HardState.commit (%v)" , index, hardstate.Commit) } if index < snapshot.Index { return fmt.Errorf("backend.ConsistentIndex (%v) must be >= last snapshot index (%v)" , index, snapshot.Index) } return nil }
consistIndex 也是批量更新到 blotdb,和事物提交同步
OnPreCommitUnsafe 1 2 3 4 5 6 7 8 9 10 func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) { bh.indexer.UnsafeSave(tx) bh.confStateLock.Lock() defer bh.confStateLock.Unlock() if bh.confStateDirty { membership.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState) bh.confStateDirty = false } }