1. 1. 目录
  2. 2. 概述
    1. 2.1. 搜索流程概览
  3. 3. 搜索任务入口
    1. 3.1. 1. SearchTask.Execute
      1. 3.1.1. 函数签名
      2. 3.1.2. 执行流程
      3. 3.1.3. 关键步骤
    2. 3.2. 2. StreamingSearchTask.Execute
      1. 3.2.1. 执行流程
      2. 3.2.2. 流式搜索的优势
  4. 4. Segment 搜索执行
    1. 4.1. 1. searchSegments 函数
      1. 4.1.1. 函数签名
      2. 4.1.2. 执行流程
      3. 4.1.3. 关键特性
    2. 4.2. 2. SearchHistorical 和 SearchStreaming
      1. 4.2.1. SearchHistorical
      2. 4.2.2. SearchStreaming
  5. 5. SearchRequest 和 Plan 参数
    1. 5.1. 1. SearchRequest 结构
      1. 5.1.1. 结构定义
      2. 5.1.2. 创建过程
    2. 5.2. 2. Plan 参数的作用
      1. 5.2.1. Plan 结构(C++)
      2. 5.2.2. Plan 的核心作用
        1. 5.2.2.1. 1. 搜索前验证(check_search)
        2. 5.2.2.2. 2. 执行计划节点树
        3. 5.2.2.3. 3. 填充返回字段
      3. 5.2.3. Plan 的创建流程
  6. 6. Segment Search 内部执行
    1. 6.1. 1. segment->Search 执行流程
      1. 6.1.1. 函数签名
      2. 6.1.2. 执行流程
    2. 6.2. 2. ExecPlanNodeVisitor 执行计划节点树
      1. 6.2.1. visit(VectorPlanNode& node)
    3. 6.3. 3. 计划节点执行顺序
      1. 6.3.1. MvccNode - MVCC 过滤
      2. 6.3.2. VectorSearchNode - 向量搜索 + TopK
      3. 6.3.3. GroupByNode - Group By 操作
    4. 6.4. 4. Segment Search 完成的操作总结
  7. 7. 结果归约(Reduce)
    1. 7.1. 1. 为什么需要 Reduce?
    2. 7.2. 2. ReduceSearchResults 函数
      1. 7.2.1. 函数签名
      2. 7.2.2. 执行流程
    3. 7.3. 3. K-Way Merge 算法
      1. 7.3.1. 算法原理
      2. 7.3.2. SelectSearchResultData 函数
    4. 7.4. 4. Reduce 的其他作用
      1. 7.4.1. 去重(Deduplication)
      2. 7.4.2. Group By 处理
      3. 7.4.3. 成本聚合
  8. 8. 高级搜索(Advanced Search)
    1. 8.1. 1. ResultInfo.isAdvance 的作用
      1. 8.1.1. 核心作用
      2. 8.1.2. 两种归约策略
        1. 8.1.2.1. 1. 普通搜索(isAdvance = false)
        2. 8.1.2.2. 2. 高级搜索(isAdvance = true)
      3. 8.1.3. ReduceAdvancedSearchResults 实现
  9. 9. 完整流程总结
    1. 9.1. 1. 搜索流程时序图
    2. 9.2. 2. 关键数据流
      1. 9.2.1. SearchRequest 创建
      2. 9.2.2. Segment Search 执行
      3. 9.2.3. Result Reduce
    3. 9.3. 3. 性能考虑
      1. 9.3.1. 时间复杂度
      2. 9.3.2. 优化策略
    4. 9.4. 4. 关键要点总结
      1. 9.4.1. Segment Search 层面
      2. 9.4.2. Reduce 层面
      3. 9.4.3. 类比理解
  10. 10. 参考资料

Milvus 搜索流程全面分析

目录

  1. 概述
  2. 搜索任务入口
  3. Segment 搜索执行
  4. SearchRequest 和 Plan 参数
  5. Segment Search 内部执行
  6. 结果归约(Reduce)
  7. 高级搜索(Advanced Search)
  8. 完整流程总结

概述

本文档全面分析 Milvus 的搜索流程,从 QueryNode 接收搜索请求开始,到返回最终结果为止。涵盖搜索任务的执行、segment 搜索、计划节点执行、结果归约等各个环节。

搜索流程概览

1
Proxy → QueryNode → SearchTask → SearchSegments → Segment.Search → Reduce → Result

搜索任务入口

1. SearchTask.Execute

SearchTask.Execute 是普通搜索任务的执行入口,负责在 QueryNode 上执行搜索并归约结果。

函数签名

1
func (t *SearchTask) Execute() error

执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (t *SearchTask) Execute() error {
// 1. 准备搜索请求
err := t.combinePlaceHolderGroups()
searchReq, err := segcore.NewSearchRequest(t.collection.GetCCollection(), req, t.placeholderGroup)

// 2. 根据 Scope 选择搜索历史数据或流式数据
if req.GetScope() == querypb.DataScope_Historical {
results, searchedSegments, err = segments.SearchHistorical(...)
} else if req.GetScope() == querypb.DataScope_Streaming {
results, searchedSegments, err = segments.SearchStreaming(...)
}

// 3. 归约多个 segment 的结果
reducedResult, err := segcore.ReduceSearchResultsAndFillData(...)

// 4. 填充主键和目标字段
err = segments.FillPrimaryKeys(...)
err = segments.FillTargetEntry(...)

// 5. 编码并返回结果
t.result = EncodeSearchResults(...)
return nil
}

关键步骤

  1. 合并 Placeholder Groups:将多个查询向量合并成一个 PlaceholderGroup
  2. 创建 SearchRequest:封装搜索计划、PlaceholderGroup、时间戳等信息
  3. 搜索 Segments:根据 Scope 搜索历史或流式 segments
  4. 归约结果:将多个 segment 的结果合并成全局 topK
  5. 填充字段:填充主键和目标字段数据
  6. 编码返回:将结果编码为 protobuf 格式

2. StreamingSearchTask.Execute

StreamingSearchTask.Execute 是流式搜索任务的执行入口,支持流式归约,可以边搜索边归约,降低内存占用。

执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (t *StreamingSearchTask) Execute() error {
// 1. 准备搜索请求
t.combinePlaceHolderGroups()
searchReq, err := segcore.NewSearchRequest(...)

// 2. 流式搜索和归约
if req.GetScope() == querypb.DataScope_Historical {
streamReduceFunc := func(result *segments.SearchResult) error {
return t.streamReduce(t.ctx, searchReq.Plan(), result, ...)
}
pinnedSegments, err := segments.SearchHistoricalStreamly(
t.ctx, t.segmentManager, searchReq, ..., streamReduceFunc)

// 3. 获取流式归约结果
t.resultBlobs, err = segcore.GetStreamReduceResult(t.ctx, t.streamReducer)
}

return nil
}

流式搜索的优势

  • 内存效率:不需要等待所有 segment 搜索完成,边搜索边归约
  • 延迟优化:可以更早返回部分结果
  • 适合大数据量:当 segment 数量很多时,流式处理可以避免内存峰值

Segment 搜索执行

1. searchSegments 函数

searchSegments 函数在多个 segment 上并发执行搜索,并收集结果。

函数签名

1
2
3
4
5
6
func searchSegments(
ctx context.Context,
mgr *Manager,
segments []Segment,
segType SegmentType,
searchReq *SearchRequest) ([]*SearchResult, error)

执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func searchSegments(ctx context.Context, mgr *Manager, segments []Segment, 
segType SegmentType, searchReq *SearchRequest) ([]*SearchResult, error) {

// 1. 设置指标标签
searchLabel := metrics.SealedSegmentLabel
if segType == commonpb.SegmentState_Growing {
searchLabel = metrics.GrowingSegmentLabel
}

// 2. 并发搜索所有 segments
results := make([]*SearchResult, 0, len(segments))
var wg sync.WaitGroup
var mu sync.Mutex

for _, s := range segments {
wg.Add(1)
go func(seg Segment) {
defer wg.Done()

// Pin segment 防止被释放
if err := seg.PinIfNotReleased(); err != nil {
return
}
defer seg.Unpin()

// 检查是否需要懒加载
if seg.IsLazyLoad() {
mgr.DiskCache.Do(seg, func() error {
return seg.Load(ctx)
})
}

// 执行搜索
result, err := seg.Search(ctx, searchReq)
if err != nil {
return
}

mu.Lock()
results = append(results, result)
mu.Unlock()
}(s)
}

wg.Wait()
return results, nil
}

关键特性

  1. 并发执行:使用 goroutine 并发搜索多个 segments
  2. Segment Pin:Pin segment 防止在搜索过程中被释放
  3. 懒加载支持:如果 segment 需要懒加载,通过 DiskCache 加载
  4. 错误处理:单个 segment 搜索失败不影响其他 segments

2. SearchHistorical 和 SearchStreaming

SearchHistorical

1
2
3
4
5
6
7
8
9
10
11
12
13
func SearchHistorical(ctx context.Context, manager *Manager, searchReq *SearchRequest, 
collID int64, partIDs []int64, segIDs []int64) ([]*SearchResult, []Segment, error) {

// 验证并获取历史 segments
segments, err := validateOnHistorical(ctx, manager, collID, partIDs, segIDs)
if err != nil {
return nil, nil, err
}

// 搜索 sealed segments
searchResults, err := searchSegments(ctx, manager, segments, SegmentTypeSealed, searchReq)
return searchResults, segments, err
}

搜索范围

  • 如果 segIDs 未指定:搜索 partIDs 指定的所有历史 segments
  • 如果 segIDs 指定了:只搜索指定的 segments
  • 如果 partIDs 为空:搜索已加载 collection 的所有分区

SearchStreaming

1
2
3
4
5
6
7
8
9
10
11
12
13
func SearchStreaming(ctx context.Context, manager *Manager, searchReq *SearchRequest, 
collID int64, partIDs []int64, segIDs []int64) ([]*SearchResult, []Segment, error) {

// 验证并获取流式 segments
segments, err := validateOnStream(ctx, manager, collID, partIDs, segIDs)
if err != nil {
return nil, nil, err
}

// 搜索 growing segments
searchResults, err := searchSegments(ctx, manager, segments, SegmentTypeGrowing, searchReq)
return searchResults, segments, err
}

SearchRequest 和 Plan 参数

1. SearchRequest 结构

SearchRequest 封装了搜索请求的所有信息。

结构定义

1
2
3
4
5
6
7
8
9
type SearchRequest struct {
plan *SearchPlan // 搜索计划
cPlaceholderGroup C.CPlaceholderGroup // C++ 层的 PlaceholderGroup
msgID int64 // 消息 ID
searchFieldID int64 // 搜索字段 ID
mvccTimestamp typeutil.Timestamp // MVCC 时间戳
consistencyLevel commonpb.ConsistencyLevel // 一致性级别
collectionTTL typeutil.Timestamp // Collection TTL
}

创建过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func NewSearchRequest(collection *CCollection, req *querypb.SearchRequest, 
placeholderGrp []byte) (*SearchRequest, error) {

// 1. 从序列化的 Plan 创建 SearchPlan
expr := req.Req.SerializedExprPlan
plan, err := createSearchPlanByExpr(collection, expr)

// 2. 解析 PlaceholderGroup
var cPlaceholderGroup C.CPlaceholderGroup
status := C.ParsePlaceholderGroup(plan.cSearchPlan, blobPtr, blobSize, &cPlaceholderGroup)

// 3. 验证 MetricType
metricTypeInPlan := plan.GetMetricType()
if len(metricType) != 0 && metricType != metricTypeInPlan {
return nil, merr.WrapErrParameterInvalid(...)
}

// 4. 获取搜索字段 ID
var fieldID C.int64_t
status = C.GetFieldID(plan.cSearchPlan, &fieldID)

return &SearchRequest{
plan: plan,
cPlaceholderGroup: cPlaceholderGroup,
searchFieldID: int64(fieldID),
mvccTimestamp: req.GetReq().GetMvccTimestamp(),
consistencyLevel: req.GetReq().GetConsistencyLevel(),
collectionTTL: req.GetReq().GetCollectionTtlTimestamps(),
}, nil
}

2. Plan 参数的作用

Plan 参数是 segment->Search 的核心参数,包含了搜索执行所需的所有信息。

Plan 结构(C++)

1
2
3
4
5
6
7
8
struct Plan {
SchemaPtr schema_; // Schema 信息
std::unique_ptr<VectorPlanNode> plan_node_; // 计划节点树
std::map<std::string, FieldId> tag2field_; // PlaceholderName -> FieldId
std::vector<FieldId> target_entries_; // 需要返回的字段 ID
std::vector<std::string> target_dynamic_fields_; // 动态字段列表
std::optional<ExtractedPlanInfo> extra_info_opt_; // 额外信息(涉及的字段)
};

Plan 的核心作用

1. 搜索前验证(check_search)
1
2
3
4
5
6
7
8
9
10
void ChunkedSegmentSealedImpl::check_search(const query::Plan* plan) const {
// 检查涉及的字段是否已加载
auto& request_fields = plan->extra_info_opt_.value().involved_fields_;
auto field_ready_bitset = field_data_ready_bitset_ | index_ready_bitset_ | binlog_index_bitset_;

auto absent_fields = request_fields - field_ready_bitset;
if (absent_fields.any()) {
// 抛出 FieldNotLoaded 错误
}
}
2. 执行计划节点树

plan->plan_node_VectorPlanNode,包含:

1
2
3
4
5
struct VectorPlanNode : PlanNode {
SearchInfo search_info_; // 搜索信息(topk, metric_type, field_id 等)
std::string placeholder_tag_; // Placeholder 标签
std::shared_ptr<milvus::plan::PlanNode> plannodes_; // 执行计划节点树
};

计划节点树通常包括:

  • MvccNode:MVCC 过滤(时间戳过滤和删除过滤)
  • VectorSearchNode:向量搜索(包含 topK)
  • FilterNode:表达式过滤
  • GroupByNode:Group by 操作(如果启用)
  • RescoresNode:重排序(如果启用)
3. 填充返回字段
1
2
3
4
5
6
7
8
9
10
void SegmentInternalInterface::FillTargetEntry(const query::Plan* plan, 
SearchResult& results) const {

// 遍历 plan->target_entries_,填充每个字段的数据
for (auto field_id : plan->target_entries_) {
auto& field_meta = plan->schema_->operator[](field_id);
field_data = bulk_subscript(&op_ctx, field_id, results.seg_offsets_.data(), size);
results.output_fields_data_[field_id] = std::move(field_data);
}
}

Plan 的创建流程

  1. Proxy 层:解析 DSL 表达式,生成 PlanNode
  2. 设置 SearchInfo:从 search_params 中提取 topK、metric_type 等
  3. 设置 target_entries:从 output_fields 中提取需要返回的字段
  4. 序列化:序列化为 protobuf,发送到 QueryNode
  5. QueryNode:反序列化并创建 C++ Plan 对象

Segment Search 内部执行

1. segment->Search 执行流程

segment->Search 是 C++ 层的搜索执行入口,通过执行计划节点树完成搜索。

函数签名

1
2
3
4
5
6
7
8
std::unique_ptr<SearchResult>
SegmentInternalInterface::Search(
const query::Plan* plan,
const query::PlaceholderGroup* placeholder_group,
Timestamp timestamp,
const folly::CancellationToken& cancel_token,
int32_t consistency_level,
Timestamp collection_ttl) const

执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
std::unique_ptr<SearchResult>
SegmentInternalInterface::Search(...) const {
std::shared_lock lck(mutex_);

// 1. 验证 Plan
check_search(plan);

// 2. 创建执行访问者
query::ExecPlanNodeVisitor visitor(*this, timestamp, placeholder_group,
cancel_token, consistency_level, collection_ttl);

// 3. 执行计划节点树
auto results = std::make_unique<SearchResult>();
*results = visitor.get_moved_result(*plan->plan_node_);
results->segment_ = (void*)this;

return results;
}

2. ExecPlanNodeVisitor 执行计划节点树

visit(VectorPlanNode& node)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void ExecPlanNodeVisitor::visit(VectorPlanNode& node) {
// 1. 获取活跃数据量
auto active_count = segment->get_active_count(timestamp_);
if (active_count == 0) {
search_result_opt_ = empty_search_result(...);
return;
}

// 2. 构建计划片段
auto plan = plan::PlanFragment(node.plannodes_);

// 3. 创建查询上下文
auto query_context = std::make_shared<milvus::exec::QueryContext>(...);
query_context->set_search_info(node.search_info_);
query_context->set_placeholder_group(placeholder_group_);

// 4. 执行任务
auto result = ExecuteTask(plan, query_context);

// 5. 获取搜索结果
search_result_opt_ = std::move(query_context->get_search_result());
}

3. 计划节点执行顺序

MvccNode - MVCC 过滤

1
2
3
4
5
6
7
8
9
10
11
RowVectorPtr PhyMvccNode::GetOutput() {
TargetBitmapView data(col_input->GetRawData(), col_input->size());

// 时间戳过滤(MVCC)
segment_->mask_with_timestamps(data, query_timestamp_, collection_ttl_timestamp_);

// 删除标记过滤
segment_->mask_with_delete(data, active_count_, query_timestamp_);

return std::make_shared<RowVector>(std::vector<VectorPtr>{col_input});
}

作用

  • mask_with_timestamps:过滤掉时间戳大于查询时间戳的数据
  • mask_with_delete:过滤掉已删除的数据

VectorSearchNode - 向量搜索 + TopK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
RowVectorPtr PhyVectorSearchNode::GetOutput() {
// 执行向量搜索
segment_->vector_search(search_info_,
src_data,
src_offsets,
num_queries,
query_timestamp_,
final_view,
op_context,
search_result);

// 设置搜索结果(已包含 topK)
query_context_->set_search_result(std::move(search_result));
}

作用

  • 执行向量搜索,使用 search_info_.topk_ 作为 topK
  • 返回的结果已包含 topK 的 seg_offsets_distances_

GroupByNode - Group By 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
RowVectorPtr PhyGroupByNode::GetOutput() {
auto search_result = query_context_->get_search_result();

// 执行 Group By
milvus::exec::SearchGroupBy(op_context,
search_result.vector_iterators_.value(),
search_info_,
group_by_values,
*segment_,
search_result.seg_offsets_,
search_result.distances_,
search_result.topk_per_nq_prefix_sum_);

search_result.group_by_values_ = std::move(group_by_values);
search_result.group_size_ = search_info_.group_size_;

query_context_->set_search_result(std::move(search_result));
}

作用

  • 对搜索结果进行 group by 处理
  • 更新 search_result.group_by_values_group_size_

4. Segment Search 完成的操作总结

segment->Search 中,已经完成了以下操作:

MVCC 过滤:通过 MvccNode 执行时间戳和删除过滤
TopK 选择:通过 VectorSearchNode 执行向量搜索,结果已包含 topK
Group By:通过 GroupByNode 执行(如果启用)
表达式过滤:通过 FilterNode 执行(如果有过滤表达式)
重排序:通过 RescoresNode 执行(如果启用)

关键点:每个 segment 返回的结果是局部最优的,只代表该 segment 内的 topK,不代表全局最优。


结果归约(Reduce)

1. 为什么需要 Reduce?

虽然每个 segment 已经完成了 MVCC、topK、group by 等操作,但仍然需要对多个 segment 的结果进行 Reduce,原因如下:

  1. 数据分布:数据分布在多个 segments 中
  2. 局部 vs 全局:每个 segment 返回局部 topK,用户需要全局 topK
  3. 去重需求:需要去除跨 segment 的重复 entity
  4. Group By:需要全局的 group by 处理
  5. 成本聚合:需要聚合各个 segment 的成本信息

2. ReduceSearchResults 函数

函数签名

1
2
func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResults, 
info *reduce.ResultInfo) (*internalpb.SearchResults, error)

执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResults, 
info *reduce.ResultInfo) (*internalpb.SearchResults, error) {

// 1. 过滤空结果
results = lo.Filter(results, func(result *internalpb.SearchResults, _ int) bool {
return result != nil && result.GetSlicedBlob() != nil
})

// 2. 短路优化:如果只有一个结果,直接返回
if len(results) == 1 {
return results[0], nil
}

// 3. 解码搜索结果
searchResultData, err := DecodeSearchResults(ctx, results)

// 4. 初始化归约器
searchReduce := InitSearchReducer(info)

// 5. 归约搜索结果
reducedResultData, err := searchReduce.ReduceSearchResultData(ctx, searchResultData, info)

// 6. 编码结果
searchResults, err := EncodeSearchResultData(ctx, reducedResultData, ...)

// 7. 聚合成本信息
searchResults.CostAggregation = mergeRequestCost(requestCosts)

return searchResults, nil
}

3. K-Way Merge 算法

Reduce 使用多路归并(K-Way Merge)算法,从多个已排序的结果中选择全局最优的 topK。

算法原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (scr *SearchCommonReduce) ReduceSearchResultData(
ctx context.Context,
searchResultData []*schemapb.SearchResultData,
info *reduce.ResultInfo) (*schemapb.SearchResultData, error) {

// 为每个 segment 的结果维护偏移量
offsets := make([]int64, len(searchResultData))
idSet := make(map[interface{}]struct{}) // 用于去重

for i := int64(0); i < info.GetNq(); i++ {
var j int64
for j = 0; j < info.GetTopK(); {
// 从所有 segment 中选择当前最高分
sel := SelectSearchResultData(searchResultData, resultOffsets, offsets, i)
if sel == -1 {
break
}

idx := resultOffsets[sel][i] + offsets[sel]
id := typeutil.GetPK(searchResultData[sel].GetIds(), idx)
score := searchResultData[sel].Scores[idx]

// 去重:跳过已存在的 entity
if _, ok := idSet[id]; !ok {
// 添加到最终结果
retSize += typeutil.AppendFieldData(ret.FieldsData,
searchResultData[sel].FieldsData, idx)
typeutil.AppendPKs(ret.Ids, id)
ret.Scores = append(ret.Scores, score)
idSet[id] = struct{}{}
j++
} else {
skipDupCnt++ // 跳过重复的 entity
}
offsets[sel]++ // 移动指针
}
ret.Topks = append(ret.Topks, j)
}
return ret, nil
}

SelectSearchResultData 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func SelectSearchResultData(
dataArray []*schemapb.SearchResultData,
resultOffsets [][]int64,
offsets []int64,
qi int64) int {

var sel = -1
var maxDistance = -float32(math.MaxFloat32)

// 遍历所有 segment,找到当前最高分
for i, offset := range offsets {
if offset >= dataArray[i].Topks[qi] {
continue // 该 segment 的结果已用完
}

idx := resultOffsets[i][qi] + offset
distance := dataArray[i].Scores[idx]

if distance > maxDistance {
sel = i
maxDistance = distance
}
}
return sel
}

4. Reduce 的其他作用

去重(Deduplication)

同一个 entity 可能同时出现在多个 segments 中:

  • Growing + Sealed:新写入的数据在 growing segment,但可能还未 flush 到 sealed segment
  • Compaction:compaction 过程中,数据可能同时存在于新旧 segments

Group By 处理

如果有 group by 需求,Reduce 阶段会进行全局的 group by 处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (sbr *SearchGroupByReduce) ReduceSearchResultData(...) {
groupByValueMap := make(map[interface{}]int64)

for j = 0; j < groupBound; {
sel := SelectSearchResultData(...)
groupByVal := groupByValIterator[sel](idx)

// 检查 group 限制
if groupCount >= groupSize {
// 跳过:该 group 已满
} else {
// 添加到该 group
groupByValueMap[groupByVal]++
}
}
}

成本聚合

Reduce 还会聚合各个 segment 的存储扫描成本:

1
2
3
4
5
6
storageCost := lo.Reduce(results, func(acc segcore.StorageCost, 
result *internalpb.SearchResults, _ int) segcore.StorageCost {
acc.ScannedRemoteBytes += result.GetScannedRemoteBytes()
acc.ScannedTotalBytes += result.GetScannedTotalBytes()
return acc
}, segcore.StorageCost{})

高级搜索(Advanced Search)

1. ResultInfo.isAdvance 的作用

ResultInfo.isAdvance 用于标识是否是高级搜索(Hybrid Search),决定在 QueryNode 层的结果归约策略。

核心作用

1
2
3
4
5
6
7
func ReduceSearchOnQueryNode(ctx context.Context, results []*internalpb.SearchResults, 
info *reduce.ResultInfo) (*internalpb.SearchResults, error) {
if info.GetIsAdvance() {
return ReduceAdvancedSearchResults(ctx, results)
}
return ReduceSearchResults(ctx, results, info)
}

两种归约策略

1. 普通搜索(isAdvance = false)

使用 ReduceSearchResults

  • 立即归约所有结果
  • 返回最终的 topK 结果
  • 适用于单一向量搜索
2. 高级搜索(isAdvance = true)

使用 ReduceAdvancedSearchResults

  • 不立即归约,而是将子结果保存到 SubResults
  • 延迟到 Proxy 层进行归约
  • 适用于混合搜索(Hybrid Search),包含多个子搜索请求
  • 需要在 Proxy 层进行特殊的归约逻辑(如 rerank)

ReduceAdvancedSearchResults 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func ReduceAdvancedSearchResults(ctx context.Context, 
results []*internalpb.SearchResults) (*internalpb.SearchResults, error) {

searchResults := &internalpb.SearchResults{
IsAdvanced: true,
}

// 不归约,直接追加子结果
for index, result := range results {
subResult := &internalpb.SubSearchResults{
MetricType: result.GetMetricType(),
NumQueries: result.GetNumQueries(),
TopK: result.GetTopK(),
SlicedBlob: result.GetSlicedBlob(),
SlicedNumCount: result.GetSlicedNumCount(),
SlicedOffset: result.GetSlicedOffset(),
ReqIndex: int64(index),
}
searchResults.SubResults = append(searchResults.SubResults, subResult)
}

return searchResults, nil
}

完整流程总结

1. 搜索流程时序图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
┌─────────┐
│ Proxy │
└────┬────┘
│ 1. SearchRequest

┌─────────────┐
│ QueryNode │
└─────┬───────┘
│ 2. SearchTask.Execute

┌─────────────────┐
│ SearchTask │
│ - combinePlaceHolderGroups
│ - NewSearchRequest
└─────┬───────────┘
│ 3. SearchHistorical/SearchStreaming

┌─────────────────┐
│ searchSegments │
│ - 并发搜索多个 segments
└─────┬───────────┘
│ 4. Segment.Search

┌─────────────────┐
│ Segment.Search │
│ - check_search
│ - ExecPlanNodeVisitor
└─────┬───────────┘
│ 5. ExecuteTask

┌─────────────────┐
│ Plan Nodes │
│ - MvccNode (MVCC 过滤)
│ - VectorSearchNode (向量搜索 + TopK)
│ - FilterNode (表达式过滤)
│ - GroupByNode (Group By)
└─────┬───────────┘
│ 6. SearchResult (局部 topK)

┌─────────────────┐
│ ReduceSearchResults │
│ - K-Way Merge
│ - 去重
│ - Group By
│ - 成本聚合
└─────┬───────────┘
│ 7. 全局 topK 结果

┌─────────┐
│ Result │
└─────────┘

2. 关键数据流

SearchRequest 创建

1
2
3
4
5
6
7
8
9
QueryRequest (protobuf)

SerializedExprPlan (bytes)

createSearchPlanByExpr

SearchPlan (C++)

SearchRequest (Go wrapper)

Segment Search 执行

1
2
3
4
5
6
7
8
9
10
11
SearchRequest

Plan (包含 plan_node_)

ExecPlanNodeVisitor.visit(VectorPlanNode)

PlanFragment (执行计划节点树)

ExecuteTask

SearchResult (局部 topK)

Result Reduce

1
2
3
4
5
6
7
8
9
多个 SearchResult (局部 topK)

DecodeSearchResults

ReduceSearchResultData (K-Way Merge)

EncodeSearchResultData

SearchResults (全局 topK)

3. 性能考虑

时间复杂度

  • Segment Search:O(N × log K),其中 N 是 segment 内的数据量,K 是 topK
  • Reduce:O(M × K),其中 M 是 segment 数量,K 是 topK

由于 M(segment 数量)通常远小于 N(数据量),Reduce 的开销相对较小。

优化策略

  1. 短路优化:如果只有一个 segment 的结果,直接返回
  2. 并发搜索:多个 segments 并发搜索
  3. 流式处理:使用 StreamingSearchTask 进行流式归约
  4. 内存限制:检查结果大小,避免 OOM

4. 关键要点总结

Segment Search 层面

✅ 每个 segment 的 Search 已经完成了:

  • MVCC 过滤(时间戳和删除过滤)
  • TopK 选择(向量搜索)
  • Group By(如果启用)
  • 表达式过滤(如果有)

✅ 但返回的是局部最优结果,只代表该 segment 内的 topK

Reduce 层面

✅ Reduce 通过 K-Way Merge 算法合并多个 segment 的结果:

  • 选择全局最优的 topK
  • 去除跨 segment 的重复 entity
  • 处理全局的 group by
  • 聚合成本信息

✅ 最终得到全局最优的 topK 结果

类比理解

  • Segment Search:每个班级选出前 10 名
  • Reduce:从所有班级的前 10 名中,选出全校前 10 名

即使每个班级已经完成了排名,仍然需要全校级别的 Reduce 操作来选出最终的 topK。


参考资料