本文档深入分析 organizeSubTask 函数的实现细节,这是 QueryNode delegator 中负责组织和分发子任务的核心函数。
函数签名
1 2 3 4 5 6 7 8 9
| func organizeSubTask[T any]( ctx context.Context, req T, sealed []SnapshotItem, growing []SegmentEntry, sd *shardDelegator, skipEmpty bool, modify func(T, querypb.DataScope, []int64, int64) T, ) ([]subTask[T], error)
|
位置: internal/querynodev2/delegator/delegator.go:751
函数职责
organizeSubTask 是一个泛型函数,负责将查询/搜索请求组织成多个子任务,每个子任务对应一组需要在特定节点上查询的 segments。
参数说明
| 参数 |
类型 |
说明 |
ctx |
context.Context |
上下文,用于取消和超时控制 |
req |
T (泛型) |
原始请求,可以是 QueryRequest、SearchRequest 或 GetStatisticsRequest |
sealed |
[]SnapshotItem |
sealed segments 分布快照,按节点分组 |
growing |
[]SegmentEntry |
growing segments 列表(通常在本地节点) |
sd |
*shardDelegator |
shard delegator 实例,提供 workerManager 等资源 |
skipEmpty |
bool |
是否跳过空的子任务(没有 segments) |
modify |
func(...) |
修改请求的回调函数,为每个子任务定制请求参数 |
返回值
[]subTask[T]: 子任务列表,每个子任务包含请求、目标节点 ID 和 worker 客户端
error: 错误信息(当前实现总是返回 nil)
核心数据结构
subTask
1 2 3 4 5
| type subTask[T any] struct { req T targetID int64 worker cluster.Worker }
|
SnapshotItem
1 2 3 4
| type SnapshotItem struct { NodeID int64 Segments []SegmentEntry }
|
说明: SnapshotItem 表示某个节点上的 sealed segments 分组。
SegmentEntry
1 2 3 4 5 6 7 8 9
| type SegmentEntry struct { NodeID int64 SegmentID UniqueID PartitionID UniqueID Version int64 TargetVersion int64 Level datapb.SegmentLevel Offline bool }
|
执行流程详解
1. 初始化结果列表 (Line 760)
1 2
| log := sd.getLogger(ctx) result := make([]subTask[T], 0, len(sealed)+1)
|
- 容量预分配:
len(sealed)+1
len(sealed): 每个 sealed snapshot 一个任务
+1: growing segments 一个任务
2. 定义 packSubTask 闭包函数 (Lines 762-788)
packSubTask 是核心的任务打包函数,负责为一组 segments 创建子任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| packSubTask := func(segments []SegmentEntry, workerID int64, scope querypb.DataScope) error { segmentIDs := lo.Map(segments, func(item SegmentEntry, _ int) int64 { return item.SegmentID }) if skipEmpty && len(segmentIDs) == 0 { return nil } req := modify(req, scope, segmentIDs, workerID) worker, err := sd.workerManager.GetWorker(ctx, workerID) if err != nil { log.Warn("failed to get worker for sub task", zap.Int64("nodeID", workerID), zap.Int64s("segments", segmentIDs), zap.Error(err), ) } result = append(result, subTask[T]{ req: req, targetID: workerID, worker: worker, }) return nil }
|
packSubTask 的关键特性
- Segment IDs 提取: 使用
lo.Map 提取所有 segment IDs
- 空任务处理: 根据
skipEmpty 决定是否跳过没有 segments 的任务
- 请求定制: 通过
modify 函数为每个子任务定制请求参数
- 容错设计: 即使 worker 获取失败,仍然创建任务(worker 为 nil),留给后续的
executeSubTasks 处理
3. 处理 Sealed Segments (Lines 790-795)
1 2 3 4 5 6
| for _, entry := range sealed { err := packSubTask(entry.Segments, entry.NodeID, querypb.DataScope_Historical) if err != nil { return nil, err } }
|
逻辑:
- 遍历每个
SnapshotItem(每个代表一个节点上的 sealed segments)
- 为每个节点创建一个子任务
- Scope:
DataScope_Historical(历史数据)
- Worker ID:
entry.NodeID(远程节点)
示例:
1 2 3 4 5 6 7 8 9 10
| sealed = [ {NodeID: 1, Segments: [seg1, seg2, seg3]}, {NodeID: 2, Segments: [seg4, seg5]}, {NodeID: 3, Segments: [seg6]}, ]
生成 3 个子任务: - Task 1: NodeID=1, SegmentIDs=[seg1, seg2, seg3], Scope=Historical - Task 2: NodeID=2, SegmentIDs=[seg4, seg5], Scope=Historical - Task 3: NodeID=3, SegmentIDs=[seg6], Scope=Historical
|
4. 处理 Growing Segments (Line 797)
1
| packSubTask(growing, paramtable.GetNodeID(), querypb.DataScope_Streaming)
|
逻辑:
- 所有 growing segments 在一个子任务中
- Scope:
DataScope_Streaming(流式数据)
- Worker ID:
paramtable.GetNodeID()(本地节点)
- Growing segments 总是在当前节点(leader)上
为什么在本地节点:
- Growing segments 是正在写入的数据
- 只有 shard leader(当前 delegator 所在节点)负责 growing segments
- 不会分布到其他节点
5. 返回结果 (Line 799)
返回所有创建的子任务。
modify 函数详解
modify 函数是一个回调函数,用于为每个子任务定制请求参数。不同的操作类型有不同的实现。
Query 操作的 modify 函数
函数: shardDelegator.modifyQueryRequest (line 292)
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (sd *shardDelegator) modifyQueryRequest( req *querypb.QueryRequest, scope querypb.DataScope, segmentIDs []int64, targetID int64, ) *querypb.QueryRequest { nodeReq := proto.Clone(req).(*querypb.QueryRequest) nodeReq.Scope = scope nodeReq.Req.Base.TargetID = targetID nodeReq.SegmentIDs = segmentIDs nodeReq.DmlChannels = []string{sd.vchannelName} return nodeReq }
|
Search 操作的 modify 函数
函数: shardDelegator.modifySearchRequest (line 280)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (sd *shardDelegator) modifySearchRequest( req *querypb.SearchRequest, scope querypb.DataScope, segmentIDs []int64, targetID int64, ) *querypb.SearchRequest { nodeReq := &querypb.SearchRequest{ DmlChannels: []string{sd.vchannelName}, SegmentIDs: segmentIDs, Scope: scope, Req: sd.shallowCopySearchRequest(req.GetReq(), targetID), FromShardLeader: req.FromShardLeader, TotalChannelNum: req.TotalChannelNum, } return nodeReq }
|
GetStatistics 操作的 modify 函数
使用: 内联定义 (line 717)
1 2 3 4 5 6 7 8
| func(req *querypb.GetStatisticsRequest, scope querypb.DataScope, segmentIDs []int64, targetID int64) *querypb.GetStatisticsRequest { nodeReq := proto.Clone(req).(*querypb.GetStatisticsRequest) nodeReq.GetReq().GetBase().TargetID = targetID nodeReq.Scope = scope nodeReq.SegmentIDs = segmentIDs nodeReq.FromShardLeader = true return nodeReq }
|
Worker 获取机制
WorkerManager.GetWorker
位置: internal/querynodev2/cluster/manager.go:47
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| func (m *grpcWorkerManager) GetWorker(ctx context.Context, nodeID int64) (Worker, error) { worker, ok := m.workers.Get(nodeID) var err error if !ok { worker, err, _ = m.sf.Do(strconv.FormatInt(nodeID, 10), func() (Worker, error) { worker, err = m.builder(ctx, nodeID) if err != nil { return nil, err } old, exist := m.workers.GetOrInsert(nodeID, worker) if exist { worker.Stop() worker = old } return worker, nil }) if err != nil { return nil, err } } if !worker.IsHealthy() { return nil, fmt.Errorf("node is not healthy: %d", nodeID) } return worker, nil }
|
特性:
- 缓存机制: 复用已创建的 worker
- Singleflight: 防止并发创建同一个 worker
- 健康检查: 确保 worker 可用
Worker 类型
本地 Worker (LocalWorker)
1 2 3 4
| if nodeID == node.GetNodeID() { return NewLocalWorker(node), nil }
|
- 直接调用本地 QueryNode 方法
- 无网络开销
远程 Worker (RemoteWorker)
1 2 3 4
| return cluster.NewPoolingRemoteWorker(func() (types.QueryNodeClient, error) { return grpcquerynodeclient.NewClient(node.ctx, addr, nodeID) })
|
- 通过 gRPC 调用远程 QueryNode
- 支持连接池(
WorkerPoolingSize 配置)
- 使用 round-robin 选择连接
Worker 接口:
1 2 3 4 5 6 7 8 9 10 11
| type Worker interface { LoadSegments(context.Context, *querypb.LoadSegmentsRequest) error ReleaseSegments(context.Context, *querypb.ReleaseSegmentsRequest) error SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) QueryStreamSegments(ctx context.Context, req *querypb.QueryRequest, srv streamrpc.QueryStreamServer) error GetStatistics(ctx context.Context, req *querypb.GetStatisticsRequest) (*internalpb.GetStatisticsResponse, error) IsHealthy() bool Stop() }
|
使用场景
1. Query 操作
1
| tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, sd.modifyQueryRequest)
|
- 用于普通查询操作
skipEmpty = true: 跳过没有 segments 的任务
2. QueryStream 操作
1
| tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, sd.modifyQueryRequest)
|
3. Search 操作
1
| tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, sd.modifySearchRequest)
|
- 用于向量搜索操作
- 使用
modifySearchRequest 定制请求
4. GetStatistics 操作
1 2 3 4
| tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, func(req *querypb.GetStatisticsRequest, scope querypb.DataScope, segmentIDs []int64, targetID int64) *querypb.GetStatisticsRequest { })
|
5. UpdateSchema 操作
1 2 3 4 5
| tasks, err := organizeSubTask(ctx, &querypb.UpdateSchemaRequest{...}, sealed, growing, sd, false, func(...) *querypb.UpdateSchemaRequest { })
|
- 用于更新 schema
skipEmpty = false: 即使没有 segments 也创建任务(需要通知所有节点)
设计优势
1. 泛型设计
1
| func organizeSubTask[T any](...)
|
- 类型安全: 编译时类型检查
- 代码复用: 同一函数支持多种请求类型
- 灵活性: 通过
modify 函数定制不同类型的请求
2. 容错设计
1 2 3 4 5
| worker, err := sd.workerManager.GetWorker(ctx, workerID) if err != nil { log.Warn("failed to get worker for sub task", ...) }
|
- 即使 worker 不可用,仍然创建任务
- 让后续的
executeSubTasks 决定如何处理(部分结果 vs 完全失败)
3. 关注点分离
- organizeSubTask: 只负责组织任务,不执行
- executeSubTasks: 负责执行任务和结果收集
- modify 函数: 负责请求定制
4. 批量操作
- 将同一节点上的所有 segments 组织到一个任务中
- 减少 RPC 调用次数
- 提高网络效率
性能考虑
1. 预分配容量
1
| result := make([]subTask[T], 0, len(sealed)+1)
|
减少内存重新分配。
2. Worker 缓存
- WorkerManager 缓存已创建的 worker
- 避免重复建立 gRPC 连接
3. 连接池
1
| poolSize := paramtable.Get().QueryNodeCfg.WorkerPoolingSize.GetAsInt()
|
- 每个远程 worker 维护多个 gRPC 连接
- Round-robin 选择连接,提高并发性能
4. 本地优化
1 2 3
| if nodeID == node.GetNodeID() { return NewLocalWorker(node), nil }
|
错误处理
Worker 获取失败
1 2 3 4
| if err != nil { log.Warn("failed to get worker for sub task", ...) }
|
策略: 延迟错误处理
- 在
organizeSubTask 阶段只记录警告
- 在
executeSubTasks 阶段根据部分结果策略决定是否失败
空 Segment 列表
1 2 3
| if skipEmpty && len(segmentIDs) == 0 { return nil }
|
场景:
- 某个节点上没有需要查询的 segments
- 根据
skipEmpty 参数决定是否创建任务
完整示例
输入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| sealed = [ {NodeID: 1, Segments: [ {SegmentID: 101, PartitionID: 1}, {SegmentID: 102, PartitionID: 1}, ]}, {NodeID: 2, Segments: [ {SegmentID: 103, PartitionID: 2}, ]}, ]
growing = [ {SegmentID: 201, NodeID: 0, PartitionID: 1}, {SegmentID: 202, NodeID: 0, PartitionID: 2}, ]
currentNodeID = 0
|
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| tasks = [ { req: QueryRequest{ SegmentIDs: [101, 102], Scope: DataScope_Historical, TargetID: 1, DmlChannels: ["channel-1"], }, targetID: 1, worker: RemoteWorker{nodeID: 1}, }, { req: QueryRequest{ SegmentIDs: [103], Scope: DataScope_Historical, TargetID: 2, DmlChannels: ["channel-1"], }, targetID: 2, worker: RemoteWorker{nodeID: 2}, }, { req: QueryRequest{ SegmentIDs: [201, 202], Scope: DataScope_Streaming, TargetID: 0, DmlChannels: ["channel-1"], }, targetID: 0, worker: LocalWorker{}, }, ]
|
总结
organizeSubTask 是一个设计精巧的泛型函数,具有以下特点:
- 高度抽象: 通过泛型和回调函数支持多种请求类型
- 容错健壮: 即使部分 worker 不可用也能创建任务
- 性能优化: 批量操作、连接池、本地优化
- 关注点分离: 只负责组织任务,不负责执行
- 易于扩展: 添加新的请求类型只需提供新的 modify 函数
该函数是 Milvus 查询系统中任务分发机制的核心组件,确保了查询能够高效、可靠地分发到多个节点并行执行。