1. 1. 概述
  2. 2. 一、TSO 的基本概念
    1. 2.1. 1.1 为什么需要 TSO?
    2. 2.2. 1.2 TSO 的解决方案
  3. 3. 二、TSO 的数据结构
    1. 3.1. 2.1 混合时间戳(Hybrid Timestamp)
    2. 3.2. 2.2 各部分的作用
  4. 4. 三、TSO 的生成流程
    1. 4.1. 3.1 整体架构
    2. 4.2. 3.2 RootCoord 中的 TSO 服务
      1. 4.2.1. 3.2.1 timestampOracle 结构
      2. 4.2.2. 3.2.2 初始化(InitTimestamp)
      3. 4.2.3. 3.2.3 更新时间戳(UpdateTimestamp)
    3. 4.3. 3.3 生成时间戳(GenerateTSO)
    4. 4.4. 3.4 RootCoord 的 AllocTimestamp RPC
  5. 5. 四、Proxy 如何使用 TSO
    1. 5.1. 4.1 TimestampAllocator
    2. 5.2. 4.2 Insert 操作中的 TSO 使用
    3. 5.3. 4.3 Delete 操作中的 TSO 使用
  6. 6. 五、TSO 的作用
    1. 6.1. 5.1 保证全局事件顺序
    2. 6.2. 5.2 时间同步(Time Synchronization)
    3. 6.3. 5.3 数据一致性保证
    4. 6.4. 5.4 故障恢复
  7. 7. 六、TSO 的性能优化
    1. 7.1. 6.1 批量分配
    2. 7.2. 6.2 时间窗口预分配
    3. 7.3. 6.3 内存分配
    4. 7.4. 6.4 逻辑计数器溢出处理
  8. 8. 七、TSO 的限制和注意事项
    1. 8.1. 7.1 时钟同步要求
    2. 8.2. 7.2 逻辑计数器限制
    3. 8.3. 7.3 单点故障
    4. 8.4. 7.4 时间戳范围
  9. 9. 八、总结
    1. 9.1. 8.1 TSO 的核心价值
    2. 9.2. 8.2 关键设计
    3. 9.3. 8.3 使用场景
  10. 10. 九、GenerateTSO 失败处理机制
    1. 10.1. 9.1 GenerateTSO 内部重试机制
    2. 10.2. 9.2 Proxy 层的超时和重试
      1. 10.2.1. 9.2.1 Context 超时
      2. 10.2.2. 9.2.2 gRPC 客户端重试
    3. 10.3. 9.3 任务入队失败处理
    4. 10.4. 9.4 失败场景分析
      1. 10.4.1. 场景 1:RootCoord 未启动或不可用
      2. 10.4.2. 场景 2:TSO 未初始化(RootCoord 刚启动)
      3. 10.4.3. 场景 3:逻辑计数器溢出
    5. 10.5. 9.5 总结

Milvus TSO(Timestamp Oracle)生成及其作用分析

概述

TSO(Timestamp Oracle)是 Milvus 分布式系统中的核心组件,负责生成全局唯一、单调递增的时间戳。所有数据操作(Insert、Delete、Search 等)都需要从 TSO 获取时间戳,确保分布式环境下的事件顺序一致性。

一、TSO 的基本概念

1.1 为什么需要 TSO?

在分布式系统中,存在以下问题:

  1. 时钟不同步:不同节点的本地时钟可能不一致
  2. 网络延迟:消息在网络中传输存在延迟,可能导致乱序
  3. 事件顺序:需要保证全局事件的有序性

示例场景

1
2
3
4
5
6
7
用户 u1 和 u2 在不同节点上操作:
- t0: u1 创建 Collection C0
- t2: u2 在 C0 上搜索(应该看到空集合)
- t5: u1 插入数据 A1
- t7: u2 在 C0 上搜索(应该看到 A1)
- t15: u1 删除 A1
- t17: u2 在 C0 上搜索(应该看不到 A1)

如果没有 TSO,由于时钟不同步和网络延迟,u2 可能看到不一致的数据状态。

1.2 TSO 的解决方案

  • 统一时间源:所有组件从 RootCoord 的 TSO 服务获取时间戳,而不是使用本地时钟
  • 全局有序:TSO 保证生成的时间戳全局唯一且单调递增
  • 时间同步:通过 TimeTick 机制确保消息流的顺序处理

二、TSO 的数据结构

2.1 混合时间戳(Hybrid Timestamp)

TSO 生成的时间戳是 uint64 类型,采用混合结构:

1
2
3
4
5
6
┌─────────────────────────────────────────────────────────┐
│ 64 bits (uint64) │
├──────────────────────────────┬──────────────────────────┤
│ Physical Part (46 bits) │ Logical Part (18 bits) │
│ UTC 时间(毫秒) │ 逻辑计数器 │
└──────────────────────────────┴──────────────────────────┘

代码实现pkg/util/tsoutil/tso.go):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const (
logicalBits = 18
logicalBitsMask = (1 << logicalBits) - 1 // 0x3FFFF
)

// ComposeTS 组合物理时间和逻辑时间
func ComposeTS(physical, logical int64) uint64 {
return uint64((physical << logicalBits) + logical)
}

// ParseTS 解析时间戳
func ParseTS(ts uint64) (time.Time, uint64) {
logical := ts & logicalBitsMask // 提取逻辑部分(低 18 位)
physical := ts >> logicalBits // 提取物理部分(高 46 位)
physicalTime := time.Unix(int64(physical/1000),
int64(physical)%1000*time.Millisecond.Nanoseconds())
return physicalTime, logical
}

2.2 各部分的作用

部分 位数 作用 范围
Physical Part 46 bits UTC 时间(毫秒) 约 8925 年
Logical Part 18 bits 逻辑计数器 0 ~ 262,143 (2^18-1)

优势

  • 高精度:同一毫秒内可生成最多 262,143 个时间戳
  • 时间可读:物理部分可以直接转换为 UTC 时间
  • 全局唯一:物理时间 + 逻辑计数器保证唯一性

三、TSO 的生成流程

3.1 整体架构

1
2
3
4
5
┌─────────────┐         ┌──────────────┐         ┌─────────────┐
│ Proxy │────────▶│ RootCoord │────────▶│ etcd │
│ │ Request │ │ Save │ │
│ │◀────────│ TSO Service │◀────────│ │
└─────────────┘ Response└──────────────┘ Load └─────────────┘

3.2 RootCoord 中的 TSO 服务

文件: internal/tso/tso.go

3.2.1 timestampOracle 结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type timestampOracle struct {
key string // etcd 中的 key
txnKV kv.TxnKV // etcd 客户端

saveInterval time.Duration // 保存间隔(默认 3 秒)
maxResetTSGap func() time.Duration

TSO unsafe.Pointer // 当前 TSO(原子指针)
lastSavedTime atomic.Value // 最后保存的时间
}

// atomicObject 存储当前 TSO 状态
type atomicObject struct {
physical time.Time // 物理时间
logical int64 // 逻辑计数器
}

3.2.2 初始化(InitTimestamp)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (t *timestampOracle) InitTimestamp() error {
// 1. 从 etcd 加载上次保存的时间戳
last, err := t.loadTimestamp()
if err != nil {
return err
}

// 2. 获取当前系统时间
next := time.Now()

// 3. 如果系统时间与 etcd 时间差距太小,使用 etcd 时间
if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard {
next = last.Add(updateTimestampGuard)
}

// 4. 保存未来时间窗口到 etcd(提前保存,避免频繁写 etcd)
save := next.Add(t.saveInterval) // saveInterval = 3 秒
if err := t.saveTimestamp(save); err != nil {
return err
}

// 5. 初始化内存中的 TSO
current := &atomicObject{
physical: next,
logical: 0,
}
atomic.StorePointer(&t.TSO, unsafe.Pointer(current))

return nil
}

关键点

  • 从 etcd 恢复上次保存的时间戳
  • 提前保存未来 3 秒的时间窗口,减少 etcd 写入
  • 使用原子指针保证线程安全

3.2.3 更新时间戳(UpdateTimestamp)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (t *timestampOracle) UpdateTimestamp() error {
prev := (*atomicObject)(atomic.LoadPointer(&t.TSO))
now := time.Now()

jetLag := typeutil.SubTimeByWallClock(now, prev.physical)

var next time.Time

// 情况 1:系统时间比物理时间大,同步到系统时间
if jetLag > updateTimestampGuard {
next = now
}
// 情况 2:逻辑计数器快用完了,增加物理时间
else if prevLogical > maxLogical/2 {
next = prev.physical.Add(time.Millisecond)
}
// 情况 3:时间窗口足够,不需要更新
else {
return nil
}

// 如果时间窗口不够,需要更新 etcd
if typeutil.SubTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= updateTimestampGuard {
save := next.Add(t.saveInterval)
if err := t.saveTimestamp(save); err != nil {
return err
}
}

// 更新内存中的 TSO
current := &atomicObject{
physical: next,
logical: 0, // 重置逻辑计数器
}
atomic.StorePointer(&t.TSO, unsafe.Pointer(current))

return nil
}

更新触发条件

  1. 系统时间比物理时间大(时钟同步)
  2. 逻辑计数器超过 maxLogical/2(131,071),需要增加物理时间
  3. 时间窗口不足,需要更新 etcd

3.3 生成时间戳(GenerateTSO)

文件: internal/tso/global_allocator.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) {
var physical, logical int64

maxRetryCount := 10

for i := 0; i < maxRetryCount; i++ {
// 1. 获取当前 TSO 对象
current := (*atomicObject)(atomic.LoadPointer(&gta.tso.TSO))
if current == nil || current.physical.Equal(typeutil.ZeroTime) {
// TSO 未初始化,等待
time.Sleep(200 * time.Millisecond)
continue
}

// 2. 原子增加逻辑计数器
physical = current.physical.UnixMilli()
logical = atomic.AddInt64(&current.logical, int64(count))

// 3. 检查逻辑计数器是否溢出
if logical >= maxLogical && gta.LimitMaxLogic {
log.Info("logical part outside of max logical interval, please check ntp time")
time.Sleep(UpdateTimestampStep) // 等待 50ms,触发 UpdateTimestamp
continue
}

// 4. 组合时间戳
return tsoutil.ComposeTS(physical, logical), nil
}

return 0, errors.New("can not get timestamp")
}

关键点

  • 使用原子操作增加逻辑计数器,保证线程安全
  • 支持批量分配(count 个时间戳)
  • 如果逻辑计数器溢出,等待并触发 UpdateTimestamp

3.4 RootCoord 的 AllocTimestamp RPC

文件: internal/rootcoord/root_coord.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestampRequest) 
(*rootcoordpb.AllocTimestampResponse, error) {

// 1. 检查健康状态
if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
return &rootcoordpb.AllocTimestampResponse{Status: merr.Status(err)}, nil
}

// 2. 处理阻塞时间戳(用于数据恢复等场景)
if in.BlockTimestamp > 0 {
blockTime, _ := tsoutil.ParseTS(in.BlockTimestamp)
lastTime := c.tsoAllocator.GetLastSavedTime()
deltaDuration := blockTime.Sub(lastTime)
if deltaDuration > 0 {
time.Sleep(deltaDuration + time.Millisecond*200)
}
}

// 3. 生成 TSO
ts, err := c.tsoAllocator.GenerateTSO(in.GetCount())
if err != nil {
return &rootcoordpb.AllocTimestampResponse{Status: merr.Status(err)}, nil
}

// 4. 返回第一个可用时间戳
// 注意:GenerateTSO 返回的是最后一个时间戳,需要减去 count-1
ts = ts - uint64(in.GetCount()) + 1

return &rootcoordpb.AllocTimestampResponse{
Status: merr.Success(),
Timestamp: ts, // 第一个可用时间戳
Count: in.GetCount(),
}, nil
}

Proto 定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
service RootCoord {
rpc AllocTimestamp(AllocTimestampRequest) returns (AllocTimestampResponse) {}
}

message AllocTimestampRequest {
common.MsgBase base = 1;
uint32 count = 3; // 请求的时间戳数量
}

message AllocTimestampResponse {
common.Status status = 1;
uint64 timestamp = 2; // 第一个可用时间戳
uint32 count = 3; // 分配的数量
}

四、Proxy 如何使用 TSO

4.1 TimestampAllocator

文件: internal/proxy/timestamp.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
type timestampAllocator struct {
peerID int64
tso types.RootCoordClient
}

func (ta *timestampAllocator) alloc(ctx context.Context, count uint32) ([]Timestamp, error) {
// 1. 构建请求
req := &rootcoordpb.AllocTimestampRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO),
commonpbutil.WithSourceID(ta.peerID),
),
Count: count,
}

// 2. 调用 RootCoord 的 AllocTimestamp
resp, err := ta.tso.AllocTimestamp(ctx, req)
if err != nil {
return nil, err
}

// 3. 生成时间戳数组
start, cnt := resp.GetTimestamp(), resp.GetCount()
ret := make([]Timestamp, cnt)
for i := uint32(0); i < cnt; i++ {
ret[i] = start + uint64(i) // 连续的时间戳
}

return ret, nil
}

// AllocOne 分配单个时间戳
func (ta *timestampAllocator) AllocOne(ctx context.Context) (Timestamp, error) {
ret, err := ta.alloc(ctx, 1)
if err != nil {
return 0, err
}
return ret[0], nil
}

4.2 Insert 操作中的 TSO 使用

文件: internal/proxy/task_insert.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (it *insertTask) insertPreExecute(ctx context.Context) error {
// 1. 分配主键 ID
rowNums := uint32(it.insertMsg.NRows())
rowIDBegin, rowIDEnd, _ := common.AllocAutoID(it.idAllocator.Alloc, rowNums, clusterID)

// 2. 为每行分配时间戳
rowNum := it.insertMsg.NRows()
it.insertMsg.Timestamps = make([]uint64, rowNum)

// 3. 批量分配时间戳(性能优化)
timestamps, err := it.timestampAllocator.AllocTimestamp(ctx, uint32(rowNum))
if err != nil {
return err
}

// 4. 为每行设置时间戳
for index := range it.insertMsg.Timestamps {
it.insertMsg.Timestamps[index] = timestamps[index]
}

return nil
}

4.3 Delete 操作中的 TSO 使用

文件: internal/proxy/task_delete.go

1
2
3
4
5
6
7
8
9
10
11
12
13
func (dr *deleteRunner) Run(ctx context.Context) error {
// 1. 分配时间戳
ts, err := dr.tsoAllocatorIns.AllocOne(ctx)
if err != nil {
return err
}

// 2. 设置删除消息的时间戳
dr.deleteMsg.Timestamps = []uint64{ts}

// 3. 发送删除消息到消息流
// ...
}

五、TSO 的作用

5.1 保证全局事件顺序

场景:多个 Proxy 同时写入数据

1
2
3
4
Proxy1: Insert A (ts=100)
Proxy2: Insert B (ts=150)
Proxy1: Insert C (ts=200)
Proxy2: Delete A (ts=250)

所有操作都从同一个 TSO 获取时间戳,保证全局顺序:

  • A 在 B 之前插入(100 < 150)
  • C 在 B 之后插入(200 > 150)
  • Delete A 在 Insert C 之后(250 > 200)

5.2 时间同步(Time Synchronization)

问题:如何确保消息流中所有小于某个时间戳的消息都已处理?

解决方案:TimeTick 机制

  1. Proxy 上报时间戳

    • 每个 Proxy 定期(默认 200ms)向 RootCoord 上报每个消息流的最新时间戳
  2. RootCoord 计算最小时间戳

    • 对于每个消息流,RootCoord 计算所有 Proxy 上报的最小时间戳
  3. 插入 TimeTick 消息

    • RootCoord 将最小时间戳作为 TimeTick 消息插入到消息流中
  4. 消费者处理

    • 当消费者读取到 TimeTick 消息时,表示所有小于该时间戳的消息都已处理完成

代码internal/rootcoord/root_coord.go):

1
2
3
4
5
6
func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.ChannelTimeTickMsg) 
(*commonpb.Status, error) {
// 更新每个 Channel 的时间戳
err := c.chanTimeTick.updateTimeTick(in, "gRPC")
return merr.Status(err), nil
}

5.3 数据一致性保证

场景:查询操作需要看到一致的数据快照

1
2
3
4
5
t1: Insert A (ts=100)
t2: Insert B (ts=200)
t3: Search (ts=150) // 应该只看到 A,看不到 B
t4: Delete A (ts=250)
t5: Search (ts=300) // 应该只看到 B

通过 TSO 时间戳:

  • 查询操作获取时间戳 ts_query
  • 只处理时间戳 <= ts_query 的数据
  • 保证查询结果的一致性

5.4 故障恢复

场景:系统重启后恢复时间戳

  1. 持久化到 etcd

    • RootCoord 定期将时间戳保存到 etcd
  2. 恢复时间戳

    • 重启后,从 etcd 加载上次保存的时间戳
    • 确保新生成的时间戳大于已保存的时间戳
  3. 防止时间回退

    • 如果系统时间回退,使用 etcd 中的时间戳
    • 保证时间戳单调递增

六、TSO 的性能优化

6.1 批量分配

  • 减少 RPC 调用:Proxy 可以一次请求多个时间戳
  • 提高吞吐量:减少网络往返次数
1
2
// 一次分配 100 个时间戳
timestamps, err := tsoAllocator.AllocTimestamp(ctx, 100)

6.2 时间窗口预分配

  • 减少 etcd 写入:提前保存未来 3 秒的时间窗口
  • 提高性能:避免频繁写 etcd
1
saveInterval = 3 * time.Second  // 提前保存 3 秒

6.3 内存分配

  • 原子操作:使用原子操作更新逻辑计数器,无锁
  • 高性能:避免锁竞争
1
logical = atomic.AddInt64(&current.logical, int64(count))

6.4 逻辑计数器溢出处理

  • 自动增加物理时间:当逻辑计数器超过 maxLogical/2 时,自动增加物理时间
  • 保证可用性:不会因为逻辑计数器用完而阻塞

七、TSO 的限制和注意事项

7.1 时钟同步要求

  • NTP 同步:建议所有节点使用 NTP 同步时钟
  • 时钟偏移检测:如果时钟偏移超过 3 倍 UpdateTimestampStep(150ms),会记录警告日志

7.2 逻辑计数器限制

  • 最大逻辑值maxLogical = 2^18 = 262,143
  • 同一毫秒内最多生成:262,143 个时间戳
  • 如果超过:需要等待下一毫秒

7.3 单点故障

  • RootCoord 是单点:如果 RootCoord 故障,无法分配时间戳
  • 高可用方案:通过 etcd 的 Leader 选举实现高可用

7.4 时间戳范围

  • 物理时间范围
    • 最小值:1546300800000(2019-01-01 00:00:00 UTC)
    • 最大值:253402300799000(9999-12-31 23:59:59 UTC)

八、总结

8.1 TSO 的核心价值

  1. 全局唯一时间戳:保证分布式环境下事件顺序
  2. 时间同步:解决时钟不同步和网络延迟问题
  3. 数据一致性:支持基于时间戳的快照查询
  4. 高性能:批量分配、时间窗口预分配等优化

8.2 关键设计

  • 混合时间戳:物理时间(46 bits)+ 逻辑计数器(18 bits)
  • 持久化:定期保存到 etcd,支持故障恢复
  • 原子操作:使用原子操作保证线程安全
  • 批量分配:支持一次分配多个时间戳

8.3 使用场景

  • Insert/Delete 操作:为每行数据分配时间戳
  • 查询操作:获取查询时间戳,保证一致性
  • 消息流处理:通过 TimeTick 确保消息顺序
  • 故障恢复:从 etcd 恢复时间戳状态

TSO 是 Milvus 分布式系统的核心基础设施,为整个系统提供了全局有序的时间基准,确保了数据一致性和系统可靠性。

九、GenerateTSO 失败处理机制

9.1 GenerateTSO 内部重试机制

GenerateTSO 方法内部实现了重试机制,不会立即失败:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) {
maxRetryCount := 10 // 最多重试 10 次

for i := 0; i < maxRetryCount; i++ {
current := (*atomicObject)(atomic.LoadPointer(&gta.tso.TSO))

// 情况 1:TSO 未初始化,等待 200ms 后重试
if current == nil || current.physical.Equal(typeutil.ZeroTime) {
log.Info("sync hasn't completed yet, wait for a while")
time.Sleep(200 * time.Millisecond)
continue
}

// 情况 2:逻辑计数器溢出,等待 50ms 后重试
logical = atomic.AddInt64(&current.logical, int64(count))
if logical >= maxLogical && gta.LimitMaxLogic {
log.Info("logical part outside of max logical interval")
time.Sleep(UpdateTimestampStep) // 50ms
continue
}

return tsoutil.ComposeTS(physical, logical), nil
}

// 所有重试都失败,返回错误
return 0, errors.New("can not get timestamp")
}

重试场景

  1. TSO 未初始化:等待 200ms 后重试(最多 10 次,总等待时间约 2 秒)
  2. 逻辑计数器溢出:等待 50ms 后重试(等待 UpdateTimestamp 更新物理时间)

9.2 Proxy 层的超时和重试

Proxy 调用 TSO 时有多层保护:

9.2.1 Context 超时

1
2
3
4
5
6
7
8
func (ta *timestampAllocator) alloc(ctx context.Context, count uint32) ([]Timestamp, error) {
// 设置 10 秒超时
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

resp, err := ta.tso.AllocTimestamp(ctx, req)
// ...
}

超时保护:如果 RootCoord 无响应,10 秒后自动超时,不会永久阻塞。

9.2.2 gRPC 客户端重试

Proxy 的 gRPC 客户端有自动重试机制:

1
2
3
4
5
6
7
8
9
10
// internal/util/grpcclient/client.go
retry.Handle(ctx, func() (bool, error) {
// 执行 RPC 调用
ret, err = caller(wrapper.client)
// 检查错误,决定是否重试
return needRetry, err
},
retry.Attempts(10), // 最多重试 10 次
retry.Sleep(200*time.Millisecond), // 初始退避 200ms
retry.MaxSleepTime(10*time.Second)) // 最大退避 10 秒

重试策略

  • 最多重试 10 次
  • 初始退避:200ms
  • 最大退避:10 秒
  • 总耗时:最多约 52.8 秒(如果所有重试都失败)

9.3 任务入队失败处理

当 TSO 分配失败时,任务无法入队,会立即返回错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (queue *baseTaskQueue) Enqueue(t task) error {
// ...
if t.CanSkipAllocTimestamp() {
// 某些任务可以跳过 TSO 分配
ts = tsoutil.ComposeTS(time.Now().UnixMilli(), 0)
} else {
// 必须分配 TSO 的任务
ts, err = queue.tsoAllocatorIns.AllocOne(t.TraceCtx())
if err != nil {
return err // 直接返回错误,任务不入队
}
}
// ...
}

关键点

  • 不会阻塞:任务入队失败会立即返回错误,不会阻塞其他任务
  • 错误传播:错误会返回给调用者(如 Insert/Delete 请求)
  • 用户可见:用户会收到明确的错误响应

9.4 失败场景分析

场景 1:RootCoord 未启动或不可用

1
2
3
4
5
6
7
GenerateTSO 内部重试(10次 × 200ms = 2秒)

Proxy gRPC 重试(10次,最多 52.8 秒)

Context 超时(10 秒)

任务入队失败,返回错误给用户

结果:用户请求会在约 10 秒后收到错误响应,不会永久阻塞。

场景 2:TSO 未初始化(RootCoord 刚启动)

1
2
3
4
5
6
7
8
9
GenerateTSO 检测到 TSO 未初始化

等待 200ms

重试(最多 10 次,总等待约 2 秒)

如果初始化完成,成功返回

如果仍未初始化,返回错误

结果:正常情况下,RootCoord 初始化很快(< 1 秒),不会失败。

场景 3:逻辑计数器溢出

1
2
3
4
5
6
7
8
9
GenerateTSO 检测到逻辑计数器溢出

等待 50ms(等待 UpdateTimestamp 更新)

重试(最多 10 次,总等待约 0.5 秒)

如果 UpdateTimestamp 成功,继续分配

如果仍然溢出,返回错误

结果:正常情况下,UpdateTimestamp 会在 50ms 内完成,不会失败。

9.5 总结

GenerateTSO 失败不会永久阻塞处理

  1. 多层重试机制

    • GenerateTSO 内部重试(10 次)
    • gRPC 客户端重试(10 次)
    • 总重试次数可达 100 次
  2. 超时保护

    • Context 超时(10 秒)
    • 确保不会无限等待
  3. 快速失败

    • 任务入队失败立即返回错误
    • 不会阻塞其他任务的处理
    • 用户会收到明确的错误响应
  4. 正常情况下的性能

    • TSO 分配通常在毫秒级完成
    • 重试机制只在异常情况下触发
    • 不会影响正常请求的延迟

最佳实践

  • 确保 RootCoord 高可用(通过 etcd Leader 选举)
  • 监控 TSO 分配失败率
  • 如果频繁失败,检查 RootCoord 健康状态和网络连接