Milvus SyncManager 与 SyncTask 详解

概述

SyncManager 是 Milvus DataNode 中管理异步数据同步的核心组件,负责将内存中的数据持久化到对象存储。SyncTask 是执行同步的具体任务单元,封装了数据写入、元数据更新等完整流程。

SyncManager 架构

核心结构

1
2
3
4
5
6
7
8
type syncManager struct {
*keyLockDispatcher[int64]
chunkManager storage.ChunkManager

tasks *typeutil.ConcurrentMap[string, Task]
taskStats *expirable.LRU[string, Task]
handler config.EventHandler
}

关键字段说明:

  • keyLockDispatcher: 基于段 ID 的任务分发器,确保同一段的任务串行执行
  • chunkManager: 对象存储管理器
  • tasks: 当前执行中的任务映射
  • taskStats: 任务统计信息(LRU 缓存,15分钟过期)

接口定义

1
2
3
4
5
6
type SyncManager interface {
SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error)
SyncDataWithChunkManager(ctx context.Context, task Task, chunkManager storage.ChunkManager, callbacks ...func(error) error) (*conc.Future[struct{}], error)
Close() error
TaskStatsJSON() string
}

SyncManager 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func NewSyncManager(chunkManager storage.ChunkManager) SyncManager {
params := paramtable.Get()
cpuNum := hardware.GetCPUNum()

// 初始化工作池大小:CPU 核心数 × 每核心任务数
initPoolSize := cpuNum * params.DataNodeCfg.MaxParallelSyncMgrTasksPerCPUCore.GetAsInt()
dispatcher := newKeyLockDispatcher[int64](initPoolSize)

syncMgr := &syncManager{
keyLockDispatcher: dispatcher,
chunkManager: chunkManager,
tasks: typeutil.NewConcurrentMap[string, Task](),
taskStats: expirable.NewLRU[string, Task](64, nil, time.Minute*15),
}

// 监听配置变更,动态调整工作池大小
handler := config.NewHandler("datanode.syncmgr.poolsize", syncMgr.resizeHandler)
syncMgr.handler = handler
params.Watch(params.DataNodeCfg.MaxParallelSyncMgrTasksPerCPUCore.Key, handler)

return syncMgr
}

任务提交流程

SyncData 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
if mgr.workerPool.IsClosed() {
return nil, errors.New("sync manager is closed")
}

// 为 SyncTask 设置 ChunkManager
switch t := task.(type) {
case *SyncTask:
t.WithChunkManager(mgr.chunkManager)
}

return mgr.safeSubmitTask(ctx, task, callbacks...), nil
}

safeSubmitTask 方法

1
2
3
4
5
6
7
8
9
10
11
12
func (mgr *syncManager) safeSubmitTask(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
// 生成任务键:segmentID-timestamp
taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp())

// 记录任务
mgr.tasks.Insert(taskKey, task)
mgr.taskStats.Add(taskKey, task)

// 使用段 ID 作为分发键,确保同一段的任务串行执行
key := task.SegmentID()
return mgr.submit(ctx, key, task, callbacks...)
}

submit 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (mgr *syncManager) submit(ctx context.Context, key int64, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
handler := func(err error) error {
taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp())
defer func() {
mgr.tasks.Remove(taskKey) // 任务完成后移除
}()

if err == nil {
return nil
}

task.HandleError(err) // 处理错误
return err
}

callbacks = append([]func(error) error{handler}, callbacks...)
return mgr.Submit(ctx, key, task, callbacks...)
}

SyncTask 详解

核心结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
type SyncTask struct {
chunkManager storage.ChunkManager
allocator allocator.Interface

collectionID int64
partitionID int64
segmentID int64
channelName string
startPosition *msgpb.MsgPosition
checkpoint *msgpb.MsgPosition
dataSource string
batchRows int64
level datapb.SegmentLevel

tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp

metacache metacache.MetaCache
metaWriter MetaWriter
schema *schemapb.CollectionSchema

pack *SyncPack

insertBinlogs map[int64]*datapb.FieldBinlog
statsBinlogs map[int64]*datapb.FieldBinlog
bm25Binlogs map[int64]*datapb.FieldBinlog
deltaBinlog *datapb.FieldBinlog

manifestPath string

writeRetryOpts []retry.Option
failureCallback func(err error)

tr *timerecord.TimeRecorder

flushedSize int64
execTime time.Duration

storageConfig *indexpb.StorageConfig
}

SyncPack(数据包)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type SyncPack struct {
metacache metacache.MetaCache
metawriter MetaWriter

// 数据
insertData []*storage.InsertData
deltaData *storage.DeleteData
bm25Stats map[int64]*storage.BM25Stats

// 统计信息
tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
startPosition *msgpb.MsgPosition
checkpoint *msgpb.MsgPosition
batchRows int64
dataSource string

// 标志
isFlush bool
isDrop bool

// 元数据
collectionID int64
partitionID int64
segmentID int64
channelName string
level datapb.SegmentLevel

errHandler func(err error)
}

SyncTask 执行流程

Run 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
func (t *SyncTask) Run(ctx context.Context) (err error) {
t.tr = timerecord.NewTimeRecorder("syncTask")
log := t.getLogger()

defer func() {
if err != nil {
t.HandleError(err)
}
}()

// 1. 检查段是否存在
segmentInfo, has := t.metacache.GetSegmentByID(t.segmentID)
if !has {
if t.pack.isDrop {
log.Info("segment dropped, discard sync task")
return nil
}
log.Warn("segment not found in metacache, may be already synced")
return nil
}

// 2. 获取列分组信息(StorageV2)
columnGroups := t.getColumnGroups(segmentInfo)

// 3. 写入数据到对象存储
switch segmentInfo.GetStorageVersion() {
case storage.StorageV2:
writer := NewBulkPackWriterV2(...)
t.insertBinlogs, t.deltaBinlog, t.statsBinlogs, t.bm25Binlogs,
t.manifestPath, t.flushedSize, err = writer.Write(ctx, t.pack)
default:
writer := NewBulkPackWriter(...)
t.insertBinlogs, t.deltaBinlog, t.statsBinlogs, t.bm25Binlogs,
t.flushedSize, err = writer.Write(ctx, t.pack)
}

if err != nil {
return err
}

// 4. 记录监控指标
metrics.DataNodeWriteDataCount.Add(float64(t.batchRows))
metrics.DataNodeFlushedSize.Add(float64(t.flushedSize))
metrics.DataNodeSave2StorageLatency.Observe(float64(t.tr.RecordSpan().Milliseconds()))

// 5. 更新元数据
if t.metaWriter != nil {
err = t.writeMeta(ctx)
if err != nil {
return err
}
}

// 6. 释放数据
t.pack.ReleaseData()

// 7. 更新 MetaCache
actions := []metacache.SegmentAction{metacache.FinishSyncing(t.batchRows)}
if columnGroups != nil {
actions = append(actions, metacache.UpdateCurrentSplit(columnGroups))
}
if t.pack.isFlush {
actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Flushed))
}
t.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...),
metacache.WithSegmentIDs(t.segmentID))

// 8. 处理删除段
if t.pack.isDrop {
t.metacache.RemoveSegments(metacache.WithSegmentIDs(t.segmentID))
}

t.execTime = t.tr.ElapseSpan()
log.Info("task done", zap.Int64("flushedSize", t.flushedSize),
zap.Duration("timeTaken", t.execTime))

return nil
}

执行流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
┌─────────────────────────────────────────┐
│ SyncTask.Run() │
└──────────────┬──────────────────────────┘


┌──────────────────────┐
│ 检查段是否存在 │
└──────────┬───────────┘


┌──────────────────────┐
│ 获取列分组信息 │
└──────────┬───────────┘


┌──────────────────────┐
│ 写入数据到对象存储 │
│ (BulkPackWriter) │
└──────────┬───────────┘


┌──────────────────────┐
│ 记录监控指标 │
└──────────┬───────────┘


┌──────────────────────┐
│ 更新元数据到 DataCoord│
│ (MetaWriter) │
└──────────┬───────────┘


┌──────────────────────┐
│ 释放数据包 │
└──────────┬───────────┘


┌──────────────────────┐
│ 更新本地 MetaCache │
└──────────┬───────────┘


┌──────────────────────┐
│ 任务完成 │
└──────────────────────┘

元数据更新(writeMeta)

1
2
3
func (t *SyncTask) writeMeta(ctx context.Context) error {
return t.metaWriter.UpdateSync(ctx, t)
}

MetaWriter 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error {
// 1. 构建 SaveBinlogPathsRequest
req := &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SaveBinlogPaths,
},
SegmentID: pack.segmentID,
CollectionID: pack.collectionID,
Field2BinlogPaths: pack.insertBinlogs,
Field2StatslogPaths: pack.statsBinlogs,
Field2BM25StatslogPaths: pack.bm25Binlogs,
Deltalogs: []*datapb.FieldBinlog{pack.deltaBinlog},
CheckPoints: checkPoints,
Flushed: pack.pack.isFlush, // 刷新标志
Dropped: pack.pack.isDrop, // 删除标志
// ...
}

// 2. 重试机制发送 RPC
err := retry.Handle(ctx, func() (bool, error) {
err := b.broker.SaveBinlogPaths(ctx, req)
return err != nil, err
})

return err
}

错误处理

HandleError 方法

1
2
3
4
5
6
7
8
9
10
11
12
func (t *SyncTask) HandleError(err error) {
// 调用失败回调
if t.failureCallback != nil {
t.failureCallback(err)
}

// 记录失败指标
metrics.DataNodeFlushBufferCount.WithLabelValues(metrics.FailLabel, t.level.String()).Inc()
if !t.pack.isFlush {
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(metrics.FailLabel, t.level.String()).Inc()
}
}

Builder 模式创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 创建 SyncPack
pack := &syncmgr.SyncPack{}
pack.WithInsertData(insertData).
WithDeleteData(deltaData).
WithCollectionID(collectionID).
WithSegmentID(segmentID).
WithCheckpoint(checkpoint).
WithFlush() // 标记为刷新

// 创建 SyncTask
task := syncmgr.NewSyncTask().
WithSyncPack(pack).
WithAllocator(allocator).
WithMetaWriter(metaWriter).
WithMetaCache(metaCache).
WithSchema(schema)

// 提交到 SyncManager
future, err := syncMgr.SyncData(ctx, task, callback)

设计特点

1. 异步执行

通过工作池实现异步任务执行,不阻塞主流程。

2. 串行保证

使用 keyLockDispatcher 确保同一段的任务串行执行,避免并发冲突。

3. 存储版本支持

支持 StorageV1 和 StorageV2 两种存储格式。

4. 列分组优化(StorageV2)

根据列统计信息动态分组,优化查询性能。

5. 完善的监控

记录详细的执行指标和性能数据。

相关文档