概述
TSO(Timestamp Oracle)是 Milvus 分布式系统中的核心组件,负责生成全局唯一、单调递增的时间戳。所有数据操作(Insert、Delete、Search 等)都需要从 TSO 获取时间戳,确保分布式环境下的事件顺序一致性。
一、TSO 的基本概念
1.1 为什么需要 TSO?
在分布式系统中,存在以下问题:
- 时钟不同步:不同节点的本地时钟可能不一致
- 网络延迟:消息在网络中传输存在延迟,可能导致乱序
- 事件顺序:需要保证全局事件的有序性
示例场景:
1 2 3 4 5 6 7
| 用户 u1 和 u2 在不同节点上操作: - t0: u1 创建 Collection C0 - t2: u2 在 C0 上搜索(应该看到空集合) - t5: u1 插入数据 A1 - t7: u2 在 C0 上搜索(应该看到 A1) - t15: u1 删除 A1 - t17: u2 在 C0 上搜索(应该看不到 A1)
|
如果没有 TSO,由于时钟不同步和网络延迟,u2 可能看到不一致的数据状态。
1.2 TSO 的解决方案
- 统一时间源:所有组件从 RootCoord 的 TSO 服务获取时间戳,而不是使用本地时钟
- 全局有序:TSO 保证生成的时间戳全局唯一且单调递增
- 时间同步:通过 TimeTick 机制确保消息流的顺序处理
二、TSO 的数据结构
2.1 混合时间戳(Hybrid Timestamp)
TSO 生成的时间戳是 uint64 类型,采用混合结构:
1 2 3 4 5 6
| ┌─────────────────────────────────────────────────────────┐ │ 64 bits (uint64) │ ├──────────────────────────────┬──────────────────────────┤ │ Physical Part (46 bits) │ Logical Part (18 bits) │ │ UTC 时间(毫秒) │ 逻辑计数器 │ └──────────────────────────────┴──────────────────────────┘
|
代码实现(pkg/util/tsoutil/tso.go):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| const ( logicalBits = 18 logicalBitsMask = (1 << logicalBits) - 1 )
func ComposeTS(physical, logical int64) uint64 { return uint64((physical << logicalBits) + logical) }
func ParseTS(ts uint64) (time.Time, uint64) { logical := ts & logicalBitsMask physical := ts >> logicalBits physicalTime := time.Unix(int64(physical/1000), int64(physical)%1000*time.Millisecond.Nanoseconds()) return physicalTime, logical }
|
2.2 各部分的作用
| 部分 |
位数 |
作用 |
范围 |
| Physical Part |
46 bits |
UTC 时间(毫秒) |
约 8925 年 |
| Logical Part |
18 bits |
逻辑计数器 |
0 ~ 262,143 (2^18-1) |
优势:
- 高精度:同一毫秒内可生成最多 262,143 个时间戳
- 时间可读:物理部分可以直接转换为 UTC 时间
- 全局唯一:物理时间 + 逻辑计数器保证唯一性
三、TSO 的生成流程
3.1 整体架构
1 2 3 4 5
| ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ │ Proxy │────────▶│ RootCoord │────────▶│ etcd │ │ │ Request │ │ Save │ │ │ │◀────────│ TSO Service │◀────────│ │ └─────────────┘ Response└──────────────┘ Load └─────────────┘
|
3.2 RootCoord 中的 TSO 服务
文件: internal/tso/tso.go
3.2.1 timestampOracle 结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| type timestampOracle struct { key string txnKV kv.TxnKV saveInterval time.Duration maxResetTSGap func() time.Duration TSO unsafe.Pointer lastSavedTime atomic.Value }
type atomicObject struct { physical time.Time logical int64 }
|
3.2.2 初始化(InitTimestamp)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func (t *timestampOracle) InitTimestamp() error { last, err := t.loadTimestamp() if err != nil { return err } next := time.Now() if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard { next = last.Add(updateTimestampGuard) } save := next.Add(t.saveInterval) if err := t.saveTimestamp(save); err != nil { return err } current := &atomicObject{ physical: next, logical: 0, } atomic.StorePointer(&t.TSO, unsafe.Pointer(current)) return nil }
|
关键点:
- 从 etcd 恢复上次保存的时间戳
- 提前保存未来 3 秒的时间窗口,减少 etcd 写入
- 使用原子指针保证线程安全
3.2.3 更新时间戳(UpdateTimestamp)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| func (t *timestampOracle) UpdateTimestamp() error { prev := (*atomicObject)(atomic.LoadPointer(&t.TSO)) now := time.Now() jetLag := typeutil.SubTimeByWallClock(now, prev.physical) var next time.Time if jetLag > updateTimestampGuard { next = now } else if prevLogical > maxLogical/2 { next = prev.physical.Add(time.Millisecond) } else { return nil } if typeutil.SubTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= updateTimestampGuard { save := next.Add(t.saveInterval) if err := t.saveTimestamp(save); err != nil { return err } } current := &atomicObject{ physical: next, logical: 0, } atomic.StorePointer(&t.TSO, unsafe.Pointer(current)) return nil }
|
更新触发条件:
- 系统时间比物理时间大(时钟同步)
- 逻辑计数器超过
maxLogical/2(131,071),需要增加物理时间
- 时间窗口不足,需要更新 etcd
3.3 生成时间戳(GenerateTSO)
文件: internal/tso/global_allocator.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) { var physical, logical int64 maxRetryCount := 10 for i := 0; i < maxRetryCount; i++ { current := (*atomicObject)(atomic.LoadPointer(>a.tso.TSO)) if current == nil || current.physical.Equal(typeutil.ZeroTime) { time.Sleep(200 * time.Millisecond) continue } physical = current.physical.UnixMilli() logical = atomic.AddInt64(¤t.logical, int64(count)) if logical >= maxLogical && gta.LimitMaxLogic { log.Info("logical part outside of max logical interval, please check ntp time") time.Sleep(UpdateTimestampStep) continue } return tsoutil.ComposeTS(physical, logical), nil } return 0, errors.New("can not get timestamp") }
|
关键点:
- 使用原子操作增加逻辑计数器,保证线程安全
- 支持批量分配(
count 个时间戳)
- 如果逻辑计数器溢出,等待并触发
UpdateTimestamp
3.4 RootCoord 的 AllocTimestamp RPC
文件: internal/rootcoord/root_coord.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { return &rootcoordpb.AllocTimestampResponse{Status: merr.Status(err)}, nil } if in.BlockTimestamp > 0 { blockTime, _ := tsoutil.ParseTS(in.BlockTimestamp) lastTime := c.tsoAllocator.GetLastSavedTime() deltaDuration := blockTime.Sub(lastTime) if deltaDuration > 0 { time.Sleep(deltaDuration + time.Millisecond*200) } } ts, err := c.tsoAllocator.GenerateTSO(in.GetCount()) if err != nil { return &rootcoordpb.AllocTimestampResponse{Status: merr.Status(err)}, nil } ts = ts - uint64(in.GetCount()) + 1 return &rootcoordpb.AllocTimestampResponse{ Status: merr.Success(), Timestamp: ts, Count: in.GetCount(), }, nil }
|
Proto 定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| service RootCoord { rpc AllocTimestamp(AllocTimestampRequest) returns (AllocTimestampResponse) {} }
message AllocTimestampRequest { common.MsgBase base = 1; uint32 count = 3; }
message AllocTimestampResponse { common.Status status = 1; uint64 timestamp = 2; uint32 count = 3; }
|
四、Proxy 如何使用 TSO
4.1 TimestampAllocator
文件: internal/proxy/timestamp.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| type timestampAllocator struct { peerID int64 tso types.RootCoordClient }
func (ta *timestampAllocator) alloc(ctx context.Context, count uint32) ([]Timestamp, error) { req := &rootcoordpb.AllocTimestampRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO), commonpbutil.WithSourceID(ta.peerID), ), Count: count, } resp, err := ta.tso.AllocTimestamp(ctx, req) if err != nil { return nil, err } start, cnt := resp.GetTimestamp(), resp.GetCount() ret := make([]Timestamp, cnt) for i := uint32(0); i < cnt; i++ { ret[i] = start + uint64(i) } return ret, nil }
func (ta *timestampAllocator) AllocOne(ctx context.Context) (Timestamp, error) { ret, err := ta.alloc(ctx, 1) if err != nil { return 0, err } return ret[0], nil }
|
4.2 Insert 操作中的 TSO 使用
文件: internal/proxy/task_insert.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func (it *insertTask) insertPreExecute(ctx context.Context) error { rowNums := uint32(it.insertMsg.NRows()) rowIDBegin, rowIDEnd, _ := common.AllocAutoID(it.idAllocator.Alloc, rowNums, clusterID) rowNum := it.insertMsg.NRows() it.insertMsg.Timestamps = make([]uint64, rowNum) timestamps, err := it.timestampAllocator.AllocTimestamp(ctx, uint32(rowNum)) if err != nil { return err } for index := range it.insertMsg.Timestamps { it.insertMsg.Timestamps[index] = timestamps[index] } return nil }
|
4.3 Delete 操作中的 TSO 使用
文件: internal/proxy/task_delete.go
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (dr *deleteRunner) Run(ctx context.Context) error { ts, err := dr.tsoAllocatorIns.AllocOne(ctx) if err != nil { return err } dr.deleteMsg.Timestamps = []uint64{ts} }
|
五、TSO 的作用
5.1 保证全局事件顺序
场景:多个 Proxy 同时写入数据
1 2 3 4
| Proxy1: Insert A (ts=100) Proxy2: Insert B (ts=150) Proxy1: Insert C (ts=200) Proxy2: Delete A (ts=250)
|
所有操作都从同一个 TSO 获取时间戳,保证全局顺序:
- A 在 B 之前插入(100 < 150)
- C 在 B 之后插入(200 > 150)
- Delete A 在 Insert C 之后(250 > 200)
5.2 时间同步(Time Synchronization)
问题:如何确保消息流中所有小于某个时间戳的消息都已处理?
解决方案:TimeTick 机制
Proxy 上报时间戳:
- 每个 Proxy 定期(默认 200ms)向 RootCoord 上报每个消息流的最新时间戳
RootCoord 计算最小时间戳:
- 对于每个消息流,RootCoord 计算所有 Proxy 上报的最小时间戳
插入 TimeTick 消息:
- RootCoord 将最小时间戳作为 TimeTick 消息插入到消息流中
消费者处理:
- 当消费者读取到 TimeTick 消息时,表示所有小于该时间戳的消息都已处理完成
代码(internal/rootcoord/root_coord.go):
1 2 3 4 5 6
| func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) { err := c.chanTimeTick.updateTimeTick(in, "gRPC") return merr.Status(err), nil }
|
5.3 数据一致性保证
场景:查询操作需要看到一致的数据快照
1 2 3 4 5
| t1: Insert A (ts=100) t2: Insert B (ts=200) t3: Search (ts=150) // 应该只看到 A,看不到 B t4: Delete A (ts=250) t5: Search (ts=300) // 应该只看到 B
|
通过 TSO 时间戳:
- 查询操作获取时间戳
ts_query
- 只处理时间戳
<= ts_query 的数据
- 保证查询结果的一致性
5.4 故障恢复
场景:系统重启后恢复时间戳
持久化到 etcd:
恢复时间戳:
- 重启后,从 etcd 加载上次保存的时间戳
- 确保新生成的时间戳大于已保存的时间戳
防止时间回退:
- 如果系统时间回退,使用 etcd 中的时间戳
- 保证时间戳单调递增
六、TSO 的性能优化
6.1 批量分配
- 减少 RPC 调用:Proxy 可以一次请求多个时间戳
- 提高吞吐量:减少网络往返次数
1 2
| timestamps, err := tsoAllocator.AllocTimestamp(ctx, 100)
|
6.2 时间窗口预分配
- 减少 etcd 写入:提前保存未来 3 秒的时间窗口
- 提高性能:避免频繁写 etcd
1
| saveInterval = 3 * time.Second
|
6.3 内存分配
- 原子操作:使用原子操作更新逻辑计数器,无锁
- 高性能:避免锁竞争
1
| logical = atomic.AddInt64(¤t.logical, int64(count))
|
6.4 逻辑计数器溢出处理
- 自动增加物理时间:当逻辑计数器超过
maxLogical/2 时,自动增加物理时间
- 保证可用性:不会因为逻辑计数器用完而阻塞
七、TSO 的限制和注意事项
7.1 时钟同步要求
- NTP 同步:建议所有节点使用 NTP 同步时钟
- 时钟偏移检测:如果时钟偏移超过 3 倍
UpdateTimestampStep(150ms),会记录警告日志
7.2 逻辑计数器限制
- 最大逻辑值:
maxLogical = 2^18 = 262,143
- 同一毫秒内最多生成:262,143 个时间戳
- 如果超过:需要等待下一毫秒
7.3 单点故障
- RootCoord 是单点:如果 RootCoord 故障,无法分配时间戳
- 高可用方案:通过 etcd 的 Leader 选举实现高可用
7.4 时间戳范围
- 物理时间范围:
- 最小值:
1546300800000(2019-01-01 00:00:00 UTC)
- 最大值:
253402300799000(9999-12-31 23:59:59 UTC)
八、总结
8.1 TSO 的核心价值
- ✅ 全局唯一时间戳:保证分布式环境下事件顺序
- ✅ 时间同步:解决时钟不同步和网络延迟问题
- ✅ 数据一致性:支持基于时间戳的快照查询
- ✅ 高性能:批量分配、时间窗口预分配等优化
8.2 关键设计
- 混合时间戳:物理时间(46 bits)+ 逻辑计数器(18 bits)
- 持久化:定期保存到 etcd,支持故障恢复
- 原子操作:使用原子操作保证线程安全
- 批量分配:支持一次分配多个时间戳
8.3 使用场景
- Insert/Delete 操作:为每行数据分配时间戳
- 查询操作:获取查询时间戳,保证一致性
- 消息流处理:通过 TimeTick 确保消息顺序
- 故障恢复:从 etcd 恢复时间戳状态
TSO 是 Milvus 分布式系统的核心基础设施,为整个系统提供了全局有序的时间基准,确保了数据一致性和系统可靠性。
九、GenerateTSO 失败处理机制
9.1 GenerateTSO 内部重试机制
GenerateTSO 方法内部实现了重试机制,不会立即失败:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) { maxRetryCount := 10 for i := 0; i < maxRetryCount; i++ { current := (*atomicObject)(atomic.LoadPointer(>a.tso.TSO)) if current == nil || current.physical.Equal(typeutil.ZeroTime) { log.Info("sync hasn't completed yet, wait for a while") time.Sleep(200 * time.Millisecond) continue } logical = atomic.AddInt64(¤t.logical, int64(count)) if logical >= maxLogical && gta.LimitMaxLogic { log.Info("logical part outside of max logical interval") time.Sleep(UpdateTimestampStep) continue } return tsoutil.ComposeTS(physical, logical), nil } return 0, errors.New("can not get timestamp") }
|
重试场景:
- TSO 未初始化:等待 200ms 后重试(最多 10 次,总等待时间约 2 秒)
- 逻辑计数器溢出:等待 50ms 后重试(等待
UpdateTimestamp 更新物理时间)
9.2 Proxy 层的超时和重试
Proxy 调用 TSO 时有多层保护:
9.2.1 Context 超时
1 2 3 4 5 6 7 8
| func (ta *timestampAllocator) alloc(ctx context.Context, count uint32) ([]Timestamp, error) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() resp, err := ta.tso.AllocTimestamp(ctx, req) }
|
超时保护:如果 RootCoord 无响应,10 秒后自动超时,不会永久阻塞。
9.2.2 gRPC 客户端重试
Proxy 的 gRPC 客户端有自动重试机制:
1 2 3 4 5 6 7 8 9 10
| retry.Handle(ctx, func() (bool, error) { ret, err = caller(wrapper.client) return needRetry, err }, retry.Attempts(10), retry.Sleep(200*time.Millisecond), retry.MaxSleepTime(10*time.Second))
|
重试策略:
- 最多重试 10 次
- 初始退避:200ms
- 最大退避:10 秒
- 总耗时:最多约 52.8 秒(如果所有重试都失败)
9.3 任务入队失败处理
当 TSO 分配失败时,任务无法入队,会立即返回错误:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (queue *baseTaskQueue) Enqueue(t task) error { if t.CanSkipAllocTimestamp() { ts = tsoutil.ComposeTS(time.Now().UnixMilli(), 0) } else { ts, err = queue.tsoAllocatorIns.AllocOne(t.TraceCtx()) if err != nil { return err } } }
|
关键点:
- ✅ 不会阻塞:任务入队失败会立即返回错误,不会阻塞其他任务
- ✅ 错误传播:错误会返回给调用者(如 Insert/Delete 请求)
- ✅ 用户可见:用户会收到明确的错误响应
9.4 失败场景分析
场景 1:RootCoord 未启动或不可用
1 2 3 4 5 6 7
| GenerateTSO 内部重试(10次 × 200ms = 2秒) ↓ Proxy gRPC 重试(10次,最多 52.8 秒) ↓ Context 超时(10 秒) ↓ 任务入队失败,返回错误给用户
|
结果:用户请求会在约 10 秒后收到错误响应,不会永久阻塞。
场景 2:TSO 未初始化(RootCoord 刚启动)
1 2 3 4 5 6 7 8 9
| GenerateTSO 检测到 TSO 未初始化 ↓ 等待 200ms ↓ 重试(最多 10 次,总等待约 2 秒) ↓ 如果初始化完成,成功返回 ↓ 如果仍未初始化,返回错误
|
结果:正常情况下,RootCoord 初始化很快(< 1 秒),不会失败。
场景 3:逻辑计数器溢出
1 2 3 4 5 6 7 8 9
| GenerateTSO 检测到逻辑计数器溢出 ↓ 等待 50ms(等待 UpdateTimestamp 更新) ↓ 重试(最多 10 次,总等待约 0.5 秒) ↓ 如果 UpdateTimestamp 成功,继续分配 ↓ 如果仍然溢出,返回错误
|
结果:正常情况下,UpdateTimestamp 会在 50ms 内完成,不会失败。
9.5 总结
GenerateTSO 失败不会永久阻塞处理:
✅ 多层重试机制:
- GenerateTSO 内部重试(10 次)
- gRPC 客户端重试(10 次)
- 总重试次数可达 100 次
✅ 超时保护:
- Context 超时(10 秒)
- 确保不会无限等待
✅ 快速失败:
- 任务入队失败立即返回错误
- 不会阻塞其他任务的处理
- 用户会收到明确的错误响应
✅ 正常情况下的性能:
- TSO 分配通常在毫秒级完成
- 重试机制只在异常情况下触发
- 不会影响正常请求的延迟
最佳实践:
- 确保 RootCoord 高可用(通过 etcd Leader 选举)
- 监控 TSO 分配失败率
- 如果频繁失败,检查 RootCoord 健康状态和网络连接