Milvus 数据消费 DataSyncService 详解

概述

DataSyncService 是 Milvus DataNode 中控制单个通道(Channel)数据同步的核心服务。它负责组装和管理 FlowGraph,协调各个组件完成数据从消息流到持久化存储的完整流程。

架构设计

核心结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type DataSyncService struct {
ctx context.Context
cancelFn context.CancelFunc
metacache metacache.MetaCache
opID int64
collectionID typeutil.UniqueID
vchannelName string
serverID typeutil.UniqueID

fg *flowgraph.TimeTickedFlowGraph // 内部 FlowGraph

broker broker.Broker
timetickSender util.StatsUpdater
dispClient msgdispatcher.Client
chunkManager storage.ChunkManager

stopOnce sync.Once
}

关键字段说明:

  • fg: TimeTickedFlowGraph,消息处理管道
  • metacache: 段元数据缓存
  • broker: 与 DataCoord 通信的代理
  • timetickSender: 时间戳发送器
  • dispClient: 消息分发客户端
  • chunkManager: 对象存储管理器

FlowGraph 组装流程

节点配置

1
2
3
4
5
6
7
8
type nodeConfig struct {
msFactory msgstream.Factory
collectionID typeutil.UniqueID
vChannelName string
metacache metacache.MetaCache
serverID typeutil.UniqueID
dropCallback func()
}

节点创建与组装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func getServiceWithChannel(...) (*DataSyncService, error) {
// 1. 创建 FlowGraph
fg := flowgraph.NewTimeTickedFlowGraph(params.Ctx)
nodeList := []flowgraph.Node{}

// 2. 创建输入节点
dmStreamNode := newDmInputNode(config, input)
nodeList = append(nodeList, dmStreamNode)

// 3. 创建 DD Node(数据分发节点)
ddNode := newDDNode(
params.Ctx,
collectionID,
channelName,
info.GetVchan().GetDroppedSegmentIds(),
flushed,
unflushed,
params.MsgHandler,
)
nodeList = append(nodeList, ddNode)

// 4. 创建 Embedding Node(如果启用)
if len(info.GetSchema().GetFunctions()) > 0 {
emNode, err := newEmbeddingNode(channelName, config.metacache)
if err != nil {
return nil, err
}
nodeList = append(nodeList, emNode)
}

// 5. 创建 Write Node
writeNode, err := newWriteNode(params.Ctx, params.WriteBufferManager, ds.timetickSender, config)
if err != nil {
return nil, err
}
nodeList = append(nodeList, writeNode)

// 6. 创建 TT Node(时间戳节点)
ttNode := newTTNode(config, params.WriteBufferManager, params.CheckpointUpdater)
nodeList = append(nodeList, ttNode)

// 7. 组装节点
if err := fg.AssembleNodes(nodeList...); err != nil {
return nil, err
}
ds.fg = fg

// 8. 注册通道到 WriteBufferManager
err = params.WriteBufferManager.Register(channelName, metacache,
writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(params.Broker, config.serverID)),
writebuffer.WithIDAllocator(params.Allocator),
writebuffer.WithTaskObserverCallback(wbTaskObserverCallback))

return ds, nil
}

FlowGraph 节点链路

1
2
3
4
5
6
7
8
9
输入节点 (dmStreamNode)

DD Node (ddNode) - 消息过滤和 DDL 处理

Embedding Node (emNode) - 可选,向量嵌入处理

Write Node (writeNode) - 数据缓冲

TT Node (ttNode) - Checkpoint 更新

MetaCache 初始化

从 Checkpoint 恢复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func initMetaCache(...) (metacache.MetaCache, error) {
// 1. 加载段统计信息(Bloom Filter)
loadSegmentStats := func(segType string, segments []*datapb.SegmentInfo) {
for _, item := range segments {
future := io.GetOrCreateStatsPool().Submit(func() (any, error) {
stats, err := compaction.LoadStats(initCtx, chunkManager, info.GetSchema(),
segment.GetID(), segment.GetStatslogs())
if err != nil {
return nil, err
}
segmentPks.Insert(segment.GetID(), pkoracle.NewBloomFilterSet(stats...))

// 加载 BM25 统计(仅 Growing 段)
if segType == "growing" && len(segment.GetBm25Statslogs()) > 0 {
bm25stats, err := compaction.LoadBM25Stats(...)
segmentBm25.Insert(segment.GetID(), bm25stats)
}
return struct{}{}, nil
})
futures = append(futures, future)
}
}

// 2. 加载 Growing 和 Sealed 段的统计
loadSegmentStats("growing", unflushed)
if !streamingutil.IsStreamingServiceEnabled() {
loadSegmentStats("sealed", flushed)
}

// 3. 等待所有统计加载完成
if err := conc.AwaitAll(futures...); err != nil {
return nil, err
}

// 4. 创建 MetaCache
pkStatsFactory := func(segment *datapb.SegmentInfo) pkoracle.PkStat {
pkStat, _ := segmentPks.Get(segment.GetID())
return pkStat
}

metacache := metacache.NewMetaCache(info, pkStatsFactory, bm25StatsFactor, schemaManager)
return metacache, nil
}

生命周期管理

启动 FlowGraph

1
2
3
4
5
6
7
8
func (dsService *DataSyncService) Start() {
if dsService.fg != nil {
log.Info("dataSyncService starting flow graph",
zap.Int64("collectionID", dsService.collectionID),
zap.String("vChanName", dsService.vchannelName))
dsService.fg.Start()
}
}

优雅关闭

1
2
3
4
5
6
7
func (dsService *DataSyncService) GracefullyClose() {
if dsService.fg != nil {
log.Info("dataSyncService gracefully closing flowgraph")
dsService.fg.SetCloseMethod(flowgraph.CloseGracefully)
dsService.close()
}
}

关闭流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (dsService *DataSyncService) close() {
dsService.stopOnce.Do(func() {
// 1. 注销消息分发客户端
if dsService.dispClient != nil {
dsService.dispClient.Deregister(dsService.vchannelName)
}

// 2. 关闭 FlowGraph
if dsService.fg != nil {
dsService.fg.Close()
}

// 3. 取消上下文
dsService.cancelFn()

// 4. 清理监控指标
pChan := funcutil.ToPhysicalChannel(dsService.vchannelName)
metrics.CleanupDataNodeCollectionMetrics(paramtable.GetNodeID(),
dsService.collectionID, pChan)
})
}

创建方式

标准 DataNode 模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func NewDataSyncService(initCtx context.Context, pipelineParams *util.PipelineParams, 
info *datapb.ChannelWatchInfo, tickler *util.Tickler) (*DataSyncService, error) {

// 1. 获取段信息
unflushedSegmentInfos, err := pipelineParams.Broker.GetSegmentInfo(...)
flushedSegmentInfos, err := pipelineParams.Broker.GetSegmentInfo(...)

// 2. 初始化 MetaCache
metaCache, err := getMetaCacheWithTickler(initCtx, pipelineParams, info,
tickler, unflushedSegmentInfos, flushedSegmentInfos)

// 3. 创建消息输入
input, err := createNewInputFromDispatcher(initCtx, pipelineParams.DispClient, ...)

// 4. 创建服务
ds, err := getServiceWithChannel(initCtx, pipelineParams, info, metaCache,
unflushedSegmentInfos, flushedSegmentInfos, input, nil, nil)

return ds, nil
}

Streaming Node 模式

1
2
3
4
5
6
func NewStreamingNodeDataSyncService(...) (*DataSyncService, error) {
// 使用简化的 MetaCache 初始化(不加载 Sealed 段的统计)
metaCache, err := getMetaCacheForStreaming(...)

return getServiceWithChannel(..., input, wbTaskObserverCallback, dropCallback)
}

组件依赖关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
DataSyncService

├── FlowGraph
│ ├── dmStreamNode (输入)
│ ├── ddNode (过滤)
│ ├── emNode (可选,嵌入)
│ ├── writeNode (缓冲)
│ └── ttNode (检查点)

├── MetaCache
│ ├── Segment 元数据
│ ├── Bloom Filter
│ └── BM25 统计

├── Broker
│ ├── 与 DataCoord 通信
│ └── 元数据更新

├── WriteBufferManager
│ └── 数据缓冲管理

└── ChunkManager
└── 对象存储访问

关键设计点

  1. 单通道单服务

每个 channel 对应一个独立的 DataSyncService,实现隔离和并发处理。

  1. 组件聚合

    DataSyncService 聚合了 FlowGraph、MetaCache、Broker 等核心组件,作为统一入口。

  2. 优雅关闭

    使用 sync.Once 确保关闭操作只执行一次,避免资源泄漏。

  3. 监控集成

    自动清理通道相关的监控指标,避免指标泄漏。

相关文档