Milvus organizeSubTask 函数详细分析

本文档深入分析 organizeSubTask 函数的实现细节,这是 QueryNode delegator 中负责组织和分发子任务的核心函数。

函数签名

1
2
3
4
5
6
7
8
9
func organizeSubTask[T any](
ctx context.Context,
req T,
sealed []SnapshotItem,
growing []SegmentEntry,
sd *shardDelegator,
skipEmpty bool,
modify func(T, querypb.DataScope, []int64, int64) T,
) ([]subTask[T], error)

位置: internal/querynodev2/delegator/delegator.go:751

函数职责

organizeSubTask 是一个泛型函数,负责将查询/搜索请求组织成多个子任务,每个子任务对应一组需要在特定节点上查询的 segments。

参数说明

参数 类型 说明
ctx context.Context 上下文,用于取消和超时控制
req T (泛型) 原始请求,可以是 QueryRequestSearchRequestGetStatisticsRequest
sealed []SnapshotItem sealed segments 分布快照,按节点分组
growing []SegmentEntry growing segments 列表(通常在本地节点)
sd *shardDelegator shard delegator 实例,提供 workerManager 等资源
skipEmpty bool 是否跳过空的子任务(没有 segments)
modify func(...) 修改请求的回调函数,为每个子任务定制请求参数

返回值

  • []subTask[T]: 子任务列表,每个子任务包含请求、目标节点 ID 和 worker 客户端
  • error: 错误信息(当前实现总是返回 nil)

核心数据结构

subTask

1
2
3
4
5
type subTask[T any] struct {
req T // 修改后的请求(包含特定的 segmentIDs 和 targetID)
targetID int64 // 目标节点 ID
worker cluster.Worker // Worker 客户端(可能为 nil 如果节点不可用)
}

SnapshotItem

1
2
3
4
type SnapshotItem struct {
NodeID int64 // 节点 ID
Segments []SegmentEntry // 该节点上的 segments
}

说明: SnapshotItem 表示某个节点上的 sealed segments 分组。

SegmentEntry

1
2
3
4
5
6
7
8
9
type SegmentEntry struct {
NodeID int64 // 节点 ID
SegmentID UniqueID // Segment ID
PartitionID UniqueID // Partition ID
Version int64 // Segment 版本
TargetVersion int64 // 目标版本
Level datapb.SegmentLevel // Segment 级别(L0, L1 等)
Offline bool // 是否离线
}

执行流程详解

1. 初始化结果列表 (Line 760)

1
2
log := sd.getLogger(ctx)
result := make([]subTask[T], 0, len(sealed)+1)
  • 容量预分配: len(sealed)+1
    • len(sealed): 每个 sealed snapshot 一个任务
    • +1: growing segments 一个任务

2. 定义 packSubTask 闭包函数 (Lines 762-788)

packSubTask 是核心的任务打包函数,负责为一组 segments 创建子任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
packSubTask := func(segments []SegmentEntry, workerID int64, scope querypb.DataScope) error {
// 步骤 1: 提取 segment IDs
segmentIDs := lo.Map(segments, func(item SegmentEntry, _ int) int64 {
return item.SegmentID
})

// 步骤 2: 跳过空任务(可选)
if skipEmpty && len(segmentIDs) == 0 {
return nil
}

// 步骤 3: 修改请求
req := modify(req, scope, segmentIDs, workerID)

// 步骤 4: 获取 worker(容错)
worker, err := sd.workerManager.GetWorker(ctx, workerID)
if err != nil {
log.Warn("failed to get worker for sub task",
zap.Int64("nodeID", workerID),
zap.Int64s("segments", segmentIDs),
zap.Error(err),
)
// 注意:即使获取 worker 失败,仍然创建任务
}

// 步骤 5: 创建并添加子任务
result = append(result, subTask[T]{
req: req,
targetID: workerID,
worker: worker, // 可能为 nil
})
return nil
}

packSubTask 的关键特性

  1. Segment IDs 提取: 使用 lo.Map 提取所有 segment IDs
  2. 空任务处理: 根据 skipEmpty 决定是否跳过没有 segments 的任务
  3. 请求定制: 通过 modify 函数为每个子任务定制请求参数
  4. 容错设计: 即使 worker 获取失败,仍然创建任务(worker 为 nil),留给后续的 executeSubTasks 处理

3. 处理 Sealed Segments (Lines 790-795)

1
2
3
4
5
6
for _, entry := range sealed {
err := packSubTask(entry.Segments, entry.NodeID, querypb.DataScope_Historical)
if err != nil {
return nil, err
}
}

逻辑:

  • 遍历每个 SnapshotItem(每个代表一个节点上的 sealed segments)
  • 为每个节点创建一个子任务
  • Scope: DataScope_Historical(历史数据)
  • Worker ID: entry.NodeID(远程节点)

示例:

1
2
3
4
5
6
7
8
9
10
sealed = [
{NodeID: 1, Segments: [seg1, seg2, seg3]},
{NodeID: 2, Segments: [seg4, seg5]},
{NodeID: 3, Segments: [seg6]},
]

生成 3 个子任务:
- Task 1: NodeID=1, SegmentIDs=[seg1, seg2, seg3], Scope=Historical
- Task 2: NodeID=2, SegmentIDs=[seg4, seg5], Scope=Historical
- Task 3: NodeID=3, SegmentIDs=[seg6], Scope=Historical

4. 处理 Growing Segments (Line 797)

1
packSubTask(growing, paramtable.GetNodeID(), querypb.DataScope_Streaming)

逻辑:

  • 所有 growing segments 在一个子任务中
  • Scope: DataScope_Streaming(流式数据)
  • Worker ID: paramtable.GetNodeID()(本地节点)
  • Growing segments 总是在当前节点(leader)上

为什么在本地节点:

  • Growing segments 是正在写入的数据
  • 只有 shard leader(当前 delegator 所在节点)负责 growing segments
  • 不会分布到其他节点

5. 返回结果 (Line 799)

1
return result, nil

返回所有创建的子任务。

modify 函数详解

modify 函数是一个回调函数,用于为每个子任务定制请求参数。不同的操作类型有不同的实现。

Query 操作的 modify 函数

函数: shardDelegator.modifyQueryRequest (line 292)

1
2
3
4
5
6
7
8
9
10
11
12
13
func (sd *shardDelegator) modifyQueryRequest(
req *querypb.QueryRequest,
scope querypb.DataScope,
segmentIDs []int64,
targetID int64,
) *querypb.QueryRequest {
nodeReq := proto.Clone(req).(*querypb.QueryRequest)
nodeReq.Scope = scope // 设置 scope
nodeReq.Req.Base.TargetID = targetID // 设置目标节点
nodeReq.SegmentIDs = segmentIDs // 设置要查询的 segments
nodeReq.DmlChannels = []string{sd.vchannelName}
return nodeReq
}

Search 操作的 modify 函数

函数: shardDelegator.modifySearchRequest (line 280)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (sd *shardDelegator) modifySearchRequest(
req *querypb.SearchRequest,
scope querypb.DataScope,
segmentIDs []int64,
targetID int64,
) *querypb.SearchRequest {
nodeReq := &querypb.SearchRequest{
DmlChannels: []string{sd.vchannelName},
SegmentIDs: segmentIDs,
Scope: scope,
Req: sd.shallowCopySearchRequest(req.GetReq(), targetID),
FromShardLeader: req.FromShardLeader,
TotalChannelNum: req.TotalChannelNum,
}
return nodeReq
}

GetStatistics 操作的 modify 函数

使用: 内联定义 (line 717)

1
2
3
4
5
6
7
8
func(req *querypb.GetStatisticsRequest, scope querypb.DataScope, segmentIDs []int64, targetID int64) *querypb.GetStatisticsRequest {
nodeReq := proto.Clone(req).(*querypb.GetStatisticsRequest)
nodeReq.GetReq().GetBase().TargetID = targetID
nodeReq.Scope = scope
nodeReq.SegmentIDs = segmentIDs
nodeReq.FromShardLeader = true
return nodeReq
}

Worker 获取机制

WorkerManager.GetWorker

位置: internal/querynodev2/cluster/manager.go:47

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (m *grpcWorkerManager) GetWorker(ctx context.Context, nodeID int64) (Worker, error) {
// 1. 尝试从缓存获取
worker, ok := m.workers.Get(nodeID)
var err error

// 2. 如果不存在,使用 singleflight 创建(防止重复创建)
if !ok {
worker, err, _ = m.sf.Do(strconv.FormatInt(nodeID, 10), func() (Worker, error) {
// 使用 builder 创建 worker
worker, err = m.builder(ctx, nodeID)
if err != nil {
return nil, err
}

// 插入缓存(如果已存在则使用已有的)
old, exist := m.workers.GetOrInsert(nodeID, worker)
if exist {
worker.Stop()
worker = old
}
return worker, nil
})
if err != nil {
return nil, err
}
}

// 3. 检查 worker 健康状态
if !worker.IsHealthy() {
return nil, fmt.Errorf("node is not healthy: %d", nodeID)
}

return worker, nil
}

特性:

  • 缓存机制: 复用已创建的 worker
  • Singleflight: 防止并发创建同一个 worker
  • 健康检查: 确保 worker 可用

Worker 类型

本地 Worker (LocalWorker)

1
2
3
4
// 当 nodeID == 当前节点 ID 时创建
if nodeID == node.GetNodeID() {
return NewLocalWorker(node), nil
}
  • 直接调用本地 QueryNode 方法
  • 无网络开销

远程 Worker (RemoteWorker)

1
2
3
4
// 远程节点
return cluster.NewPoolingRemoteWorker(func() (types.QueryNodeClient, error) {
return grpcquerynodeclient.NewClient(node.ctx, addr, nodeID)
})
  • 通过 gRPC 调用远程 QueryNode
  • 支持连接池(WorkerPoolingSize 配置)
  • 使用 round-robin 选择连接

Worker 接口:

1
2
3
4
5
6
7
8
9
10
11
type Worker interface {
LoadSegments(context.Context, *querypb.LoadSegmentsRequest) error
ReleaseSegments(context.Context, *querypb.ReleaseSegmentsRequest) error
SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error)
QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error)
QueryStreamSegments(ctx context.Context, req *querypb.QueryRequest, srv streamrpc.QueryStreamServer) error
GetStatistics(ctx context.Context, req *querypb.GetStatisticsRequest) (*internalpb.GetStatisticsResponse, error)
// ...
IsHealthy() bool
Stop()
}

使用场景

1. Query 操作

1
tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, sd.modifyQueryRequest)
  • 用于普通查询操作
  • skipEmpty = true: 跳过没有 segments 的任务

2. QueryStream 操作

1
tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, sd.modifyQueryRequest)
  • 用于流式查询操作
  • 与普通查询使用相同的逻辑

3. Search 操作

1
tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, sd.modifySearchRequest)
  • 用于向量搜索操作
  • 使用 modifySearchRequest 定制请求

4. GetStatistics 操作

1
2
3
4
tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, 
func(req *querypb.GetStatisticsRequest, scope querypb.DataScope, segmentIDs []int64, targetID int64) *querypb.GetStatisticsRequest {
// 内联定制函数
})
  • 用于获取统计信息
  • 使用内联函数定制请求

5. UpdateSchema 操作

1
2
3
4
5
tasks, err := organizeSubTask(ctx, &querypb.UpdateSchemaRequest{...}, 
sealed, growing, sd, false, // skipEmpty = false
func(...) *querypb.UpdateSchemaRequest {
// 定制函数
})
  • 用于更新 schema
  • skipEmpty = false: 即使没有 segments 也创建任务(需要通知所有节点)

设计优势

1. 泛型设计

1
func organizeSubTask[T any](...)
  • 类型安全: 编译时类型检查
  • 代码复用: 同一函数支持多种请求类型
  • 灵活性: 通过 modify 函数定制不同类型的请求

2. 容错设计

1
2
3
4
5
worker, err := sd.workerManager.GetWorker(ctx, workerID)
if err != nil {
log.Warn("failed to get worker for sub task", ...)
// 继续创建任务,worker 为 nil
}
  • 即使 worker 不可用,仍然创建任务
  • 让后续的 executeSubTasks 决定如何处理(部分结果 vs 完全失败)

3. 关注点分离

  • organizeSubTask: 只负责组织任务,不执行
  • executeSubTasks: 负责执行任务和结果收集
  • modify 函数: 负责请求定制

4. 批量操作

  • 将同一节点上的所有 segments 组织到一个任务中
  • 减少 RPC 调用次数
  • 提高网络效率

性能考虑

1. 预分配容量

1
result := make([]subTask[T], 0, len(sealed)+1)

减少内存重新分配。

2. Worker 缓存

  • WorkerManager 缓存已创建的 worker
  • 避免重复建立 gRPC 连接

3. 连接池

1
poolSize := paramtable.Get().QueryNodeCfg.WorkerPoolingSize.GetAsInt()
  • 每个远程 worker 维护多个 gRPC 连接
  • Round-robin 选择连接,提高并发性能

4. 本地优化

1
2
3
if nodeID == node.GetNodeID() {
return NewLocalWorker(node), nil
}
  • 本地 worker 直接调用,无网络开销

错误处理

Worker 获取失败

1
2
3
4
if err != nil {
log.Warn("failed to get worker for sub task", ...)
// 不返回错误,继续创建任务
}

策略: 延迟错误处理

  • organizeSubTask 阶段只记录警告
  • executeSubTasks 阶段根据部分结果策略决定是否失败

空 Segment 列表

1
2
3
if skipEmpty && len(segmentIDs) == 0 {
return nil // 跳过任务
}

场景:

  • 某个节点上没有需要查询的 segments
  • 根据 skipEmpty 参数决定是否创建任务

完整示例

输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
sealed = [
{NodeID: 1, Segments: [
{SegmentID: 101, PartitionID: 1},
{SegmentID: 102, PartitionID: 1},
]},
{NodeID: 2, Segments: [
{SegmentID: 103, PartitionID: 2},
]},
]

growing = [
{SegmentID: 201, NodeID: 0, PartitionID: 1},
{SegmentID: 202, NodeID: 0, PartitionID: 2},
]

currentNodeID = 0

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
tasks = [
{
req: QueryRequest{
SegmentIDs: [101, 102],
Scope: DataScope_Historical,
TargetID: 1,
DmlChannels: ["channel-1"],
},
targetID: 1,
worker: RemoteWorker{nodeID: 1},
},
{
req: QueryRequest{
SegmentIDs: [103],
Scope: DataScope_Historical,
TargetID: 2,
DmlChannels: ["channel-1"],
},
targetID: 2,
worker: RemoteWorker{nodeID: 2},
},
{
req: QueryRequest{
SegmentIDs: [201, 202],
Scope: DataScope_Streaming,
TargetID: 0,
DmlChannels: ["channel-1"],
},
targetID: 0,
worker: LocalWorker{},
},
]

总结

organizeSubTask 是一个设计精巧的泛型函数,具有以下特点:

  1. 高度抽象: 通过泛型和回调函数支持多种请求类型
  2. 容错健壮: 即使部分 worker 不可用也能创建任务
  3. 性能优化: 批量操作、连接池、本地优化
  4. 关注点分离: 只负责组织任务,不负责执行
  5. 易于扩展: 添加新的请求类型只需提供新的 modify 函数

该函数是 Milvus 查询系统中任务分发机制的核心组件,确保了查询能够高效、可靠地分发到多个节点并行执行。