本文档详细介绍 Milvus 中 MVCC timestamp 信息在数据写入 Segment 过程中的处理机制,包括正常 Flush 流程和 Compaction 流程。
目录
1. 概述 1.1 核心结论 是的,MVCC timestamp 信息会完整写入到 Segment 中。
Timestamp 作为系统预留字段(FieldID=1),在数据写入过程中会像其他用户字段一样被序列化并持久化到存储中。主要有两个写入路径:
Flush 流程 (主要路径):正常数据写入时的持久化
Compaction 流程 :Segment 合并优化时的重新写入
1.2 Timestamp 的用途
MVCC 版本控制 :支持时间旅行查询(Time Travel Query)
数据可见性判断 :根据 timestamp 过滤不可见的数据
数据过期处理 :基于 TTL 和 timestamp 清理过期数据
查询优化 :通过 timestamp 范围快速过滤 Segment
2. Timestamp 字段定义 2.1 系统预留字段 1 2 3 4 5 6 7 8 9 10 11 12 const ( TimeStampField = 1 RowIDField = 0 TimeStampFieldName = "Timestamp" )
2.2 数据类型 Timestamp 是 int64 类型,在 C++ 和 Go 代码中都被定义为 uint64:
1 2 3 4 using Timestamp = uint64_t ;constexpr auto MAX_TIMESTAMP = std::numeric_limits<Timestamp>::max ();
3. Flush 写入流程(主流程) 3.1 整体流程图 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 用户 Insert/Delete 请求 ↓ WriteBuffer.BufferData() // 缓存数据到内存 ↓ triggerSync() // 根据策略触发同步 ↓ getSegmentsToSync() // 选择需要 sync 的 segments ↓ syncSegments() ↓ getSyncTask() // 创建 SyncTask ├─ yieldBuffer() // 从 buffer 获取数据 └─ NewSyncTask() // 创建同步任务 ↓ SyncManager.SyncData() ↓ SyncTask.Run() ↓ ├─ [StorageV1] BulkPackWriter.Write() │ ├─ writeInserts() │ │ └─ storageV1Serializer.serializeBinlog() │ │ └─ InsertCodec.Serialize() ★ 序列化所有字段(包括 timestamp) │ │ └─ 为每个 field 创建独立的 binlog 文件 │ │ │ ├─ writeStats() ★ 写入统计信息(含 timestamp 范围) │ └─ writeBM25Stats() │ └─ [StorageV2] BulkPackWriterV2.Write() ├─ writeInserts() │ ├─ serializeBinlog() // 转换为 Arrow Record │ ├─ 提取 timestamp 列计算范围 │ └─ PackedRecordWriter.Write() ★ 写入 Parquet 格式 │ └─ writeStats()
3.2 关键代码位置 3.2.1 WriteBuffer 创建 SyncTask 文件 : internal/flushcommon/writebuffer/write_buffer.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64 ) (syncmgr.Task, error ) { segmentInfo, ok := wb.metaCache.GetSegmentByID(segmentID) if !ok { return nil , merr.WrapErrSegmentNotFound(segmentID) } var batchSize int64 var tsFrom, tsTo uint64 insert, bm25, delta, schema, timeRange, startPos := wb.yieldBuffer(segmentID) if timeRange != nil { tsFrom, tsTo = timeRange.timestampMin, timeRange.timestampMax } for _, chunk := range insert { batchSize += int64 (chunk.GetRowNum()) } pack := &syncmgr.SyncPack{} pack.WithInsertData(insert). WithDeleteData(delta). WithCollectionID(wb.collectionID). WithPartitionID(segmentInfo.PartitionID()). WithSegmentID(segmentID). WithTimeRange(tsFrom, tsTo). WithBatchRows(batchSize) task := syncmgr.NewSyncTask(). WithMetaCache(wb.metaCache). WithSchema(schema). WithSyncPack(pack) return task, nil }
3.2.2 SyncTask 执行写入 文件 : internal/flushcommon/syncmgr/task.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (t *SyncTask) Run(ctx context.Context) (err error ) { segmentInfo, has := t.metacache.GetSegmentByID(t.segmentID) if !has { return nil } columnGroups := t.getColumnGroups(segmentInfo) switch segmentInfo.GetStorageVersion() { case storage.StorageV2: writer := NewBulkPackWriterV2(t.metacache, t.schema, t.chunkManager, t.allocator, 0 , packed.DefaultMultiPartUploadSize, t.storageConfig, columnGroups, t.writeRetryOpts...) t.insertBinlogs, t.deltaBinlog, t.statsBinlogs, t.bm25Binlogs, t.manifestPath, t.flushedSize, err = writer.Write(ctx, t.pack) default : writer := NewBulkPackWriter(t.metacache, t.schema, t.chunkManager, t.allocator, t.writeRetryOpts...) t.insertBinlogs, t.deltaBinlog, t.statsBinlogs, t.bm25Binlogs, t.flushedSize, err = writer.Write(ctx, t.pack) } return err }
3.2.3 StorageV1 序列化 Binlog 文件 : internal/flushcommon/syncmgr/storage_serializer.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func (s *storageV1Serializer) serializeBinlog(ctx context.Context, pack *SyncPack) (map [int64 ]*storage.Blob, error ) { if len (pack.insertData) == 0 { return make (map [int64 ]*storage.Blob), nil } blobs, err := s.inCodec.Serialize(pack.partitionID, pack.segmentID, pack.insertData...) if err != nil { return nil , err } result := make (map [int64 ]*storage.Blob) for _, blob := range blobs { fieldID, err := strconv.ParseInt(blob.GetKey(), 10 , 64 ) if err != nil { return nil , err } result[fieldID] = blob } return result, nil }
文件 : internal/storage/data_codec.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data ...*InsertData) ([]*Blob, error ) { blobs := make ([]*Blob, 0 ) var rowNum int64 var startTs, endTs Timestamp startTs, endTs = math.MaxUint64, 0 for _, block := range data { timeFieldData, ok := block.Data[common.TimeStampField] if !ok { return nil , errors.New("data doesn't contains timestamp field" ) } rowNum += int64 (timeFieldData.RowNum()) ts := timeFieldData.(*Int64FieldData).Data for _, t := range ts { if uint64 (t) > endTs { endTs = uint64 (t) } if uint64 (t) < startTs { startTs = uint64 (t) } } } serializeField := func (field *schemapb.FieldSchema) error { writer = NewInsertBinlogWriter(field.DataType, insertCodec.Schema.ID, partitionID, segmentID, field.FieldID, field.GetNullable()) eventWriter, err := writer.NextInsertEventWriter() if err != nil { return err } eventWriter.SetEventTimestamp(startTs, endTs) for _, block := range data { singleData := block.Data[field.FieldID] if err = AddFieldDataToPayload(eventWriter, field.DataType, singleData); err != nil { return err } } buffer, err := writer.GetBuffer() blobs = append (blobs, &Blob{ Key: fmt.Sprintf("%d" , field.FieldID), Value: buffer, RowNum: rowNum, }) return nil } for _, field := range insertCodec.Schema.Schema.Fields { if err := serializeField(field); err != nil { return nil , err } } return blobs, nil }
3.2.4 StorageV2 写入 Parquet 文件 : internal/flushcommon/syncmgr/pack_writer_v2.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (map [int64 ]*datapb.FieldBinlog, string , error ) { if len (pack.insertData) == 0 { return make (map [int64 ]*datapb.FieldBinlog), "" , nil } rec, err := bw.serializeBinlog(ctx, pack) if err != nil { return nil , "" , err } logs := make (map [int64 ]*datapb.FieldBinlog) tsArray := rec.Column(common.TimeStampField).(*array.Int64) rows := rec.Len() var tsFrom uint64 = math.MaxUint64 var tsTo uint64 = 0 for i := 0 ; i < rows; i++ { ts := typeutil.Timestamp(tsArray.Value(i)) if ts < tsFrom { tsFrom = ts } if ts > tsTo { tsTo = ts } } doWrite := func (w storage.RecordWriter) error { if err = w.Write(rec); err != nil { return err } return w.Close() } if paramtable.Get().CommonCfg.UseLoonFFI.GetAsBool() { w, err := storage.NewPackedRecordManifestWriter(...) if err = doWrite(w); err != nil { return nil , "" , err } manifestPath = w.GetWrittenManifest() } else { w, err := storage.NewPackedRecordWriter(...) if err = doWrite(w); err != nil { return nil , "" , err } } for _, columnGroup := range columnGroups { logs[columnGroup.GroupID] = &datapb.FieldBinlog{ FieldID: columnGroup.GroupID, ChildFields: columnGroup.Fields, Binlogs: []*datapb.Binlog{ { LogPath: path, EntriesNum: rowNum, TimestampFrom: tsFrom, TimestampTo: tsTo, }, }, } } return logs, manifestPath, nil } func (bw *BulkPackWriterV2) serializeBinlog(_ context.Context, pack *SyncPack) (storage.Record, error ) { arrowSchema, err := storage.ConvertToArrowSchema(bw.schema, true ) if err != nil { return nil , err } builder := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema) defer builder.Release() for _, chunk := range pack.insertData { if err := storage.BuildRecord(builder, chunk, bw.schema); err != nil { return nil , err } } rec := builder.NewRecord() return storage.NewSimpleArrowRecord(rec, field2Col), nil }
4. Compaction 写入流程 Compaction 是对已有 Segment 的合并和优化,也会重新写入 timestamp 信息。
4.1 整体流程图 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 Compaction Task 创建 ↓ mixCompactionTask.Compact() ↓ mixCompactionTask.mergeSplit() ↓ NewMultiSegmentWriter() // 创建多 segment 写入器 ↓ 对每个源 Segment: ├─ storage.NewBinlogRecordReader() // 读取原始数据 └─ mixCompactionTask.writeSegment() └─ [循环读取 Record] ├─ reader.Next() // 读取 Arrow Record(含 timestamp) ├─ 过滤已删除/过期数据 └─ mWriter.Write(rec) ★ 写入合并后的数据 └─ MultiSegmentWriter.Write() └─ BinlogValueWriter.Write() └─ BinlogRecordWriter.Write() ├─ [V1] CompositeBinlogRecordWriter.Write() │ ├─ 提取 timestamp 范围 │ └─ 序列化为 binlog │ └─ [V2] PackedBinlogRecordWriter.Write() ├─ 提取 timestamp 范围 └─ 写入 Parquet
4.2 关键代码位置 4.2.1 Mix Compaction 主流程 文件 : internal/datanode/compactor/mix_compactor.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 func (t *mixCompactionTask) mergeSplit(ctx context.Context) ([]*datapb.CompactionSegment, error ) { segIDAlloc := allocator.NewLocalAllocator( t.plan.GetPreAllocatedSegmentIDs().GetBegin(), t.plan.GetPreAllocatedSegmentIDs().GetEnd()) logIDAlloc := allocator.NewLocalAllocator( t.plan.GetPreAllocatedLogIDs().GetBegin(), t.plan.GetPreAllocatedLogIDs().GetEnd()) compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc) mWriter, err := NewMultiSegmentWriter(ctx, t.binlogIO, compAlloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.maxRows, t.partitionID, t.collectionID, t.GetChannelName(), 4096 ) if err != nil { return nil , err } deletedRowCount := int64 (0 ) expiredRowCount := int64 (0 ) for _, seg := range t.plan.GetSegmentBinlogs() { del, exp, err := t.writeSegment(ctx, seg, mWriter, pkField) if err != nil { return nil , err } deletedRowCount += del expiredRowCount += exp } if err := mWriter.Close(); err != nil { return nil , err } return mWriter.GetCompactionSegments(), nil }
4.2.2 写入单个 Segment 文件 : internal/datanode/compactor/mix_compactor.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 func (t *mixCompactionTask) writeSegment(ctx context.Context, seg *datapb.CompactionSegmentBinlogs, mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema) (deletedRowCount, expiredRowCount int64 , err error ) { deltaPaths := make ([]string , 0 ) for _, fieldBinlog := range seg.GetDeltalogs() { for _, binlog := range fieldBinlog.GetBinlogs() { deltaPaths = append (deltaPaths, binlog.GetLogPath()) } } delta, err := compaction.ComposeDeleteFromDeltalogs(ctx, t.binlogIO, deltaPaths) if err != nil { return } entityFilter := compaction.NewEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime) var reader storage.RecordReader if seg.GetManifest() != "" { reader, err = storage.NewManifestRecordReader(ctx, seg.GetManifest(), t.plan.GetSchema(), ...) } else { reader, err = storage.NewBinlogRecordReader(ctx, seg.GetFieldBinlogs(), t.plan.GetSchema(), ...) } if err != nil { return } defer reader.Close() for { var r storage.Record r, err = reader.Next() if err != nil { if err == sio.EOF { err = nil break } return } var ( pkArray = r.Column(pkField.FieldID) tsArray = r.Column(common.TimeStampField).(*array.Int64) sliceStart = -1 rb *storage.RecordBuilder ) for i := range r.Len() { var pk any switch pkField.DataType { case schemapb.DataType_Int64: pk = pkArray.(*array.Int64).Value(i) case schemapb.DataType_VarChar: pk = pkArray.(*array.String).Value(i) } ts := typeutil.Timestamp(tsArray.Value(i)) if entityFilter.Filtered(pk, ts) { if rb == nil { rb = storage.NewRecordBuilder(t.plan.GetSchema()) } if sliceStart != -1 { rb.Append(r, sliceStart, i) } sliceStart = -1 continue } if sliceStart == -1 { sliceStart = i } } if rb != nil { if sliceStart != -1 { rb.Append(r, sliceStart, r.Len()) } if rb.GetRowNum() > 0 { rec := rb.Build() defer rec.Release() err := mWriter.Write(rec) if err != nil { return 0 , 0 , err } } } else { err := mWriter.Write(r) if err != nil { return 0 , 0 , err } } } return }
4.2.3 MultiSegmentWriter 写入 文件 : internal/datanode/compactor/segment_writer.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 type MultiSegmentWriter struct { ctx context.Context binlogIO io.BinlogIO allocator *compactionAlloactor writer *storage.BinlogValueWriter currentSegmentID typeutil.UniqueID maxRows int64 segmentSize int64 schema *schemapb.CollectionSchema partitionID int64 collectionID int64 res []*datapb.CompactionSegment } func (w *MultiSegmentWriter) Write(r storage.Record) error { if w.writer == nil || w.writer.GetWrittenUncompressed() >= uint64 (w.segmentSize) { if err := w.rotateWriter(); err != nil { return err } } return w.writer.Write(r) } func (w *MultiSegmentWriter) rotateWriter() error { if err := w.closeWriter(); err != nil { return err } newSegmentID, err := w.allocator.allocSegmentID() if err != nil { return err } w.currentSegmentID = newSegmentID rw, err := storage.NewBinlogRecordWriter(w.ctx, w.collectionID, w.partitionID, newSegmentID, w.schema, w.allocator.logIDAlloc, chunkSize, w.maxRows, w.rwOption...) if err != nil { return err } w.writer = storage.NewBinlogValueWriter(rw, w.batchSize) return nil }
4.2.4 底层 Record Writer 文件 : internal/storage/binlog_record_writer.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 func (pw *PackedBinlogRecordWriter) Write(r Record) error { if err := pw.initWriters(r); err != nil { return err } tsArray := r.Column(common.TimeStampField).(*array.Int64) rows := r.Len() for i := 0 ; i < rows; i++ { ts := typeutil.Timestamp(tsArray.Value(i)) if ts < pw.tsFrom { pw.tsFrom = ts } if ts > pw.tsTo { pw.tsTo = ts } } if err := pw.pkCollector.Collect(r); err != nil { return err } if err := pw.bm25Collector.Collect(r); err != nil { return err } err := pw.writer.Write(r) if err != nil { return err } pw.writtenUncompressed = pw.writer.GetWrittenUncompressed() return nil }
5. Storage 层的 Timestamp 处理 5.1 Growing Segment 中的 Timestamp 文件 : internal/core/src/segcore/SegmentGrowingImpl.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void SegmentGrowingImpl::load_field_data_common ( FieldId field_id, size_t reserved_offset, const std::vector<FieldDataPtr>& field_data, FieldId primary_field_id, size_t num_rows) { if (field_id == TimestampFieldID) { insert_record_.timestamps_.set_data_raw (reserved_offset, field_data); return ; } }
5.2 Segment Writer 中的 Timestamp 追踪 文件 : internal/datanode/compactor/segment_writer.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 func (w *SegmentWriter) WriteRecord(r storage.Record) error { tsArray := r.Column(common.TimeStampField).(*array.Int64) rows := r.Len() for i := 0 ; i < rows; i++ { ts := typeutil.Timestamp(tsArray.Value(i)) if ts < w.tsFrom { w.tsFrom = ts } if ts > w.tsTo { w.tsTo = ts } switch schemapb.DataType(w.pkstats.PkType) { case schemapb.DataType_Int64: pkArray := r.Column(w.GetPkID()).(*array.Int64) pk := &storage.Int64PrimaryKey{Value: pkArray.Value(i)} w.pkstats.Update(pk) case schemapb.DataType_VarChar: pkArray := r.Column(w.GetPkID()).(*array.String) pk := &storage.VarCharPrimaryKey{Value: pkArray.Value(i)} w.pkstats.Update(pk) } w.rowCount.Inc() } return w.writer.Write(r) } func (w *SegmentWriter) GetTimeRange() *writebuffer.TimeRange { return writebuffer.NewTimeRange(w.tsFrom, w.tsTo) }
6. 两种存储版本对比 6.1 StorageV1 (Binlog 格式) 特点 :
每个字段一个独立的 binlog 文件
Timestamp 字段对应文件:{segmentID}/insert_log/1/{logID}
使用 InsertCodec 序列化
Protobuf 格式存储
文件结构 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 {collectionID}/{partitionID}/{segmentID}/ ├── insert_log/ │ ├── 0/ # RowID field │ │ └── {logID} │ ├── 1/ # Timestamp field ★ │ │ └── {logID} │ ├── 100/ # User field 1 │ │ └── {logID} │ └── 101/ # User field 2 │ └── {logID} ├── delta_log/ │ └── {logID} └── stats_log/ └── {logID}
代码路径 :
1 2 3 4 BulkPackWriter.Write() └─> storageV1Serializer.serializeBinlog() └─> InsertCodec.Serialize() └─> 为每个 field 创建 binlog
6.2 StorageV2 (Parquet 格式) 特点 :
列式存储,使用 Apache Arrow + Parquet
Timestamp 作为 Arrow Record 的一列
支持列组(Column Group)优化
更高的压缩率和查询性能
文件结构 :
1 2 3 4 5 6 7 8 9 {collectionID}/{partitionID}/{segmentID}/ ├── insert_log/ │ ├── {columnGroupID}/ │ │ └── {logID}.parquet # 包含多个字段(含 timestamp) │ └── manifest.json # 元数据清单 ├── delta_log/ │ └── {logID} └── stats_log/ └── {logID}
代码路径 :
1 2 3 4 BulkPackWriterV2.Write() └─> serializeBinlog() -> Arrow Record └─> PackedRecordWriter.Write() └─> Parquet Writer (通过 FFI 调用 C++)
6.3 对比表
特性
StorageV1 (Binlog)
StorageV2 (Parquet)
文件格式
Protobuf + 自定义编码
Apache Parquet
Timestamp 存储
独立 binlog 文件(FieldID=1)
Arrow Record 的一列
文件数量
多个(每个 field 一个)
少量(按列组组织)
压缩率
中等
高
查询性能
需要读取多个文件
列式访问更高效
元数据
每个 binlog 独立元数据
统一 manifest 管理
序列化类
InsertCodec
Arrow Builder
写入类
BulkPackWriter
BulkPackWriterV2
7. 总结 7.1 核心要点
Timestamp 完整存储
Timestamp 作为 FieldID=1 的系统字段,在所有写入路径中都会完整持久化
无论是 Flush 还是 Compaction,都会保留完整的 timestamp 信息
两种主要写入路径
Flush :正常数据写入的主流程,从 WriteBuffer 触发
Compaction :Segment 合并优化流程,读取旧数据重新写入
存储格式演进
StorageV1 :每个字段独立 binlog 文件
StorageV2 :列式 Parquet 格式,更高效
Timestamp 的多重用途
MVCC 版本控制和可见性判断
TTL 过期数据清理
查询优化(通过 timestamp 范围过滤)
Binlog 元数据(TimestampFrom/TimestampTo)
7.2 数据流向图 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 用户数据 Insert/Delete ↓ [内存] WriteBuffer ↓ (Buffer Full / Time Trigger) [Flush] SyncTask ↓ [序列化] InsertCodec / Arrow Builder ↓ [持久化] Binlog / Parquet 文件 ↓ (多个小 Segment) [Compaction] Mix/Clustering/L0 Compactor ↓ [合并] MultiSegmentWriter ↓ [持久化] 新的 Segment 文件
7.3 相关文件索引 Flush 流程 :
internal/flushcommon/writebuffer/write_buffer.go - WriteBuffer 管理
internal/flushcommon/syncmgr/task.go - SyncTask 执行
internal/flushcommon/syncmgr/pack_writer.go - StorageV1 写入
internal/flushcommon/syncmgr/pack_writer_v2.go - StorageV2 写入
internal/flushcommon/syncmgr/storage_serializer.go - 序列化器
Compaction 流程 :
internal/datanode/compactor/mix_compactor.go - Mix Compaction
internal/datanode/compactor/segment_writer.go - Segment Writer
internal/datanode/compactor/clustering_compactor.go - Clustering Compaction
Storage 层 :
internal/storage/data_codec.go - InsertCodec 序列化
internal/storage/binlog_record_writer.go - Binlog 写入
internal/storage/record_writer.go - Parquet 写入
internal/storage/serde_events.go - 事件序列化
C++ 核心 :
internal/core/src/segcore/SegmentGrowingImpl.cpp - Growing Segment
internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp - Sealed Segment
7.4 扩展阅读 相关设计文档:
docs/developer_guides/flush_pipeline_write_buffer_manager.md
docs/developer_guides/flush_pipeline_flush_checkpoint.md
docs/design_docs/segcore/segment_sealed.md
附录:术语表
术语
说明
MVCC
Multi-Version Concurrency Control,多版本并发控制
Timestamp
系统预留字段(FieldID=1),用于 MVCC 版本控制
Flush
将内存中的数据持久化到存储的过程
Compaction
Segment 合并优化,将多个小 Segment 合并为大 Segment
WriteBuffer
数据写入的内存缓冲区
SyncTask
数据同步任务,负责将 buffer 数据持久化
Binlog
Binary Log,二进制日志,Milvus 的数据文件格式之一
Parquet
Apache Parquet,列式存储格式,StorageV2 使用
Arrow Record
Apache Arrow 的数据记录格式,内存中的列式数据表示
Growing Segment
增长中的 Segment,正在接收写入
Sealed Segment
已封存的 Segment,不再接收新数据
InsertCodec
Insert 数据的编解码器,用于 StorageV1
TTL
Time To Live,数据存活时间
文档版本 : v1.1最后更新 : 2025-01-24适用 Milvus 版本 : 2.5.x
附录 A:Segment 内部 Layout A.1 Segment 类型概述 Milvus 中有两种主要的 Segment 类型:
Growing Segment (增长中的 Segment)
状态 : SegmentState_Growing
特点 : 正在接收写入操作
实现类 : SegmentGrowingImpl
数据结构 : 使用 ConcurrentVector 存储,支持并发写入
索引 : 可选的 interim index(临时索引)
Sealed Segment (封存的 Segment)
状态 : SegmentState_Sealed / SegmentState_Flushing / SegmentState_Flushed
特点 : 不再接收写入,数据已持久化
实现类 : ChunkedSegmentSealedImpl
数据结构 : 使用 ChunkedColumnInterface 存储,支持 mmap 和延迟加载
索引 : 可以加载永久索引
A.2 Growing Segment 内部结构 A.2.1 核心组件 文件 : internal/core/src/segcore/SegmentGrowingImpl.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class SegmentGrowingImpl : public SegmentGrowing {private : SchemaPtr schema_; IndexMetaPtr index_meta_; SegcoreConfig segcore_config_; int64_t id_; InsertRecord<false > insert_record_; IndexingRecord indexing_record_; DeletedRecord<false > deleted_record_; SegmentStats stats_{}; mutable std::shared_mutex chunk_mutex_; storage::MmapChunkDescriptorPtr mmap_descriptor_; };
A.2.2 InsertRecord 数据布局 文件 : internal/core/src/segcore/InsertRecord.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 class InsertRecordGrowing {public : ConcurrentVector<Timestamp> timestamps_; TimestampIndex timestamp_index_; std::unique_ptr<OffsetMap> pk2offset_; std::atomic<int64_t > reserved = 0 ; AckResponder ack_responder_; private : std::unordered_map<FieldId, std::unique_ptr<VectorBase>> data_{}; std::unordered_map<FieldId, ThreadSafeValidDataPtr> valid_data_{}; mutable std::shared_mutex shared_mutex_{}; };
A.2.3 数据组织方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Growing Segment Memory Layout: ┌─────────────────────────────────────────────────┐ │ SegmentGrowingImpl │ ├─────────────────────────────────────────────────┤ │ insert_record_ (InsertRecordGrowing) │ │ ├─ timestamps_: ConcurrentVector<Timestamp> │ ★ Timestamp 列 │ ├─ timestamp_index_: TimestampIndex │ ★ Timestamp 索引 │ ├─ pk2offset_: OffsetMap │ ★ PK 索引 │ └─ data_: map<FieldId, VectorBase> │ ★ 字段数据 │ ├─ FieldID=100: ConcurrentVector<int64> │ (用户字段1) │ ├─ FieldID=101: ConcurrentVector<float> │ (用户字段2) │ ├─ FieldID=102: ConcurrentVector<vector> │ (向量字段) │ └─ ... │ ├─────────────────────────────────────────────────┤ │ indexing_record_ (IndexingRecord) │ │ ├─ Chunk-level indexes │ ★ 分块索引 │ └─ Interim vector indexes │ ★ 临时向量索引 ├─────────────────────────────────────────────────┤ │ deleted_record_ (DeletedRecord) │ │ └─ Deleted PKs with timestamps │ ★ 删除记录 └─────────────────────────────────────────────────┘
A.2.4 Chunk 机制 Growing Segment 使用 Chunk 机制组织数据:
Chunk Size : 默认 32768 行(可配置)
目的 : 支持大规模数据的高效存储和查询
索引 : 每个 chunk 可以独立建立索引
1 2 3 4 int64_t chunk_rows = segcore_config_.get_chunk_rows (); auto chunk_id = offset / chunk_rows;auto offset_in_chunk = offset % chunk_rows;
A.3 Sealed Segment 内部结构 A.3.1 核心组件 文件 : internal/core/src/segcore/ChunkedSegmentSealedImpl.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 class ChunkedSegmentSealedImpl : public SegmentSealed {private : SchemaPtr schema_; IndexMetaPtr col_index_meta_; int64_t id_; SegcoreConfig segcore_config_; std::optional<int64_t > num_rows_; BitsetType field_data_ready_bitset_; BitsetType index_ready_bitset_; BitsetType binlog_index_bitset_; std::atomic<int > system_ready_count_; folly::Synchronized<std::unordered_map< FieldId, std::shared_ptr<ChunkedColumnInterface> >> fields_; folly::Synchronized<std::unordered_map< FieldId, index::CacheIndexBasePtr >> scalar_indexings_; SealedIndexingRecord vector_indexings_; folly::Synchronized<std::unordered_map< FieldId, std::unordered_map<std::string, index::CacheIndexBasePtr> >> ngram_indexings_; InsertRecord<true > insert_record_; DeletedRecord<true > deleted_record_; std::unordered_set<FieldId> mmap_field_ids_; SegmentStats stats_{}; bool is_sorted_by_pk_ = false ; std::unique_ptr<milvus_storage::api::Reader> reader_; };
A.3.2 数据布局 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 Sealed Segment Memory Layout: ┌────────────────────────────────────────────────┐ │ ChunkedSegmentSealedImpl │ ├────────────────────────────────────────────────┤ │ insert_record_ (InsertRecordSealed) │ │ ├─ timestamps_: ConcurrentVector<Timestamp> │ ★ 仅系统字段 │ ├─ timestamp_index_: TimestampIndex │ │ └─ pk2offset_: OffsetOrderedArray │ ★ PK 索引(已排序) ├────────────────────────────────────────────────┤ │ fields_: map<FieldId, ChunkedColumnInterface>│ ★ 列数据 │ ├─ FieldID=100: ChunkedColumn<int64> │ (Scalar 字段) │ │ ├─ Chunk 0: [data...] │ │ │ ├─ Chunk 1: [data...] │ │ │ └─ ... │ │ ├─ FieldID=102: ChunkedColumn<vector> │ (向量字段) │ │ └─ 可能被索引替代 │ │ └─ ... │ ├────────────────────────────────────────────────┤ │ scalar_indexings_: map<FieldId, Index> │ ★ Scalar 索引 │ └─ FieldID=100: ScalarIndex │ ├────────────────────────────────────────────────┤ │ vector_indexings_: SealedIndexingRecord │ ★ Vector 索引 │ ├─ FieldID=102: VectorIndex (HNSW/IVF/...) │ │ └─ index_has_raw_data_: bool │ (是否保留原始数据) ├────────────────────────────────────────────────┤ │ deleted_record_: DeletedRecord │ ★ 删除记录 └────────────────────────────────────────────────┘
A.3.3 ChunkedColumn 结构 Sealed Segment 使用分块列存储:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class ChunkedColumnInterface {public : virtual size_t num_chunks () const = 0 ; virtual size_t chunk_size (size_t chunk_id) const = 0 ; virtual SpanBase chunk_data (size_t chunk_id) const = 0 ; virtual bool is_mmaped () const = 0 ; }; auto column = fields_[field_id];for (size_t chunk_id = 0 ; chunk_id < column->num_chunks (); ++chunk_id) { auto data = column->chunk_data (chunk_id); }
A.4 文件系统中的 Layout A.4.1 StorageV1 (Binlog) 布局 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 {root_path}/{collectionID}/{partitionID}/{segmentID}/ ├── insert_log/ # 插入数据 │ ├── 0/ # RowID field │ │ └── {logID} # Binlog 文件 │ ├── 1/ # ★ Timestamp field │ │ └── {logID} │ ├── 100/ # User field 1 │ │ └── {logID} │ ├── 101/ # User field 2 │ │ └── {logID} │ └── 102/ # Vector field │ └── {logID} │ ├── delta_log/ # 删除数据 │ └── {logID} │ ├── stats_log/ # 统计数据(PK stats) │ └── {logID} │ └── index/ # 索引文件 └── {fieldID}/ ├── {indexBuildID}/ │ ├── index_params │ ├── index_info │ └── index_data_* └── ...
A.4.2 StorageV2 (Parquet) 布局 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 {root_path}/{collectionID}/{partitionID}/{segmentID}/ ├── insert_log/ │ ├── {columnGroupID}/ │ │ └── {logID}.parquet # Parquet 文件(包含多个字段) │ │ # ★ Timestamp 作为其中一列 │ ├── manifest.json # 元数据清单 │ │ ├── schema │ │ ├── column_groups │ │ │ ├─ group_id: 1 │ │ │ │ └─ fields: [0, 1, 100] # RowID, Timestamp, Field1 │ │ │ └─ group_id: 2 │ │ │ └─ fields: [101, 102] # Field2, VectorField │ │ └── files │ └── ... │ ├── delta_log/ │ └── {logID} │ ├── stats_log/ │ └── {logID} │ └── index/ └── ... (同 V1)
A.4.3 Binlog 文件内部结构 每个 binlog 文件包含:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 Binlog File Structure: ┌─────────────────────────────┐ │ Magic Number │ 4 bytes ├─────────────────────────────┤ │ Descriptor Event │ │ ├─ CollectionID │ │ ├─ PartitionID │ │ ├─ SegmentID │ │ ├─ FieldID │ │ ├─ StartTimestamp │ ★ Min timestamp │ ├─ EndTimestamp │ ★ Max timestamp │ └─ PayloadDataType │ ├─────────────────────────────┤ │ Insert Event 1 │ │ ├─ StartTimestamp │ │ ├─ EndTimestamp │ │ └─ Payload (field data) │ ★ 实际字段数据 ├─────────────────────────────┤ │ Insert Event 2 │ │ └─ ... │ ├─────────────────────────────┤ │ ... │ └─────────────────────────────┘
A.5 查询时的数据访问 A.5.1 访问路径 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Query Request ↓ Plan Node ↓ Segment Interface ↓ ├─ [Growing Segment] │ └─> InsertRecord.data_[fieldID] │ └─> ConcurrentVector::get(offset) │ └─ [Sealed Segment] ├─> [Has Index?] │ ├─ Yes → vector_indexings_[fieldID] │ │ └─> Index Search │ └─ No → fields_[fieldID] │ └─> ChunkedColumn::chunk_data(chunk_id) │ └─> [Filter by Timestamp] └─> insert_record_.timestamps_ └─> TimestampIndex (Binary Search)
A.5.2 Timestamp 过滤示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 void SegmentGrowingImpl::mask_with_timestamps ( BitsetTypeView& bitset_chunk, Timestamp timestamp, Timestamp ttl) const { auto timestamps_data_ptr = insert_record_.timestamps_.data (); auto size = insert_record_.size (); for (int64_t i = 0 ; i < size; ++i) { auto ts = timestamps_data_ptr[i]; if (ts > timestamp) { bitset_chunk[i] = true ; } if (ttl > 0 && timestamp - ts > ttl) { bitset_chunk[i] = true ; } } }
A.6 内存管理 A.6.1 Growing Segment 内存管理
动态增长 : ConcurrentVector 按需分配内存
Chunk 管理 : 数据按 chunk 组织,支持增量加载
Mmap 支持 : 可选地使用 mmap 减少内存占用
1 2 3 4 5 6 int64_t chunk_rows = segcore_config_.get_chunk_rows (); size_t memory_per_row = EstimateRowSize (schema);size_t total_memory = num_rows * memory_per_row;
A.6.2 Sealed Segment 内存管理
延迟加载 : 字段和索引按需加载
Mmap : 支持 mmap 模式,减少内存拷贝
缓存管理 : 使用 LRU 缓存管理索引和字段数据
1 2 3 4 5 6 bool CanQuery () { return system_ready_count_ == 2 && AllRequiredFieldsReady () && (HasIndex () || HasRawData ()); }
A.7 关键数据结构总结
组件
Growing Segment
Sealed Segment
说明
Timestamp
ConcurrentVector<Timestamp>
ConcurrentVector<Timestamp>
两者都存储完整 timestamp
TimestampIndex
TimestampIndex
TimestampIndex
用于时间旅行查询
PK Index
OffsetOrderedMap
OffsetOrderedArray
Growing 用 Map,Sealed 用 Array
Field Data
map<FieldId, ConcurrentVector>
map<FieldId, ChunkedColumn>
列式存储
Vector Index
IndexingRecord (interim)
SealedIndexingRecord (permanent)
索引类型不同
Scalar Index
可选
map<FieldId, Index>
Sealed 支持更多索引
Delete Record
DeletedRecord<false>
DeletedRecord<true>
删除记录
并发控制
std::shared_mutex
folly::Synchronized
不同的同步机制
文档版本 : v1.1最后更新 : 2025-01-24适用 Milvus 版本 : 2.5.x