概述
ttNode(TimeTick Node)是 FlowGraph 中的最后一个节点,负责管理和更新通道检查点(Checkpoint)。它定期或基于触发条件将检查点异步更新到 DataCoord,确保数据消费位置的持久化。
架构设计
核心结构
1 2 3 4 5 6 7 8 9 10 11
| type ttNode struct { BaseNode vChannelName string metacache metacache.MetaCache writeBufferManager writebuffer.BufferManager lastUpdateTime *atomic.Time cpUpdater *util.ChannelCheckpointUpdater dropMode *atomic.Bool dropCallback func() }
|
关键字段说明:
writeBufferManager: WriteBuffer 管理器,用于获取检查点
lastUpdateTime: 上次更新时间,用于控制更新频率
cpUpdater: 检查点更新器,异步更新检查点
dropMode: 删除模式标志
消息处理流程
Operate 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| func (ttn *ttNode) Operate(in []Msg) []Msg { fgMsg := in[0].(*FlowGraphMsg) if fgMsg.dropCollection { ttn.dropMode.Store(true) if ttn.dropCallback != nil { defer ttn.dropCallback() } } if ttn.dropMode.Load() { return []Msg{} } if fgMsg.IsCloseMsg() { if ttn.dropMode.Load() { return in } if len(fgMsg.EndPositions) > 0 { channelPos, _, err := ttn.writeBufferManager.GetCheckpoint(ttn.vChannelName) if err != nil { return []Msg{} } ttn.updateChannelCP(channelPos, curTs, false) } return in } curTs, _ := tsoutil.ParseTS(fgMsg.TimeRange.TimestampMax) channelPos, needUpdate, err := ttn.writeBufferManager.GetCheckpoint(ttn.vChannelName) if err != nil { return []Msg{} } if curTs.Sub(ttn.lastUpdateTime.Load()) >= paramtable.Get().DataNodeCfg.UpdateChannelCheckpointInterval.GetAsDuration(time.Second) { ttn.updateChannelCP(channelPos, curTs, false) return []Msg{} } if needUpdate { ttn.updateChannelCP(channelPos, curTs, true) } return []Msg{} }
|
检查点更新机制
updateChannelCP 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition, curTs time.Time, flush bool) { callBack := func() { channelCPTs, _ := tsoutil.ParseTS(channelPos.GetTimestamp()) ttn.writeBufferManager.NotifyCheckpointUpdated(ttn.vChannelName, channelPos.GetTimestamp()) log.Debug("UpdateChannelCheckpoint success", zap.String("channel", ttn.vChannelName), zap.Uint64("cpTs", channelPos.GetTimestamp()), zap.Time("cpTime", channelCPTs)) } ttn.cpUpdater.AddTask(channelPos, flush, callBack) ttn.lastUpdateTime.Store(curTs) }
|
关键逻辑:
flush 参数:标记是否由刷新操作触发
callBack:更新成功后的回调,用于重置 flushTs
ChannelCheckpointUpdater 详解
核心结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| type ChannelCheckpointUpdater struct { broker broker.Broker mu sync.RWMutex tasks map[string]*channelCPUpdateTask notifyChan chan struct{} closeCh chan struct{} closeOnce sync.Once updateDoneCallback func(*msgpb.MsgPosition) }
type channelCPUpdateTask struct { pos *msgpb.MsgPosition callback func() flush bool }
|
启动循环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func (ccu *ChannelCheckpointUpdater) Start() { ticker := time.NewTicker(paramtable.Get().DataNodeCfg.ChannelCheckpointUpdateTickInSeconds.GetAsDuration(time.Second)) defer ticker.Stop() for { select { case <-ccu.closeCh: return case <-ccu.notifyChan: var tasks []*channelCPUpdateTask ccu.mu.Lock() for _, task := range ccu.tasks { if task.flush { task.flush = false tasks = append(tasks, task) } } ccu.mu.Unlock() if len(tasks) > 0 { ccu.updateCheckpoints(tasks) } case <-ticker.C: ccu.execute() } } }
|
添加任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| func (ccu *ChannelCheckpointUpdater) AddTask(channelPos *msgpb.MsgPosition, flush bool, callback func()) { if flush { defer ccu.trigger() } channel := channelPos.GetChannelName() task, ok := ccu.getTask(channel) if !ok { ccu.mu.Lock() defer ccu.mu.Unlock() ccu.tasks[channel] = &channelCPUpdateTask{ pos: channelPos, callback: callback, flush: flush, } return } max := func(a, b *msgpb.MsgPosition) *msgpb.MsgPosition { if a.GetTimestamp() > b.GetTimestamp() { return a } return b } if task.pos.GetTimestamp() < channelPos.GetTimestamp() || (flush && !task.flush) { ccu.mu.Lock() defer ccu.mu.Unlock() ccu.tasks[channel] = &channelCPUpdateTask{ pos: max(channelPos, task.pos), callback: callback, flush: flush || task.flush, } } }
|
更新检查点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| func (ccu *ChannelCheckpointUpdater) updateCheckpoints(tasks []*channelCPUpdateTask) { taskGroups := lo.Chunk(tasks, paramtable.Get().DataNodeCfg.MaxChannelCheckpointsPerRPC.GetAsInt()) updateChanCPMaxParallel := paramtable.Get().DataNodeCfg.UpdateChannelCheckpointMaxParallel.GetAsInt() rpcGroups := lo.Chunk(taskGroups, updateChanCPMaxParallel) finished := typeutil.NewConcurrentMap[string, *channelCPUpdateTask]() for _, groups := range rpcGroups { wg := &sync.WaitGroup{} for _, tasks := range groups { wg.Add(1) go func(tasks []*channelCPUpdateTask) { defer wg.Done() timeout := paramtable.Get().DataNodeCfg.UpdateChannelCheckpointRPCTimeout.GetAsDuration(time.Second) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() channelCPs := lo.Map(tasks, func(t *channelCPUpdateTask, _ int) *msgpb.MsgPosition { return t.pos }) err := ccu.broker.UpdateChannelCheckpoint(ctx, channelCPs) if err != nil { log.Warn("update channel checkpoint failed", zap.Error(err)) return } for _, task := range tasks { task.callback() finished.Insert(task.pos.GetChannelName(), task) if ccu.updateDoneCallback != nil { ccu.updateDoneCallback(task.pos) } } }(tasks) } wg.Wait() } ccu.mu.Lock() defer ccu.mu.Unlock() finished.Range(func(_ string, task *channelCPUpdateTask) bool { channel := task.pos.GetChannelName() if ccu.tasks[channel].pos.GetTimestamp() <= task.pos.GetTimestamp() { delete(ccu.tasks, channel) } return true }) }
|
检查点更新触发条件
1. 定期更新
1 2 3
| if curTs.Sub(ttn.lastUpdateTime.Load()) >= UpdateChannelCheckpointInterval { ttn.updateChannelCP(channelPos, curTs, false) }
|
触发条件: 距离上次更新时间超过配置的间隔
2. 刷新触发
1 2 3 4
| channelPos, needUpdate, err := ttn.writeBufferManager.GetCheckpoint(ttn.vChannelName) if needUpdate { ttn.updateChannelCP(channelPos, curTs, true) }
|
触发条件: GetCheckpoint 返回 needUpdate = true
needUpdate 的计算:
1 2
| flushTs := buf.GetFlushTimestamp() return cp, flushTs != nonFlushTS && cp.GetTimestamp() >= flushTs, nil
|
含义: 当检查点时间戳 >= flushTs 时,需要更新检查点
检查点失败处理
失败重试机制
1 2 3 4 5
| err := ccu.broker.UpdateChannelCheckpoint(ctx, channelCPs) if err != nil { log.Warn("update channel checkpoint failed", zap.Error(err)) return }
|
特点:
- 失败时不删除任务
- 任务保留在
ccu.tasks 中
- 下次
execute() 或 notifyChan 触发时会重试
任务合并
1 2 3 4 5 6
| ccu.tasks[channel] = &channelCPUpdateTask{ pos: max(channelPos, task.pos), callback: callback, flush: flush || task.flush, }
|
优势:
- 避免重复更新
- 确保使用最新的检查点
- 保留刷新触发标志
设计特点
1. 异步更新
通过 ChannelCheckpointUpdater 实现异步更新,不阻塞 FlowGraph。
2. 批量处理
支持批量更新多个通道的检查点,提高效率。
3. 并行执行
支持并行处理多个批次,充分利用系统资源。
4. 失败重试
失败的任务保留在队列中,自动重试。
5. 刷新加速
刷新触发的更新会立即尝试,不等待定时器。
配置参数
UpdateChannelCheckpointInterval: 定期更新间隔
ChannelCheckpointUpdateTickInSeconds: 更新器定时器间隔
MaxChannelCheckpointsPerRPC: 每次 RPC 最大检查点数
UpdateChannelCheckpointMaxParallel: 最大并行数
UpdateChannelCheckpointRPCTimeout: RPC 超时时间
相关文档