Milvus Query Request 处理流程分析

本文档详细分析了 Milvus 中 Query Request 从接收到返回的完整处理流程。

1. 请求入口层

1.1 HTTP 接口入口

文件: internal/distributed/proxy/httpserver/handler_v1.go, handler_v2.go

  • V1 API: HandlersV1.query() (line 496)

    • 接收 HTTP JSON 请求
    • 解析参数:collectionName, filter, outputFields, limit, offset
    • 转换为 milvuspb.QueryRequest
    • 调用 proxy.Query()
  • V2 API: HandlersV2.query() (line 912)

    • 支持更多参数:partitionNames, consistencyLevel, exprParams
    • 处理时区转换和表达式模板

1.2 gRPC 接口入口

文件: internal/distributed/proxy/httpserver/handler.go

  • handleQuery() (line 384)
    • 直接接收 milvuspb.QueryRequest
    • 调用 proxy.Query()

2. Proxy 层处理

2.1 Query 方法入口

文件: internal/proxy/impl.go

方法: Proxy.Query() (line 3554)

主要步骤:

  1. 创建 queryTask 对象
  2. 初始化 RetrieveRequest 基础信息
  3. 记录指标(metrics)
  4. 调用 query() 方法执行实际查询
1
2
3
4
5
6
7
8
qt := &queryTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
RetrieveRequest: &internalpb.RetrieveRequest{...},
request: request,
...
}
res, storageCost, err := node.query(ctx, qt, sp)

2.2 query 方法执行

文件: internal/proxy/impl.go

方法: Proxy.query() (line 3447)

主要步骤:

  1. 健康检查
  2. 记录日志和追踪信息
  3. 将任务加入队列: node.sched.dqQueue.Enqueue(qt)
  4. 等待任务完成: qt.WaitToFinish()
  5. 记录慢查询和指标

2.3 QueryTask 执行流程

文件: internal/proxy/task_query.go

QueryTask 遵循标准的任务执行模式:PreExecute → Execute → PostExecute

2.3.1 PreExecute 阶段

方法: queryTask.PreExecute() (line 369)

主要职责:

  1. 验证和获取元数据

    • 验证 collection 名称
    • 获取 collection ID 和 schema
    • 验证 partition 名称
    • 检查 partition key 模式
  2. 解析查询参数

    • 解析 limit, offset, timezone, extractTimeFields
    • 处理 iterator 模式
    • 验证最大查询窗口
  3. 创建查询计划

    • 调用 createPlanArgs() (line 299)
    • 解析表达式:planparserv2.CreateRetrievePlanArgs()
    • 处理 count(*) 特殊逻辑
    • 转换输出字段名称为字段 ID
  4. 设置时间戳

    • 根据一致性级别计算 guaranteeTimestamp
    • 处理 mvccTimestamp
    • 处理 collection TTL
    • 设置超时时间戳
  5. 序列化查询计划

    • 将 PlanNode 序列化为 SerializedExprPlan

2.3.2 Execute 阶段

方法: queryTask.Execute() (line 587)

主要职责:

  1. 初始化结果缓冲区

    1
    t.resultBuf = typeutil.NewConcurrentSet[*internalpb.RetrieveResults]()
  2. 负载均衡执行

    • 调用 t.lb.Execute() (line 595)
    • 对每个 shard 调用 queryShard() 方法
    • 使用 shardclient.CollectionWorkLoad 管理负载
  3. queryShard 方法 (line 704)

    • 构建 querypb.QueryRequest
    • 设置目标节点 ID
    • 处理 MVCC 时间戳覆盖
    • 调用 qn.Query() 发送到 QueryNode
    • 收集结果到 resultBuf

2.3.3 PostExecute 阶段

方法: queryTask.PostExecute() (line 611)

主要职责:

  1. 收集所有结果

    • resultBuf 收集所有 RetrieveResults
    • 统计总查询数量和数据大小
  2. 结果合并(Reduce)

    • 创建 reducer: createMilvusReducer()
    • 调用 reducer.Reduce() 合并多个 shard 的结果
    • 应用 limit 和 offset
    • 处理排序
  3. 结果后处理

    • 验证 geometry 字段
    • 重构 struct 字段数据
    • 时区转换(timestamp 转 ISO 字符串)
    • 提取时间字段(如果指定)
  4. 设置最终结果

    • 设置输出字段名称
    • 设置主键字段名称
    • 处理 iterator session timestamp

3. QueryNode 层处理

3.1 Query 方法入口

文件: internal/querynodev2/services.go

方法: QueryNode.Query() (line 972)

主要步骤:

  1. 健康检查

    • 检查节点生命周期状态
    • 检查 collection 是否已加载
  2. 并发处理多个 Channel

    • 为每个 DML channel 创建独立的 goroutine
    • 调用 queryChannel() 处理每个 channel
    • 使用 errgroup 管理并发
  3. 结果合并

    • 收集所有 channel 的结果
    • 使用 segments.CreateInternalReducer 合并结果
    • 返回最终结果

3.2 queryChannel 方法

文件: internal/querynodev2/handlers.go

方法: QueryNode.queryChannel() (line 218)

主要步骤:

  1. 获取 Shard Delegator

    1
    sd, ok := node.delegators.Get(channel)
  2. 调用 Delegator 查询

    1
    results, err := sd.Query(queryCtx, req)
  3. 结果合并

    • 使用 segments.ReduceRetrieveResults() 合并结果
    • 处理 collection 引用计数

3.3 Delegator Query 方法

文件: internal/querynodev2/delegator/delegator.go

方法: shardDelegator.Query() (line 577)

主要步骤:

  1. 验证请求

    • 检查 channel 是否匹配
    • 检查 delegator 是否可用
  2. 处理时间戳

    • 调用 speedupGuranteeTS() 优化 guarantee timestamp
    • 等待 tSafe(根据 partialResultRequiredDataRatio 配置)
    • 设置 MVCC timestamp
  3. 获取可读 Segment

    1
    2
    sealed, growing, sealedRowCount, version, err := 
    sd.distribution.PinReadableSegments(partialResultRequiredDataRatio, ...)
    • 获取 sealed segments 和 growing segments
    • Pin 操作防止 segment 被卸载
  4. Segment 剪枝(可选)

    • 如果启用 EnableSegmentPrune
    • 根据统计信息剪枝不相关的 segments
  5. 组织子任务

    1
    tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, ...)
    • 将 segments 组织成查询任务
    • 分配任务到不同的 worker
  6. 执行子任务

    1
    2
    results, err := executeSubTasks(ctx, tasks, evaluator, 
    func(ctx, req, worker) { worker.QuerySegments(ctx, req) }, ...)
    • 并发执行所有子任务
    • 使用 RowCountBasedEvaluator 评估部分结果
    • 处理失败情况(部分结果模式)
  7. 返回结果

    • 返回所有成功的查询结果
    • Unpin segments

4. 数据流图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Client Request (HTTP/gRPC)

Proxy Handler (handler.go/handler_v1.go/handler_v2.go)

Proxy.Query() [impl.go:3554]

Proxy.query() [impl.go:3447]

Enqueue to dqQueue

QueryTask.PreExecute() [task_query.go:369]
├─ 验证请求
├─ 获取元数据
├─ 创建查询计划
└─ 设置时间戳

QueryTask.Execute() [task_query.go:587]
├─ Load Balancer 选择节点
└─ queryShard() [task_query.go:704]

QueryNode.Query() [services.go:972]
├─ 并发处理多个 Channel
└─ queryChannel() [handlers.go:218]

Delegator.Query() [delegator.go:577]
├─ 等待 tSafe
├─ Pin Readable Segments
├─ 组织子任务
└─ executeSubTasks()

Worker.QuerySegments()

实际查询 Segment 数据

QueryTask.PostExecute() [task_query.go:611]
├─ 收集结果
├─ Reduce 合并结果
└─ 后处理(时区转换等)

返回结果给 Client

5. 关键组件说明

5.1 Load Balancer

位置: internal/proxy/shardclient

  • 负责选择 QueryNode 节点
  • 支持多种负载均衡策略
  • 管理 shard client 连接

5.2 Query Plan Parser

位置: internal/parser/planparserv2

  • 解析查询表达式(如 id > 100
  • 生成执行计划(PlanNode)
  • 优化查询计划

5.3 Reducer

位置: internal/util/reduce

  • 合并多个 shard 的查询结果
  • 应用 limit 和 offset
  • 处理排序和去重

5.4 Delegator

位置: internal/querynodev2/delegator

  • 管理 shard 级别的查询逻辑
  • 组织 segment 查询任务
  • 处理部分结果和容错

5.5 Segment Manager

位置: internal/querynodev2/segments

  • 管理 segment 的生命周期
  • 提供 segment 查询接口
  • 处理 segment 的加载和卸载

6. 关键配置参数

  • ProxyCfg.SlowLogSpanInSeconds: 慢查询阈值
  • QueryNodeCfg.PartialResultRequiredDataRatio: 部分结果所需数据比例
  • QueryNodeCfg.EnableSegmentPrune: 是否启用 segment 剪枝
  • QueryNodeCfg.DefaultSegmentFilterRatio: 默认 segment 过滤比例

7. 性能优化点

  1. 并发处理: 多个 channel 和 segment 并发查询
  2. Segment 剪枝: 根据统计信息跳过不相关的 segments
  3. 部分结果: 支持返回部分结果以降低延迟
  4. 负载均衡: 智能选择 QueryNode 节点
  5. 结果缓存: 某些场景下可以缓存查询结果

8. 错误处理

  • Collection 未加载: 返回 CollectionNotLoaded 错误
  • Channel 不匹配: Delegator 验证 channel 是否属于自己
  • Segment 未加载: 标记 segment 为 offline,可能触发重试
  • 部分失败: 根据 partialResultRequiredDataRatio 决定是否返回部分结果

9. 监控指标

  • ProxyReceivedNQ: Proxy 接收的查询请求数
  • ProxySQLatency: Proxy 查询延迟
  • QueryNodeSQCount: QueryNode 查询计数
  • QueryNodeSQLatencyWaitTSafe: 等待 tSafe 的延迟
  • ProxyReduceResultLatency: 结果合并延迟