概述
ddNode(Data Distribution Node)是 FlowGraph 中的第一个处理节点,负责从消息流中过滤和分发消息。它处理插入消息、删除消息以及 DDL 消息(如 CreateSegment、Flush、DropCollection),确保只有有效的数据消息传递到后续节点。
架构设计
核心结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| type ddNode struct { BaseNode ctx context.Context collectionID typeutil.UniqueID vChannelName string dropMode atomic.Value msgHandler util.MsgHandler growingSegInfo map[typeutil.UniqueID]*datapb.SegmentInfo sealedSegInfo map[typeutil.UniqueID]*datapb.SegmentInfo droppedSegmentIDs []int64 }
|
关键字段说明:
dropMode: 原子值,标记是否处于删除模式
msgHandler: DDL 消息处理器
growingSegInfo: Growing 段信息映射
sealedSegInfo: Sealed 段信息映射
droppedSegmentIDs: 已删除段 ID 列表
消息处理流程
Operate 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| func (ddn *ddNode) Operate(in []Msg) []Msg { msMsg, ok := in[0].(*MsgStreamMsg) if !ok { return []Msg{} } if msMsg.IsCloseMsg() { return []Msg{&FlowGraphMsg{...}} } if load := ddn.dropMode.Load(); load != nil && load.(bool) { return []Msg{} } fgMsg := FlowGraphMsg{...} for _, msg := range msMsg.TsMessages() { switch msg.Type() { case commonpb.MsgType_DropCollection: case commonpb.MsgType_DropPartition: case commonpb.MsgType_Insert: case commonpb.MsgType_Delete: case commonpb.MsgType_CreateSegment: case commonpb.MsgType_FlushSegment: case commonpb.MsgType_ManualFlush: case commonpb.MsgType_AddCollectionField: case commonpb.MsgType_AlterCollection: } } return []Msg{&fgMsg} }
|
消息类型处理
1. Insert 消息处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| case commonpb.MsgType_Insert: imsg := msg.(*msgstream.InsertMsg) if imsg.CollectionID != ddn.collectionID { continue } if ddn.tryToFilterSegmentInsertMessages(imsg) { continue } metrics.DataNodeConsumeMsgCount.Inc() metrics.DataNodeConsumeMsgRowsCount.Add(float64(imsg.GetNumRows())) fgMsg.InsertMessages = append(fgMsg.InsertMessages, imsg)
|
2. Delete 消息处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| case commonpb.MsgType_Delete: dmsg := msg.(*msgstream.DeleteMsg) if dmsg.CollectionID != ddn.collectionID { continue } metrics.DataNodeConsumeMsgCount.Inc() metrics.DataNodeConsumeMsgRowsCount.Add(float64(dmsg.GetNumRows())) fgMsg.DeleteMessages = append(fgMsg.DeleteMessages, dmsg)
|
3. DDL 消息处理
CreateSegment
1 2 3 4 5
| case commonpb.MsgType_CreateSegment: createSegment := msg.(*adaptor.CreateSegmentMessageBody) if err := ddn.msgHandler.HandleCreateSegment(ddn.ctx, createSegment.CreateSegmentMessage); err != nil { log.Warn("handle create segment message failed", zap.Error(err)) }
|
FlushSegment
1 2 3 4 5
| case commonpb.MsgType_FlushSegment: flushMsg := msg.(*adaptor.FlushMessageBody) if err := ddn.msgHandler.HandleFlush(flushMsg.FlushMessage); err != nil { log.Warn("handle flush message failed", zap.Error(err)) }
|
ManualFlush
1 2 3 4 5
| case commonpb.MsgType_ManualFlush: manualFlushMsg := msg.(*adaptor.ManualFlushMessageBody) if err := ddn.msgHandler.HandleManualFlush(manualFlushMsg.ManualFlushMessage); err != nil { log.Warn("handle manual flush message failed", zap.Error(err)) }
|
消息过滤机制
tryToFilterSegmentInsertMessages
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| func (ddn *ddNode) tryToFilterSegmentInsertMessages(msg *msgstream.InsertMsg) bool { if msg.GetShardName() != ddn.vChannelName { return true } if ddn.isDropped(msg.GetSegmentID()) { return true } for segID, segInfo := range ddn.sealedSegInfo { if msg.EndTs() > segInfo.GetDmlPosition().GetTimestamp() { delete(ddn.sealedSegInfo, segID) } } if _, ok := ddn.sealedSegInfo[msg.GetSegmentID()]; ok { return true } if si, ok := ddn.growingSegInfo[msg.GetSegmentID()]; ok { if msg.EndTs() <= si.GetDmlPosition().GetTimestamp() { return true } delete(ddn.growingSegInfo, msg.GetSegmentID()) } return false }
|
过滤逻辑说明:
- Shard 名称检查: 确保消息属于当前通道
- 删除段过滤: 过滤掉已删除段的消息
- Sealed 段过滤: 过滤已封装的段,直到消息时间戳超过段的检查点
- Growing 段过滤: 过滤 Growing 段中已处理的消息
消息类型(MsgType)赋值流程
消息创建时的赋值
消息的 MsgType 在创建时通过 commonpbutil.WithMsgType 设置:
1 2 3 4 5 6 7 8 9 10 11
| msg := &msgstream.InsertMsg{ BaseMsg: msgstream.BaseMsg{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_Insert), commonpbutil.WithMsgID(...), commonpbutil.WithTimeStamp(...), ), }, InsertRequest: &msgpb.InsertRequest{...}, }
|
消息类型获取
1 2 3 4
| func (it *InsertMsg) Type() MsgType { return it.Base.MsgType }
|
消息流中的传递
1
| 消息创建 → 序列化 → 消息流 → MsgStreamMsg 包装 → ddNode.Operate → msg.Type()
|
初始化流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func newDDNode(ctx context.Context, collID typeutil.UniqueID, vChannelName string, droppedSegmentIDs []typeutil.UniqueID, sealedSegments []*datapb.SegmentInfo, growingSegments []*datapb.SegmentInfo, handler util.MsgHandler) *ddNode { dd := &ddNode{ ctx: ctx, collectionID: collID, sealedSegInfo: make(map[typeutil.UniqueID]*datapb.SegmentInfo, len(sealedSegments)), growingSegInfo: make(map[typeutil.UniqueID]*datapb.SegmentInfo, len(growingSegments)), droppedSegmentIDs: droppedSegmentIDs, vChannelName: vChannelName, msgHandler: handler, } for _, s := range sealedSegments { dd.sealedSegInfo[s.GetID()] = s } for _, s := range growingSegments { dd.growingSegInfo[s.GetID()] = s } dd.dropMode.Store(false) return dd }
|
监控指标
消费指标
DataNodeConsumeMsgCount: 消费消息数量
DataNodeConsumeMsgRowsCount: 消费行数
DataNodeConsumeBytesCount: 消费字节数
速率指标
InsertConsumeThroughput: 插入消息吞吐量
DeleteConsumeThroughput: 删除消息吞吐量
设计特点
1. 早期过滤
在 FlowGraph 的第一个节点就进行过滤,减少无效消息的传递和处理。
2. 状态管理
维护段的 Growing/Sealed/Dropped 状态,实现精确的消息过滤。
3. DDL 委托
将 DDL 消息处理委托给 MsgHandler,实现关注点分离。
4. 线程安全
使用原子操作管理 dropMode,支持并发访问。
相关文档