Milvus Query Request 处理流程分析
本文档详细分析了 Milvus 中 Query Request 从接收到返回的完整处理流程。
1. 请求入口层
1.1 HTTP 接口入口
文件: internal/distributed/proxy/httpserver/handler_v1.go, handler_v2.go
V1 API:
HandlersV1.query()(line 496)- 接收 HTTP JSON 请求
- 解析参数:
collectionName,filter,outputFields,limit,offset - 转换为
milvuspb.QueryRequest - 调用
proxy.Query()
V2 API:
HandlersV2.query()(line 912)- 支持更多参数:
partitionNames,consistencyLevel,exprParams - 处理时区转换和表达式模板
- 支持更多参数:
1.2 gRPC 接口入口
文件: internal/distributed/proxy/httpserver/handler.go
handleQuery()(line 384)- 直接接收
milvuspb.QueryRequest - 调用
proxy.Query()
- 直接接收
2. Proxy 层处理
2.1 Query 方法入口
文件: internal/proxy/impl.go
方法: Proxy.Query() (line 3554)
主要步骤:
- 创建
queryTask对象 - 初始化
RetrieveRequest基础信息 - 记录指标(metrics)
- 调用
query()方法执行实际查询
1 | qt := &queryTask{ |
2.2 query 方法执行
文件: internal/proxy/impl.go
方法: Proxy.query() (line 3447)
主要步骤:
- 健康检查
- 记录日志和追踪信息
- 将任务加入队列:
node.sched.dqQueue.Enqueue(qt) - 等待任务完成:
qt.WaitToFinish() - 记录慢查询和指标
2.3 QueryTask 执行流程
文件: internal/proxy/task_query.go
QueryTask 遵循标准的任务执行模式:PreExecute → Execute → PostExecute
2.3.1 PreExecute 阶段
方法: queryTask.PreExecute() (line 369)
主要职责:
验证和获取元数据
- 验证 collection 名称
- 获取 collection ID 和 schema
- 验证 partition 名称
- 检查 partition key 模式
解析查询参数
- 解析
limit,offset,timezone,extractTimeFields - 处理 iterator 模式
- 验证最大查询窗口
- 解析
创建查询计划
- 调用
createPlanArgs()(line 299) - 解析表达式:
planparserv2.CreateRetrievePlanArgs() - 处理 count(*) 特殊逻辑
- 转换输出字段名称为字段 ID
- 调用
设置时间戳
- 根据一致性级别计算
guaranteeTimestamp - 处理
mvccTimestamp - 处理 collection TTL
- 设置超时时间戳
- 根据一致性级别计算
序列化查询计划
- 将 PlanNode 序列化为
SerializedExprPlan
- 将 PlanNode 序列化为
2.3.2 Execute 阶段
方法: queryTask.Execute() (line 587)
主要职责:
初始化结果缓冲区
1
t.resultBuf = typeutil.NewConcurrentSet[*internalpb.RetrieveResults]()
负载均衡执行
- 调用
t.lb.Execute()(line 595) - 对每个 shard 调用
queryShard()方法 - 使用
shardclient.CollectionWorkLoad管理负载
- 调用
queryShard 方法 (line 704)
- 构建
querypb.QueryRequest - 设置目标节点 ID
- 处理 MVCC 时间戳覆盖
- 调用
qn.Query()发送到 QueryNode - 收集结果到
resultBuf
- 构建
2.3.3 PostExecute 阶段
方法: queryTask.PostExecute() (line 611)
主要职责:
收集所有结果
- 从
resultBuf收集所有RetrieveResults - 统计总查询数量和数据大小
- 从
结果合并(Reduce)
- 创建 reducer:
createMilvusReducer() - 调用
reducer.Reduce()合并多个 shard 的结果 - 应用 limit 和 offset
- 处理排序
- 创建 reducer:
结果后处理
- 验证 geometry 字段
- 重构 struct 字段数据
- 时区转换(timestamp 转 ISO 字符串)
- 提取时间字段(如果指定)
设置最终结果
- 设置输出字段名称
- 设置主键字段名称
- 处理 iterator session timestamp
3. QueryNode 层处理
3.1 Query 方法入口
文件: internal/querynodev2/services.go
方法: QueryNode.Query() (line 972)
主要步骤:
健康检查
- 检查节点生命周期状态
- 检查 collection 是否已加载
并发处理多个 Channel
- 为每个 DML channel 创建独立的 goroutine
- 调用
queryChannel()处理每个 channel - 使用
errgroup管理并发
结果合并
- 收集所有 channel 的结果
- 使用
segments.CreateInternalReducer合并结果 - 返回最终结果
3.2 queryChannel 方法
文件: internal/querynodev2/handlers.go
方法: QueryNode.queryChannel() (line 218)
主要步骤:
获取 Shard Delegator
1
sd, ok := node.delegators.Get(channel)
调用 Delegator 查询
1
results, err := sd.Query(queryCtx, req)
结果合并
- 使用
segments.ReduceRetrieveResults()合并结果 - 处理 collection 引用计数
- 使用
3.3 Delegator Query 方法
文件: internal/querynodev2/delegator/delegator.go
方法: shardDelegator.Query() (line 577)
主要步骤:
验证请求
- 检查 channel 是否匹配
- 检查 delegator 是否可用
处理时间戳
- 调用
speedupGuranteeTS()优化 guarantee timestamp - 等待 tSafe(根据
partialResultRequiredDataRatio配置) - 设置 MVCC timestamp
- 调用
获取可读 Segment
1
2sealed, growing, sealedRowCount, version, err :=
sd.distribution.PinReadableSegments(partialResultRequiredDataRatio, ...)- 获取 sealed segments 和 growing segments
- Pin 操作防止 segment 被卸载
Segment 剪枝(可选)
- 如果启用
EnableSegmentPrune - 根据统计信息剪枝不相关的 segments
- 如果启用
组织子任务
1
tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, ...)
- 将 segments 组织成查询任务
- 分配任务到不同的 worker
执行子任务
1
2results, err := executeSubTasks(ctx, tasks, evaluator,
func(ctx, req, worker) { worker.QuerySegments(ctx, req) }, ...)- 并发执行所有子任务
- 使用
RowCountBasedEvaluator评估部分结果 - 处理失败情况(部分结果模式)
返回结果
- 返回所有成功的查询结果
- Unpin segments
4. 数据流图
1 | Client Request (HTTP/gRPC) |
5. 关键组件说明
5.1 Load Balancer
位置: internal/proxy/shardclient
- 负责选择 QueryNode 节点
- 支持多种负载均衡策略
- 管理 shard client 连接
5.2 Query Plan Parser
位置: internal/parser/planparserv2
- 解析查询表达式(如
id > 100) - 生成执行计划(PlanNode)
- 优化查询计划
5.3 Reducer
位置: internal/util/reduce
- 合并多个 shard 的查询结果
- 应用 limit 和 offset
- 处理排序和去重
5.4 Delegator
位置: internal/querynodev2/delegator
- 管理 shard 级别的查询逻辑
- 组织 segment 查询任务
- 处理部分结果和容错
5.5 Segment Manager
位置: internal/querynodev2/segments
- 管理 segment 的生命周期
- 提供 segment 查询接口
- 处理 segment 的加载和卸载
6. 关键配置参数
ProxyCfg.SlowLogSpanInSeconds: 慢查询阈值QueryNodeCfg.PartialResultRequiredDataRatio: 部分结果所需数据比例QueryNodeCfg.EnableSegmentPrune: 是否启用 segment 剪枝QueryNodeCfg.DefaultSegmentFilterRatio: 默认 segment 过滤比例
7. 性能优化点
- 并发处理: 多个 channel 和 segment 并发查询
- Segment 剪枝: 根据统计信息跳过不相关的 segments
- 部分结果: 支持返回部分结果以降低延迟
- 负载均衡: 智能选择 QueryNode 节点
- 结果缓存: 某些场景下可以缓存查询结果
8. 错误处理
- Collection 未加载: 返回
CollectionNotLoaded错误 - Channel 不匹配: Delegator 验证 channel 是否属于自己
- Segment 未加载: 标记 segment 为 offline,可能触发重试
- 部分失败: 根据
partialResultRequiredDataRatio决定是否返回部分结果
9. 监控指标
ProxyReceivedNQ: Proxy 接收的查询请求数ProxySQLatency: Proxy 查询延迟QueryNodeSQCount: QueryNode 查询计数QueryNodeSQLatencyWaitTSafe: 等待 tSafe 的延迟ProxyReduceResultLatency: 结果合并延迟