Milvus Flush 与 Checkpoint 机制详解

概述

Flush(刷新)和 Checkpoint(检查点)是 Milvus DataNode 中确保数据可靠性和一致性的两个核心机制。它们协同工作,实现数据从内存到持久化存储的完整流程,并保证故障恢复时的数据一致性。

Flush 操作全链路影响

Flush 触发方式

1. 手动刷新(Manual Flush)

1
2
3
4
// 通过 DDL 消息触发
case commonpb.MsgType_ManualFlush:
manualFlushMsg := msg.(*adaptor.ManualFlushMessageBody)
ddn.msgHandler.HandleManualFlush(manualFlushMsg.ManualFlushMessage)

流程:

1
DataCoord → ManualFlush 消息 → ddNode → MsgHandler → WriteBufferManager.FlushChannel

2. 自动刷新(Auto Flush)

触发条件:

  • 内存超过阈值(MemoryForceSyncWatermark
  • 缓冲区满(GetFullBufferPolicy
  • 时间策略(GetSyncStaleBufferPolicy
  • 刷新时间戳(GetFlushTsPolicy

Flush 执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
┌─────────────────────────────────────────────────────────────┐
│ 1. FlushChannel 设置 flushTs │
│ WriteBufferManager.FlushChannel(channel, flushTs) │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 2. WriteBuffer 应用刷新策略 │
│ GetFlushTsPolicy: ts >= flushTs 时选择段 │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 3. 创建 SyncTask(isFlush = true) │
│ SyncPack.WithFlush() │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 4. SyncTask.Run() 执行 │
│ - 写入数据到对象存储 │
│ - 更新元数据到 DataCoord │
│ - 更新本地 MetaCache(状态 → Flushed) │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 5. 从 MetaCache 移除段 │
│ metaCache.RemoveSegments(segmentID) │
└─────────────────────────────────────────────────────────────┘

Flush 对系统的影响

1. 数据持久化

  • 对象存储: 数据写入到对象存储(S3/MinIO 等)
  • Binlog 生成: 生成 InsertLogs、StatsLogs、DeltaLogs
  • Manifest: StorageV2 模式下生成 Manifest 文件

2. 元数据更新

  • DataCoord: 通过 SaveBinlogPaths RPC 更新段元数据
  • MetaCache: 更新段状态为 Flushed,并移除段

3. 内存释放

  • WriteBuffer: 释放段缓冲区内存
  • MetaCache: 移除段元数据

4. 监控指标

  • DataNodeFlushBufferCount: 刷新计数
  • DataNodeFlushedSize: 刷新数据大小
  • DataNodeFlushedRows: 刷新行数
  • DataNodeSave2StorageLatency: 存储延迟

Checkpoint 机制

Checkpoint 的作用

  1. 数据消费位置: 记录通道的数据消费位置
  2. 故障恢复: 节点重启后从检查点恢复
  3. 数据一致性: 确保数据不丢失、不重复

Checkpoint 更新流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
┌─────────────────────────────────────────────────────────────┐
│ 1. ttNode 获取检查点 │
│ channelPos, needUpdate := GetCheckpoint(channel) │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 2. 判断是否需要更新 │
│ needUpdate = (flushTs != 0 && cp.Timestamp >= flushTs) │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 3. 添加到 CheckpointUpdater │
│ cpUpdater.AddTask(channelPos, flush=true, callback) │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 4. 异步更新到 DataCoord │
│ broker.UpdateChannelCheckpoint(ctx, channelCPs) │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 5. 更新成功回调 │
│ NotifyCheckpointUpdated(channel, ts) │
│ if ts > flushTs: reset flushTs │
└─────────────────────────────────────────────────────────────┘

Checkpoint 获取逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition {
// 1. 如果有非空缓冲区,返回最早的起始位置
var earliest *checkpointCandidate
for _, buf := range wb.segmentBuffers {
if buf.insertBuffer.rows > 0 || buf.deltaBuffer.Size() > 0 {
if earliest == nil || buf.startPosition.GetTimestamp() < earliest.position.GetTimestamp() {
earliest = &checkpointCandidate{
segmentID: buf.segmentID,
position: buf.startPosition,
}
}
}
}

// 2. 否则返回最新检查点
if earliest != nil {
return earliest.position
}
return wb.checkpoint
}

设计原理:

  • 有未同步数据时,检查点不能超过最早未同步数据的起始位置
  • 确保故障恢复时不会丢失数据

Flush 与 Checkpoint 的关系

协同工作流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
┌─────────────────────────────────────────────────────────────┐
│ Flush 操作 │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 1. 设置 flushTs = T1 │
│ FlushChannel(channel, flushTs=T1) │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 2. 触发数据同步 │
│ SyncTask 执行,数据写入对象存储 │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 3. 检查点推进到 T2 (T2 >= T1) │
│ GetCheckpoint() 返回 checkpoint = T2 │
│ needUpdate = (T2 >= T1) = true │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 4. 更新检查点到 DataCoord │
│ UpdateChannelCheckpoint(T2) │
└──────────────────────┬──────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ 5. 重置 flushTs │
│ NotifyCheckpointUpdated(channel, T2) │
│ if T2 > T1: flushTs = 0 │
└─────────────────────────────────────────────────────────────┘

关键条件:ts > flushTs

1
2
3
4
5
6
7
8
9
10
11
12
func (m *bufferManager) NotifyCheckpointUpdated(channel string, ts uint64) {
buf, loaded := m.buffers.Get(channel)
if !loaded {
return
}
flushTs := buf.GetFlushTimestamp()

// 关键条件:严格大于
if flushTs != nonFlushTS && ts > flushTs {
buf.SetFlushTimestamp(nonFlushTS)
}
}

为什么需要 ts > flushTs(严格大于)?

1. 异步操作的时序问题

1
2
3
4
5
6
时间线:
T1: FlushChannel(flushTs=T1) 设置刷新时间戳
T2: SyncTask 开始执行(异步)
T3: SyncTask 完成,数据已写入
T4: Checkpoint 更新到 T1(刚好等于 flushTs)
T5: Checkpoint 更新到 T2(大于 flushTs)

如果使用 ts >= flushTs

  • 在 T4 时刻,T1 >= T1 为真,会重置 flushTs
  • 但此时 SyncTask 可能还未完成,导致数据不一致

使用 ts > flushTs

  • 在 T4 时刻,T1 > T1 为假,不会重置
  • 在 T5 时刻,T2 > T1 为真,此时 SyncTask 已完成,安全重置

2. 边界情况处理

场景:检查点时间戳等于 flushTs

1
2
flushTs = 1000
checkpoint = 1000 // 刚好等于

问题: 此时数据可能还在同步中,不能确定刷新已完成

解决: 等待检查点超过 flushTs,确保刷新已完成

3. 数据一致性保证

使用严格大于的好处:

  • 确保刷新操作完全完成
  • 避免过早重置 flushTs
  • 保证数据一致性

对比:

  • GetFlushTsPolicy 使用 ts >= flushTs:触发刷新(包含边界)
  • NotifyCheckpointUpdated 使用 ts > flushTs:确认完成(排除边界)

Checkpoint 失败处理

失败场景

  1. 网络故障: RPC 调用失败
  2. DataCoord 不可用: 服务暂时不可用
  3. 超时: RPC 超时

处理机制

1
2
3
4
5
err := ccu.broker.UpdateChannelCheckpoint(ctx, channelCPs)
if err != nil {
log.Warn("update channel checkpoint failed", zap.Error(err))
return // 不删除任务,等待重试
}

特点:

  • 失败时不删除任务
  • 任务保留在 ccu.tasks
  • 下次 execute()notifyChan 触发时自动重试

任务合并

1
2
3
4
5
6
7
8
// 合并逻辑:取最大时间戳,保留刷新标志
if task.pos.GetTimestamp() < channelPos.GetTimestamp() || (flush && !task.flush) {
ccu.tasks[channel] = &channelCPUpdateTask{
pos: max(channelPos, task.pos),
callback: callback,
flush: flush || task.flush,
}
}

优势:

  • 避免重复更新
  • 确保使用最新的检查点
  • 保留刷新触发标志

数据一致性保证

1. 刷新完成确认

1
2
3
4
5
6
7
8
9
FlushChannel(flushTs=T1)

SyncTask 执行(异步)

Checkpoint 更新到 T2 (T2 > T1)

NotifyCheckpointUpdated(T2)

重置 flushTs(确认刷新完成)

2. 故障恢复

1
2
3
4
5
6
7
节点重启

从 DataCoord 获取检查点

从检查点位置恢复数据消费

确保数据不丢失、不重复

3. 检查点推进规则

  • 有未同步数据: 检查点不能超过最早未同步数据的起始位置
  • 无未同步数据: 检查点推进到最新位置

监控与调试

关键指标

  • DataNodeFlushBufferCount: 刷新计数
  • DataNodeFlushedSize: 刷新数据大小
  • DataNodeSave2StorageLatency: 存储延迟
  • UpdateChannelCheckpoint: 检查点更新次数

日志关键点

1
2
3
4
5
6
7
8
// Flush 触发
log.Info("receive manual flush message", zap.Uint64("flushTs", flushTs))

// 检查点更新
log.Info("reset channel flushTs", zap.String("channel", channel))

// 检查点更新成功
log.Debug("UpdateChannelCheckpoint success", zap.Uint64("cpTs", ts))

最佳实践

1. Flush 时机

  • 手动刷新: 数据导入完成后
  • 自动刷新: 基于内存阈值和时间策略

2. Checkpoint 更新频率

  • 定期更新: 避免过于频繁的 RPC
  • 刷新触发: 立即更新,加速刷新完成确认

3. 故障处理

  • 重试机制: 自动重试失败的检查点更新
  • 任务合并: 避免重复更新

相关文档