Milvus 为什么需要对 Search Result 进行 Reduce
概述
在 Milvus 的搜索流程中,虽然每个 segment 在 segment->Search 中已经完成了 MVCC 过滤、topK 选择、group by 等操作,但仍然需要对多个 segment 的搜索结果进行 Reduce(归约)操作。本文档详细解释为什么需要 Reduce 以及 Reduce 的具体作用。
核心原因:数据分布与全局 TopK
核心问题:一个查询请求通常涉及多个 segments,每个 segment 只返回自己数据范围内的 topK 结果,但用户需要的是全局最优的 topK。
1. 多个 Segments 的场景
在 Milvus 中,一个 collection 的数据可能分布在:
- 多个 Sealed Segments:历史数据被分割成多个 sealed segments
- 多个 Growing Segments:实时写入的数据存储在 growing segments 中
- 多个 Shards:在分布式场景中,数据分布在多个 shards 上
- 多个 QueryNodes:在分布式部署中,不同的 segments 可能位于不同的 QueryNode 上
示例场景
假设一个 collection 有 1000 万条数据,被分割成:
- Segment 1: 300 万条(sealed)
- Segment 2: 300 万条(sealed)
- Segment 3: 300 万条(sealed)
- Segment 4: 100 万条(growing)
当用户请求 topK=10 时,每个 segment 都会返回自己的 topK=10,但我们需要的是从这 4000 万条数据中选出全局最优的 topK=10。
2. 每个 Segment 返回的是局部 TopK
Segment Search 的执行流程
每个 segment 在执行 segment->Search 时:
1 | std::unique_ptr<SearchResult> |
Segment 返回的结果结构
每个 segment 的 SearchResult 包含:
seg_offsets_:该 segment 内的 topK 偏移量(已排序)distances_:该 segment 内的 topK 距离值(已排序)group_by_values_:group by 的值(如果启用)topk_per_nq_prefix_sum_:每个查询的 topK 前缀和
关键点:这些结果是局部最优的,只代表该 segment 内的 topK,不代表全局最优。
3. Reduce 的作用:K-Way Merge 算法
Reduce 操作使用多路归并(K-Way Merge)算法,从多个已排序的结果中选择全局最优的 topK。
算法原理
Reduce 算法的核心思想类似于归并排序的归并阶段:
- 多路指针:为每个 segment 的结果维护一个指针(offset)
- 比较选择:每次比较所有 segment 当前指针位置的值
- 选择最优:选择当前最大的 score,移动对应 segment 的指针
- 去重处理:如果遇到重复的 entity ID,跳过
- 重复直到 topK:重复上述过程直到选出全局 topK 个结果
代码实现
1 | func (scr *SearchCommonReduce) ReduceSearchResultData( |
SelectSearchResultData 函数
1 | func SelectSearchResultData( |
4. 示例说明
假设有 3 个 segments,每个返回 topK=10,用户请求全局 topK=10:
Segment 结果(已排序)
1 | Segment 1: [0.95, 0.90, 0.85, 0.80, 0.75, ...] (10个结果) |
Reduce 过程
| 步骤 | Segment 1 指针 | Segment 2 指针 | Segment 3 指针 | 选择 | 全局结果 |
|---|---|---|---|---|---|
| 1 | 0 (0.95) | 0 (0.92) | 0 (0.89) | Seg1 | [0.95] |
| 2 | 1 (0.90) | 0 (0.92) | 0 (0.89) | Seg2 | [0.95, 0.92] |
| 3 | 1 (0.90) | 1 (0.88) | 0 (0.89) | Seg1 | [0.95, 0.92, 0.90] |
| 4 | 2 (0.85) | 1 (0.88) | 0 (0.89) | Seg3 | [0.95, 0.92, 0.90, 0.89] |
| 5 | 2 (0.85) | 1 (0.88) | 1 (0.87) | Seg2 | [0.95, 0.92, 0.90, 0.89, 0.88] |
| … | … | … | … | … | … |
| 10 | … | … | … | … | [全局 topK=10] |
结果:最终得到全局最优的 topK=10 个结果。
5. Reduce 的其他重要作用
5.1 去重(Deduplication)
同一个 entity 可能同时出现在多个 segments 中:
- Growing + Sealed:新写入的数据在 growing segment,但可能还未 flush 到 sealed segment
- Compaction:compaction 过程中,数据可能同时存在于新旧 segments
- 多副本:在分布式场景中,可能存在数据副本
Reduce 通过 idSet 确保每个 entity 只出现一次:
1 | idSet := make(map[interface{}]struct{}) |
5.2 Group By 处理
如果有 group by 需求,Reduce 阶段会进行全局的 group by 处理:
1 | func (sbr *SearchGroupByReduce) ReduceSearchResultData(...) { |
5.3 成本聚合
Reduce 还会聚合各个 segment 的存储扫描成本:
1 | storageCost := lo.Reduce(results, func(acc segcore.StorageCost, |
6. 性能考虑
时间复杂度
- Segment Search:O(N × log K),其中 N 是 segment 内的数据量,K 是 topK
- Reduce:O(M × K),其中 M 是 segment 数量,K 是 topK
由于 M(segment 数量)通常远小于 N(数据量),Reduce 的开销相对较小。
优化策略
短路优化:如果只有一个 segment 的结果,直接返回,无需 Reduce
1
2
3if len(results) == 1 {
return results[0], nil
}并行处理:对于多个查询(nq > 1),可以并行处理每个查询的 Reduce
内存限制:检查结果大小,避免 OOM
1
2
3if retSize > maxOutputSize {
return nil, fmt.Errorf("search results exceed the maxOutputSize Limit")
}
7. 总结
为什么需要 Reduce?
- 数据分布:数据分布在多个 segments 中
- 局部 vs 全局:每个 segment 返回局部 topK,用户需要全局 topK
- 去重需求:需要去除跨 segment 的重复 entity
- Group By:需要全局的 group by 处理
- 成本聚合:需要聚合各个 segment 的成本信息
关键要点
- ✅ 每个 segment 的
Search已经完成了 MVCC、topK、group by 等操作 - ✅ 但返回的是局部最优结果
- ✅ Reduce 通过 K-Way Merge 算法合并多个 segment 的结果
- ✅ 最终得到全局最优的 topK 结果
- ✅ 同时处理去重、group by、成本聚合等逻辑
类比理解
可以类比为:
- Segment Search:每个班级选出前 10 名
- Reduce:从所有班级的前 10 名中,选出全校前 10 名
即使每个班级已经完成了排名,仍然需要全校级别的 Reduce 操作来选出最终的 topK。