Milvus 数据写入过程中索引同步流程分析
本文档详细分析了 Milvus 中数据写入后如何同步构建索引的完整流程。
1. 概述
Milvus 采用异步索引构建机制:数据写入后不会立即构建索引,而是在 Segment 完成 Flush 操作后,触发索引构建任务。这种设计可以:
- 提高写入性能:避免每次写入都构建索引
- 批量优化:对完整 Segment 构建索引更高效
- 资源管理:通过任务调度器控制索引构建的资源使用
2. 数据写入和 Flush 流程
2.1 数据写入
文件: internal/datanode/ (DataNode 组件)
数据写入流程:
- 客户端通过 Proxy 发送 Insert 请求
- DataNode 接收数据并写入内存 Buffer
- 当满足条件时(如达到大小阈值、时间阈值),触发 Flush 操作
2.2 Flush 完成通知
文件: internal/datacoord/services.go
方法: Server.SaveBinlogPaths() (line 700-738)
当 DataNode 完成 Segment 的 Flush 操作后,会调用 DataCoord 的 SaveBinlogPaths 方法:
1 | // notify building index and compaction for "flushing/flushed" level one segment |
关键点:
- 只有当
req.GetFlushed() == true时才会触发索引构建 - 通过
flushChchannel 发送 SegmentID - 同时也会触发 Compaction 任务
2.3 Flush Channel 转发
文件: internal/datacoord/server.go
方法: Server.postFlush() (line 960-976)
flushCh 中的消息会被转发到全局的 getBuildIndexChSingleton() channel:
1 | if enableSortCompaction() { |
关键点:
- 如果启用了排序压缩(Sort Compaction),先发送到统计任务 channel
- 否则直接发送到索引构建 channel
- 使用
select的非阻塞方式,避免阻塞
3. 索引构建任务创建
3.1 Index Inspector 监听
文件: internal/datacoord/index_inspector.go
方法: indexInspector.createIndexForSegmentLoop() (line 87-132)
Index Inspector 是一个后台守护进程,持续监听以下事件:
定时检查 (ticker.C):
- 定期检查是否有未构建索引的 Flushed Segment
- 调用
getUnIndexTaskSegments()获取需要构建索引的 Segment
Collection 索引通知 (notifyIndexChan):
- 当 Collection 创建新索引时触发
- 为所有已 Flush 的 Segment 创建索引任务
Flush 完成通知 (getBuildIndexChSingleton()):
- 接收新 Flush 完成的 SegmentID
- 立即为该 Segment 创建索引任务
1 | case segID := <-getBuildIndexChSingleton(): |
3.2 创建索引任务
方法: indexInspector.createIndexesForSegment() (line 148-170)
为 Segment 创建索引任务的主要步骤:
检查 Segment 状态:
- 如果启用了排序压缩,需要等待 Segment 排序完成
- L0 级别的 Segment 不构建索引
获取 Collection 的所有索引:
1
indexes := i.meta.indexMeta.GetIndexesForCollection(segment.CollectionID, "")
检查哪些索引还未构建:
1
2
3
4
5
6
7indexIDToSegIndexes := i.meta.indexMeta.GetSegmentIndexes(segment.CollectionID, segment.ID)
for _, index := range indexes {
if _, ok := indexIDToSegIndexes[index.IndexID]; !ok {
// 为这个索引创建任务
i.createIndexForSegment(ctx, segment, index.IndexID)
}
}
3.3 创建 SegmentIndex 元数据
方法: indexInspector.createIndexForSegment() (line 172-225)
为每个索引创建 SegmentIndex 元数据并加入调度队列:
1 | // 1. 分配 BuildID |
关键点:
BuildID是索引构建任务的唯一标识IndexState初始为Unissued- 任务槽位(TaskSlot)用于控制并发度,根据 Segment 大小和索引类型计算
4. 索引任务调度和执行
4.1 DataCoord 任务调度
文件: internal/datacoord/task_index.go
方法: indexBuildTask.CreateTaskOnWorker() (line 148-206)
DataCoord 的调度器会选择合适的 DataNode 节点执行索引构建任务:
1 | // 1. 验证任务和 Segment 状态 |
4.2 DataNode 接收任务
文件: internal/datanode/index_services.go
方法: DataNode.CreateJob() (line 44-120)
DataNode 接收索引构建请求:
1 | // 1. 创建任务上下文 |
4.3 索引构建执行
文件: internal/datanode/index/task_index.go
索引构建任务遵循标准的任务执行模式:PreExecute → Execute → PostExecute
4.3.1 PreExecute 阶段
方法: indexBuildTask.PreExecute() (line 146-221)
准备阶段的主要工作:
构建数据路径:
1
2
3
4
5
6if len(it.req.DataPaths) == 0 {
for _, id := range it.req.GetDataIds() {
path := metautil.BuildInsertLogPath(...)
it.req.DataPaths = append(it.req.DataPaths, path)
}
}解析索引参数:
- 从请求中提取
typeParams和indexParams - 处理特殊参数(如
mmap_enabled)
- 从请求中提取
填充字段元数据:
- 如果请求中缺少字段信息,从 Binlog 中解析
设置索引版本:
1
2it.req.CurrentIndexVersion = getCurrentIndexVersion(...)
it.req.CurrentScalarIndexVersion = getCurrentScalarIndexVersion(...)
4.3.2 Execute 阶段
方法: indexBuildTask.Execute() (line 223-330)
执行阶段的核心工作:
准备构建参数:
1
2
3
4
5
6
7
8
9
10
11
12
13buildIndexParams := &indexcgopb.BuildIndexInfo{
ClusterID: it.req.GetClusterID(),
BuildID: it.req.GetBuildID(),
CollectionID: it.req.GetCollectionID(),
SegmentID: it.req.GetSegmentID(),
NumRows: it.req.GetNumRows(),
Dim: it.req.GetDim(),
InsertFiles: it.req.GetDataPaths(),
FieldSchema: it.req.GetField(),
IndexParams: mapToKVPairs(it.newIndexParams),
StorageConfig: storageConfig,
// ...
}调用 CGO 接口构建索引:
1
it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexParams)
这一步会:
- 从存储中加载 Segment 数据
- 调用 Knowhere 库构建索引
- 索引构建完成后序列化
记录构建指标:
1
metrics.DataNodeKnowhereBuildIndexLatency.Observe(buildIndexLatency.Seconds())
4.3.3 PostExecute 阶段
方法: indexBuildTask.PostExecute() (line 333-375)
后处理阶段的主要工作:
上传索引文件到存储:
1
indexStats, err := it.index.UpLoad()
上传过程会:
- 序列化索引数据
- 分片上传到对象存储(MinIO/S3 等)
- 返回索引文件路径和大小信息
清理本地索引数据:
1
2
3
4
5
6gcIndex := func() {
if err := it.index.Delete(); err != nil {
log.Warn("indexBuildTask Execute CIndexDelete failed", zap.Error(err))
}
}
gcIndex() // 早期释放,节省内存保存索引元数据:
1
2
3
4
5
6
7
8
9it.manager.StoreIndexFilesAndStatistic(
it.req.GetClusterID(),
it.req.GetBuildID(),
saveFileKeys, // 索引文件路径列表
serializedSize, // 序列化后的大小
uint64(indexStats.MemSize), // 内存大小
it.req.GetCurrentIndexVersion(),
it.req.GetCurrentScalarIndexVersion(),
)更新任务状态:
- 通过
SetState()更新任务状态为Finished - 记录完成时间
- 通过
4.4 索引元数据更新
文件: internal/datacoord/index_meta.go
方法: indexMeta.FinishTask() (line 885-920)
当 DataNode 完成索引构建后,会通知 DataCoord 更新元数据:
1 | func (m *indexMeta) FinishTask(taskInfo *workerpb.IndexTaskInfo) error { |
关键点:
IndexState更新为Finished- 保存索引文件路径列表(
IndexFileKeys) - 记录索引大小信息,用于查询时加载
5. 完整流程图
1 | 数据写入流程 |
6. 关键组件说明
6.1 Index Inspector
位置: internal/datacoord/index_inspector.go
- 职责:监听 Flush 完成事件,创建索引构建任务
- 触发方式:
- 定时检查(默认间隔由
TaskCheckInterval配置) - Flush 完成通知(通过 channel)
- Collection 索引创建通知
- 定时检查(默认间隔由
6.2 Index Scheduler
位置: internal/datacoord/task/
- 职责:调度索引构建任务到合适的 DataNode
- 调度策略:
- 根据任务槽位(TaskSlot)控制并发度
- 选择负载较低的 DataNode
- 支持任务重试和失败处理
6.3 Index Task Manager
位置: internal/datanode/index/
- 职责:管理 DataNode 上的索引构建任务
- 功能:
- 任务队列管理
- 任务状态跟踪
- 资源清理
6.4 Index Meta
位置: internal/datacoord/index_meta.go
- 职责:管理索引元数据
- 存储内容:
- SegmentIndex 信息(BuildID, SegmentID, IndexID)
- 索引状态(Unissued, InProgress, Finished, Failed)
- 索引文件路径和大小
7. 关键配置参数
DataCoordCfg.TaskCheckInterval: Index Inspector 定时检查间隔(默认 1 秒)DataCoordCfg.MinSegmentNumRowsToEnableIndex: 启用索引的最小行数阈值KnowhereConfig.Enable: 是否启用 Knowhere 索引库- 索引参数:
index_type,metric_type,nlist,nprobe等
8. 索引状态流转
1 | Unissued → InProgress → Finished |
- Unissued: 任务已创建,等待调度
- InProgress: 任务正在执行
- Finished: 索引构建完成
- Failed: 构建失败,可重试
9. 性能优化点
- 异步构建:索引构建不影响数据写入性能
- 批量处理:对完整 Segment 构建索引,效率更高
- 资源控制:通过 TaskSlot 控制并发度,避免资源耗尽
- 早期释放:索引构建完成后立即释放内存
- 分片上传:大索引文件分片上传,提高可靠性
10. 错误处理
- Segment 不存在:忽略该任务
- 索引构建失败:任务状态标记为 Failed,可重试
- 上传失败:清理本地数据,标记失败
- 元数据更新失败:记录日志,等待重试
11. 监控指标
DataNodeBuildIndexTaskCounter: 索引构建任务计数DataNodeKnowhereBuildIndexLatency: 索引构建延迟DataNodeEncodeIndexFileLatency: 索引文件编码和上传延迟DataNodeSaveIndexFileLatency: 保存索引文件元数据延迟DataCoordStoredIndexFilesSize: 存储的索引文件总大小
12. 注意事项
- 索引构建是异步的:数据写入后不会立即有索引,需要等待 Flush 和索引构建完成
- 小 Segment 可能不构建索引:如果 Segment 行数小于阈值,可能跳过索引构建
- L0 Segment 不构建索引:L0 级别的 Segment 用于实时查询,不构建持久化索引
- 排序压缩模式:如果启用了排序压缩,需要等待 Segment 排序完成后再构建索引