Milvus DDNode(数据分发节点)详解

概述

ddNode(Data Distribution Node)是 FlowGraph 中的第一个处理节点,负责从消息流中过滤和分发消息。它处理插入消息、删除消息以及 DDL 消息(如 CreateSegment、Flush、DropCollection),确保只有有效的数据消息传递到后续节点。

架构设计

核心结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type ddNode struct {
BaseNode

ctx context.Context
collectionID typeutil.UniqueID
vChannelName string

dropMode atomic.Value
msgHandler util.MsgHandler

// 段信息缓存(用于过滤)
growingSegInfo map[typeutil.UniqueID]*datapb.SegmentInfo
sealedSegInfo map[typeutil.UniqueID]*datapb.SegmentInfo
droppedSegmentIDs []int64
}

关键字段说明:

  • dropMode: 原子值,标记是否处于删除模式
  • msgHandler: DDL 消息处理器
  • growingSegInfo: Growing 段信息映射
  • sealedSegInfo: Sealed 段信息映射
  • droppedSegmentIDs: 已删除段 ID 列表

消息处理流程

Operate 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (ddn *ddNode) Operate(in []Msg) []Msg {
msMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
return []Msg{}
}

// 1. 处理关闭消息
if msMsg.IsCloseMsg() {
return []Msg{&FlowGraphMsg{...}}
}

// 2. 检查删除模式
if load := ddn.dropMode.Load(); load != nil && load.(bool) {
return []Msg{}
}

// 3. 处理消息流中的每条消息
fgMsg := FlowGraphMsg{...}
for _, msg := range msMsg.TsMessages() {
switch msg.Type() {
case commonpb.MsgType_DropCollection:
// 处理删除集合消息
case commonpb.MsgType_DropPartition:
// 处理删除分区消息
case commonpb.MsgType_Insert:
// 处理插入消息
case commonpb.MsgType_Delete:
// 处理删除消息
case commonpb.MsgType_CreateSegment:
// 处理创建段消息
case commonpb.MsgType_FlushSegment:
// 处理刷新段消息
case commonpb.MsgType_ManualFlush:
// 处理手动刷新消息
case commonpb.MsgType_AddCollectionField:
// 处理 Schema 变更消息
case commonpb.MsgType_AlterCollection:
// 处理修改集合消息
}
}

return []Msg{&fgMsg}
}

消息类型处理

1. Insert 消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
case commonpb.MsgType_Insert:
imsg := msg.(*msgstream.InsertMsg)

// 检查集合 ID 匹配
if imsg.CollectionID != ddn.collectionID {
continue
}

// 过滤段消息
if ddn.tryToFilterSegmentInsertMessages(imsg) {
continue
}

// 记录指标
metrics.DataNodeConsumeMsgCount.Inc()
metrics.DataNodeConsumeMsgRowsCount.Add(float64(imsg.GetNumRows()))

// 添加到输出消息
fgMsg.InsertMessages = append(fgMsg.InsertMessages, imsg)

2. Delete 消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
case commonpb.MsgType_Delete:
dmsg := msg.(*msgstream.DeleteMsg)

// 检查集合 ID 匹配
if dmsg.CollectionID != ddn.collectionID {
continue
}

// 记录指标
metrics.DataNodeConsumeMsgCount.Inc()
metrics.DataNodeConsumeMsgRowsCount.Add(float64(dmsg.GetNumRows()))

// 添加到输出消息
fgMsg.DeleteMessages = append(fgMsg.DeleteMessages, dmsg)

3. DDL 消息处理

CreateSegment

1
2
3
4
5
case commonpb.MsgType_CreateSegment:
createSegment := msg.(*adaptor.CreateSegmentMessageBody)
if err := ddn.msgHandler.HandleCreateSegment(ddn.ctx, createSegment.CreateSegmentMessage); err != nil {
log.Warn("handle create segment message failed", zap.Error(err))
}

FlushSegment

1
2
3
4
5
case commonpb.MsgType_FlushSegment:
flushMsg := msg.(*adaptor.FlushMessageBody)
if err := ddn.msgHandler.HandleFlush(flushMsg.FlushMessage); err != nil {
log.Warn("handle flush message failed", zap.Error(err))
}

ManualFlush

1
2
3
4
5
case commonpb.MsgType_ManualFlush:
manualFlushMsg := msg.(*adaptor.ManualFlushMessageBody)
if err := ddn.msgHandler.HandleManualFlush(manualFlushMsg.ManualFlushMessage); err != nil {
log.Warn("handle manual flush message failed", zap.Error(err))
}

消息过滤机制

tryToFilterSegmentInsertMessages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (ddn *ddNode) tryToFilterSegmentInsertMessages(msg *msgstream.InsertMsg) bool {
// 1. 检查 Shard 名称匹配
if msg.GetShardName() != ddn.vChannelName {
return true
}

// 2. 过滤已删除的段
if ddn.isDropped(msg.GetSegmentID()) {
return true
}

// 3. 过滤 Sealed 段(直到当前时间戳超过段的检查点)
for segID, segInfo := range ddn.sealedSegInfo {
if msg.EndTs() > segInfo.GetDmlPosition().GetTimestamp() {
delete(ddn.sealedSegInfo, segID)
}
}
if _, ok := ddn.sealedSegInfo[msg.GetSegmentID()]; ok {
return true
}

// 4. 过滤 Growing 段(直到当前时间戳超过段的 DML 位置)
if si, ok := ddn.growingSegInfo[msg.GetSegmentID()]; ok {
if msg.EndTs() <= si.GetDmlPosition().GetTimestamp() {
return true
}
delete(ddn.growingSegInfo, msg.GetSegmentID())
}

return false
}

过滤逻辑说明:

  1. Shard 名称检查: 确保消息属于当前通道
  2. 删除段过滤: 过滤掉已删除段的消息
  3. Sealed 段过滤: 过滤已封装的段,直到消息时间戳超过段的检查点
  4. Growing 段过滤: 过滤 Growing 段中已处理的消息

消息类型(MsgType)赋值流程

消息创建时的赋值

消息的 MsgType 在创建时通过 commonpbutil.WithMsgType 设置:

1
2
3
4
5
6
7
8
9
10
11
// 在消息创建时
msg := &msgstream.InsertMsg{
BaseMsg: msgstream.BaseMsg{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_Insert),
commonpbutil.WithMsgID(...),
commonpbutil.WithTimeStamp(...),
),
},
InsertRequest: &msgpb.InsertRequest{...},
}

消息类型获取

1
2
3
4
// InsertMsg 实现
func (it *InsertMsg) Type() MsgType {
return it.Base.MsgType // 返回 Base 中的 MsgType
}

消息流中的传递

1
消息创建 → 序列化 → 消息流 → MsgStreamMsg 包装 → ddNode.Operate → msg.Type()

初始化流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func newDDNode(ctx context.Context, collID typeutil.UniqueID, vChannelName string,
droppedSegmentIDs []typeutil.UniqueID,
sealedSegments []*datapb.SegmentInfo,
growingSegments []*datapb.SegmentInfo,
handler util.MsgHandler) *ddNode {

dd := &ddNode{
ctx: ctx,
collectionID: collID,
sealedSegInfo: make(map[typeutil.UniqueID]*datapb.SegmentInfo, len(sealedSegments)),
growingSegInfo: make(map[typeutil.UniqueID]*datapb.SegmentInfo, len(growingSegments)),
droppedSegmentIDs: droppedSegmentIDs,
vChannelName: vChannelName,
msgHandler: handler,
}

// 初始化段信息
for _, s := range sealedSegments {
dd.sealedSegInfo[s.GetID()] = s
}

for _, s := range growingSegments {
dd.growingSegInfo[s.GetID()] = s
}

dd.dropMode.Store(false)
return dd
}

监控指标

消费指标

  • DataNodeConsumeMsgCount: 消费消息数量
  • DataNodeConsumeMsgRowsCount: 消费行数
  • DataNodeConsumeBytesCount: 消费字节数

速率指标

  • InsertConsumeThroughput: 插入消息吞吐量
  • DeleteConsumeThroughput: 删除消息吞吐量

设计特点

1. 早期过滤

在 FlowGraph 的第一个节点就进行过滤,减少无效消息的传递和处理。

2. 状态管理

维护段的 Growing/Sealed/Dropped 状态,实现精确的消息过滤。

3. DDL 委托

将 DDL 消息处理委托给 MsgHandler,实现关注点分离。

4. 线程安全

使用原子操作管理 dropMode,支持并发访问。

相关文档