Milvus 为什么需要对 Search Result 进行 Reduce

概述

在 Milvus 的搜索流程中,虽然每个 segment 在 segment->Search 中已经完成了 MVCC 过滤、topK 选择、group by 等操作,但仍然需要对多个 segment 的搜索结果进行 Reduce(归约)操作。本文档详细解释为什么需要 Reduce 以及 Reduce 的具体作用。

核心原因:数据分布与全局 TopK

核心问题:一个查询请求通常涉及多个 segments,每个 segment 只返回自己数据范围内的 topK 结果,但用户需要的是全局最优的 topK

1. 多个 Segments 的场景

在 Milvus 中,一个 collection 的数据可能分布在:

  • 多个 Sealed Segments:历史数据被分割成多个 sealed segments
  • 多个 Growing Segments:实时写入的数据存储在 growing segments 中
  • 多个 Shards:在分布式场景中,数据分布在多个 shards 上
  • 多个 QueryNodes:在分布式部署中,不同的 segments 可能位于不同的 QueryNode 上

示例场景

假设一个 collection 有 1000 万条数据,被分割成:

  • Segment 1: 300 万条(sealed)
  • Segment 2: 300 万条(sealed)
  • Segment 3: 300 万条(sealed)
  • Segment 4: 100 万条(growing)

当用户请求 topK=10 时,每个 segment 都会返回自己的 topK=10,但我们需要的是从这 4000 万条数据中选出全局最优的 topK=10。

2. 每个 Segment 返回的是局部 TopK

Segment Search 的执行流程

每个 segment 在执行 segment->Search 时:

1
2
3
4
5
6
7
8
9
10
11
12
13
std::unique_ptr<SearchResult>
SegmentInternalInterface::Search(
const query::Plan* plan,
const query::PlaceholderGroup* placeholder_group,
Timestamp timestamp,
const folly::CancellationToken& cancel_token,
int32_t consistency_level,
Timestamp collection_ttl) const {
// ... 执行搜索计划节点树
// 包括:MVCC 过滤、向量搜索、topK 选择、group by 等
*results = visitor.get_moved_result(*plan->plan_node_);
return results;
}

Segment 返回的结果结构

每个 segment 的 SearchResult 包含:

  • seg_offsets_:该 segment 内的 topK 偏移量(已排序)
  • distances_:该 segment 内的 topK 距离值(已排序)
  • group_by_values_:group by 的值(如果启用)
  • topk_per_nq_prefix_sum_:每个查询的 topK 前缀和

关键点:这些结果是局部最优的,只代表该 segment 内的 topK,不代表全局最优。

3. Reduce 的作用:K-Way Merge 算法

Reduce 操作使用多路归并(K-Way Merge)算法,从多个已排序的结果中选择全局最优的 topK。

算法原理

Reduce 算法的核心思想类似于归并排序的归并阶段:

  1. 多路指针:为每个 segment 的结果维护一个指针(offset)
  2. 比较选择:每次比较所有 segment 当前指针位置的值
  3. 选择最优:选择当前最大的 score,移动对应 segment 的指针
  4. 去重处理:如果遇到重复的 entity ID,跳过
  5. 重复直到 topK:重复上述过程直到选出全局 topK 个结果

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (scr *SearchCommonReduce) ReduceSearchResultData(
ctx context.Context,
searchResultData []*schemapb.SearchResultData,
info *reduce.ResultInfo) (*schemapb.SearchResultData, error) {

// 为每个 segment 的结果维护偏移量
offsets := make([]int64, len(searchResultData))
idSet := make(map[interface{}]struct{}) // 用于去重

for i := int64(0); i < info.GetNq(); i++ {
var j int64
for j = 0; j < info.GetTopK(); {
// 从所有 segment 中选择当前最高分
sel := SelectSearchResultData(searchResultData, resultOffsets, offsets, i)
if sel == -1 {
break
}

idx := resultOffsets[sel][i] + offsets[sel]
id := typeutil.GetPK(searchResultData[sel].GetIds(), idx)
score := searchResultData[sel].Scores[idx]

// 去重:跳过已存在的 entity
if _, ok := idSet[id]; !ok {
// 添加到最终结果
retSize += typeutil.AppendFieldData(ret.FieldsData,
searchResultData[sel].FieldsData, idx)
typeutil.AppendPKs(ret.Ids, id)
ret.Scores = append(ret.Scores, score)
idSet[id] = struct{}{}
j++
} else {
skipDupCnt++ // 跳过重复的 entity
}
offsets[sel]++ // 移动指针
}
ret.Topks = append(ret.Topks, j)
}
return ret, nil
}

SelectSearchResultData 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func SelectSearchResultData(
dataArray []*schemapb.SearchResultData,
resultOffsets [][]int64,
offsets []int64,
qi int64) int {

var sel = -1
var maxDistance = -float32(math.MaxFloat32)

// 遍历所有 segment,找到当前最高分
for i, offset := range offsets {
if offset >= dataArray[i].Topks[qi] {
continue // 该 segment 的结果已用完
}

idx := resultOffsets[i][qi] + offset
distance := dataArray[i].Scores[idx]

if distance > maxDistance {
sel = i
maxDistance = distance
}
}
return sel
}

4. 示例说明

假设有 3 个 segments,每个返回 topK=10,用户请求全局 topK=10:

Segment 结果(已排序)

1
2
3
Segment 1: [0.95, 0.90, 0.85, 0.80, 0.75, ...] (10个结果)
Segment 2: [0.92, 0.88, 0.82, 0.78, 0.73, ...] (10个结果)
Segment 3: [0.89, 0.87, 0.80, 0.76, 0.71, ...] (10个结果)

Reduce 过程

步骤 Segment 1 指针 Segment 2 指针 Segment 3 指针 选择 全局结果
1 0 (0.95) 0 (0.92) 0 (0.89) Seg1 [0.95]
2 1 (0.90) 0 (0.92) 0 (0.89) Seg2 [0.95, 0.92]
3 1 (0.90) 1 (0.88) 0 (0.89) Seg1 [0.95, 0.92, 0.90]
4 2 (0.85) 1 (0.88) 0 (0.89) Seg3 [0.95, 0.92, 0.90, 0.89]
5 2 (0.85) 1 (0.88) 1 (0.87) Seg2 [0.95, 0.92, 0.90, 0.89, 0.88]
10 [全局 topK=10]

结果:最终得到全局最优的 topK=10 个结果。

5. Reduce 的其他重要作用

5.1 去重(Deduplication)

同一个 entity 可能同时出现在多个 segments 中:

  • Growing + Sealed:新写入的数据在 growing segment,但可能还未 flush 到 sealed segment
  • Compaction:compaction 过程中,数据可能同时存在于新旧 segments
  • 多副本:在分布式场景中,可能存在数据副本

Reduce 通过 idSet 确保每个 entity 只出现一次:

1
2
3
4
5
6
7
idSet := make(map[interface{}]struct{})
if _, ok := idSet[id]; !ok {
// 添加到结果
idSet[id] = struct{}{}
} else {
skipDupCnt++ // 跳过重复
}

5.2 Group By 处理

如果有 group by 需求,Reduce 阶段会进行全局的 group by 处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (sbr *SearchGroupByReduce) ReduceSearchResultData(...) {
// 1. 按 group by 值分组
groupByValueMap := make(map[interface{}]int64)

// 2. 为每个 group 选择 topK
for j = 0; j < groupBound; {
// 选择当前最高分
sel := SelectSearchResultData(...)
groupByVal := groupByValIterator[sel](idx)

// 3. 检查 group 限制
if groupCount >= groupSize {
// 跳过:该 group 已满
} else {
// 添加到该 group
groupByValueMap[groupByVal]++
}
}
}

5.3 成本聚合

Reduce 还会聚合各个 segment 的存储扫描成本:

1
2
3
4
5
6
storageCost := lo.Reduce(results, func(acc segcore.StorageCost, 
result *internalpb.SearchResults, _ int) segcore.StorageCost {
acc.ScannedRemoteBytes += result.GetScannedRemoteBytes()
acc.ScannedTotalBytes += result.GetScannedTotalBytes()
return acc
}, segcore.StorageCost{})

6. 性能考虑

时间复杂度

  • Segment Search:O(N × log K),其中 N 是 segment 内的数据量,K 是 topK
  • Reduce:O(M × K),其中 M 是 segment 数量,K 是 topK

由于 M(segment 数量)通常远小于 N(数据量),Reduce 的开销相对较小。

优化策略

  1. 短路优化:如果只有一个 segment 的结果,直接返回,无需 Reduce

    1
    2
    3
    if len(results) == 1 {
    return results[0], nil
    }
  2. 并行处理:对于多个查询(nq > 1),可以并行处理每个查询的 Reduce

  3. 内存限制:检查结果大小,避免 OOM

    1
    2
    3
    if retSize > maxOutputSize {
    return nil, fmt.Errorf("search results exceed the maxOutputSize Limit")
    }

7. 总结

为什么需要 Reduce?

  1. 数据分布:数据分布在多个 segments 中
  2. 局部 vs 全局:每个 segment 返回局部 topK,用户需要全局 topK
  3. 去重需求:需要去除跨 segment 的重复 entity
  4. Group By:需要全局的 group by 处理
  5. 成本聚合:需要聚合各个 segment 的成本信息

关键要点

  • ✅ 每个 segment 的 Search 已经完成了 MVCC、topK、group by 等操作
  • ✅ 但返回的是局部最优结果
  • ✅ Reduce 通过 K-Way Merge 算法合并多个 segment 的结果
  • ✅ 最终得到全局最优的 topK 结果
  • ✅ 同时处理去重、group by、成本聚合等逻辑

类比理解

可以类比为:

  • Segment Search:每个班级选出前 10 名
  • Reduce:从所有班级的前 10 名中,选出全校前 10 名

即使每个班级已经完成了排名,仍然需要全校级别的 Reduce 操作来选出最终的 topK。

参考资料