Milvus TTNode(时间戳节点)详解

概述

ttNode(TimeTick Node)是 FlowGraph 中的最后一个节点,负责管理和更新通道检查点(Checkpoint)。它定期或基于触发条件将检查点异步更新到 DataCoord,确保数据消费位置的持久化。

架构设计

核心结构

1
2
3
4
5
6
7
8
9
10
11
type ttNode struct {
BaseNode

vChannelName string
metacache metacache.MetaCache
writeBufferManager writebuffer.BufferManager
lastUpdateTime *atomic.Time
cpUpdater *util.ChannelCheckpointUpdater
dropMode *atomic.Bool
dropCallback func()
}

关键字段说明:

  • writeBufferManager: WriteBuffer 管理器,用于获取检查点
  • lastUpdateTime: 上次更新时间,用于控制更新频率
  • cpUpdater: 检查点更新器,异步更新检查点
  • dropMode: 删除模式标志

消息处理流程

Operate 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (ttn *ttNode) Operate(in []Msg) []Msg {
fgMsg := in[0].(*FlowGraphMsg)

// 1. 处理删除集合消息
if fgMsg.dropCollection {
ttn.dropMode.Store(true)
if ttn.dropCallback != nil {
defer ttn.dropCallback()
}
}

// 2. 删除模式下跳过检查点更新
if ttn.dropMode.Load() {
return []Msg{}
}

// 3. 处理关闭消息
if fgMsg.IsCloseMsg() {
if ttn.dropMode.Load() {
return in
}
// 强制更新检查点
if len(fgMsg.EndPositions) > 0 {
channelPos, _, err := ttn.writeBufferManager.GetCheckpoint(ttn.vChannelName)
if err != nil {
return []Msg{}
}
ttn.updateChannelCP(channelPos, curTs, false)
}
return in
}

curTs, _ := tsoutil.ParseTS(fgMsg.TimeRange.TimestampMax)

// 4. 获取检查点
channelPos, needUpdate, err := ttn.writeBufferManager.GetCheckpoint(ttn.vChannelName)
if err != nil {
return []Msg{}
}

// 5. 定期更新(基于时间间隔)
if curTs.Sub(ttn.lastUpdateTime.Load()) >= paramtable.Get().DataNodeCfg.UpdateChannelCheckpointInterval.GetAsDuration(time.Second) {
ttn.updateChannelCP(channelPos, curTs, false)
return []Msg{}
}

// 6. 触发更新(基于 flushTs 条件)
if needUpdate {
ttn.updateChannelCP(channelPos, curTs, true)
}

return []Msg{}
}

检查点更新机制

updateChannelCP 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition, curTs time.Time, flush bool) {
callBack := func() {
channelCPTs, _ := tsoutil.ParseTS(channelPos.GetTimestamp())
// 重置 flushTs,防止频繁刷新
ttn.writeBufferManager.NotifyCheckpointUpdated(ttn.vChannelName, channelPos.GetTimestamp())
log.Debug("UpdateChannelCheckpoint success",
zap.String("channel", ttn.vChannelName),
zap.Uint64("cpTs", channelPos.GetTimestamp()),
zap.Time("cpTime", channelCPTs))
}

// 添加到更新器
ttn.cpUpdater.AddTask(channelPos, flush, callBack)
ttn.lastUpdateTime.Store(curTs)
}

关键逻辑:

  • flush 参数:标记是否由刷新操作触发
  • callBack:更新成功后的回调,用于重置 flushTs

ChannelCheckpointUpdater 详解

核心结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type ChannelCheckpointUpdater struct {
broker broker.Broker

mu sync.RWMutex
tasks map[string]*channelCPUpdateTask
notifyChan chan struct{}

closeCh chan struct{}
closeOnce sync.Once
updateDoneCallback func(*msgpb.MsgPosition)
}

type channelCPUpdateTask struct {
pos *msgpb.MsgPosition
callback func()
flush bool // 是否由刷新触发
}

启动循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (ccu *ChannelCheckpointUpdater) Start() {
ticker := time.NewTicker(paramtable.Get().DataNodeCfg.ChannelCheckpointUpdateTickInSeconds.GetAsDuration(time.Second))
defer ticker.Stop()

for {
select {
case <-ccu.closeCh:
return
case <-ccu.notifyChan:
// 处理刷新触发的更新
var tasks []*channelCPUpdateTask
ccu.mu.Lock()
for _, task := range ccu.tasks {
if task.flush {
task.flush = false // 重置标志
tasks = append(tasks, task)
}
}
ccu.mu.Unlock()
if len(tasks) > 0 {
ccu.updateCheckpoints(tasks)
}
case <-ticker.C:
// 定期执行更新
ccu.execute()
}
}
}

添加任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (ccu *ChannelCheckpointUpdater) AddTask(channelPos *msgpb.MsgPosition, flush bool, callback func()) {
if flush {
// 刷新触发时,立即触发更新尝试
defer ccu.trigger()
}

channel := channelPos.GetChannelName()
task, ok := ccu.getTask(channel)

if !ok {
// 新任务
ccu.mu.Lock()
defer ccu.mu.Unlock()
ccu.tasks[channel] = &channelCPUpdateTask{
pos: channelPos,
callback: callback,
flush: flush,
}
return
}

// 合并任务
max := func(a, b *msgpb.MsgPosition) *msgpb.MsgPosition {
if a.GetTimestamp() > b.GetTimestamp() {
return a
}
return b
}

// 更新条件:
// 1. 位置更新了(时间戳更大)
// 2. 刷新触发但任务未标记为刷新
if task.pos.GetTimestamp() < channelPos.GetTimestamp() || (flush && !task.flush) {
ccu.mu.Lock()
defer ccu.mu.Unlock()
ccu.tasks[channel] = &channelCPUpdateTask{
pos: max(channelPos, task.pos), // 取最大时间戳
callback: callback,
flush: flush || task.flush, // 保留刷新标志
}
}
}

更新检查点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (ccu *ChannelCheckpointUpdater) updateCheckpoints(tasks []*channelCPUpdateTask) {
// 1. 分批处理(每批最大数量限制)
taskGroups := lo.Chunk(tasks, paramtable.Get().DataNodeCfg.MaxChannelCheckpointsPerRPC.GetAsInt())

// 2. 并行处理(最大并行数限制)
updateChanCPMaxParallel := paramtable.Get().DataNodeCfg.UpdateChannelCheckpointMaxParallel.GetAsInt()
rpcGroups := lo.Chunk(taskGroups, updateChanCPMaxParallel)

finished := typeutil.NewConcurrentMap[string, *channelCPUpdateTask]()

for _, groups := range rpcGroups {
wg := &sync.WaitGroup{}
for _, tasks := range groups {
wg.Add(1)
go func(tasks []*channelCPUpdateTask) {
defer wg.Done()

// 3. 构建 RPC 请求
timeout := paramtable.Get().DataNodeCfg.UpdateChannelCheckpointRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

channelCPs := lo.Map(tasks, func(t *channelCPUpdateTask, _ int) *msgpb.MsgPosition {
return t.pos
})

// 4. 发送 RPC
err := ccu.broker.UpdateChannelCheckpoint(ctx, channelCPs)
if err != nil {
log.Warn("update channel checkpoint failed", zap.Error(err))
return // 失败时不删除任务,等待重试
}

// 5. 执行回调
for _, task := range tasks {
task.callback()
finished.Insert(task.pos.GetChannelName(), task)
if ccu.updateDoneCallback != nil {
ccu.updateDoneCallback(task.pos)
}
}
}(tasks)
}
wg.Wait()
}

// 6. 清理已完成的任务
ccu.mu.Lock()
defer ccu.mu.Unlock()
finished.Range(func(_ string, task *channelCPUpdateTask) bool {
channel := task.pos.GetChannelName()
// 只有当任务位置 >= 当前任务位置时才删除(避免删除更新的任务)
if ccu.tasks[channel].pos.GetTimestamp() <= task.pos.GetTimestamp() {
delete(ccu.tasks, channel)
}
return true
})
}

检查点更新触发条件

1. 定期更新

1
2
3
if curTs.Sub(ttn.lastUpdateTime.Load()) >= UpdateChannelCheckpointInterval {
ttn.updateChannelCP(channelPos, curTs, false)
}

触发条件: 距离上次更新时间超过配置的间隔

2. 刷新触发

1
2
3
4
channelPos, needUpdate, err := ttn.writeBufferManager.GetCheckpoint(ttn.vChannelName)
if needUpdate {
ttn.updateChannelCP(channelPos, curTs, true)
}

触发条件: GetCheckpoint 返回 needUpdate = true

needUpdate 的计算:

1
2
flushTs := buf.GetFlushTimestamp()
return cp, flushTs != nonFlushTS && cp.GetTimestamp() >= flushTs, nil

含义: 当检查点时间戳 >= flushTs 时,需要更新检查点

检查点失败处理

失败重试机制

1
2
3
4
5
err := ccu.broker.UpdateChannelCheckpoint(ctx, channelCPs)
if err != nil {
log.Warn("update channel checkpoint failed", zap.Error(err))
return // 不删除任务,等待下次重试
}

特点:

  • 失败时不删除任务
  • 任务保留在 ccu.tasks
  • 下次 execute()notifyChan 触发时会重试

任务合并

1
2
3
4
5
6
// 合并逻辑:取最大时间戳,保留刷新标志
ccu.tasks[channel] = &channelCPUpdateTask{
pos: max(channelPos, task.pos),
callback: callback,
flush: flush || task.flush,
}

优势:

  • 避免重复更新
  • 确保使用最新的检查点
  • 保留刷新触发标志

设计特点

1. 异步更新

通过 ChannelCheckpointUpdater 实现异步更新,不阻塞 FlowGraph。

2. 批量处理

支持批量更新多个通道的检查点,提高效率。

3. 并行执行

支持并行处理多个批次,充分利用系统资源。

4. 失败重试

失败的任务保留在队列中,自动重试。

5. 刷新加速

刷新触发的更新会立即尝试,不等待定时器。

配置参数

  • UpdateChannelCheckpointInterval: 定期更新间隔
  • ChannelCheckpointUpdateTickInSeconds: 更新器定时器间隔
  • MaxChannelCheckpointsPerRPC: 每次 RPC 最大检查点数
  • UpdateChannelCheckpointMaxParallel: 最大并行数
  • UpdateChannelCheckpointRPCTimeout: RPC 超时时间

相关文档