剖析 ETCD.lessor

Lessor

Lessor 是 ETCD 中负责管理租约 (Lease) 的核心组件。租约机制是 ETCD 实现分布式锁、会话管理和自动过期清理的重要基础。Lessor 提供了租约的创建、续期、撤销、过期处理等完整功能。

2. 核心接口设计

2.1 Lessor 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type Lessor interface {
// 设置范围删除器和检查点器
SetRangeDeleter(rd RangeDeleter)
SetCheckpointer(cp Checkpointer)

// 租约生命周期管理
Grant(id LeaseID, ttl int64) (*Lease, error) // 创建租约
Revoke(id LeaseID) error // 撤销租约
Renew(id LeaseID) (int64, error) // 续期租约
Checkpoint(id LeaseID, remainingTTL int64) error // 检查点

// 租约项管理
Attach(id LeaseID, items []LeaseItem) error // 附加项到租约
Detach(id LeaseID, items []LeaseItem) error // 从租约分离项
GetLease(item LeaseItem) LeaseID // 获取项的租约ID

// 主从管理 callback
Promote(extend time.Duration) // 提升为主节点时的 callback
Demote() // 降级为从节点的 callback

// 查询操作
Lookup(id LeaseID) *Lease // 查找租约
Leases() []*Lease // 获取所有租约
ExpiredLeasesC() <-chan []*Lease // 过期租约通道

// 恢复和停止
Recover(b backend.Backend, rd RangeDeleter) // 从后端恢复
Stop() // 停止服务
}

2.2 关键数据结构

Lease 结构体

1
2
3
4
5
6
7
8
9
10
type Lease struct {
ID LeaseID // 租约ID
ttl int64 // 生存时间(秒)
remainingTTL int64 // 剩余生存时间
expiryMu sync.RWMutex // 保护过期时间的互斥锁
expiry time.Time // 过期时间
mu sync.RWMutex // 保护项集合的互斥锁
itemSet map[LeaseItem]struct{} // 附加的项集合
revokec chan struct{} // 撤销完成信号
}

lessor 结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type lessor struct {
mu sync.RWMutex
demotec chan struct{} // 降级信号通道
leaseMap map[LeaseID]*Lease // 租约映射
leaseExpiredNotifier *LeaseExpiredNotifier // 过期通知器
leaseCheckpointHeap LeaseQueue // 检查点堆
itemMap map[LeaseItem]LeaseID // 项到租约的映射
rd RangeDeleter // 范围删除器
cp Checkpointer // 检查点器
b backend.Backend // 后端存储
minLeaseTTL int64 // 最小TTL
leaseRevokeRate int // 撤销速率限制
expiredC chan []*Lease // 过期租约通道
stopC chan struct{} // 停止信号
doneC chan struct{} // 完成信号
checkpointInterval time.Duration // 检查点间隔
expiredLeaseRetryInterval time.Duration // 过期重试间隔
checkpointPersist bool // 是否持久化检查点
cluster cluster // 集群版本信息
}

3. 核心功能设计

LeaseExpiredNotifier

LeaseQueue 是 lessor 中的一个核心数据结构,queue 是个基于堆排序的优先级队列,堆顶的元素是最先超时的 lease。 m 用于快速查找某个 lease 是否存在。在 grant 时,会在更新 leaseExpiredNotifier

1
2
3
4
5
6
// LeaseExpiredNotifier is a queue used to notify lessor to revoke expired lease.
// Only save one item for a lease, `Register` will update time of the corresponding lease.
type LeaseExpiredNotifier struct {
m map[LeaseID]*LeaseWithTime
queue LeaseQueue
}

3.1 租约创建(Grant)

设计要点:

  • 支持最小TTL限制,防止过短的租约, 用户 TTL 只能在 [le.minLeaseTTl, MaxLeaseTTL] 范围

  • 主从节点采用不同的过期策略

  • 自动注册到 expired notifier 和 checkpoint heap,由后台 goroutine 进行超 ttl 超时检测以及 checkpoint 同步。

    checkpoint 机制后面统一阐述,目的是为了保存 ttl 进度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
// 1. 参数验证
if id == NoLease || ttl > MaxLeaseTTL {
return nil, ErrLeaseNotFound/ErrLeaseTTLTooLarge
}

// 2. 创建租约对象
l := NewLease(id, ttl)

// 3. 设置最小TTL
if l.ttl < le.minLeaseTTL {
l.ttl = le.minLeaseTTL
}

// 4. 根据主从状态设置过期时间
if le.isPrimary() {
l.refresh(0) // 主节点:设置正常过期时间
} else {
l.forever() // 从节点:设置为永不过期
}

// 5. 持久化到后端
le.leaseMap[id] = l
l.persistTo(le.b)

// 6. 注册到 expired notifier 和 checkpoint heap
if le.isPrimary() {
item := &LeaseWithTime{id: l.ID, time: l.expiry}
le.leaseExpiredNotifier.RegisterOrUpdate(item)
le.scheduleCheckpointIfNeeded(l)
}

return l, nil
}

3.2 租约续期(Renew)

设计要点:

  • 只有主节点可以处理续期请求
  • 对已过期租约有特殊的等待机制
  • 支持检查点清除,减少RAFT日志
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (le *lessor) Renew(id LeaseID) (int64, error) {
// 1. 检查主节点状态
if !le.isPrimary() {
return -1, ErrNotPrimary
}

// 2. 查找租约
l := le.leaseMap[id]
if l == nil {
return -1, ErrLeaseNotFound
}

// 3. 处理已过期租约
if l.expired() {
// 等待撤销完成或主节点变更
select {
case <-l.revokec:
return -1, ErrLeaseNotFound
case <-demotec:
return -1, ErrNotPrimary
}
}

// 4. 清除剩余TTL(如果设置了检查点)
clearRemainingTTL := le.cp != nil && l.remainingTTL > 0
if clearRemainingTTL {
le.cp(context.Background(), &pb.LeaseCheckpointRequest{
Checkpoints: []*pb.LeaseCheckpoint{
{ID: int64(l.ID), Remaining_TTL: 0}
}
})
}

// 5. 刷新过期时间
l.refresh(0)
item := &LeaseWithTime{id: l.ID, time: l.expiry}
le.leaseExpiredNotifier.RegisterOrUpdate(item)

return l.ttl, nil
}

3.3 租约撤销(Revoke)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (le *lessor) Revoke(id LeaseID) error {
// 1. 查找租约
l := le.leaseMap[id]
if l == nil {
return ErrLeaseNotFound
}

// 2. 删除附加的键值对
if le.rd != nil {
txn := le.rd()
keys := l.Keys()
sort.StringSlice(keys).Sort() // 保证删除顺序一致
for _, key := range keys {
txn.DeleteRange([]byte(key), nil)
}
txn.End()
}

// 3. 从内存和持久化存储中删除
delete(le.leaseMap, l.ID)
le.b.BatchTx().UnsafeDelete(buckets.Lease, int64ToBytes(int64(l.ID)))

// 4. 通知撤销完成
close(l.revokec)

return nil
}

设计要点:

  • 原子性删除:键值对删除和租约删除在同一事务中
  • 保证删除顺序一致性,避免不同节点产生不同的哈希值
  • 通过通道通知撤销完成

3.4 主从切换机制

Promote(提升为主节点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (le *lessor) Promote(extend time.Duration) {
le.mu.Lock()
defer le.mu.Unlock()

// 1. 设置主节点标志
le.demotec = make(chan struct{})

// 2. 刷新所有租约的过期时间
for _, l := range le.leaseMap {
l.refresh(extend)
item := &LeaseWithTime{id: l.ID, time: l.expiry}
le.leaseExpiredNotifier.RegisterOrUpdate(item)
le.scheduleCheckpointIfNeeded(l)
}

// 3. 处理租约堆积问题
if len(le.leaseMap) >= le.leaseRevokeRate {
le.adjustExpiriesForPileup(leases)
}
}

Demote(降级为从节点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (le *lessor) Demote() {
le.mu.Lock()
defer le.mu.Unlock()

// 1. 将所有租约设置为永不过期
for _, l := range le.leaseMap {
l.forever()
}

// 2. 清理检查点和过期通知器
le.clearScheduledLeasesCheckpoints()
le.clearLeaseExpiredNotifier()

// 3. 关闭降级信号通道
if le.demotec != nil {
close(le.demotec)
le.demotec = nil
}
}

设计要点:

  • 主节点负责租约过期管理和续期
  • 从节点将租约设置为永不过期,避免误删
  • 支持租约堆积的智能调整,防止大量租约同时过期

3.5 过期处理机制

主循环(runLoop)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (le *lessor) runLoop() {
defer close(le.doneC)

for {
le.revokeExpiredLeases() // 处理过期租约
le.checkpointScheduledLeases() // 处理检查点

select {
case <-time.After(500 * time.Millisecond):
case <-le.stopC:
return
}
}
}

过期租约处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (le *lessor) revokeExpiredLeases() {
// 1. 速率限制
revokeLimit := le.leaseRevokeRate / 2

// 2. 查找过期租约
le.mu.RLock()
if le.isPrimary() {
ls = le.findExpiredLeases(revokeLimit)
}
le.mu.RUnlock()

// 3. 发送到过期通道
if len(ls) != 0 {
select {
case le.expiredC <- ls:
default:
// 通道满时跳过,下次再处理
}
}
}

设计要点:

  • 500ms 的固定检查间隔,平衡性能和及时性
  • 速率限制防止大量租约同时过期造成系统压力
  • 非阻塞的过期通道,避免阻塞主循环

3.6 检查点机制

检查点调度

1
2
3
4
5
6
7
8
9
10
11
12
13
func (le *lessor) scheduleCheckpointIfNeeded(lease *Lease) {
if le.cp == nil {
return
}

// 只有当剩余TTL大于检查点间隔时才调度
if lease.RemainingTTL() > int64(le.checkpointInterval.Seconds()) {
heap.Push(&le.leaseCheckpointHeap, &LeaseWithTime{
id: lease.ID,
time: time.Now().Add(le.checkpointInterval),
})
}
}

检查点处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (le *lessor) checkpointScheduledLeases() {
// 速率限制
for i := 0; i < leaseCheckpointRate/2; i++ {
var cps []*pb.LeaseCheckpoint
le.mu.Lock()
if le.isPrimary() {
cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize)
}
le.mu.Unlock()

if len(cps) != 0 {
// 提交到RAFT
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps})
}
}
}

设计要点:

  • 使用最小堆管理检查点调度
  • 批量处理减少RAFT日志数量
  • 速率限制防止检查点过于频繁

4. 数据结构设计

4.1 过期通知器(LeaseExpiredNotifier)

1
2
3
4
type LeaseExpiredNotifier struct {
m map[LeaseID]*LeaseWithTime // 租约ID到时间项的映射
queue LeaseQueue // 基于过期时间的最小堆
}

设计要点:

  • 使用最小堆按过期时间排序
  • 支持O(log N)的插入、删除和查找
  • 映射表提供O(1)的租约查找

4.2 租约队列(LeaseQueue)

1
2
3
4
5
6
7
type LeaseQueue []*LeaseWithTime

type LeaseWithTime struct {
id LeaseID
time time.Time
index int // 堆中的索引
}

设计要点:

  • 实现标准堆接口
  • 支持堆修复操作
  • 索引字段支持O(log N)的堆修复

5. 持久化设计

5.1 后端存储

租约信息持久化到 buckets.Lease 桶中:

1
2
3
4
5
type Lease struct {
ID int64 // 租约ID
TTL int64 // 生存时间
RemainingTTL int64 // 剩余时间(v3.6+支持)
}

5.2 恢复机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (le *lessor) initAndRecover() {
tx := le.b.BatchTx()
tx.LockOutsideApply()

tx.UnsafeCreateBucket(buckets.Lease)
lpbs := unsafeGetAllLeases(tx)
tx.Unlock()

// 从持久化数据恢复租约
for _, lpb := range lpbs {
ID := LeaseID(lpb.ID)
le.leaseMap[ID] = &Lease{
ID: ID,
ttl: lpb.TTL,
itemSet: make(map[LeaseItem]struct{}),
expiry: forever, // 初始设置为永不过期
revokec: make(chan struct{}),
remainingTTL: lpb.RemainingTTL,
}
}

le.leaseExpiredNotifier.Init()
heap.Init(&le.leaseCheckpointHeap)
}

设计要点:

  • 租约项(itemSet)不持久化,通过遍历KV恢复
  • 初始过期时间设为永不过期,等待Promote时刷新
  • 支持RemainingTTL的版本兼容性

6. 性能优化设计

6.1 速率限制

  • 撤销速率限制leaseRevokeRate 控制每秒最大撤销数量
  • 检查点速率限制leaseCheckpointRate 控制每秒最大检查点数量
  • 批量处理:检查点支持批量提交,减少RAFT日志

6.2 内存优化

  • 最小堆:O(log N)的过期时间管理
  • 映射表:O(1)的租约查找
  • 缓冲通道:避免过期处理阻塞主循环

6.3 并发安全

  • 读写锁:支持并发读操作
  • 细粒度锁:租约级别的互斥锁
  • 通道通信:避免共享状态竞争

7. 容错设计

7.1 主从切换容错

  • 短暂双主:Raft允许短暂的双主状态,通过term保证正确性
  • 降级保护:从节点将租约设为永不过期,避免误删
  • 重试机制:过期租约的重试间隔机制

7.2 网络分区容错

  • 检查点机制:保存剩余TTL,支持跨重启恢复
  • 版本兼容:支持不同版本的检查点行为
  • 优雅降级:网络问题时自动降级为从节点

8. 监控和指标

8.1 Prometheus指标

1
2
3
4
5
6
var (
leaseGranted = prometheus.NewCounter(...) // 租约创建总数
leaseRevoked = prometheus.NewCounter(...) // 租约撤销总数
leaseRenewed = prometheus.NewCounter(...) // 租约续期总数
leaseTotalTTLs = prometheus.NewHistogram(...) // TTL分布直方图
)

8.2 日志记录

  • 调试日志:检查点调度和处理的详细日志
  • 错误日志:租约操作失败的错误信息
  • 性能日志:关键操作的性能指标

9. 总结

etcd 的 Lessor 设计体现了以下核心特点:

  1. 高可用性:通过主从切换和检查点机制保证服务连续性
  2. 高性能:使用最小堆和映射表优化查找性能,支持速率限制
  3. 强一致性:通过RAFT协议保证租约操作的一致性
  4. 可扩展性:支持大量租约的高效管理
  5. 容错性:完善的错误处理和恢复机制

Lessor 的设计充分考虑了分布式环境下的各种挑战,是一个经过充分验证的租约管理实现。