Milvus StorageV2 写入流程分析

概述

StorageV2 是 Milvus 的新一代存储格式,使用 Apache Arrow + Parquet 技术栈,相比 StorageV1(Binlog 格式)具有更高的压缩率和查询性能。本文档详细分析 StorageV2 的完整写入流程。

一、整体架构

1.1 数据流向

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
用户 Insert/Delete 请求

WriteBuffer.BufferData() // 缓存数据到内存

triggerSync() // 触发同步

getSyncTask() // 创建 SyncTask
├─ yieldBuffer() // 从 buffer 获取数据
└─ NewSyncTask() // 创建同步任务

SyncTask.Run()

BulkPackWriterV2.Write() // StorageV2 写入
├─ serializeBinlog() // InsertData → Arrow Record
├─ writeInserts() // Arrow Record → Parquet
├─ writeStats() // 写入统计信息
├─ writeDelta() // 写入删除数据
└─ writeBM25Stasts() // 写入 BM25 统计

PackedRecordWriter.Write() // 写入 Parquet 文件

对象存储 (MinIO/S3/Local)

1.2 关键组件

组件 职责 位置
WriteBuffer 内存缓冲区,缓存写入数据 internal/flushcommon/writebuffer/
SyncTask 同步任务,负责将数据写入存储 internal/flushcommon/syncmgr/task.go
BulkPackWriterV2 StorageV2 写入器 internal/flushcommon/syncmgr/pack_writer_v2.go
PackedRecordWriter Arrow Record → Parquet 转换器 internal/storage/record_writer.go
PackedWriter 底层 Parquet 写入器(C++ FFI) internal/storagev2/packed/

二、详细流程分析

2.1 数据准备阶段(WriteBuffer → SyncPack)

文件: internal/flushcommon/writebuffer/write_buffer.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (syncmgr.Task, error) {
// 1. 从 WriteBuffer 中提取数据
insert, bm25, delta, schema, timeRange, startPos := wb.yieldBuffer(segmentID)

// 2. 构建 SyncPack
pack := &syncmgr.SyncPack{}
pack.WithInsertData(insert). // InsertData 数组
WithDeleteData(delta). // DeleteData
WithBM25Stats(bm25). // BM25 统计
WithCollectionID(wb.collectionID).
WithPartitionID(segmentInfo.PartitionID()).
WithSegmentID(segmentID).
WithTimeRange(tsFrom, tsTo). // Timestamp 范围
WithBatchRows(batchSize)

// 3. 创建 SyncTask
task := syncmgr.NewSyncTask(...)
return task, nil
}

关键数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// InsertData 包含所有字段的数据
type InsertData struct {
Data map[FieldID]FieldData // map[fieldID]FieldData
RowNum() int
GetMemorySize() int64
}

// SyncPack 包含一次同步的所有数据
type SyncPack struct {
insertData []*storage.InsertData // 插入数据
deltaData *storage.DeleteData // 删除数据
bm25Stats map[int64]*storage.BM25Stats
tsFrom, tsTo typeutil.Timestamp // Timestamp 范围
collectionID, partitionID, segmentID int64
// ...
}

2.2 SyncTask 执行阶段

文件: internal/flushcommon/syncmgr/task.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (t *SyncTask) Run(ctx context.Context) (err error) {
// 1. 获取段信息和列组配置
segmentInfo, has := t.metacache.GetSegmentByID(t.segmentID)
columnGroups := t.getColumnGroups(segmentInfo)

// 2. 根据 StorageVersion 选择写入器
switch segmentInfo.GetStorageVersion() {
case storage.StorageV2:
writer := NewBulkPackWriterV2(
t.metacache, t.schema, t.chunkManager, t.allocator,
0, packed.DefaultMultiPartUploadSize,
t.storageConfig, columnGroups, t.writeRetryOpts...)

// 3. 执行写入
t.insertBinlogs, t.deltaBinlog, t.statsBinlogs,
t.bm25Binlogs, t.manifestPath, t.flushedSize, err =
writer.Write(ctx, t.pack)
}

return err
}

2.3 BulkPackWriterV2 写入阶段

文件: internal/flushcommon/syncmgr/pack_writer_v2.go

2.3.1 Write 方法主流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (bw *BulkPackWriterV2) Write(ctx context.Context, pack *SyncPack) (
inserts map[int64]*datapb.FieldBinlog,
deltas *datapb.FieldBinlog,
stats map[int64]*datapb.FieldBinlog,
bm25Stats map[int64]*datapb.FieldBinlog,
manifest string,
size int64,
err error,
) {
// 1. 预分配 ID
err = bw.prefetchIDs(pack)

// 2. 写入插入数据(核心步骤)
if inserts, manifest, err = bw.writeInserts(ctx, pack); err != nil {
return
}

// 3. 写入统计信息
if stats, err = bw.writeStats(ctx, pack); err != nil {
return
}

// 4. 写入删除数据
if deltas, err = bw.writeDelta(ctx, pack); err != nil {
return
}

// 5. 写入 BM25 统计
if bm25Stats, err = bw.writeBM25Stasts(ctx, pack); err != nil {
return
}

size = bw.sizeWritten
return
}

2.3.2 serializeBinlog:InsertData → Arrow Record

关键步骤:将内存中的 InsertData 转换为 Arrow Record

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (bw *BulkPackWriterV2) serializeBinlog(_ context.Context, pack *SyncPack) (storage.Record, error) {
if len(pack.insertData) == 0 {
return nil, nil
}

// 1. 将 Milvus Schema 转换为 Arrow Schema
arrowSchema, err := storage.ConvertToArrowSchema(bw.schema, true)
if err != nil {
return nil, err
}

// 2. 创建 Arrow RecordBuilder
builder := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema)
defer builder.Release()

// 3. 遍历所有 InsertData chunk,构建 Arrow Record
for _, chunk := range pack.insertData {
if err := storage.BuildRecord(builder, chunk, bw.schema); err != nil {
return nil, err
}
}

// 4. 创建 Arrow Record
rec := builder.NewRecord()

// 5. 构建 FieldID 到列索引的映射
allFields := typeutil.GetAllFieldSchemas(bw.schema)
field2Col := make(map[storage.FieldID]int, len(allFields))
for c, field := range allFields {
field2Col[field.FieldID] = c
}

// 6. 返回 SimpleArrowRecord(包含 FieldID 映射)
return storage.NewSimpleArrowRecord(rec, field2Col), nil
}

BuildRecord 详细过程internal/storage/serde.go):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func BuildRecord(b *array.RecordBuilder, data *InsertData, schema *schemapb.CollectionSchema) error {
idx := 0

serializeField := func(field *schemapb.FieldSchema) error {
fBuilder := b.Field(idx) // 获取对应字段的 Builder
idx++

// 获取字段数据
fieldData, exists := data.Data[field.FieldID]
if !exists {
return merr.WrapErrFieldNotFound(field.FieldID, ...)
}

// 根据字段类型获取序列化函数
typeEntry := serdeMap[field.DataType]

// 逐行序列化数据到 Arrow Builder
for j := 0; j < fieldData.RowNum(); j++ {
ok = typeEntry.serialize(fBuilder, fieldData.GetRow(j), elementType)
if !ok {
return merr.WrapErrServiceInternal(...)
}
}
return nil
}

// 序列化所有字段(包括 timestamp field)
for _, field := range schema.GetFields() {
if err := serializeField(field); err != nil {
return err
}
}

// 序列化结构体数组字段
for _, structField := range schema.GetStructArrayFields() {
for _, field := range structField.GetFields() {
if err := serializeField(field); err != nil {
return err
}
}
}

return nil
}

类型映射示例

Milvus 类型 Arrow 类型 说明
DataType_Int64 arrow.Int64Type 64 位整数
DataType_FloatVector arrow.FixedSizeBinaryType{ByteWidth: dim * 4} 固定大小二进制(向量)
DataType_Timestamp arrow.Int64Type Timestamp 作为 Int64 存储
DataType_JSON arrow.BinaryType JSON 作为二进制存储

2.3.3 writeInserts:Arrow Record → Parquet 文件

核心流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (
map[int64]*datapb.FieldBinlog, string, error) {

// 1. 序列化为 Arrow Record
rec, err := bw.serializeBinlog(ctx, pack)
if err != nil {
return nil, "", err
}

// 2. 提取 Timestamp 列并计算范围
tsArray := rec.Column(common.TimeStampField).(*array.Int64)
rows := rec.Len()
var tsFrom uint64 = math.MaxUint64
var tsTo uint64 = 0
for i := 0; i < rows; i++ {
ts := typeutil.Timestamp(tsArray.Value(i))
if ts < tsFrom {
tsFrom = ts
}
if ts > tsTo {
tsTo = ts
}
}

// 3. 准备写入函数
doWrite := func(w storage.RecordWriter) error {
if err = w.Write(rec); err != nil { // 写入 Arrow Record
return err
}
return w.Close() // 关闭并刷新到磁盘
}

// 4. 根据配置选择写入模式
if paramtable.Get().CommonCfg.UseLoonFFI.GetAsBool() {
// Manifest 模式:使用 FFI 写入器
basePath := path.Join(bw.getRootPath(), common.SegmentInsertLogPath, k)
w, err := storage.NewPackedRecordManifestWriter(
bucketName, basePath, bw.schema,
bw.bufferSize, bw.multiPartUploadSize,
columnGroups, bw.storageConfig, pluginContextPtr)
if err = doWrite(w); err != nil {
return nil, "", err
}
manifestPath = w.GetWrittenManifest()
} else {
// 普通模式:每个列组一个文件
paths := make([]string, 0)
for _, columnGroup := range columnGroups {
path := metautil.BuildInsertLogPath(
bw.getRootPath(), pack.collectionID,
pack.partitionID, pack.segmentID,
columnGroup.GroupID, bw.nextID())
paths = append(paths, path)
}

w, err := storage.NewPackedRecordWriter(
bucketName, paths, bw.schema,
bw.bufferSize, bw.multiPartUploadSize,
columnGroups, bw.storageConfig, pluginContextPtr)
if err = doWrite(w); err != nil {
return nil, "", err
}
}

// 5. 构建 Binlog 元数据
logs := make(map[int64]*datapb.FieldBinlog)
for _, columnGroup := range columnGroups {
logs[columnGroupID] = &datapb.FieldBinlog{
FieldID: columnGroupID,
ChildFields: columnGroup.Fields,
Binlogs: []*datapb.Binlog{
{
LogSize: int64(w.GetColumnGroupWrittenCompressed(columnGroup.GroupID)),
MemorySize: int64(w.GetColumnGroupWrittenUncompressed(columnGroup.GroupID)),
LogPath: w.GetWrittenPaths(columnGroupID),
EntriesNum: w.GetWrittenRowNum(),
TimestampFrom: tsFrom,
TimestampTo: tsTo,
},
},
}
}

return logs, manifestPath, nil
}

2.4 PackedRecordWriter:Arrow → Parquet 转换

文件: internal/storage/record_writer.go

2.4.1 Write 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (pw *packedRecordWriter) Write(r Record) error {
// 1. 将 Record 转换为 Arrow Record
var rec arrow.Record
if sar, ok := r.(*simpleArrowRecord); ok {
rec = sar.r // 直接使用 Arrow Record
} else {
// 从 Record 接口构建 Arrow Record
allFields := typeutil.GetAllFieldSchemas(pw.schema)
arrays := make([]arrow.Array, len(allFields))
for i, field := range allFields {
arrays[i] = r.Column(field.FieldID)
}
rec = array.NewRecord(pw.arrowSchema, arrays, int64(r.Len()))
}

// 2. 统计写入大小(按列组)
pw.rowNum += int64(r.Len())
for col, arr := range rec.Columns() {
size := calculateActualDataSize(arr)
pw.writtenUncompressed += size
for _, columnGroup := range pw.columnGroups {
if lo.Contains(columnGroup.Columns, col) {
pw.columnGroupUncompressed[columnGroup.GroupID] += size
break
}
}
}

// 3. 调用底层 PackedWriter 写入
defer rec.Release()
return pw.writer.WriteRecordBatch(rec) // 写入 Arrow RecordBatch
}

2.4.2 PackedWriter(C++ FFI)

文件: internal/storagev2/packed/packed_writer.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (pw *PackedWriter) WriteRecordBatch(recordBatch arrow.Record) error {
// 1. 导出 Arrow Array 和 Schema 到 C 结构
cArrays := make([]CArrowArray, recordBatch.NumCols())
cSchemas := make([]CArrowSchema, recordBatch.NumCols())

for i := range recordBatch.NumCols() {
var caa cdata.CArrowArray
var cas cdata.CArrowSchema
cdata.ExportArrowArray(recordBatch.Column(int(i)), &caa, &cas)
cArrays[i] = *(*CArrowArray)(unsafe.Pointer(&caa))
cSchemas[i] = *(*CArrowSchema)(unsafe.Pointer(&cas))
}

// 2. 导出 Arrow Schema
var cas cdata.CArrowSchema
cdata.ExportArrowSchema(recordBatch.Schema(), &cas)
cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas))

// 3. 调用 C++ 函数写入 Parquet
status := C.WriteRecordBatch(pw.cPackedWriter, &cArrays[0], &cSchemas[0], cSchema)
if err := ConsumeCStatusIntoError(&status); err != nil {
return err
}

return nil
}

底层 C++ 实现internal/core/src/segcore/packed_writer_c.cpp):

1
2
3
4
5
6
7
8
9
10
11
// C++ 端接收 Arrow RecordBatch,写入 Parquet 文件
Status WriteRecordBatch(PackedWriter* writer,
ArrowArray* arrays,
ArrowSchema* schemas,
ArrowSchema* schema) {
// 1. 导入 Arrow 数据到 C++ Arrow 对象
auto record_batch = ImportRecordBatch(arrays, schemas, schema);

// 2. 调用 PackedRecordBatchWriter 写入 Parquet
return writer->packed_writer_->Write(record_batch);
}

三、列组(Column Group)机制

3.1 列组概念

列组是 StorageV2 的重要优化机制,将相关字段组织在一起,减少文件数量,提高查询效率。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 列组配置示例
columnGroups := []storagecommon.ColumnGroup{
{
GroupID: 100, // 主键组
Fields: []int64{0}, // 主键字段
Columns: []int{0}, // Arrow 列索引
},
{
GroupID: 101, // 向量组
Fields: []int64{100}, // 向量字段
Columns: []int{1}, // Arrow 列索引
},
{
GroupID: 102, // 标量字段组
Fields: []int64{1, 2, 3}, // 多个标量字段
Columns: []int{2, 3, 4}, // Arrow 列索引
},
}

3.2 列组的作用

  1. 减少文件数量:多个字段可以写入同一个 Parquet 文件
  2. 优化查询:相关字段在一起,减少 I/O
  3. 支持列剪枝:查询时只读取需要的列组

3.3 文件路径结构

普通模式

1
2
3
4
5
{rootPath}/{collectionID}/{partitionID}/{segmentID}/
├── insert_log/
│ ├── {columnGroupID1}/{logID}.parquet
│ ├── {columnGroupID2}/{logID}.parquet
│ └── {columnGroupID3}/{logID}.parquet

Manifest 模式

1
2
3
4
5
6
7
{rootPath}/{collectionID}/{partitionID}/{segmentID}/
├── insert_log/
│ ├── {columnGroupID1}/
│ │ └── {logID}.parquet
│ ├── {columnGroupID2}/
│ │ └── {logID}.parquet
│ └── manifest.json # 统一元数据清单

四、关键优化点

4.1 零拷贝转换

  • Arrow Record 在内存中直接使用,无需序列化
  • Go → C++ 通过 FFI 传递 Arrow C Data Interface,零拷贝
  • Arrow → Parquet 使用 Arrow 的 Parquet 写入器,高效转换

4.2 批量写入

  • 使用 RecordBatch 批量写入,减少 I/O 次数
  • 支持缓冲写入,提高吞吐量

4.3 压缩优化

  • Parquet 使用列式压缩,压缩率高
  • 支持多种压缩算法(Zstd, Snappy, Gzip 等)
  • 在 Milvus 中默认使用 Zstd 压缩

4.4 元数据管理

  • Timestamp 范围:在写入时计算并记录
  • 统计信息:记录压缩前后大小、行数等
  • Manifest:统一管理文件元数据(Manifest 模式)

五、与 StorageV1 的对比

特性 StorageV1 (Binlog) StorageV2 (Parquet)
文件格式 Protobuf + 自定义编码 Apache Parquet
文件组织 每个字段一个文件 按列组组织,多个字段一个文件
压缩率 中等 高(3-10倍)
查询性能 需要读取多个文件 列式访问,更高效
元数据 每个文件独立元数据 统一 Manifest 管理
Timestamp 存储 独立 binlog 文件 Arrow Record 的一列
类型支持 基础类型 支持复杂类型(JSON、嵌套结构等)

六、总结

6.1 核心流程

  1. 数据准备:WriteBuffer → SyncPack
  2. 数据转换:InsertData → Arrow Record(serializeBinlog
  3. 数据写入:Arrow Record → Parquet 文件(writeInserts
  4. 底层存储:通过 C++ FFI 调用 Parquet 写入器

6.2 关键技术

  • Apache Arrow:内存中的列式数据格式
  • Apache Parquet:持久化的列式存储格式
  • 列组机制:优化文件组织和查询性能
  • 零拷贝转换:Arrow → Parquet 高效转换
  • FFI 集成:Go 和 C++ 无缝协作

6.3 优势

  1. 高压缩率:节省 50-80% 存储空间
  2. 高性能:列式访问减少 60-90% I/O
  3. 灵活性:支持复杂数据类型和 Schema Evolution
  4. 兼容性:与其他大数据工具无缝集成

StorageV2 通过 Arrow + Parquet 技术栈,为 Milvus 提供了高效、灵活、可扩展的存储解决方案。