Milvus ShardDelegator.Query 方法详细分析

本文档深入分析 shardDelegator.Query 方法的实现细节,这是 QueryNode 中处理查询请求的核心方法。

方法签名

1
func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest) ([]*internalpb.RetrieveResults, error)

位置: internal/querynodev2/delegator/delegator.go:577

方法职责

shardDelegator.Query 负责在 shard 级别执行查询操作,它协调多个 segment 的查询,处理时间戳同步、segment 管理、任务分发和结果收集。

执行流程详解

1. 生命周期管理 (Lines 579-582)

1
2
3
4
if err := sd.lifetime.Add(sd.IsWorking); err != nil {
return nil, err
}
defer sd.lifetime.Done()
  • 目的: 确保 delegator 处于工作状态
  • 机制: 使用 lifetime 管理器跟踪组件状态
  • 作用: 防止在 delegator 关闭或不可用时执行查询

2. Channel 验证 (Lines 584-589)

1
2
3
4
if !funcutil.SliceContain(req.GetDmlChannels(), sd.vchannelName) {
log.Warn("delegator received query request not belongs to it", ...)
return nil, fmt.Errorf("dml channel not match, ...")
}
  • 目的: 验证请求的 channel 是否属于当前 delegator
  • 原因: 每个 delegator 只负责一个特定的 virtual channel
  • 错误处理: 如果不匹配,立即返回错误

3. 优化 Guarantee Timestamp (Lines 591-597)

1
2
3
4
5
6
7
req.Req.GuaranteeTimestamp = sd.speedupGuranteeTS(
ctx,
req.Req.GetConsistencyLevel(),
req.Req.GetGuaranteeTimestamp(),
req.Req.GetMvccTimestamp(),
req.Req.GetIsIterator(),
)

speedupGuranteeTS 方法 (line 918):

1
2
3
4
5
6
7
8
9
10
11
12
func (sd *shardDelegator) speedupGuranteeTS(...) uint64 {
// 如果不是 Strong 一致性或已设置 mvccTS,直接返回原值
if isIterator || cl != commonpb.ConsistencyLevel_Strong || mvccTS != 0 {
return guaranteeTS
}
// 使用 WAL 的 MVCC timestamp 来加速 Strong 一致性查询
if mvcc, err := streaming.WAL().Local().GetLatestMVCCTimestampIfLocal(ctx, sd.vchannelName);
err == nil && mvcc < guaranteeTS {
return mvcc
}
return guaranteeTS
}
  • 优化原理: 对于 Strong 一致性查询,如果 WAL 的 MVCC timestamp 小于 guarantee timestamp,可以使用更小的值来加速查询
  • 适用场景:
    • 一致性级别为 Strong
    • 不是 iterator 模式
    • 未设置 mvccTS
  • 效果: 减少等待时间,提高查询性能

4. 等待 tSafe (Lines 599-621)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
partialResultRequiredDataRatio := paramtable.Get().QueryNodeCfg.PartialResultRequiredDataRatio.GetAsFloat()
waitTr := timerecord.NewTimeRecorder("wait tSafe")
var tSafe uint64
var err error
if partialResultRequiredDataRatio >= 1.0 {
// 需要完整结果,必须等待 tSafe
tSafe, err = sd.waitTSafe(ctx, req.Req.GuaranteeTimestamp)
if err != nil {
return nil, err
}
if req.GetReq().GetMvccTimestamp() == 0 {
req.Req.MvccTimestamp = tSafe
}
} else {
// 允许部分结果,使用当前 tSafe 即可
if req.GetReq().GetMvccTimestamp() == 0 {
req.Req.MvccTimestamp = sd.GetTSafe()
}
}

waitTSafe 方法 (line 939):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (sd *shardDelegator) waitTSafe(ctx context.Context, ts uint64) (uint64, error) {
// 如果已经满足条件,直接返回
latestTSafe := sd.latestTsafe.Load()
if latestTSafe >= ts {
return latestTSafe, nil
}

// 检查是否启用降级模式
if paramtable.Get().QueryNodeCfg.DowngradeTsafe.GetAsBool() {
return latestTSafe, nil
}

// 检查时间戳延迟是否过大
lag := gt.Sub(st)
maxLag := paramtable.Get().QueryNodeCfg.MaxTimestampLag.GetAsDuration(time.Second)
if lag > maxLag {
return 0, WrapErrTsLagTooLarge(lag, maxLag)
}

// 等待 tSafe 更新
ch := make(chan struct{})
go func() {
sd.tsCond.L.Lock()
defer sd.tsCond.L.Unlock()
for sd.latestTsafe.Load() < ts && ctx.Err() == nil && sd.Serviceable() {
sd.tsCond.Wait()
}
close(ch)
}()

// 监听超时或完成信号
select {
case <-ctx.Done():
sd.tsCond.Broadcast()
return 0, ctx.Err()
case <-ch:
return sd.latestTsafe.Load(), nil
}
}

关键点:

  • tSafe (Time Safe): 表示数据已经安全可读的时间戳
  • 等待机制: 使用条件变量 (tsCond) 等待 tSafe 更新
  • 部分结果模式:
    • partialResultRequiredDataRatio >= 1.0: 必须等待完整数据,保证强一致性
    • partialResultRequiredDataRatio < 1.0: 允许部分结果,使用当前 tSafe,降低延迟
  • 超时处理: 检查时间戳延迟,如果超过 MaxTimestampLag 则返回错误
  • 降级模式: 如果启用 DowngradeTsafe,直接返回当前 tSafe,不等待

5. Pin Readable Segments (Lines 623-628)

1
2
3
4
5
6
7
sealed, growing, sealedRowCount, version, err := sd.distribution.PinReadableSegments(
partialResultRequiredDataRatio,
req.GetReq().GetPartitionIDs()...)
if err != nil {
return nil, err
}
defer sd.distribution.Unpin(version)

PinReadableSegments 方法 (distribution.go:162):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (d *distribution) PinReadableSegments(requiredLoadRatio float64, partitions ...int64) 
(sealed []SnapshotItem, growing []SegmentEntry, sealedRowCount map[int64]int64, version int64, err error) {

requireFullResult := requiredLoadRatio >= 1.0
loadRatioSatisfy := d.queryView.GetLoadedRatio() >= requiredLoadRatio

var isServiceable bool
if requireFullResult {
isServiceable = d.queryView.Serviceable()
} else {
isServiceable = loadRatioSatisfy
}

if !isServiceable {
return nil, nil, nil, -1, merr.WrapErrChannelNotAvailable(...)
}

// 验证 partition 是否已加载
for _, partition := range partitions {
if !current.partitions.Contain(partition) {
return nil, nil, nil, -1, merr.WrapErrPartitionNotLoaded(partition)
}
}

// 获取 segments
sealed, growing = current.Get(partitions...)
version = current.version

// 根据加载比例过滤 segments
if d.queryView.GetLoadedRatio() == 1.0 {
// 完全加载:使用 target version 过滤
targetVersion := current.GetTargetVersion()
filterReadable := d.readableFilter(targetVersion)
sealed, growing = d.filterSegments(sealed, growing, filterReadable)
} else {
// 部分加载:只返回已加载的 segments
sealed = lo.Map(sealed, func(item SnapshotItem, _ int) SnapshotItem {
return SnapshotItem{
NodeID: item.NodeID,
Segments: lo.Filter(item.Segments, func(entry SegmentEntry, _ int) bool {
return d.queryView.sealedSegmentRowCount[entry.SegmentID] > 0
}),
}
})
// 类似处理 growing segments
}

// Pin snapshot,防止被卸载
snapshot, _ := d.snapshots.GetOrInsert(version, func() *snapshot {
return current.Clone()
})
snapshot.AddRef()

return sealed, growing, sealedRowCount, version, nil
}

关键概念:

  • Pin 操作: 增加 snapshot 的引用计数,防止 segment 在查询过程中被卸载
  • Snapshot: 某个时间点的 segment 分布快照
  • Sealed Segments: 已封存的 segments,数据不再变化
  • Growing Segments: 正在增长的 segments,数据可能还在变化
  • Serviceable: 检查 channel 是否可服务(所有必需的 segments 都已加载)
  • Load Ratio: 已加载的 segments 比例,用于部分结果模式

6. 处理 IgnoreGrowing 标志 (Lines 630-632)

1
2
3
if req.Req.IgnoreGrowing {
growing = []SegmentEntry{}
}
  • 目的: 如果请求指定忽略 growing segments,则清空 growing 列表
  • 场景: 某些查询只需要查询历史数据,不需要最新的增量数据

7. Segment 剪枝 (Lines 634-640)

1
2
3
4
5
6
7
8
9
if paramtable.Get().QueryNodeCfg.EnableSegmentPrune.GetAsBool() {
func() {
sd.partitionStatsMut.RLock()
defer sd.partitionStatsMut.RUnlock()
PruneSegments(ctx, sd.partitionStats, nil, req.GetReq(),
sd.collection.Schema(), sealed,
PruneInfo{paramtable.Get().QueryNodeCfg.DefaultSegmentFilterRatio.GetAsFloat()})
}()
}
  • 目的: 根据统计信息剪枝不相关的 segments
  • 原理: 使用 partition 统计信息(min/max 值)判断 segment 是否可能包含查询结果
  • 效果: 减少不必要的 segment 查询,提高性能
  • 线程安全: 使用读锁保护 partitionStats

8. 组织子任务 (Lines 648-652)

1
2
3
4
tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, true, sd.modifyQueryRequest)
if err != nil {
return nil, err
}

organizeSubTask 方法 (line 751):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func organizeSubTask[T any](ctx context.Context,
req T,
sealed []SnapshotItem,
growing []SegmentEntry,
sd *shardDelegator,
skipEmpty bool,
modify func(T, querypb.DataScope, []int64, int64) T,
) ([]subTask[T], error) {

result := make([]subTask[T], 0, len(sealed)+1)

packSubTask := func(segments []SegmentEntry, workerID int64, scope querypb.DataScope) error {
segmentIDs := lo.Map(segments, func(item SegmentEntry, _ int) int64 {
return item.SegmentID
})
if skipEmpty && len(segmentIDs) == 0 {
return nil
}

// 修改请求,设置 scope、segmentIDs、targetID
req := modify(req, scope, segmentIDs, workerID)

// 获取 worker
worker, err := sd.workerManager.GetWorker(ctx, workerID)
if err != nil {
log.Warn("failed to get worker for sub task", ...)
}

result = append(result, subTask[T]{
req: req,
targetID: workerID,
worker: worker,
})
return nil
}

// 为每个 sealed snapshot 创建任务
for _, entry := range sealed {
packSubTask(entry.Segments, entry.NodeID, querypb.DataScope_Historical)
}

// 为 growing segments 创建任务(本地节点)
packSubTask(growing, paramtable.GetNodeID(), querypb.DataScope_Streaming)

return result, nil
}

modifyQueryRequest 方法 (line 292):

1
2
3
4
5
6
7
8
9
func (sd *shardDelegator) modifyQueryRequest(req *querypb.QueryRequest, 
scope querypb.DataScope, segmentIDs []int64, targetID int64) *querypb.QueryRequest {
nodeReq := proto.Clone(req).(*querypb.QueryRequest)
nodeReq.Scope = scope // Historical 或 Streaming
nodeReq.Req.Base.TargetID = targetID // 目标节点 ID
nodeReq.SegmentIDs = segmentIDs // 要查询的 segment IDs
nodeReq.DmlChannels = []string{sd.vchannelName}
return nodeReq
}

关键点:

  • 任务分组: 将 segments 按节点分组,每个节点一个任务
  • Scope 区分:
    • DataScope_Historical: sealed segments
    • DataScope_Streaming: growing segments
  • Worker 管理: 通过 workerManager 获取 worker 客户端
  • 容错: 如果 worker 获取失败,任务仍然创建(用于部分结果模式)

9. 执行子任务 (Lines 654-661)

1
2
3
4
5
6
7
8
9
10
11
results, err := executeSubTasks(ctx, tasks, 
NewRowCountBasedEvaluator(sealedRowCount),
func(ctx context.Context, req *querypb.QueryRequest, worker cluster.Worker) (*internalpb.RetrieveResults, error) {
resp, err := worker.QuerySegments(ctx, req)
status, ok := status.FromError(err)
if ok && status.Code() == codes.Unavailable {
sd.markSegmentOffline(req.GetSegmentIDs()...)
}
return resp, err
},
"Query", log)

executeSubTasks 方法 (line 802):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
func executeSubTasks[T any, R interface{ GetStatus() *commonpb.Status }](
ctx context.Context,
tasks []subTask[T],
evaluator PartialResultEvaluator,
execute func(context.Context, T, cluster.Worker) (R, error),
taskType string,
log *log.MLogger,
) ([]R, error) {

ctx, cancel := context.WithCancel(ctx)
defer cancel()

// 确定部分结果要求
var partialResultRequiredDataRatio float64
if taskType == "Query" || taskType == "Search" {
partialResultRequiredDataRatio = paramtable.Get().QueryNodeCfg.PartialResultRequiredDataRatio.GetAsFloat()
} else {
partialResultRequiredDataRatio = 1.0
}

// 并发执行所有任务
wg, ctx := errgroup.WithContext(ctx)
resultCh := make(chan channelResult, len(tasks))

for _, task := range tasks {
task := task // capture loop variable
wg.Go(func() error {
var result R
var err error

if task.targetID == -1 || task.worker == nil {
// Worker 不可用
err = fmt.Errorf("segments not loaded in any worker: %v", ...)
} else {
// 执行任务
result, err = execute(ctx, task.req, task.worker)
if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
err = fmt.Errorf("worker(%d) query failed: %s", task.targetID, result.GetStatus().GetReason())
}
}

if err != nil {
log.Warn("failed to execute sub task", ...)
// 如果禁用部分结果,立即失败
if partialResultRequiredDataRatio == 1 {
return err
}
}

// 发送结果到 channel
resultCh <- channelResult{
nodeID: task.targetID,
result: result,
err: err,
segments: req.GetSegmentIDs(),
}
return nil
})
}

// 等待所有任务完成
if err := wg.Wait(); err != nil {
return nil, err
}
close(resultCh)

// 收集结果
successSegmentList := typeutil.NewSet[int64]()
failureSegmentList := make([]int64, 0)
var errors []error
results := make([]R, 0, len(tasks))

for item := range resultCh {
if item.err == nil {
successSegmentList.Insert(item.segments...)
results = append(results, item.result)
} else {
failureSegmentList = append(failureSegmentList, item.segments...)
errors = append(errors, item.err)
}
}

// 如果全部成功,直接返回
if len(errors) == 0 {
return results, nil
}

// 使用 evaluator 判断是否返回部分结果
if evaluator != nil {
shouldReturnPartial, accessedDataRatio := evaluator(
taskType, successSegmentList, failureSegmentList, errors)
if shouldReturnPartial {
log.Info("partial result executed successfully",
zap.Float64("accessedDataRatio", accessedDataRatio),
zap.Int64s("failureSegmentList", failureSegmentList),
)
return results, nil
}
}

return nil, merr.Combine(errors...)
}

关键机制:

  • 并发执行: 使用 errgroup 并发执行所有子任务
  • 结果收集: 使用 buffered channel 收集结果
  • 错误处理:
    • 如果 partialResultRequiredDataRatio == 1.0,任何失败都会导致整体失败
    • 否则,使用 evaluator 评估是否可以返回部分结果
  • Segment 标记: 如果 worker 不可用,标记 segment 为 offline
  • Partial Result Evaluator: 根据成功访问的数据比例决定是否返回部分结果

RowCountBasedEvaluator:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type RowCountBasedEvaluator struct {
sealedRowCount map[int64]int64
}

func (e *RowCountBasedEvaluator) Evaluate(
taskType string,
successSegments typeutil.Set[int64],
failureSegments []int64,
errors []error,
) (shouldReturn bool, accessedDataRatio float64) {
// 计算成功访问的行数
successRows := 0
for segID := range successSegments {
successRows += e.sealedRowCount[segID]
}

// 计算总行数
totalRows := 0
for _, count := range e.sealedRowCount {
totalRows += count
}

accessedDataRatio = float64(successRows) / float64(totalRows)

// 判断是否满足部分结果要求
requiredRatio := paramtable.Get().QueryNodeCfg.PartialResultRequiredDataRatio.GetAsFloat()
shouldReturn = accessedDataRatio >= requiredRatio

return shouldReturn, accessedDataRatio
}

10. 返回结果 (Lines 667-685)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
log.Debug("Delegator Query done")
if log.Core().Enabled(zap.DebugLevel) {
// 记录查询的 segment IDs
sealedIDs := lo.FlatMap(sealed, func(item SnapshotItem, _ int) []int64 {
return lo.Map(item.Segments, func(segment SegmentEntry, _ int) int64 {
return segment.SegmentID
})
})
growingIDs := lo.Map(growing, func(item SegmentEntry, _ int) int64 {
return item.SegmentID
})
log.Debug("execute count on segments...",
zap.Int64s("sealedIDs", sealedIDs),
zap.Int64s("growingIDs", growingIDs),
)
}

return results, nil
  • 结果: 返回所有成功的查询结果列表
  • 日志: 在 debug 模式下记录查询的 segment IDs

关键数据结构

subTask

1
2
3
4
5
type subTask[T any] struct {
req T // 查询请求
targetID int64 // 目标节点 ID
worker cluster.Worker // Worker 客户端
}

SnapshotItem

1
2
3
4
type SnapshotItem struct {
NodeID int64 // 节点 ID
Segments []SegmentEntry // Segment 列表
}

SegmentEntry

1
2
3
4
5
6
7
8
type SegmentEntry struct {
SegmentID int64
NodeID int64
PartitionID int64
Version int64
Offline bool
// ...
}

性能优化点

  1. Guarantee Timestamp 优化: 使用 WAL MVCC timestamp 加速 Strong 一致性查询
  2. 部分结果模式: 允许在数据未完全加载时返回部分结果,降低延迟
  3. Segment 剪枝: 根据统计信息跳过不相关的 segments
  4. 并发执行: 多个 segment 查询并发执行
  5. Pin 机制: 防止查询过程中 segment 被卸载

错误处理

  1. Delegator 不可用: 生命周期检查失败
  2. Channel 不匹配: 请求的 channel 不属于当前 delegator
  3. tSafe 等待超时: 时间戳延迟过大
  4. Distribution 不可服务: Segments 未完全加载且不允许部分结果
  5. Worker 不可用: 节点离线,根据部分结果策略决定是否失败
  6. 查询失败: Segment 查询失败,根据 evaluator 决定是否返回部分结果

配置参数

  • QueryNodeCfg.PartialResultRequiredDataRatio: 部分结果所需数据比例(默认 1.0)
  • QueryNodeCfg.EnableSegmentPrune: 是否启用 segment 剪枝
  • QueryNodeCfg.DefaultSegmentFilterRatio: 默认 segment 过滤比例
  • QueryNodeCfg.DowngradeTsafe: 是否降级 tSafe(不等待)
  • QueryNodeCfg.MaxTimestampLag: 最大时间戳延迟

调用链

1
2
3
4
5
6
7
8
9
10
11
12
13
QueryNode.Query()

QueryNode.queryChannel()

shardDelegator.Query() ← 本文档分析的方法

organizeSubTask()

executeSubTasks()

worker.QuerySegments()

实际查询 segment 数据

总结

shardDelegator.Query 是一个复杂的协调方法,它:

  1. 管理时间戳: 优化 guarantee timestamp,等待 tSafe
  2. 管理 Segments: Pin segments,防止卸载
  3. 优化查询: Segment 剪枝,跳过不相关的数据
  4. 分发任务: 将查询任务分发到不同的节点
  5. 并发执行: 并发查询多个 segments
  6. 容错处理: 支持部分结果,提高可用性

该方法的设计充分考虑了性能、一致性和可用性的平衡,是 Milvus 查询系统的核心组件之一。