Lessor Lessor 是 ETCD 中负责管理租约 (Lease) 的核心组件。租约机制是 ETCD 实现分布式锁、会话管理和自动过期清理的重要基础。Lessor 提供了租约的创建、续期、撤销、过期处理等完整功能。
2. 核心接口设计 2.1 Lessor 接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 type Lessor interface { SetRangeDeleter(rd RangeDeleter) SetCheckpointer(cp Checkpointer) Grant(id LeaseID, ttl int64 ) (*Lease, error ) Revoke(id LeaseID) error Renew(id LeaseID) (int64 , error ) Checkpoint(id LeaseID, remainingTTL int64 ) error Attach(id LeaseID, items []LeaseItem) error Detach(id LeaseID, items []LeaseItem) error GetLease(item LeaseItem) LeaseID Promote(extend time.Duration) Demote() Lookup(id LeaseID) *Lease Leases() []*Lease ExpiredLeasesC() <-chan []*Lease Recover(b backend.Backend, rd RangeDeleter) Stop() }
2.2 关键数据结构 Lease 结构体 1 2 3 4 5 6 7 8 9 10 type Lease struct { ID LeaseID ttl int64 remainingTTL int64 expiryMu sync.RWMutex expiry time.Time mu sync.RWMutex itemSet map [LeaseItem]struct {} revokec chan struct {} }
lessor 结构体 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type lessor struct { mu sync.RWMutex demotec chan struct {} leaseMap map [LeaseID]*Lease leaseExpiredNotifier *LeaseExpiredNotifier leaseCheckpointHeap LeaseQueue itemMap map [LeaseItem]LeaseID rd RangeDeleter cp Checkpointer b backend.Backend minLeaseTTL int64 leaseRevokeRate int expiredC chan []*Lease stopC chan struct {} doneC chan struct {} checkpointInterval time.Duration expiredLeaseRetryInterval time.Duration checkpointPersist bool cluster cluster }
3. 核心功能设计 LeaseExpiredNotifier LeaseQueue 是 lessor 中的一个核心数据结构,queue 是个基于堆排序的优先级队列,堆顶的元素是最先超时的 lease。 m 用于快速查找某个 lease 是否存在。在 grant 时,会在更新 leaseExpiredNotifier。
1 2 3 4 5 6 type LeaseExpiredNotifier struct { m map [LeaseID]*LeaseWithTime queue LeaseQueue }
3.1 租约创建(Grant) 设计要点:
支持最小TTL限制,防止过短的租约, 用户 TTL 只能在 [le.minLeaseTTl, MaxLeaseTTL] 范围
主从节点采用不同的过期策略
自动注册到 expired notifier 和 checkpoint heap,由后台 goroutine 进行超 ttl 超时检测以及 checkpoint 同步。
checkpoint 机制后面统一阐述,目的是为了保存 ttl 进度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func (le *lessor) Grant(id LeaseID, ttl int64 ) (*Lease, error ) { if id == NoLease || ttl > MaxLeaseTTL { return nil , ErrLeaseNotFound/ErrLeaseTTLTooLarge } l := NewLease(id, ttl) if l.ttl < le.minLeaseTTL { l.ttl = le.minLeaseTTL } if le.isPrimary() { l.refresh(0 ) } else { l.forever() } le.leaseMap[id] = l l.persistTo(le.b) if le.isPrimary() { item := &LeaseWithTime{id: l.ID, time: l.expiry} le.leaseExpiredNotifier.RegisterOrUpdate(item) le.scheduleCheckpointIfNeeded(l) } return l, nil }
3.2 租约续期(Renew) 设计要点:
只有主节点可以处理续期请求
对已过期租约有特殊的等待机制
支持检查点清除,减少RAFT日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 func (le *lessor) Renew(id LeaseID) (int64 , error ) { if !le.isPrimary() { return -1 , ErrNotPrimary } l := le.leaseMap[id] if l == nil { return -1 , ErrLeaseNotFound } if l.expired() { select { case <-l.revokec: return -1 , ErrLeaseNotFound case <-demotec: return -1 , ErrNotPrimary } } clearRemainingTTL := le.cp != nil && l.remainingTTL > 0 if clearRemainingTTL { le.cp(context.Background(), &pb.LeaseCheckpointRequest{ Checkpoints: []*pb.LeaseCheckpoint{ {ID: int64 (l.ID), Remaining_TTL: 0 } } }) } l.refresh(0 ) item := &LeaseWithTime{id: l.ID, time: l.expiry} le.leaseExpiredNotifier.RegisterOrUpdate(item) return l.ttl, nil }
3.3 租约撤销(Revoke) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (le *lessor) Revoke(id LeaseID) error { l := le.leaseMap[id] if l == nil { return ErrLeaseNotFound } if le.rd != nil { txn := le.rd() keys := l.Keys() sort.StringSlice(keys).Sort() for _, key := range keys { txn.DeleteRange([]byte (key), nil ) } txn.End() } delete (le.leaseMap, l.ID) le.b.BatchTx().UnsafeDelete(buckets.Lease, int64ToBytes(int64 (l.ID))) close (l.revokec) return nil }
设计要点:
原子性删除:键值对删除和租约删除在同一事务中
保证删除顺序一致性,避免不同节点产生不同的哈希值
通过通道通知撤销完成
3.4 主从切换机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (le *lessor) Promote(extend time.Duration) { le.mu.Lock() defer le.mu.Unlock() le.demotec = make (chan struct {}) for _, l := range le.leaseMap { l.refresh(extend) item := &LeaseWithTime{id: l.ID, time: l.expiry} le.leaseExpiredNotifier.RegisterOrUpdate(item) le.scheduleCheckpointIfNeeded(l) } if len (le.leaseMap) >= le.leaseRevokeRate { le.adjustExpiriesForPileup(leases) } }
Demote(降级为从节点) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (le *lessor) Demote() { le.mu.Lock() defer le.mu.Unlock() for _, l := range le.leaseMap { l.forever() } le.clearScheduledLeasesCheckpoints() le.clearLeaseExpiredNotifier() if le.demotec != nil { close (le.demotec) le.demotec = nil } }
设计要点:
主节点负责租约过期管理和续期
从节点将租约设置为永不过期,避免误删
支持租约堆积的智能调整,防止大量租约同时过期
3.5 过期处理机制 主循环(runLoop) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (le *lessor) runLoop() { defer close (le.doneC) for { le.revokeExpiredLeases() le.checkpointScheduledLeases() select { case <-time.After(500 * time.Millisecond): case <-le.stopC: return } } }
过期租约处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (le *lessor) revokeExpiredLeases() { revokeLimit := le.leaseRevokeRate / 2 le.mu.RLock() if le.isPrimary() { ls = le.findExpiredLeases(revokeLimit) } le.mu.RUnlock() if len (ls) != 0 { select { case le.expiredC <- ls: default : } } }
设计要点:
500ms 的固定检查间隔,平衡性能和及时性
速率限制防止大量租约同时过期造成系统压力
非阻塞的过期通道,避免阻塞主循环
3.6 检查点机制 检查点调度 1 2 3 4 5 6 7 8 9 10 11 12 13 func (le *lessor) scheduleCheckpointIfNeeded(lease *Lease) { if le.cp == nil { return } if lease.RemainingTTL() > int64 (le.checkpointInterval.Seconds()) { heap.Push(&le.leaseCheckpointHeap, &LeaseWithTime{ id: lease.ID, time: time.Now().Add(le.checkpointInterval), }) } }
检查点处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (le *lessor) checkpointScheduledLeases() { for i := 0 ; i < leaseCheckpointRate/2 ; i++ { var cps []*pb.LeaseCheckpoint le.mu.Lock() if le.isPrimary() { cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize) } le.mu.Unlock() if len (cps) != 0 { le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}) } } }
设计要点:
使用最小堆管理检查点调度
批量处理减少RAFT日志数量
速率限制防止检查点过于频繁
4. 数据结构设计 4.1 过期通知器(LeaseExpiredNotifier) 1 2 3 4 type LeaseExpiredNotifier struct { m map [LeaseID]*LeaseWithTime queue LeaseQueue }
设计要点:
使用最小堆按过期时间排序
支持O(log N)的插入、删除和查找
映射表提供O(1)的租约查找
4.2 租约队列(LeaseQueue) 1 2 3 4 5 6 7 type LeaseQueue []*LeaseWithTimetype LeaseWithTime struct { id LeaseID time time.Time index int }
设计要点:
实现标准堆接口
支持堆修复操作
索引字段支持O(log N)的堆修复
5. 持久化设计 5.1 后端存储 租约信息持久化到 buckets.Lease 桶中:
1 2 3 4 5 type Lease struct { ID int64 TTL int64 RemainingTTL int64 }
5.2 恢复机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func (le *lessor) initAndRecover() { tx := le.b.BatchTx() tx.LockOutsideApply() tx.UnsafeCreateBucket(buckets.Lease) lpbs := unsafeGetAllLeases(tx) tx.Unlock() for _, lpb := range lpbs { ID := LeaseID(lpb.ID) le.leaseMap[ID] = &Lease{ ID: ID, ttl: lpb.TTL, itemSet: make (map [LeaseItem]struct {}), expiry: forever, revokec: make (chan struct {}), remainingTTL: lpb.RemainingTTL, } } le.leaseExpiredNotifier.Init() heap.Init(&le.leaseCheckpointHeap) }
设计要点:
租约项(itemSet)不持久化,通过遍历KV恢复
初始过期时间设为永不过期,等待Promote时刷新
支持RemainingTTL的版本兼容性
6. 性能优化设计 6.1 速率限制
撤销速率限制 :leaseRevokeRate 控制每秒最大撤销数量
检查点速率限制 :leaseCheckpointRate 控制每秒最大检查点数量
批量处理 :检查点支持批量提交,减少RAFT日志
6.2 内存优化
最小堆 :O(log N)的过期时间管理
映射表 :O(1)的租约查找
缓冲通道 :避免过期处理阻塞主循环
6.3 并发安全
读写锁 :支持并发读操作
细粒度锁 :租约级别的互斥锁
通道通信 :避免共享状态竞争
7. 容错设计 7.1 主从切换容错
短暂双主 :Raft允许短暂的双主状态,通过term保证正确性
降级保护 :从节点将租约设为永不过期,避免误删
重试机制 :过期租约的重试间隔机制
7.2 网络分区容错
检查点机制 :保存剩余TTL,支持跨重启恢复
版本兼容 :支持不同版本的检查点行为
优雅降级 :网络问题时自动降级为从节点
8. 监控和指标 8.1 Prometheus指标 1 2 3 4 5 6 var ( leaseGranted = prometheus.NewCounter(...) leaseRevoked = prometheus.NewCounter(...) leaseRenewed = prometheus.NewCounter(...) leaseTotalTTLs = prometheus.NewHistogram(...) )
8.2 日志记录
调试日志 :检查点调度和处理的详细日志
错误日志 :租约操作失败的错误信息
性能日志 :关键操作的性能指标
9. 总结 etcd 的 Lessor 设计体现了以下核心特点:
高可用性 :通过主从切换和检查点机制保证服务连续性
高性能 :使用最小堆和映射表优化查找性能,支持速率限制
强一致性 :通过RAFT协议保证租约操作的一致性
可扩展性 :支持大量租约的高效管理
容错性 :完善的错误处理和恢复机制
Lessor 的设计充分考虑了分布式环境下的各种挑战,是一个经过充分验证的租约管理实现。