剖析 ETCD.kvstore

KV

etcd 采用双层存储架构:

  • TreeIndex: 内存中的 B-tree 索引,存储 user_keyrevision 的映射, 管理 user_key 的所有历史生命周期
  • BoltDB: 持久化存储,存储 revisionKeyValue 的映射

传统设计: key → KeyValue 只能存储最新值, ETCD 的 ‘{revision, KeyValue}’ 映射支持历史版本存储,revision 作为全局单调递增的版本。

View 设计

先介绍下 KV 中的两个 VIEW Inteface: ReadView、WriteView 。这两个 VIEW 提供了基本的读写接口,既是 KV Store 需要实现的接口,也是应用层向 KV Store 写入数据的接口。因此,出于最小职责原理,将这公共接口抽象成 ReadView、WriteView。 上层无需要知道完整的 KV Store Interface,只需要使用 ReadView、WriteView Inteface 中提供的函数就行。

这里的 VIEWStore 是一种”引用”、”指针” 的语义。

1
2
3
4
5
type ReadView interface {
FirstRev() int64
Rev() int64
Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error)
}
1
2
3
4
type WriteView interface {
DeleteRange(key, end []byte) (n, rev int64)
Put(key, value []byte, lease lease.LeaseID) (rev int64)
}

因此 ReadView、WriteView 的实现 readViewwriteVie 也是 KV Stroe 的 wrapper。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type readView struct{ kv KV }
type writeView struct{ kv KV }


type KV interface {
ReadView
WriteView

// Read creates a read transaction.
Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead

// Write creates a write transaction.
Write(trace *traceutil.Trace) TxnWrite

// HashStorage returns HashStorage interface for KV storage.
HashStorage() HashStorage

// Compact frees all superseded keys with revisions less than rev.
Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error)

// Commit commits outstanding txns into the underlying backend.
Commit()

// Restore restores the KV store from a backend.
Restore(b backend.Backend) error
Close() error
}

这么设计就能良好的隔离应用层和 KV Store,只暴露给应用层基本都读写接口。

Txn

ETCD 中任何读写操作都需要先基于 KV Store 的 ReadWrite Interface 创建 Transaction,然后在 Transaction 中完成具体的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// TxnRead represents a read-only transaction with operations that will not
// block other read transactions.
type TxnRead interface {
ReadView
// End marks the transaction is complete and ready to commit.
End()
}

type TxnWrite interface {
TxnRead
WriteView
// Changes gets the changes made since opening the write txn.
Changes() []mvccpb.KeyValue
}

// txnReadWrite coerces a read txn to a write, panicking on any write operation.
type txnReadWrite struct{ TxnRead }

storeTxnRead

TxnRead 的生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
┌─────────────────────────────────────────────────────────────┐
│ 读事务生命周期 │
├─────────────────────────────────────────────────────────────┤
│ 1. kvstore.Read() │
│ ├─ s.mu.RLock() │
│ ├─ s.revMu.RLock() │
│ ├─ 选择ReadTx模式 │
│ ├─ tx.RLock() │
│ └─ 返回storeTxnRead │
│ │
│ 2. 执行读操作 │
│ ├─ tr.Range() │
│ ├─ tr.Rev() │
│ └─ tr.FirstRev() │
│ │
│ 3. tr.End() │
│ ├─ tr.tx.RUnlock() │
│ └─ tr.s.mu.RUnlock() │
└─────────────────────────────────────────────────────────────┘

step-1, step-3 具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type storeTxnRead struct {
s *store
tx backend.ReadTx

firstRev int64 // 第一个版本
rev int64 // 当前版本

trace *traceutil.Trace
}

func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
s.mu.RLock()
s.revMu.RLock()

var tx backend.ReadTx
if mode == ConcurrentReadTxMode {
tx = s.b.ConcurrentReadTx() // 并发读事务
} else {
tx = s.b.ReadTx() // 共享缓冲读事务
}

tx.RLock()
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace})
}

func (tr *storeTxnRead) End() {
tr.tx.RUnlock() // RUnlock signals the end of concurrentReadTx.
tr.s.mu.RUnlock()
}

storeTxnWrite

TxnWrite 生命周期如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
┌─────────────────────────────────────────────────────────────┐
│ 写事务生命周期 │
├─────────────────────────────────────────────────────────────┤
│ 1. store.Write() │
│ ├─ s.mu.RLock() │
│ ├─ tx.LockInsideApply() │
│ └─ 返回storeTxnWrite │
│ │
│ 2. 执行写操作 │
│ ├─ tw.Put() │
│ ├─ tw.DeleteRange() │
│ ├─ tw.Range() (读操作) │
│ └─ 记录变更到changes │
│ │
│ 3. tw.End() │
│ ├─ 如果有变更: │
│ │ ├─ tw.s.revMu.Lock() │
│ │ └─ tw.s.currentRev++ │
│ ├─ tw.tx.Unlock() │
│ ├─ 如果有变更:tw.s.revMu.Unlock() │
│ └─ tw.s.mu.RUnlock() │
└─────────────────────────────────────────────────────────────┘

step-1, step-3 具体实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type storeTxnWrite struct {
storeTxnRead
tx backend.BatchTx
// beginRev is the revision where the txn begins; it will write to the next revision.
beginRev int64
changes []mvccpb.KeyValue
}

func (s *store) Write(trace *traceutil.Trace) TxnWrite {
s.mu.RLock()
tx := s.b.BatchTx()
tx.LockInsideApply()
tw := &storeTxnWrite{
storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
}
return newMetricsTxnWrite(tw)
}

func (tw *storeTxnWrite) End() {
// only update index if the txn modifies the mvcc state.
if len(tw.changes) != 0 {
// hold revMu lock to prevent new read txns from opening until writeback.
tw.s.revMu.Lock()
tw.s.currentRev++ // 存在变更则 version++
}
tw.tx.Unlock()
if len(tw.changes) != 0 {
tw.s.revMu.Unlock()
}
tw.s.mu.RUnlock()
}

lock

store 中新增了两个锁:murecvMu:

  • mu 是个 RWMutex, 写锁是为了 stroe 的状态变更,读锁是用于事物
    -recvMu 是为了版本控制线程安全。

注意: store.Readstore.Write 在创建事物时,都是用的读锁 s.mu.RLock(), 配合 recvMu 进行细粒度的控制版本变更,使得 Transaction 的创建行为不会有相互堵塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
type store struct {
// mu read locks for txns and write locks for non-txn store changes.
mu sync.RWMutex

// revMuLock protects currentRev and compactMainRev.
// Locked at end of write txn and released after write txn unlock lock.
// Locked before locking read txn and released after locking.
revMu sync.RWMutex

currentRev int64 // 当前版本
compactMainRev int64 // 压缩版本
//...
}

Lock 层次如下:

1
2
3
4
5
6
7
8
9
10
11
12
┌─────────────────────────────────────────────────────────────┐
│ 锁层次结构 │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ store.mu │ │ store.revMu│ │ backend.tx │ │
│ │ (RWMutex) │ │ (RWMutex) │ │ (Mutex/RWMutex) │ │
│ │ │ │ │ │ │ │
│ │ • 保护store │ │ • 保护版本 │ │ • 保护后端事务 │ │
│ │ • 读写分离 │ │ • 版本控制 │ │ • 事务隔离 │ │
│ │ • 全局状态 │ │ • 原子更新 │ │ • 数据一致性 │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

Revision

etcd 的 Revision 设计基于 MVCC 模型,主要包含以下核心概念:

  • Main Revision: 全局递增的事务版本号,每次写事务都会递增
  • Sub Revision: 同一事务内多个操作的子版本号,从 0 开始递增
  • Generation: 键的生命周期,从创建到删除为一个 generation
  • Tombstone: 删除标记,用于标记键的删除状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type revision struct {
main int64 // 主版本号,全局递增
sub int64 // 子版本号,事务内递增
}

// 编码格式:8字节 main + 1字节分隔符'_' + 8字节sub
const revBytesLen = 8 + 1 + 8

func revToBytes(rev revision, bytes []byte) {
binary.BigEndian.PutUint64(bytes, uint64(rev.main))
bytes[8] = '_'
binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub))
}

// 比较逻辑
func (a revision) GreaterThan(b revision) bool {
if a.main > b.main {
return true
}
if a.main < b.main {
return false
}
return a.sub > b.sub // main相等时比较sub
}

Store 中的 Revision 管理

1
2
3
4
5
6
7
8
9
10
type store struct {
// 保护 currentRev 和 compactMainRev 的锁
revMu sync.RWMutex

// 当前最新的 revision (main部分)
currentRev int64

// 最后一次压缩的 revision (main部分) , FirstRev
compactMainRev int64
}
读事务中的 Revision

读事务特点:

  • 使用 firstRev 和 rev 确定可读的版本范围
  • 不会修改 currentRev
  • 支持历史版本查询
1
2
3
4
5
6
7
8
9
10
func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
s.mu.RLock()
s.revMu.RLock()

// 获取当前 revision 信息
firstRev, rev := s.compactMainRev, s.currentRev

s.revMu.RUnlock()
return &storeTxnRead{s, tx, firstRev, rev, trace}
}
写事务中的 Revision

写事务开始: beginRev = currentRev

1
2
3
4
5
6
7
8
9
10
11
12
func (s *store) Write(trace *traceutil.Trace) TxnWrite {
s.mu.RLock()
tx := s.b.BatchTx()
tx.LockInsideApply()

// 记录事务开始时的 revision
tw := &storeTxnWrite{
beginRev: s.currentRev, // 事务开始时的 revision
// ...
}
return tw
}

操作执行:

  • 每个操作使得 beginRev + 1 作为 Main revision
  • Sub revision: 同一事务内从 0 开始递增
  • 事务提交: currentRev++,更新全局版本号

在整个写事物生命周期内,Main revision 都是 beginRev + 1, storeTxnWrite.Put 会每次只会自增一次 Sub Revision。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
tw.put(key, value, lease)
return tw.beginRev + 1
}

func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
return n, tw.beginRev + 1
}
return 0, tw.beginRev
}

func (tw *storeTxnWrite) End() {
// only update index if the txn modifies the mvcc state.
if len(tw.changes) != 0 {
// hold revMu lock to prevent new read txns from opening until writeback.
tw.s.revMu.Lock()
tw.s.currentRev++ // 更新全局 revision
}
tw.tx.Unlock()
if len(tw.changes) != 0 {
tw.s.revMu.Unlock()
}
tw.s.mu.RUnlock()
}

KeyIndex

ETCD 中 treeIndex 是一个 B-Tree,存储着所有 Key 的历史版本信息,用于 MVCC. keyIndex 则存储着单个 key 的版本历史信息,他们的层次关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
┌────────────────────────────────────────────────────────────┐
│ treeIndex (B-tree) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ keyIndex │ │ keyIndex │ │ keyIndex │ │
│ │ "foo" │ │ "bar" │ │ "baz" │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└────────────────────────────────────────────────────────────┘


┌────────────────────────────────────────────────────────────┐
│ keyIndex "foo" │
│ key: "foo" │
│ modified: 103.0 │
│ generations: [gen0, gen1, gen2] │
└────────────────────────────────────────────────────────────┘


┌────────────────────────────────────────────────────────────┐
│ generation │
│ ver: 3 │
│ created: 100.0 │
│ revs: [100.0, 101.0, 102.0] │
└────────────────────────────────────────────────────────────┘

Generation

Generation 表示一个 key 的生命周期,从 创建删除 为一个完整的 generation。 keyIndexGeneration 关系如下:

1
2
3
4
5
6
7
8
9
10
11
type keyIndex struct {
key []byte // 用户键
modified revision // 最后修改的 revision
generations []generation // 该键的所有 generation
}

type generation struct {
ver int64 // key 在这个 generation 的修改次数
created revision // generation 创建时的 revision
revs []revision // 该 generation 中的所有 revision
}

Generation 生命周期示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
键 "foo" 的操作序列:
put(1.0) → put(2.0) → delete(3.0) → put(4.0) → delete(5.0)

keyIndex 结构:
┌─────────────────────────────────────────────────────────────┐
│ key: "foo" │
│ modified: 5.0 │
│ generations: │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Generation 0: {empty} │ │
│ │ ver: 0, created: {}, revs: [] │ │
│ └───────────────────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Generation 1: {4.0, 5.0(t)} │ │
│ │ ver: 2, created: 4.0, revs: [4.0, 5.0] │ │
│ └───────────────────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Generation 2: {1.0, 2.0, 3.0(t)} │ │
│ │ ver: 3, created: 1.0, revs: [1.0, 2.0, 3.0] │ │
│ └───────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

put

在 keyIndex 中 put 一个 key 新的版本信息 {main, sub},行为如下:

  • 通过 ki.modified 来确保 revision 的单调递增
  • 无论这个 key 是首次 put,或者是 delete 之后又 put, generation 的第一个 revision 都会更新generation.created 字段
  • 更新 ki.modified 为当前的 rev,用于保证后续 put 的单调性
  • g.ver 记录这个 generation 的 put 次数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// put puts a revision to the keyIndex.
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
rev := revision{main: main, sub: sub}

if !rev.GreaterThan(ki.modified) {
panic()
}
if len(ki.generations) == 0 {
ki.generations = append(ki.generations, generation{})
}
g := &ki.generations[len(ki.generations)-1]
if len(g.revs) == 0 { // create a new key
keysGauge.Inc()
g.created = rev // 'new key' or 'delete -> put' 都会更新 g.created
}
g.revs = append(g.revs, rev) // 所有历史信息
g.ver++
ki.modified = rev
}

tombstone

删除一个 key, 在 keyIndex 中是先写入一个 {main,sub},再紧跟着一个 marker(空 generation)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
if ki.isEmpty() {
lg.Panic(
"'tombstone' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
}
// 不能 delete 之后再次 delete
if ki.generations[len(ki.generations)-1].isEmpty() {
return ErrRevisionNotFound
}
ki.put(lg, main, sub)
ki.generations = append(ki.generations, generation{})
keysGauge.Dec()
return nil
}

get

get 是 ETCD 用于查询 key, value 历史版本的核心方法。算法设计:

  • findGeneration: 从最新 generation 开始向前搜索,找到包含指定 revision 的 generation

  • walk: 在 generation 内从最新 revision 开始向前遍历,找到符合条件的 revision

  • 版本号计算: 通过公式 ver = g.ver - (len(g.revs) - n - 1) 精确计算版本号

    这个公式可以简单理解为 ver = n + 1,表征 generation 第几次修改

优势:

  • 具有高效性: O(g + r) 的时间复杂度,其中 g 和 r 通常都很小。
  • 一致性: 确保查询结果与指定 revision 时刻一致
  • 可扩展性: 支持大规模数据和高并发访问
  • 空间效率: 通过 generation 结构优化存储
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// get gets the modified, created revision and version of the key that satisfies the given atRev.
// Rev must be higher than or equal to the given atRev.
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
if ki.isEmpty() {
lg.Panic(
"'get' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
}
// 先找到 atRev 所在的 generation
g := ki.findGeneration(atRev)
if g.isEmpty() {
return revision{}, revision{}, 0, ErrRevisionNotFound
}

n := g.walk(func(rev revision) bool { return rev.main > atRev })
if n != -1 {
return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
}

return revision{}, revision{}, 0, ErrRevisionNotFound
}
findGeneration

利用时间局部性原理,从最新 generation 到最旧的 generation 的策略进行搜索,以此来支持历史版本查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (ki *keyIndex) findGeneration(rev int64) *generation {
lastg := len(ki.generations) - 1
cg := lastg

for cg >= 0 {
// 跳过空的 generation
if len(ki.generations[cg].revs) == 0 {
cg--
continue
}
g := ki.generations[cg]
if cg != lastg {
// Tombstone 检查: 说明 rev 在两个 generation 的间隙
if tomb := g.revs[len(g.revs)-1].main; tomb <= rev {
return nil
}
}
// 找到所属的 generation
if g.revs[0].main <= rev {
return &ki.generations[cg]
}
cg--
}
return nil
}
walk

walk 函数也是根据 时间局部性原理 进行遍历,返回符合条件的位置索引 l-i-1,那么上层 keyIndex.get 返回的 ver = g.ver - (l - i - 1)

1
2
3
4
5
6
7
8
9
10
func (g *generation) walk(f func(rev revision) bool) int {
l := len(g.revs)
for i := range g.revs {
ok := f(g.revs[l-i-1])
if !ok {
return l - i - 1
}
}
return -1
}

mvccpb.KeyValue

写入 BlotDB 的 value 是 mvccpb.KeyValue:

  • CreateRevision:全局作用域,记录键的创建时间。只在键首次创建时设置,之后保持不变, 用于历史查询和键的生命周期管理
  • ModRevision:全局作用域,记录键的最后修改时间。每次修改都会更新为当前 revision, 用于版本比较、Watch 机制和压缩
  • Version:generation 作用域,记录键在 generation 内的修改次数。每次修改递增,删除后重置为 0, 用于版本一致性检查和压缩优化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
message KeyValue {
// key is the key in bytes. An empty key is not allowed.
bytes key = 1;
// create_revision is the revision of last creation on this key.
int64 create_revision = 2;
// mod_revision is the revision of last modification on this key.
int64 mod_revision = 3;
// version is the version of the key. A deletion resets
// the version to zero and any modification of the key
// increases its version.
int64 version = 4;
// value is the value held by the key, in bytes.
bytes value = 5;
// lease is the ID of the lease that attached to key.
// When the attached lease expires, the key will be deleted.
// If lease is 0, then no lease is attached to the key.
int64 lease = 6;
}

storeTxnWrite.put 写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
rev := tw.beginRev + 1
c := rev
oldLease := lease.NoLease

// if the key exists before, use its previous created and
// get its previous leaseID
_, created, ver, err := tw.s.kvindex.Get(key, rev)
if err == nil {
c = created.main
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
tw.trace.Step("get key's previous created_revision and leaseID")
}
ibytes := newRevBytes()
idxRev := revision{
main: rev,
sub: int64(len(tw.changes)),
}
revToBytes(idxRev, ibytes)

kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver + 1,
Lease: int64(leaseID),
}

d, err := kv.Marshal()
if err != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
}

tw.trace.Step("marshal mvccpb.KeyValue")
tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d)
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)
tw.trace.Step("store kv pair into bolt db")

if oldLease != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to detach lease")
}
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
tw.storeTxnRead.s.lg.Error(
"failed to detach old lease from a key",
zap.Error(err),
)
}
}
if leaseID != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to attach lease")
}
err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
panic("unexpected error from lease Attach")
}
}
tw.trace.Step("attach lease to kv pair")
}