Milvus FlowGraph 管理器(fgManagerImpl)

概述

fgManagerImpl 是 Milvus DataNode 中管理多个数据同步服务(DataSyncService)的核心组件。它负责按 Channel 组织和管理 FlowGraph 实例,实现多通道并发数据处理。

架构设计

核心结构

1
2
3
4
5
type fgManagerImpl struct {
ctx context.Context
cancelFunc context.CancelFunc
flowgraphs *typeutil.ConcurrentMap[string, *DataSyncService]
}

关键字段说明:

  • ctx: 上下文,用于控制生命周期
  • cancelFunc: 取消函数,用于优雅关闭
  • flowgraphs: 线程安全的映射表,key 为通道名,value 为对应的 DataSyncService

接口定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type FlowgraphManager interface {
AddFlowgraph(ds *DataSyncService)
RemoveFlowgraph(channel string)
ClearFlowgraphs()

GetFlowgraphService(channel string) (*DataSyncService, bool)
HasFlowgraph(channel string) bool
HasFlowgraphWithOpID(channel string, opID int64) bool
GetFlowgraphCount() int
GetCollectionIDs() []int64

GetChannelsJSON(collectionID int64) string
GetSegmentsJSON(collectionID int64) string
Close()
}

核心功能

1. FlowGraph 生命周期管理

添加 FlowGraph

1
2
3
4
func (fm *fgManagerImpl) AddFlowgraph(ds *DataSyncService) {
fm.flowgraphs.Insert(ds.vchannelName, ds)
metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
}

功能:

  • 将 DataSyncService 注册到管理器
  • 更新监控指标

移除 FlowGraph

1
2
3
4
5
6
7
8
9
func (fm *fgManagerImpl) RemoveFlowgraph(channel string) {
if fg, loaded := fm.flowgraphs.Get(channel); loaded {
fg.close()
fm.flowgraphs.Remove(channel)

metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
util.GetRateCollector().RemoveFlowGraphChannel(channel)
}
}

功能:

  • 关闭 DataSyncService
  • 从管理器中移除
  • 清理相关资源(指标、速率收集器)

清空所有 FlowGraph

1
2
3
4
5
6
7
8
9
10
func (fm *fgManagerImpl) ClearFlowgraphs() {
log.Info("start drop all flowgraph resources in DataNode")
fm.flowgraphs.Range(func(key string, value *DataSyncService) bool {
value.GracefullyClose()
fm.flowgraphs.GetAndRemove(key)

log.Info("successfully dropped flowgraph", zap.String("vChannelName", key))
return true
})
}

功能:

  • 优雅关闭所有 FlowGraph
  • 清理所有资源

2. 查询功能

获取 FlowGraph 服务

1
2
3
func (fm *fgManagerImpl) GetFlowgraphService(channel string) (*DataSyncService, bool) {
return fm.flowgraphs.Get(channel)
}

检查 FlowGraph 是否存在

1
2
3
4
func (fm *fgManagerImpl) HasFlowgraph(channel string) bool {
_, exist := fm.flowgraphs.Get(channel)
return exist
}

按操作 ID 检查

1
2
3
4
func (fm *fgManagerImpl) HasFlowgraphWithOpID(channel string, opID typeutil.UniqueID) bool {
ds, exist := fm.flowgraphs.Get(channel)
return exist && ds.opID == opID
}

用途: 确保 FlowGraph 的操作 ID 匹配,避免使用过期的 FlowGraph

3. 统计信息

获取集合 ID 列表

1
2
3
4
5
6
7
8
func (fm *fgManagerImpl) GetCollectionIDs() []int64 {
collectionSet := typeutil.UniqueSet{}
fm.flowgraphs.Range(func(key string, value *DataSyncService) bool {
collectionSet.Insert(value.metacache.Collection())
return true
})
return collectionSet.Collect()
}

获取通道 JSON

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (fm *fgManagerImpl) GetChannelsJSON(collectionID int64) string {
var channels []*metricsinfo.Channel
fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool {
if collectionID > 0 && ds.metacache.Collection() != collectionID {
return true
}
latestTimeTick := ds.timetickSender.GetLatestTimestamp(ch)
channels = append(channels, &metricsinfo.Channel{
Name: ch,
WatchState: ds.fg.Status(),
LatestTimeTick: tsoutil.PhysicalTimeFormat(latestTimeTick),
NodeID: paramtable.GetNodeID(),
CollectionID: ds.metacache.Collection(),
})
return true
})
// ... 序列化为 JSON
}

返回信息:

  • 通道名称
  • FlowGraph 状态
  • 最新时间戳
  • 节点 ID
  • 集合 ID

获取段 JSON

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (fm *fgManagerImpl) GetSegmentsJSON(collectionID int64) string {
var segments []*metricsinfo.Segment
fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool {
if collectionID > 0 && ds.metacache.Collection() != collectionID {
return true
}

meta := ds.metacache
for _, segment := range meta.GetSegmentsBy() {
segments = append(segments, &metricsinfo.Segment{
SegmentID: segment.SegmentID(),
CollectionID: meta.Collection(),
PartitionID: segment.PartitionID(),
Channel: ch,
State: segment.State().String(),
Level: segment.Level().String(),
NodeID: paramtable.GetNodeID(),
NumOfRows: segment.NumOfRows(),
FlushedRows: segment.FlushedRows(),
SyncBufferRows: segment.BufferRows(),
SyncingRows: segment.SyncingRows(),
})
}
return true
})
// ... 序列化为 JSON
}

设计特点

1. 线程安全

使用 typeutil.ConcurrentMap 实现线程安全的并发访问,支持多 goroutine 同时操作。

2. 资源管理

  • 通过 context.Context 统一管理生命周期
  • 提供优雅关闭机制(GracefullyClose
  • 自动清理监控指标和资源

3. 监控集成

  • 集成 Prometheus 指标
  • 提供 JSON 格式的统计信息
  • 支持按集合过滤

使用场景

场景 1: 通道注册

当 DataCoord 分配新通道给 DataNode 时:

1
2
3
4
5
6
7
8
// 创建 DataSyncService
ds, err := NewDataSyncService(ctx, params, watchInfo, tickler)

// 注册到管理器
fgManager.AddFlowgraph(ds)

// 启动 FlowGraph
ds.Start()

场景 2: 通道释放

当通道需要迁移到其他节点时:

1
2
// 移除 FlowGraph
fgManager.RemoveFlowgraph(channelName)

场景 3: 节点关闭

当 DataNode 关闭时:

1
2
3
4
5
// 清空所有 FlowGraph
fgManager.ClearFlowgraphs()

// 关闭管理器
fgManager.Close()

与其他组件的关系

1
2
3
4
5
6
7
8
9
10
fgManagerImpl

├── DataSyncService (按通道组织)
│ │
│ ├── FlowGraph (消息处理管道)
│ ├── MetaCache (段元数据缓存)
│ ├── WriteBufferManager (数据缓冲管理器)
│ └── Broker (与 DataCoord 通信)

└── Metrics (监控指标)

相关文档