概述
Flush(刷新)和 Checkpoint(检查点)是 Milvus DataNode 中确保数据可靠性和一致性的两个核心机制。它们协同工作,实现数据从内存到持久化存储的完整流程,并保证故障恢复时的数据一致性。
Flush 操作全链路影响
Flush 触发方式
1. 手动刷新(Manual Flush)
1 2 3 4
| case commonpb.MsgType_ManualFlush: manualFlushMsg := msg.(*adaptor.ManualFlushMessageBody) ddn.msgHandler.HandleManualFlush(manualFlushMsg.ManualFlushMessage)
|
流程:
1
| DataCoord → ManualFlush 消息 → ddNode → MsgHandler → WriteBufferManager.FlushChannel
|
2. 自动刷新(Auto Flush)
触发条件:
- 内存超过阈值(
MemoryForceSyncWatermark)
- 缓冲区满(
GetFullBufferPolicy)
- 时间策略(
GetSyncStaleBufferPolicy)
- 刷新时间戳(
GetFlushTsPolicy)
Flush 执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| ┌─────────────────────────────────────────────────────────────┐ │ 1. FlushChannel 设置 flushTs │ │ WriteBufferManager.FlushChannel(channel, flushTs) │ └──────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 2. WriteBuffer 应用刷新策略 │ │ GetFlushTsPolicy: ts >= flushTs 时选择段 │ └──────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 3. 创建 SyncTask(isFlush = true) │ │ SyncPack.WithFlush() │ └──────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 4. SyncTask.Run() 执行 │ │ - 写入数据到对象存储 │ │ - 更新元数据到 DataCoord │ │ - 更新本地 MetaCache(状态 → Flushed) │ └──────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 5. 从 MetaCache 移除段 │ │ metaCache.RemoveSegments(segmentID) │ └─────────────────────────────────────────────────────────────┘
|
Flush 对系统的影响
1. 数据持久化
- 对象存储: 数据写入到对象存储(S3/MinIO 等)
- Binlog 生成: 生成 InsertLogs、StatsLogs、DeltaLogs
- Manifest: StorageV2 模式下生成 Manifest 文件
2. 元数据更新
- DataCoord: 通过
SaveBinlogPaths RPC 更新段元数据
- MetaCache: 更新段状态为
Flushed,并移除段
3. 内存释放
- WriteBuffer: 释放段缓冲区内存
- MetaCache: 移除段元数据
4. 监控指标
DataNodeFlushBufferCount: 刷新计数
DataNodeFlushedSize: 刷新数据大小
DataNodeFlushedRows: 刷新行数
DataNodeSave2StorageLatency: 存储延迟
Checkpoint 机制
Checkpoint 的作用
- 数据消费位置: 记录通道的数据消费位置
- 故障恢复: 节点重启后从检查点恢复
- 数据一致性: 确保数据不丢失、不重复
Checkpoint 更新流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| ┌─────────────────────────────────────────────────────────────┐ │ 1. ttNode 获取检查点 │ │ channelPos, needUpdate := GetCheckpoint(channel) │ └──────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 2. 判断是否需要更新 │ │ needUpdate = (flushTs != 0 && cp.Timestamp >= flushTs) │ └──────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 3. 添加到 CheckpointUpdater │ │ cpUpdater.AddTask(channelPos, flush=true, callback) │ └──────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 4. 异步更新到 DataCoord │ │ broker.UpdateChannelCheckpoint(ctx, channelCPs) │ └──────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 5. 更新成功回调 │ │ NotifyCheckpointUpdated(channel, ts) │ │ if ts > flushTs: reset flushTs │ └─────────────────────────────────────────────────────────────┘
|
Checkpoint 获取逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition { var earliest *checkpointCandidate for _, buf := range wb.segmentBuffers { if buf.insertBuffer.rows > 0 || buf.deltaBuffer.Size() > 0 { if earliest == nil || buf.startPosition.GetTimestamp() < earliest.position.GetTimestamp() { earliest = &checkpointCandidate{ segmentID: buf.segmentID, position: buf.startPosition, } } } } if earliest != nil { return earliest.position } return wb.checkpoint }
|
设计原理:
- 有未同步数据时,检查点不能超过最早未同步数据的起始位置
- 确保故障恢复时不会丢失数据
Flush 与 Checkpoint 的关系
协同工作流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| ┌─────────────────────────────────────────────────────────────┐ │ Flush 操作 │ └──────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 1. 设置 flushTs = T1 │ │ FlushChannel(channel, flushTs=T1) │ └──────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 2. 触发数据同步 │ │ SyncTask 执行,数据写入对象存储 │ └──────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 3. 检查点推进到 T2 (T2 >= T1) │ │ GetCheckpoint() 返回 checkpoint = T2 │ │ needUpdate = (T2 >= T1) = true │ └──────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 4. 更新检查点到 DataCoord │ │ UpdateChannelCheckpoint(T2) │ └──────────────────────┬──────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 5. 重置 flushTs │ │ NotifyCheckpointUpdated(channel, T2) │ │ if T2 > T1: flushTs = 0 │ └─────────────────────────────────────────────────────────────┘
|
关键条件:ts > flushTs
1 2 3 4 5 6 7 8 9 10 11 12
| func (m *bufferManager) NotifyCheckpointUpdated(channel string, ts uint64) { buf, loaded := m.buffers.Get(channel) if !loaded { return } flushTs := buf.GetFlushTimestamp() if flushTs != nonFlushTS && ts > flushTs { buf.SetFlushTimestamp(nonFlushTS) } }
|
为什么需要 ts > flushTs(严格大于)?
1. 异步操作的时序问题
1 2 3 4 5 6
| 时间线: T1: FlushChannel(flushTs=T1) 设置刷新时间戳 T2: SyncTask 开始执行(异步) T3: SyncTask 完成,数据已写入 T4: Checkpoint 更新到 T1(刚好等于 flushTs) T5: Checkpoint 更新到 T2(大于 flushTs)
|
如果使用 ts >= flushTs:
- 在 T4 时刻,
T1 >= T1 为真,会重置 flushTs
- 但此时 SyncTask 可能还未完成,导致数据不一致
使用 ts > flushTs:
- 在 T4 时刻,
T1 > T1 为假,不会重置
- 在 T5 时刻,
T2 > T1 为真,此时 SyncTask 已完成,安全重置
2. 边界情况处理
场景:检查点时间戳等于 flushTs
1 2
| flushTs = 1000 checkpoint = 1000 // 刚好等于
|
问题: 此时数据可能还在同步中,不能确定刷新已完成
解决: 等待检查点超过 flushTs,确保刷新已完成
3. 数据一致性保证
使用严格大于的好处:
- 确保刷新操作完全完成
- 避免过早重置 flushTs
- 保证数据一致性
对比:
GetFlushTsPolicy 使用 ts >= flushTs:触发刷新(包含边界)
NotifyCheckpointUpdated 使用 ts > flushTs:确认完成(排除边界)
Checkpoint 失败处理
失败场景
- 网络故障: RPC 调用失败
- DataCoord 不可用: 服务暂时不可用
- 超时: RPC 超时
处理机制
1 2 3 4 5
| err := ccu.broker.UpdateChannelCheckpoint(ctx, channelCPs) if err != nil { log.Warn("update channel checkpoint failed", zap.Error(err)) return }
|
特点:
- 失败时不删除任务
- 任务保留在
ccu.tasks 中
- 下次
execute() 或 notifyChan 触发时自动重试
任务合并
1 2 3 4 5 6 7 8
| if task.pos.GetTimestamp() < channelPos.GetTimestamp() || (flush && !task.flush) { ccu.tasks[channel] = &channelCPUpdateTask{ pos: max(channelPos, task.pos), callback: callback, flush: flush || task.flush, } }
|
优势:
- 避免重复更新
- 确保使用最新的检查点
- 保留刷新触发标志
数据一致性保证
1. 刷新完成确认
1 2 3 4 5 6 7 8 9
| FlushChannel(flushTs=T1) ↓ SyncTask 执行(异步) ↓ Checkpoint 更新到 T2 (T2 > T1) ↓ NotifyCheckpointUpdated(T2) ↓ 重置 flushTs(确认刷新完成)
|
2. 故障恢复
1 2 3 4 5 6 7
| 节点重启 ↓ 从 DataCoord 获取检查点 ↓ 从检查点位置恢复数据消费 ↓ 确保数据不丢失、不重复
|
3. 检查点推进规则
- 有未同步数据: 检查点不能超过最早未同步数据的起始位置
- 无未同步数据: 检查点推进到最新位置
监控与调试
关键指标
DataNodeFlushBufferCount: 刷新计数
DataNodeFlushedSize: 刷新数据大小
DataNodeSave2StorageLatency: 存储延迟
UpdateChannelCheckpoint: 检查点更新次数
日志关键点
1 2 3 4 5 6 7 8
| log.Info("receive manual flush message", zap.Uint64("flushTs", flushTs))
log.Info("reset channel flushTs", zap.String("channel", channel))
log.Debug("UpdateChannelCheckpoint success", zap.Uint64("cpTs", ts))
|
最佳实践
1. Flush 时机
- 手动刷新: 数据导入完成后
- 自动刷新: 基于内存阈值和时间策略
2. Checkpoint 更新频率
- 定期更新: 避免过于频繁的 RPC
- 刷新触发: 立即更新,加速刷新完成确认
3. 故障处理
- 重试机制: 自动重试失败的检查点更新
- 任务合并: 避免重复更新
相关文档