Milvus 数据写入过程中索引同步流程分析

本文档详细分析了 Milvus 中数据写入后如何同步构建索引的完整流程。

1. 概述

Milvus 采用异步索引构建机制:数据写入后不会立即构建索引,而是在 Segment 完成 Flush 操作后,触发索引构建任务。这种设计可以:

  • 提高写入性能:避免每次写入都构建索引
  • 批量优化:对完整 Segment 构建索引更高效
  • 资源管理:通过任务调度器控制索引构建的资源使用

2. 数据写入和 Flush 流程

2.1 数据写入

文件: internal/datanode/ (DataNode 组件)

数据写入流程:

  1. 客户端通过 Proxy 发送 Insert 请求
  2. DataNode 接收数据并写入内存 Buffer
  3. 当满足条件时(如达到大小阈值、时间阈值),触发 Flush 操作

2.2 Flush 完成通知

文件: internal/datacoord/services.go

方法: Server.SaveBinlogPaths() (line 700-738)

当 DataNode 完成 Segment 的 Flush 操作后,会调用 DataCoord 的 SaveBinlogPaths 方法:

1
2
3
4
5
6
7
8
// notify building index and compaction for "flushing/flushed" level one segment
if req.GetFlushed() {
// notify building index
s.flushCh <- req.SegmentID

// notify compaction
s.compactionTrigger.TriggerCompaction(ctx, ...)
}

关键点

  • 只有当 req.GetFlushed() == true 时才会触发索引构建
  • 通过 flushCh channel 发送 SegmentID
  • 同时也会触发 Compaction 任务

2.3 Flush Channel 转发

文件: internal/datacoord/server.go

方法: Server.postFlush() (line 960-976)

flushCh 中的消息会被转发到全局的 getBuildIndexChSingleton() channel:

1
2
3
4
5
6
7
8
9
10
11
if enableSortCompaction() {
select {
case getStatsTaskChSingleton() <- segmentID:
default:
}
} else {
select {
case getBuildIndexChSingleton() <- segmentID:
default:
}
}

关键点

  • 如果启用了排序压缩(Sort Compaction),先发送到统计任务 channel
  • 否则直接发送到索引构建 channel
  • 使用 select 的非阻塞方式,避免阻塞

3. 索引构建任务创建

3.1 Index Inspector 监听

文件: internal/datacoord/index_inspector.go

方法: indexInspector.createIndexForSegmentLoop() (line 87-132)

Index Inspector 是一个后台守护进程,持续监听以下事件:

  1. 定时检查 (ticker.C):

    • 定期检查是否有未构建索引的 Flushed Segment
    • 调用 getUnIndexTaskSegments() 获取需要构建索引的 Segment
  2. Collection 索引通知 (notifyIndexChan):

    • 当 Collection 创建新索引时触发
    • 为所有已 Flush 的 Segment 创建索引任务
  3. Flush 完成通知 (getBuildIndexChSingleton()):

    • 接收新 Flush 完成的 SegmentID
    • 立即为该 Segment 创建索引任务
1
2
3
4
5
6
7
8
9
10
11
case segID := <-getBuildIndexChSingleton():
log.Info("receive new flushed segment", zap.Int64("segmentID", segID))
segment := i.meta.GetSegment(ctx, segID)
if segment == nil {
log.Warn("segment is not exist, no need to build index", zap.Int64("segmentID", segID))
continue
}
if err := i.createIndexesForSegment(ctx, segment); err != nil {
log.Warn("create index for segment fail, wait for retry", zap.Int64("segmentID", segID))
continue
}

3.2 创建索引任务

方法: indexInspector.createIndexesForSegment() (line 148-170)

为 Segment 创建索引任务的主要步骤:

  1. 检查 Segment 状态

    • 如果启用了排序压缩,需要等待 Segment 排序完成
    • L0 级别的 Segment 不构建索引
  2. 获取 Collection 的所有索引

    1
    indexes := i.meta.indexMeta.GetIndexesForCollection(segment.CollectionID, "")
  3. 检查哪些索引还未构建

    1
    2
    3
    4
    5
    6
    7
    indexIDToSegIndexes := i.meta.indexMeta.GetSegmentIndexes(segment.CollectionID, segment.ID)
    for _, index := range indexes {
    if _, ok := indexIDToSegIndexes[index.IndexID]; !ok {
    // 为这个索引创建任务
    i.createIndexForSegment(ctx, segment, index.IndexID)
    }
    }

3.3 创建 SegmentIndex 元数据

方法: indexInspector.createIndexForSegment() (line 172-225)

为每个索引创建 SegmentIndex 元数据并加入调度队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 1. 分配 BuildID
buildID, err := i.allocator.AllocID(context.Background())

// 2. 获取索引参数
indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, indexID)
indexType := GetIndexType(indexParams)

// 3. 计算任务槽位(用于资源调度)
isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType)
segSize := segment.getSegmentSize()
taskSlot := calculateIndexTaskSlot(segSize, isVectorIndex)

// 4. 创建 SegmentIndex 元数据
segIndex := &model.SegmentIndex{
SegmentID: segment.ID,
CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID,
NumRows: segment.NumOfRows,
IndexID: indexID,
BuildID: buildID,
CreatedUTCTime: uint64(time.Now().Unix()),
WriteHandoff: false,
IndexType: indexType,
}

// 5. 保存到元数据存储
if err = i.meta.indexMeta.AddSegmentIndex(ctx, segIndex); err != nil {
return err
}

// 6. 加入调度器队列
i.scheduler.Enqueue(newIndexBuildTask(...))

关键点

  • BuildID 是索引构建任务的唯一标识
  • IndexState 初始为 Unissued
  • 任务槽位(TaskSlot)用于控制并发度,根据 Segment 大小和索引类型计算

4. 索引任务调度和执行

4.1 DataCoord 任务调度

文件: internal/datacoord/task_index.go

方法: indexBuildTask.CreateTaskOnWorker() (line 148-206)

DataCoord 的调度器会选择合适的 DataNode 节点执行索引构建任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 1. 验证任务和 Segment 状态
segIndex, exist := it.meta.indexMeta.GetIndexJob(it.BuildID)
segment := it.meta.GetSegment(ctx, segIndex.SegmentID)

// 2. 检查是否需要构建索引
// - 某些索引类型不需要训练(如 FLAT)
// - 小 Segment 可能不需要索引
if isNoTrainIndex(indexType) || segIndex.NumRows < MinSegmentNumRowsToEnableIndex {
// 标记为已完成(假完成)
it.UpdateStateWithMeta(indexpb.JobState_JobStateFinished, "fake finished")
return
}

// 3. 准备任务请求
req, err := it.prepareJobRequest(ctx, segment, segIndex, indexParams, indexType)

// 4. 选择 DataNode 并发送请求
if err = cluster.CreateIndex(nodeID, req); err != nil {
log.Warn("failed to send job to worker", zap.Error(err))
return
}

// 5. 更新任务状态为 InProgress
it.UpdateStateWithMeta(indexpb.JobState_JobStateInProgress, ...)

4.2 DataNode 接收任务

文件: internal/datanode/index_services.go

方法: DataNode.CreateJob() (line 44-120)

DataNode 接收索引构建请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 1. 创建任务上下文
taskCtx, taskCancel := context.WithCancel(node.ctx)

// 2. 检查任务是否已存在(防止重复)
if oldInfo := node.taskManager.LoadOrStoreIndexTask(...); oldInfo != nil {
return merr.WrapErrIndexDuplicate(...)
}

// 3. 创建存储管理器
cm, err := node.storageFactory.NewChunkManager(node.ctx, req.GetStorageConfig())

// 4. 创建索引构建任务
task := index.NewIndexBuildTask(taskCtx, taskCancel, req, cm, node.taskManager, pluginContext)

// 5. 加入任务队列
if err := node.taskScheduler.TaskQueue.Enqueue(task); err != nil {
return merr.Status(err)
}

4.3 索引构建执行

文件: internal/datanode/index/task_index.go

索引构建任务遵循标准的任务执行模式:PreExecute → Execute → PostExecute

4.3.1 PreExecute 阶段

方法: indexBuildTask.PreExecute() (line 146-221)

准备阶段的主要工作:

  1. 构建数据路径

    1
    2
    3
    4
    5
    6
    if len(it.req.DataPaths) == 0 {
    for _, id := range it.req.GetDataIds() {
    path := metautil.BuildInsertLogPath(...)
    it.req.DataPaths = append(it.req.DataPaths, path)
    }
    }
  2. 解析索引参数

    • 从请求中提取 typeParamsindexParams
    • 处理特殊参数(如 mmap_enabled
  3. 填充字段元数据

    • 如果请求中缺少字段信息,从 Binlog 中解析
  4. 设置索引版本

    1
    2
    it.req.CurrentIndexVersion = getCurrentIndexVersion(...)
    it.req.CurrentScalarIndexVersion = getCurrentScalarIndexVersion(...)

4.3.2 Execute 阶段

方法: indexBuildTask.Execute() (line 223-330)

执行阶段的核心工作:

  1. 准备构建参数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    buildIndexParams := &indexcgopb.BuildIndexInfo{
    ClusterID: it.req.GetClusterID(),
    BuildID: it.req.GetBuildID(),
    CollectionID: it.req.GetCollectionID(),
    SegmentID: it.req.GetSegmentID(),
    NumRows: it.req.GetNumRows(),
    Dim: it.req.GetDim(),
    InsertFiles: it.req.GetDataPaths(),
    FieldSchema: it.req.GetField(),
    IndexParams: mapToKVPairs(it.newIndexParams),
    StorageConfig: storageConfig,
    // ...
    }
  2. 调用 CGO 接口构建索引

    1
    it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexParams)

    这一步会:

    • 从存储中加载 Segment 数据
    • 调用 Knowhere 库构建索引
    • 索引构建完成后序列化
  3. 记录构建指标

    1
    metrics.DataNodeKnowhereBuildIndexLatency.Observe(buildIndexLatency.Seconds())

4.3.3 PostExecute 阶段

方法: indexBuildTask.PostExecute() (line 333-375)

后处理阶段的主要工作:

  1. 上传索引文件到存储

    1
    indexStats, err := it.index.UpLoad()

    上传过程会:

    • 序列化索引数据
    • 分片上传到对象存储(MinIO/S3 等)
    • 返回索引文件路径和大小信息
  2. 清理本地索引数据

    1
    2
    3
    4
    5
    6
    gcIndex := func() {
    if err := it.index.Delete(); err != nil {
    log.Warn("indexBuildTask Execute CIndexDelete failed", zap.Error(err))
    }
    }
    gcIndex() // 早期释放,节省内存
  3. 保存索引元数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    it.manager.StoreIndexFilesAndStatistic(
    it.req.GetClusterID(),
    it.req.GetBuildID(),
    saveFileKeys, // 索引文件路径列表
    serializedSize, // 序列化后的大小
    uint64(indexStats.MemSize), // 内存大小
    it.req.GetCurrentIndexVersion(),
    it.req.GetCurrentScalarIndexVersion(),
    )
  4. 更新任务状态

    • 通过 SetState() 更新任务状态为 Finished
    • 记录完成时间

4.4 索引元数据更新

文件: internal/datacoord/index_meta.go

方法: indexMeta.FinishTask() (line 885-920)

当 DataNode 完成索引构建后,会通知 DataCoord 更新元数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (m *indexMeta) FinishTask(taskInfo *workerpb.IndexTaskInfo) error {
// 1. 获取 SegmentIndex
segIdx, ok := m.segmentBuildInfo.Get(taskInfo.GetBuildID())

// 2. 更新索引状态和文件信息
segIdx.IndexState = taskInfo.GetState()
segIdx.IndexFileKeys = common.CloneStringList(taskInfo.GetIndexFileKeys())
segIdx.IndexSerializedSize = taskInfo.GetSerializedSize()
segIdx.IndexMemSize = taskInfo.GetMemSize()
segIdx.FinishedUTCTime = uint64(time.Now().Unix())

// 3. 持久化到元数据存储
return m.alterSegmentIndexes([]*model.SegmentIndex{segIdx})
}

关键点

  • IndexState 更新为 Finished
  • 保存索引文件路径列表(IndexFileKeys
  • 记录索引大小信息,用于查询时加载

5. 完整流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
数据写入流程

DataNode 接收数据 → 写入内存 Buffer

触发 Flush(达到阈值或手动触发)

DataNode 完成 Flush → 保存 Binlog 到存储

调用 DataCoord.SaveBinlogPaths()

if req.GetFlushed() == true:
s.flushCh <- segmentID

DataCoord.postFlush() 转发

getBuildIndexChSingleton() <- segmentID

Index Inspector 监听

indexInspector.createIndexesForSegment()
├─ 获取 Collection 的所有索引
├─ 检查哪些索引未构建
└─ 为每个索引创建 SegmentIndex
├─ 分配 BuildID
├─ 创建元数据(状态:Unissued)
└─ 加入调度器队列

DataCoord 调度器选择 DataNode

发送 CreateIndex 请求到 DataNode

DataNode 接收请求
├─ 创建索引构建任务
└─ 加入任务队列

任务执行(PreExecute → Execute → PostExecute)
├─ PreExecute: 准备参数和路径
├─ Execute: 调用 Knowhere 构建索引
└─ PostExecute: 上传索引文件
├─ 上传到对象存储
├─ 保存文件路径到元数据
└─ 通知 DataCoord 更新状态

DataCoord 更新索引元数据
├─ IndexState: Finished
├─ IndexFileKeys: [文件路径列表]
└─ 持久化到元数据存储

索引构建完成,可用于查询

6. 关键组件说明

6.1 Index Inspector

位置: internal/datacoord/index_inspector.go

  • 职责:监听 Flush 完成事件,创建索引构建任务
  • 触发方式
    • 定时检查(默认间隔由 TaskCheckInterval 配置)
    • Flush 完成通知(通过 channel)
    • Collection 索引创建通知

6.2 Index Scheduler

位置: internal/datacoord/task/

  • 职责:调度索引构建任务到合适的 DataNode
  • 调度策略
    • 根据任务槽位(TaskSlot)控制并发度
    • 选择负载较低的 DataNode
    • 支持任务重试和失败处理

6.3 Index Task Manager

位置: internal/datanode/index/

  • 职责:管理 DataNode 上的索引构建任务
  • 功能
    • 任务队列管理
    • 任务状态跟踪
    • 资源清理

6.4 Index Meta

位置: internal/datacoord/index_meta.go

  • 职责:管理索引元数据
  • 存储内容
    • SegmentIndex 信息(BuildID, SegmentID, IndexID)
    • 索引状态(Unissued, InProgress, Finished, Failed)
    • 索引文件路径和大小

7. 关键配置参数

  • DataCoordCfg.TaskCheckInterval: Index Inspector 定时检查间隔(默认 1 秒)
  • DataCoordCfg.MinSegmentNumRowsToEnableIndex: 启用索引的最小行数阈值
  • KnowhereConfig.Enable: 是否启用 Knowhere 索引库
  • 索引参数:index_type, metric_type, nlist, nprobe

8. 索引状态流转

1
2
3
Unissued → InProgress → Finished

Failed (可重试)
  • Unissued: 任务已创建,等待调度
  • InProgress: 任务正在执行
  • Finished: 索引构建完成
  • Failed: 构建失败,可重试

9. 性能优化点

  1. 异步构建:索引构建不影响数据写入性能
  2. 批量处理:对完整 Segment 构建索引,效率更高
  3. 资源控制:通过 TaskSlot 控制并发度,避免资源耗尽
  4. 早期释放:索引构建完成后立即释放内存
  5. 分片上传:大索引文件分片上传,提高可靠性

10. 错误处理

  • Segment 不存在:忽略该任务
  • 索引构建失败:任务状态标记为 Failed,可重试
  • 上传失败:清理本地数据,标记失败
  • 元数据更新失败:记录日志,等待重试

11. 监控指标

  • DataNodeBuildIndexTaskCounter: 索引构建任务计数
  • DataNodeKnowhereBuildIndexLatency: 索引构建延迟
  • DataNodeEncodeIndexFileLatency: 索引文件编码和上传延迟
  • DataNodeSaveIndexFileLatency: 保存索引文件元数据延迟
  • DataCoordStoredIndexFilesSize: 存储的索引文件总大小

12. 注意事项

  1. 索引构建是异步的:数据写入后不会立即有索引,需要等待 Flush 和索引构建完成
  2. 小 Segment 可能不构建索引:如果 Segment 行数小于阈值,可能跳过索引构建
  3. L0 Segment 不构建索引:L0 级别的 Segment 用于实时查询,不构建持久化索引
  4. 排序压缩模式:如果启用了排序压缩,需要等待 Segment 排序完成后再构建索引