概述
fgManagerImpl 是 Milvus DataNode 中管理多个数据同步服务(DataSyncService)的核心组件。它负责按 Channel 组织和管理 FlowGraph 实例,实现多通道并发数据处理。
架构设计
核心结构
1 2 3 4 5
| type fgManagerImpl struct { ctx context.Context cancelFunc context.CancelFunc flowgraphs *typeutil.ConcurrentMap[string, *DataSyncService] }
|
关键字段说明:
ctx: 上下文,用于控制生命周期
cancelFunc: 取消函数,用于优雅关闭
flowgraphs: 线程安全的映射表,key 为通道名,value 为对应的 DataSyncService
接口定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| type FlowgraphManager interface { AddFlowgraph(ds *DataSyncService) RemoveFlowgraph(channel string) ClearFlowgraphs() GetFlowgraphService(channel string) (*DataSyncService, bool) HasFlowgraph(channel string) bool HasFlowgraphWithOpID(channel string, opID int64) bool GetFlowgraphCount() int GetCollectionIDs() []int64 GetChannelsJSON(collectionID int64) string GetSegmentsJSON(collectionID int64) string Close() }
|
核心功能
1. FlowGraph 生命周期管理
添加 FlowGraph
1 2 3 4
| func (fm *fgManagerImpl) AddFlowgraph(ds *DataSyncService) { fm.flowgraphs.Insert(ds.vchannelName, ds) metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc() }
|
功能:
- 将 DataSyncService 注册到管理器
- 更新监控指标
移除 FlowGraph
1 2 3 4 5 6 7 8 9
| func (fm *fgManagerImpl) RemoveFlowgraph(channel string) { if fg, loaded := fm.flowgraphs.Get(channel); loaded { fg.close() fm.flowgraphs.Remove(channel) metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec() util.GetRateCollector().RemoveFlowGraphChannel(channel) } }
|
功能:
- 关闭 DataSyncService
- 从管理器中移除
- 清理相关资源(指标、速率收集器)
清空所有 FlowGraph
1 2 3 4 5 6 7 8 9 10
| func (fm *fgManagerImpl) ClearFlowgraphs() { log.Info("start drop all flowgraph resources in DataNode") fm.flowgraphs.Range(func(key string, value *DataSyncService) bool { value.GracefullyClose() fm.flowgraphs.GetAndRemove(key) log.Info("successfully dropped flowgraph", zap.String("vChannelName", key)) return true }) }
|
功能:
2. 查询功能
获取 FlowGraph 服务
1 2 3
| func (fm *fgManagerImpl) GetFlowgraphService(channel string) (*DataSyncService, bool) { return fm.flowgraphs.Get(channel) }
|
检查 FlowGraph 是否存在
1 2 3 4
| func (fm *fgManagerImpl) HasFlowgraph(channel string) bool { _, exist := fm.flowgraphs.Get(channel) return exist }
|
按操作 ID 检查
1 2 3 4
| func (fm *fgManagerImpl) HasFlowgraphWithOpID(channel string, opID typeutil.UniqueID) bool { ds, exist := fm.flowgraphs.Get(channel) return exist && ds.opID == opID }
|
用途: 确保 FlowGraph 的操作 ID 匹配,避免使用过期的 FlowGraph
3. 统计信息
获取集合 ID 列表
1 2 3 4 5 6 7 8
| func (fm *fgManagerImpl) GetCollectionIDs() []int64 { collectionSet := typeutil.UniqueSet{} fm.flowgraphs.Range(func(key string, value *DataSyncService) bool { collectionSet.Insert(value.metacache.Collection()) return true }) return collectionSet.Collect() }
|
获取通道 JSON
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func (fm *fgManagerImpl) GetChannelsJSON(collectionID int64) string { var channels []*metricsinfo.Channel fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool { if collectionID > 0 && ds.metacache.Collection() != collectionID { return true } latestTimeTick := ds.timetickSender.GetLatestTimestamp(ch) channels = append(channels, &metricsinfo.Channel{ Name: ch, WatchState: ds.fg.Status(), LatestTimeTick: tsoutil.PhysicalTimeFormat(latestTimeTick), NodeID: paramtable.GetNodeID(), CollectionID: ds.metacache.Collection(), }) return true }) }
|
返回信息:
- 通道名称
- FlowGraph 状态
- 最新时间戳
- 节点 ID
- 集合 ID
获取段 JSON
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| func (fm *fgManagerImpl) GetSegmentsJSON(collectionID int64) string { var segments []*metricsinfo.Segment fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool { if collectionID > 0 && ds.metacache.Collection() != collectionID { return true } meta := ds.metacache for _, segment := range meta.GetSegmentsBy() { segments = append(segments, &metricsinfo.Segment{ SegmentID: segment.SegmentID(), CollectionID: meta.Collection(), PartitionID: segment.PartitionID(), Channel: ch, State: segment.State().String(), Level: segment.Level().String(), NodeID: paramtable.GetNodeID(), NumOfRows: segment.NumOfRows(), FlushedRows: segment.FlushedRows(), SyncBufferRows: segment.BufferRows(), SyncingRows: segment.SyncingRows(), }) } return true }) }
|
设计特点
1. 线程安全
使用 typeutil.ConcurrentMap 实现线程安全的并发访问,支持多 goroutine 同时操作。
2. 资源管理
- 通过
context.Context 统一管理生命周期
- 提供优雅关闭机制(
GracefullyClose)
- 自动清理监控指标和资源
3. 监控集成
- 集成 Prometheus 指标
- 提供 JSON 格式的统计信息
- 支持按集合过滤
使用场景
场景 1: 通道注册
当 DataCoord 分配新通道给 DataNode 时:
1 2 3 4 5 6 7 8
| ds, err := NewDataSyncService(ctx, params, watchInfo, tickler)
fgManager.AddFlowgraph(ds)
ds.Start()
|
场景 2: 通道释放
当通道需要迁移到其他节点时:
1 2
| fgManager.RemoveFlowgraph(channelName)
|
场景 3: 节点关闭
当 DataNode 关闭时:
1 2 3 4 5
| fgManager.ClearFlowgraphs()
fgManager.Close()
|
与其他组件的关系
1 2 3 4 5 6 7 8 9 10
| fgManagerImpl │ ├── DataSyncService (按通道组织) │ │ │ ├── FlowGraph (消息处理管道) │ ├── MetaCache (段元数据缓存) │ ├── WriteBufferManager (数据缓冲管理器) │ └── Broker (与 DataCoord 通信) │ └── Metrics (监控指标)
|
相关文档