目录
- 概述
- 搜索任务入口
- Segment 搜索执行
- SearchRequest 和 Plan 参数
- Segment Search 内部执行
- 结果归约(Reduce)
- 高级搜索(Advanced Search)
- 完整流程总结
概述
本文档全面分析 Milvus 的搜索流程,从 QueryNode 接收搜索请求开始,到返回最终结果为止。涵盖搜索任务的执行、segment 搜索、计划节点执行、结果归约等各个环节。
搜索流程概览
1
| Proxy → QueryNode → SearchTask → SearchSegments → Segment.Search → Reduce → Result
|
搜索任务入口
1. SearchTask.Execute
SearchTask.Execute 是普通搜索任务的执行入口,负责在 QueryNode 上执行搜索并归约结果。
函数签名
1
| func (t *SearchTask) Execute() error
|
执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func (t *SearchTask) Execute() error { err := t.combinePlaceHolderGroups() searchReq, err := segcore.NewSearchRequest(t.collection.GetCCollection(), req, t.placeholderGroup) if req.GetScope() == querypb.DataScope_Historical { results, searchedSegments, err = segments.SearchHistorical(...) } else if req.GetScope() == querypb.DataScope_Streaming { results, searchedSegments, err = segments.SearchStreaming(...) } reducedResult, err := segcore.ReduceSearchResultsAndFillData(...) err = segments.FillPrimaryKeys(...) err = segments.FillTargetEntry(...) t.result = EncodeSearchResults(...) return nil }
|
关键步骤
- 合并 Placeholder Groups:将多个查询向量合并成一个 PlaceholderGroup
- 创建 SearchRequest:封装搜索计划、PlaceholderGroup、时间戳等信息
- 搜索 Segments:根据 Scope 搜索历史或流式 segments
- 归约结果:将多个 segment 的结果合并成全局 topK
- 填充字段:填充主键和目标字段数据
- 编码返回:将结果编码为 protobuf 格式
2. StreamingSearchTask.Execute
StreamingSearchTask.Execute 是流式搜索任务的执行入口,支持流式归约,可以边搜索边归约,降低内存占用。
执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| func (t *StreamingSearchTask) Execute() error { t.combinePlaceHolderGroups() searchReq, err := segcore.NewSearchRequest(...) if req.GetScope() == querypb.DataScope_Historical { streamReduceFunc := func(result *segments.SearchResult) error { return t.streamReduce(t.ctx, searchReq.Plan(), result, ...) } pinnedSegments, err := segments.SearchHistoricalStreamly( t.ctx, t.segmentManager, searchReq, ..., streamReduceFunc) t.resultBlobs, err = segcore.GetStreamReduceResult(t.ctx, t.streamReducer) } return nil }
|
流式搜索的优势
- 内存效率:不需要等待所有 segment 搜索完成,边搜索边归约
- 延迟优化:可以更早返回部分结果
- 适合大数据量:当 segment 数量很多时,流式处理可以避免内存峰值
Segment 搜索执行
1. searchSegments 函数
searchSegments 函数在多个 segment 上并发执行搜索,并收集结果。
函数签名
1 2 3 4 5 6
| func searchSegments( ctx context.Context, mgr *Manager, segments []Segment, segType SegmentType, searchReq *SearchRequest) ([]*SearchResult, error)
|
执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| func searchSegments(ctx context.Context, mgr *Manager, segments []Segment, segType SegmentType, searchReq *SearchRequest) ([]*SearchResult, error) { searchLabel := metrics.SealedSegmentLabel if segType == commonpb.SegmentState_Growing { searchLabel = metrics.GrowingSegmentLabel } results := make([]*SearchResult, 0, len(segments)) var wg sync.WaitGroup var mu sync.Mutex for _, s := range segments { wg.Add(1) go func(seg Segment) { defer wg.Done() if err := seg.PinIfNotReleased(); err != nil { return } defer seg.Unpin() if seg.IsLazyLoad() { mgr.DiskCache.Do(seg, func() error { return seg.Load(ctx) }) } result, err := seg.Search(ctx, searchReq) if err != nil { return } mu.Lock() results = append(results, result) mu.Unlock() }(s) } wg.Wait() return results, nil }
|
关键特性
- 并发执行:使用 goroutine 并发搜索多个 segments
- Segment Pin:Pin segment 防止在搜索过程中被释放
- 懒加载支持:如果 segment 需要懒加载,通过 DiskCache 加载
- 错误处理:单个 segment 搜索失败不影响其他 segments
2. SearchHistorical 和 SearchStreaming
SearchHistorical
1 2 3 4 5 6 7 8 9 10 11 12 13
| func SearchHistorical(ctx context.Context, manager *Manager, searchReq *SearchRequest, collID int64, partIDs []int64, segIDs []int64) ([]*SearchResult, []Segment, error) { segments, err := validateOnHistorical(ctx, manager, collID, partIDs, segIDs) if err != nil { return nil, nil, err } searchResults, err := searchSegments(ctx, manager, segments, SegmentTypeSealed, searchReq) return searchResults, segments, err }
|
搜索范围:
- 如果
segIDs 未指定:搜索 partIDs 指定的所有历史 segments
- 如果
segIDs 指定了:只搜索指定的 segments
- 如果
partIDs 为空:搜索已加载 collection 的所有分区
SearchStreaming
1 2 3 4 5 6 7 8 9 10 11 12 13
| func SearchStreaming(ctx context.Context, manager *Manager, searchReq *SearchRequest, collID int64, partIDs []int64, segIDs []int64) ([]*SearchResult, []Segment, error) { segments, err := validateOnStream(ctx, manager, collID, partIDs, segIDs) if err != nil { return nil, nil, err } searchResults, err := searchSegments(ctx, manager, segments, SegmentTypeGrowing, searchReq) return searchResults, segments, err }
|
SearchRequest 和 Plan 参数
1. SearchRequest 结构
SearchRequest 封装了搜索请求的所有信息。
结构定义
1 2 3 4 5 6 7 8 9
| type SearchRequest struct { plan *SearchPlan cPlaceholderGroup C.CPlaceholderGroup msgID int64 searchFieldID int64 mvccTimestamp typeutil.Timestamp consistencyLevel commonpb.ConsistencyLevel collectionTTL typeutil.Timestamp }
|
创建过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func NewSearchRequest(collection *CCollection, req *querypb.SearchRequest, placeholderGrp []byte) (*SearchRequest, error) { expr := req.Req.SerializedExprPlan plan, err := createSearchPlanByExpr(collection, expr) var cPlaceholderGroup C.CPlaceholderGroup status := C.ParsePlaceholderGroup(plan.cSearchPlan, blobPtr, blobSize, &cPlaceholderGroup) metricTypeInPlan := plan.GetMetricType() if len(metricType) != 0 && metricType != metricTypeInPlan { return nil, merr.WrapErrParameterInvalid(...) } var fieldID C.int64_t status = C.GetFieldID(plan.cSearchPlan, &fieldID) return &SearchRequest{ plan: plan, cPlaceholderGroup: cPlaceholderGroup, searchFieldID: int64(fieldID), mvccTimestamp: req.GetReq().GetMvccTimestamp(), consistencyLevel: req.GetReq().GetConsistencyLevel(), collectionTTL: req.GetReq().GetCollectionTtlTimestamps(), }, nil }
|
2. Plan 参数的作用
Plan 参数是 segment->Search 的核心参数,包含了搜索执行所需的所有信息。
Plan 结构(C++)
1 2 3 4 5 6 7 8
| struct Plan { SchemaPtr schema_; std::unique_ptr<VectorPlanNode> plan_node_; std::map<std::string, FieldId> tag2field_; std::vector<FieldId> target_entries_; std::vector<std::string> target_dynamic_fields_; std::optional<ExtractedPlanInfo> extra_info_opt_; };
|
Plan 的核心作用
1. 搜索前验证(check_search)
1 2 3 4 5 6 7 8 9 10
| void ChunkedSegmentSealedImpl::check_search(const query::Plan* plan) const { auto& request_fields = plan->extra_info_opt_.value().involved_fields_; auto field_ready_bitset = field_data_ready_bitset_ | index_ready_bitset_ | binlog_index_bitset_; auto absent_fields = request_fields - field_ready_bitset; if (absent_fields.any()) { } }
|
2. 执行计划节点树
plan->plan_node_ 是 VectorPlanNode,包含:
1 2 3 4 5
| struct VectorPlanNode : PlanNode { SearchInfo search_info_; std::string placeholder_tag_; std::shared_ptr<milvus::plan::PlanNode> plannodes_; };
|
计划节点树通常包括:
- MvccNode:MVCC 过滤(时间戳过滤和删除过滤)
- VectorSearchNode:向量搜索(包含 topK)
- FilterNode:表达式过滤
- GroupByNode:Group by 操作(如果启用)
- RescoresNode:重排序(如果启用)
3. 填充返回字段
1 2 3 4 5 6 7 8 9 10
| void SegmentInternalInterface::FillTargetEntry(const query::Plan* plan, SearchResult& results) const { for (auto field_id : plan->target_entries_) { auto& field_meta = plan->schema_->operator[](field_id); field_data = bulk_subscript(&op_ctx, field_id, results.seg_offsets_.data(), size); results.output_fields_data_[field_id] = std::move(field_data); } }
|
Plan 的创建流程
- Proxy 层:解析 DSL 表达式,生成 PlanNode
- 设置 SearchInfo:从 search_params 中提取 topK、metric_type 等
- 设置 target_entries:从 output_fields 中提取需要返回的字段
- 序列化:序列化为 protobuf,发送到 QueryNode
- QueryNode:反序列化并创建 C++ Plan 对象
Segment Search 内部执行
1. segment->Search 执行流程
segment->Search 是 C++ 层的搜索执行入口,通过执行计划节点树完成搜索。
函数签名
1 2 3 4 5 6 7 8
| std::unique_ptr<SearchResult> SegmentInternalInterface::Search( const query::Plan* plan, const query::PlaceholderGroup* placeholder_group, Timestamp timestamp, const folly::CancellationToken& cancel_token, int32_t consistency_level, Timestamp collection_ttl) const
|
执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| std::unique_ptr<SearchResult> SegmentInternalInterface::Search(...) const { std::shared_lock lck(mutex_); check_search(plan); query::ExecPlanNodeVisitor visitor(*this, timestamp, placeholder_group, cancel_token, consistency_level, collection_ttl); auto results = std::make_unique<SearchResult>(); *results = visitor.get_moved_result(*plan->plan_node_); results->segment_ = (void*)this; return results; }
|
2. ExecPlanNodeVisitor 执行计划节点树
visit(VectorPlanNode& node)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| void ExecPlanNodeVisitor::visit(VectorPlanNode& node) { auto active_count = segment->get_active_count(timestamp_); if (active_count == 0) { search_result_opt_ = empty_search_result(...); return; } auto plan = plan::PlanFragment(node.plannodes_); auto query_context = std::make_shared<milvus::exec::QueryContext>(...); query_context->set_search_info(node.search_info_); query_context->set_placeholder_group(placeholder_group_); auto result = ExecuteTask(plan, query_context); search_result_opt_ = std::move(query_context->get_search_result()); }
|
3. 计划节点执行顺序
MvccNode - MVCC 过滤
1 2 3 4 5 6 7 8 9 10 11
| RowVectorPtr PhyMvccNode::GetOutput() { TargetBitmapView data(col_input->GetRawData(), col_input->size()); segment_->mask_with_timestamps(data, query_timestamp_, collection_ttl_timestamp_); segment_->mask_with_delete(data, active_count_, query_timestamp_); return std::make_shared<RowVector>(std::vector<VectorPtr>{col_input}); }
|
作用:
mask_with_timestamps:过滤掉时间戳大于查询时间戳的数据
mask_with_delete:过滤掉已删除的数据
VectorSearchNode - 向量搜索 + TopK
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| RowVectorPtr PhyVectorSearchNode::GetOutput() { segment_->vector_search(search_info_, src_data, src_offsets, num_queries, query_timestamp_, final_view, op_context, search_result); query_context_->set_search_result(std::move(search_result)); }
|
作用:
- 执行向量搜索,使用
search_info_.topk_ 作为 topK
- 返回的结果已包含 topK 的
seg_offsets_ 和 distances_
GroupByNode - Group By 操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| RowVectorPtr PhyGroupByNode::GetOutput() { auto search_result = query_context_->get_search_result(); milvus::exec::SearchGroupBy(op_context, search_result.vector_iterators_.value(), search_info_, group_by_values, *segment_, search_result.seg_offsets_, search_result.distances_, search_result.topk_per_nq_prefix_sum_); search_result.group_by_values_ = std::move(group_by_values); search_result.group_size_ = search_info_.group_size_; query_context_->set_search_result(std::move(search_result)); }
|
作用:
- 对搜索结果进行 group by 处理
- 更新
search_result.group_by_values_ 和 group_size_
4. Segment Search 完成的操作总结
在 segment->Search 中,已经完成了以下操作:
✅ MVCC 过滤:通过 MvccNode 执行时间戳和删除过滤
✅ TopK 选择:通过 VectorSearchNode 执行向量搜索,结果已包含 topK
✅ Group By:通过 GroupByNode 执行(如果启用)
✅ 表达式过滤:通过 FilterNode 执行(如果有过滤表达式)
✅ 重排序:通过 RescoresNode 执行(如果启用)
关键点:每个 segment 返回的结果是局部最优的,只代表该 segment 内的 topK,不代表全局最优。
结果归约(Reduce)
1. 为什么需要 Reduce?
虽然每个 segment 已经完成了 MVCC、topK、group by 等操作,但仍然需要对多个 segment 的结果进行 Reduce,原因如下:
- 数据分布:数据分布在多个 segments 中
- 局部 vs 全局:每个 segment 返回局部 topK,用户需要全局 topK
- 去重需求:需要去除跨 segment 的重复 entity
- Group By:需要全局的 group by 处理
- 成本聚合:需要聚合各个 segment 的成本信息
2. ReduceSearchResults 函数
函数签名
1 2
| func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResults, info *reduce.ResultInfo) (*internalpb.SearchResults, error)
|
执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResults, info *reduce.ResultInfo) (*internalpb.SearchResults, error) { results = lo.Filter(results, func(result *internalpb.SearchResults, _ int) bool { return result != nil && result.GetSlicedBlob() != nil }) if len(results) == 1 { return results[0], nil } searchResultData, err := DecodeSearchResults(ctx, results) searchReduce := InitSearchReducer(info) reducedResultData, err := searchReduce.ReduceSearchResultData(ctx, searchResultData, info) searchResults, err := EncodeSearchResultData(ctx, reducedResultData, ...) searchResults.CostAggregation = mergeRequestCost(requestCosts) return searchResults, nil }
|
3. K-Way Merge 算法
Reduce 使用多路归并(K-Way Merge)算法,从多个已排序的结果中选择全局最优的 topK。
算法原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| func (scr *SearchCommonReduce) ReduceSearchResultData( ctx context.Context, searchResultData []*schemapb.SearchResultData, info *reduce.ResultInfo) (*schemapb.SearchResultData, error) { offsets := make([]int64, len(searchResultData)) idSet := make(map[interface{}]struct{}) for i := int64(0); i < info.GetNq(); i++ { var j int64 for j = 0; j < info.GetTopK(); { sel := SelectSearchResultData(searchResultData, resultOffsets, offsets, i) if sel == -1 { break } idx := resultOffsets[sel][i] + offsets[sel] id := typeutil.GetPK(searchResultData[sel].GetIds(), idx) score := searchResultData[sel].Scores[idx] if _, ok := idSet[id]; !ok { retSize += typeutil.AppendFieldData(ret.FieldsData, searchResultData[sel].FieldsData, idx) typeutil.AppendPKs(ret.Ids, id) ret.Scores = append(ret.Scores, score) idSet[id] = struct{}{} j++ } else { skipDupCnt++ } offsets[sel]++ } ret.Topks = append(ret.Topks, j) } return ret, nil }
|
SelectSearchResultData 函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| func SelectSearchResultData( dataArray []*schemapb.SearchResultData, resultOffsets [][]int64, offsets []int64, qi int64) int { var sel = -1 var maxDistance = -float32(math.MaxFloat32) for i, offset := range offsets { if offset >= dataArray[i].Topks[qi] { continue } idx := resultOffsets[i][qi] + offset distance := dataArray[i].Scores[idx] if distance > maxDistance { sel = i maxDistance = distance } } return sel }
|
4. Reduce 的其他作用
去重(Deduplication)
同一个 entity 可能同时出现在多个 segments 中:
- Growing + Sealed:新写入的数据在 growing segment,但可能还未 flush 到 sealed segment
- Compaction:compaction 过程中,数据可能同时存在于新旧 segments
Group By 处理
如果有 group by 需求,Reduce 阶段会进行全局的 group by 处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (sbr *SearchGroupByReduce) ReduceSearchResultData(...) { groupByValueMap := make(map[interface{}]int64) for j = 0; j < groupBound; { sel := SelectSearchResultData(...) groupByVal := groupByValIterator[sel](idx) if groupCount >= groupSize { } else { groupByValueMap[groupByVal]++ } } }
|
成本聚合
Reduce 还会聚合各个 segment 的存储扫描成本:
1 2 3 4 5 6
| storageCost := lo.Reduce(results, func(acc segcore.StorageCost, result *internalpb.SearchResults, _ int) segcore.StorageCost { acc.ScannedRemoteBytes += result.GetScannedRemoteBytes() acc.ScannedTotalBytes += result.GetScannedTotalBytes() return acc }, segcore.StorageCost{})
|
高级搜索(Advanced Search)
1. ResultInfo.isAdvance 的作用
ResultInfo.isAdvance 用于标识是否是高级搜索(Hybrid Search),决定在 QueryNode 层的结果归约策略。
核心作用
1 2 3 4 5 6 7
| func ReduceSearchOnQueryNode(ctx context.Context, results []*internalpb.SearchResults, info *reduce.ResultInfo) (*internalpb.SearchResults, error) { if info.GetIsAdvance() { return ReduceAdvancedSearchResults(ctx, results) } return ReduceSearchResults(ctx, results, info) }
|
两种归约策略
1. 普通搜索(isAdvance = false)
使用 ReduceSearchResults:
- 立即归约所有结果
- 返回最终的 topK 结果
- 适用于单一向量搜索
2. 高级搜索(isAdvance = true)
使用 ReduceAdvancedSearchResults:
- 不立即归约,而是将子结果保存到
SubResults 中
- 延迟到 Proxy 层进行归约
- 适用于混合搜索(Hybrid Search),包含多个子搜索请求
- 需要在 Proxy 层进行特殊的归约逻辑(如 rerank)
ReduceAdvancedSearchResults 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func ReduceAdvancedSearchResults(ctx context.Context, results []*internalpb.SearchResults) (*internalpb.SearchResults, error) { searchResults := &internalpb.SearchResults{ IsAdvanced: true, } for index, result := range results { subResult := &internalpb.SubSearchResults{ MetricType: result.GetMetricType(), NumQueries: result.GetNumQueries(), TopK: result.GetTopK(), SlicedBlob: result.GetSlicedBlob(), SlicedNumCount: result.GetSlicedNumCount(), SlicedOffset: result.GetSlicedOffset(), ReqIndex: int64(index), } searchResults.SubResults = append(searchResults.SubResults, subResult) } return searchResults, nil }
|
完整流程总结
1. 搜索流程时序图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| ┌─────────┐ │ Proxy │ └────┬────┘ │ 1. SearchRequest ▼ ┌─────────────┐ │ QueryNode │ └─────┬───────┘ │ 2. SearchTask.Execute ▼ ┌─────────────────┐ │ SearchTask │ │ - combinePlaceHolderGroups │ - NewSearchRequest └─────┬───────────┘ │ 3. SearchHistorical/SearchStreaming ▼ ┌─────────────────┐ │ searchSegments │ │ - 并发搜索多个 segments └─────┬───────────┘ │ 4. Segment.Search ▼ ┌─────────────────┐ │ Segment.Search │ │ - check_search │ - ExecPlanNodeVisitor └─────┬───────────┘ │ 5. ExecuteTask ▼ ┌─────────────────┐ │ Plan Nodes │ │ - MvccNode (MVCC 过滤) │ - VectorSearchNode (向量搜索 + TopK) │ - FilterNode (表达式过滤) │ - GroupByNode (Group By) └─────┬───────────┘ │ 6. SearchResult (局部 topK) ▼ ┌─────────────────┐ │ ReduceSearchResults │ │ - K-Way Merge │ - 去重 │ - Group By │ - 成本聚合 └─────┬───────────┘ │ 7. 全局 topK 结果 ▼ ┌─────────┐ │ Result │ └─────────┘
|
2. 关键数据流
SearchRequest 创建
1 2 3 4 5 6 7 8 9
| QueryRequest (protobuf) ↓ SerializedExprPlan (bytes) ↓ createSearchPlanByExpr ↓ SearchPlan (C++) ↓ SearchRequest (Go wrapper)
|
Segment Search 执行
1 2 3 4 5 6 7 8 9 10 11
| SearchRequest ↓ Plan (包含 plan_node_) ↓ ExecPlanNodeVisitor.visit(VectorPlanNode) ↓ PlanFragment (执行计划节点树) ↓ ExecuteTask ↓ SearchResult (局部 topK)
|
Result Reduce
1 2 3 4 5 6 7 8 9
| 多个 SearchResult (局部 topK) ↓ DecodeSearchResults ↓ ReduceSearchResultData (K-Way Merge) ↓ EncodeSearchResultData ↓ SearchResults (全局 topK)
|
3. 性能考虑
时间复杂度
- Segment Search:O(N × log K),其中 N 是 segment 内的数据量,K 是 topK
- Reduce:O(M × K),其中 M 是 segment 数量,K 是 topK
由于 M(segment 数量)通常远小于 N(数据量),Reduce 的开销相对较小。
优化策略
- 短路优化:如果只有一个 segment 的结果,直接返回
- 并发搜索:多个 segments 并发搜索
- 流式处理:使用 StreamingSearchTask 进行流式归约
- 内存限制:检查结果大小,避免 OOM
4. 关键要点总结
Segment Search 层面
✅ 每个 segment 的 Search 已经完成了:
- MVCC 过滤(时间戳和删除过滤)
- TopK 选择(向量搜索)
- Group By(如果启用)
- 表达式过滤(如果有)
✅ 但返回的是局部最优结果,只代表该 segment 内的 topK
Reduce 层面
✅ Reduce 通过 K-Way Merge 算法合并多个 segment 的结果:
- 选择全局最优的 topK
- 去除跨 segment 的重复 entity
- 处理全局的 group by
- 聚合成本信息
✅ 最终得到全局最优的 topK 结果
类比理解
- Segment Search:每个班级选出前 10 名
- Reduce:从所有班级的前 10 名中,选出全校前 10 名
即使每个班级已经完成了排名,仍然需要全校级别的 Reduce 操作来选出最终的 topK。
参考资料