1. 1. 目录
  2. 2. 1. 概述
    1. 2.1. 1.1 核心结论
    2. 2.2. 1.2 Timestamp 的用途
  3. 3. 2. Timestamp 字段定义
    1. 3.1. 2.1 系统预留字段
    2. 3.2. 2.2 数据类型
  4. 4. 3. Flush 写入流程(主流程)
    1. 4.1. 3.1 整体流程图
    2. 4.2. 3.2 关键代码位置
      1. 4.2.1. 3.2.1 WriteBuffer 创建 SyncTask
      2. 4.2.2. 3.2.2 SyncTask 执行写入
      3. 4.2.3. 3.2.3 StorageV1 序列化 Binlog
      4. 4.2.4. 3.2.4 StorageV2 写入 Parquet
  5. 5. 4. Compaction 写入流程
    1. 5.1. 4.1 整体流程图
    2. 5.2. 4.2 关键代码位置
      1. 5.2.1. 4.2.1 Mix Compaction 主流程
      2. 5.2.2. 4.2.2 写入单个 Segment
      3. 5.2.3. 4.2.3 MultiSegmentWriter 写入
      4. 5.2.4. 4.2.4 底层 Record Writer
  6. 6. 5. Storage 层的 Timestamp 处理
    1. 6.1. 5.1 Growing Segment 中的 Timestamp
    2. 6.2. 5.2 Segment Writer 中的 Timestamp 追踪
  7. 7. 6. 两种存储版本对比
    1. 7.1. 6.1 StorageV1 (Binlog 格式)
    2. 7.2. 6.2 StorageV2 (Parquet 格式)
    3. 7.3. 6.3 对比表
  8. 8. 7. 总结
    1. 8.1. 7.1 核心要点
    2. 8.2. 7.2 数据流向图
    3. 8.3. 7.3 相关文件索引
    4. 8.4. 7.4 扩展阅读
  9. 9. 附录:术语表
  • 附录 A:Segment 内部 Layout
    1. 1. A.1 Segment 类型概述
      1. 1.1. Growing Segment (增长中的 Segment)
      2. 1.2. Sealed Segment (封存的 Segment)
    2. 2. A.2 Growing Segment 内部结构
      1. 2.1. A.2.1 核心组件
      2. 2.2. A.2.2 InsertRecord 数据布局
      3. 2.3. A.2.3 数据组织方式
      4. 2.4. A.2.4 Chunk 机制
    3. 3. A.3 Sealed Segment 内部结构
      1. 3.1. A.3.1 核心组件
      2. 3.2. A.3.2 数据布局
      3. 3.3. A.3.3 ChunkedColumn 结构
    4. 4. A.4 文件系统中的 Layout
      1. 4.1. A.4.1 StorageV1 (Binlog) 布局
      2. 4.2. A.4.2 StorageV2 (Parquet) 布局
      3. 4.3. A.4.3 Binlog 文件内部结构
    5. 5. A.5 查询时的数据访问
      1. 5.1. A.5.1 访问路径
      2. 5.2. A.5.2 Timestamp 过滤示例
    6. 6. A.6 内存管理
      1. 6.1. A.6.1 Growing Segment 内存管理
      2. 6.2. A.6.2 Sealed Segment 内存管理
    7. 7. A.7 关键数据结构总结
  • Milvus 数据写入流程中的 MVCC Timestamp 处理

    本文档详细介绍 Milvus 中 MVCC timestamp 信息在数据写入 Segment 过程中的处理机制,包括正常 Flush 流程和 Compaction 流程。

    目录


    1. 概述

    1.1 核心结论

    是的,MVCC timestamp 信息会完整写入到 Segment 中。

    Timestamp 作为系统预留字段(FieldID=1),在数据写入过程中会像其他用户字段一样被序列化并持久化到存储中。主要有两个写入路径:

    1. Flush 流程(主要路径):正常数据写入时的持久化
    2. Compaction 流程:Segment 合并优化时的重新写入

    1.2 Timestamp 的用途

    • MVCC 版本控制:支持时间旅行查询(Time Travel Query)
    • 数据可见性判断:根据 timestamp 过滤不可见的数据
    • 数据过期处理:基于 TTL 和 timestamp 清理过期数据
    • 查询优化:通过 timestamp 范围快速过滤 Segment

    2. Timestamp 字段定义

    2.1 系统预留字段

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // pkg/common/common.go

    const (
    // TimeStampField is the ID of the Timestamp field reserved by the system
    TimeStampField = 1

    // RowIDField is the ID of the RowID field reserved by the system
    RowIDField = 0

    // TimeStampFieldName defines the name of the Timestamp field
    TimeStampFieldName = "Timestamp"
    )

    2.2 数据类型

    Timestamp 是 int64 类型,在 C++ 和 Go 代码中都被定义为 uint64

    1
    2
    3
    4
    // internal/core/src/common/Types.h

    using Timestamp = uint64_t;
    constexpr auto MAX_TIMESTAMP = std::numeric_limits<Timestamp>::max();

    3. Flush 写入流程(主流程)

    3.1 整体流程图

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    用户 Insert/Delete 请求

    WriteBuffer.BufferData() // 缓存数据到内存

    triggerSync() // 根据策略触发同步

    getSegmentsToSync() // 选择需要 sync 的 segments

    syncSegments()

    getSyncTask() // 创建 SyncTask
    ├─ yieldBuffer() // 从 buffer 获取数据
    └─ NewSyncTask() // 创建同步任务

    SyncManager.SyncData()

    SyncTask.Run()

    ├─ [StorageV1] BulkPackWriter.Write()
    │ ├─ writeInserts()
    │ │ └─ storageV1Serializer.serializeBinlog()
    │ │ └─ InsertCodec.Serialize() ★ 序列化所有字段(包括 timestamp)
    │ │ └─ 为每个 field 创建独立的 binlog 文件
    │ │
    │ ├─ writeStats() ★ 写入统计信息(含 timestamp 范围)
    │ └─ writeBM25Stats()

    └─ [StorageV2] BulkPackWriterV2.Write()
    ├─ writeInserts()
    │ ├─ serializeBinlog() // 转换为 Arrow Record
    │ ├─ 提取 timestamp 列计算范围
    │ └─ PackedRecordWriter.Write() ★ 写入 Parquet 格式

    └─ writeStats()

    3.2 关键代码位置

    3.2.1 WriteBuffer 创建 SyncTask

    文件: internal/flushcommon/writebuffer/write_buffer.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (syncmgr.Task, error) {
    segmentInfo, ok := wb.metaCache.GetSegmentByID(segmentID)
    if !ok {
    return nil, merr.WrapErrSegmentNotFound(segmentID)
    }

    var batchSize int64
    var tsFrom, tsTo uint64

    // 从 buffer 中获取数据,包括 insert data 和 timestamp 范围
    insert, bm25, delta, schema, timeRange, startPos := wb.yieldBuffer(segmentID)
    if timeRange != nil {
    tsFrom, tsTo = timeRange.timestampMin, timeRange.timestampMax
    }

    for _, chunk := range insert {
    batchSize += int64(chunk.GetRowNum())
    }

    // 创建 SyncPack,包含所有需要持久化的数据
    pack := &syncmgr.SyncPack{}
    pack.WithInsertData(insert). // ★ InsertData 包含 timestamp 字段
    WithDeleteData(delta).
    WithCollectionID(wb.collectionID).
    WithPartitionID(segmentInfo.PartitionID()).
    WithSegmentID(segmentID).
    WithTimeRange(tsFrom, tsTo). // ★ Timestamp 范围
    WithBatchRows(batchSize)

    task := syncmgr.NewSyncTask().
    WithMetaCache(wb.metaCache).
    WithSchema(schema).
    WithSyncPack(pack)
    return task, nil
    }

    3.2.2 SyncTask 执行写入

    文件: internal/flushcommon/syncmgr/task.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    func (t *SyncTask) Run(ctx context.Context) (err error) {
    segmentInfo, has := t.metacache.GetSegmentByID(t.segmentID)
    if !has {
    return nil
    }

    columnGroups := t.getColumnGroups(segmentInfo)

    // 根据 StorageVersion 选择不同的写入方式
    switch segmentInfo.GetStorageVersion() {
    case storage.StorageV2:
    writer := NewBulkPackWriterV2(t.metacache, t.schema, t.chunkManager,
    t.allocator, 0, packed.DefaultMultiPartUploadSize,
    t.storageConfig, columnGroups, t.writeRetryOpts...)
    t.insertBinlogs, t.deltaBinlog, t.statsBinlogs, t.bm25Binlogs,
    t.manifestPath, t.flushedSize, err = writer.Write(ctx, t.pack)

    default: // StorageV1
    writer := NewBulkPackWriter(t.metacache, t.schema,
    t.chunkManager, t.allocator, t.writeRetryOpts...)
    t.insertBinlogs, t.deltaBinlog, t.statsBinlogs, t.bm25Binlogs,
    t.flushedSize, err = writer.Write(ctx, t.pack)
    }

    return err
    }

    3.2.3 StorageV1 序列化 Binlog

    文件: internal/flushcommon/syncmgr/storage_serializer.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    func (s *storageV1Serializer) serializeBinlog(ctx context.Context, pack *SyncPack) (map[int64]*storage.Blob, error) {
    if len(pack.insertData) == 0 {
    return make(map[int64]*storage.Blob), nil
    }

    // 调用 InsertCodec 序列化所有字段
    blobs, err := s.inCodec.Serialize(pack.partitionID, pack.segmentID, pack.insertData...)
    if err != nil {
    return nil, err
    }

    // 返回 map[fieldID]*Blob,包括 timestamp field (fieldID=1)
    result := make(map[int64]*storage.Blob)
    for _, blob := range blobs {
    fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
    if err != nil {
    return nil, err
    }
    result[fieldID] = blob
    }
    return result, nil
    }

    文件: internal/storage/data_codec.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data ...*InsertData) ([]*Blob, error) {
    blobs := make([]*Blob, 0)

    var rowNum int64
    var startTs, endTs Timestamp
    startTs, endTs = math.MaxUint64, 0

    // 1. 首先从 timestamp 字段提取时间范围
    for _, block := range data {
    timeFieldData, ok := block.Data[common.TimeStampField]
    if !ok {
    return nil, errors.New("data doesn't contains timestamp field")
    }

    rowNum += int64(timeFieldData.RowNum())
    ts := timeFieldData.(*Int64FieldData).Data

    for _, t := range ts {
    if uint64(t) > endTs {
    endTs = uint64(t)
    }
    if uint64(t) < startTs {
    startTs = uint64(t)
    }
    }
    }

    // 2. 为每个字段创建 binlog writer(包括 timestamp)
    serializeField := func(field *schemapb.FieldSchema) error {
    // 创建字段对应的 binlog writer
    writer = NewInsertBinlogWriter(field.DataType, insertCodec.Schema.ID,
    partitionID, segmentID, field.FieldID, field.GetNullable())

    eventWriter, err := writer.NextInsertEventWriter()
    if err != nil {
    return err
    }

    // 设置 timestamp 范围到 binlog 元数据
    eventWriter.SetEventTimestamp(startTs, endTs)

    // 将字段数据写入 payload
    for _, block := range data {
    singleData := block.Data[field.FieldID]
    if err = AddFieldDataToPayload(eventWriter, field.DataType, singleData); err != nil {
    return err
    }
    }

    // 完成并获取 blob
    buffer, err := writer.GetBuffer()
    blobs = append(blobs, &Blob{
    Key: fmt.Sprintf("%d", field.FieldID),
    Value: buffer,
    RowNum: rowNum,
    })
    return nil
    }

    // 3. 遍历所有字段进行序列化(包括 timestamp field)
    for _, field := range insertCodec.Schema.Schema.Fields {
    if err := serializeField(field); err != nil {
    return nil, err
    }
    }

    return blobs, nil
    }

    3.2.4 StorageV2 写入 Parquet

    文件: internal/flushcommon/syncmgr/pack_writer_v2.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (map[int64]*datapb.FieldBinlog, string, error) {
    if len(pack.insertData) == 0 {
    return make(map[int64]*datapb.FieldBinlog), "", nil
    }

    // 1. 序列化为 Arrow Record
    rec, err := bw.serializeBinlog(ctx, pack)
    if err != nil {
    return nil, "", err
    }

    logs := make(map[int64]*datapb.FieldBinlog)

    // 2. 从 Record 中提取 timestamp 列并计算范围
    tsArray := rec.Column(common.TimeStampField).(*array.Int64)
    rows := rec.Len()
    var tsFrom uint64 = math.MaxUint64
    var tsTo uint64 = 0
    for i := 0; i < rows; i++ {
    ts := typeutil.Timestamp(tsArray.Value(i))
    if ts < tsFrom {
    tsFrom = ts
    }
    if ts > tsTo {
    tsTo = ts
    }
    }

    // 3. 写入 Parquet 文件(包含所有字段,含 timestamp)
    doWrite := func(w storage.RecordWriter) error {
    if err = w.Write(rec); err != nil { // ★ Arrow Record 包含 timestamp 列
    return err
    }
    return w.Close()
    }

    // 4. 创建 PackedRecordWriter 并写入
    if paramtable.Get().CommonCfg.UseLoonFFI.GetAsBool() {
    // Manifest 模式
    w, err := storage.NewPackedRecordManifestWriter(...)
    if err = doWrite(w); err != nil {
    return nil, "", err
    }
    manifestPath = w.GetWrittenManifest()
    } else {
    // 普通模式
    w, err := storage.NewPackedRecordWriter(...)
    if err = doWrite(w); err != nil {
    return nil, "", err
    }
    }

    // 5. 记录 binlog 信息(含 timestamp 范围)
    for _, columnGroup := range columnGroups {
    logs[columnGroup.GroupID] = &datapb.FieldBinlog{
    FieldID: columnGroup.GroupID,
    ChildFields: columnGroup.Fields,
    Binlogs: []*datapb.Binlog{
    {
    LogPath: path,
    EntriesNum: rowNum,
    TimestampFrom: tsFrom, // ★ Timestamp 范围
    TimestampTo: tsTo,
    },
    },
    }
    }

    return logs, manifestPath, nil
    }

    // 序列化为 Arrow Record
    func (bw *BulkPackWriterV2) serializeBinlog(_ context.Context, pack *SyncPack) (storage.Record, error) {
    // 转换 schema 为 Arrow Schema
    arrowSchema, err := storage.ConvertToArrowSchema(bw.schema, true)
    if err != nil {
    return nil, err
    }

    builder := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema)
    defer builder.Release()

    // 构建 Arrow Record(包含所有字段)
    for _, chunk := range pack.insertData {
    if err := storage.BuildRecord(builder, chunk, bw.schema); err != nil {
    return nil, err
    }
    }

    rec := builder.NewRecord()
    return storage.NewSimpleArrowRecord(rec, field2Col), nil
    }

    4. Compaction 写入流程

    Compaction 是对已有 Segment 的合并和优化,也会重新写入 timestamp 信息。

    4.1 整体流程图

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    Compaction Task 创建

    mixCompactionTask.Compact()

    mixCompactionTask.mergeSplit()

    NewMultiSegmentWriter() // 创建多 segment 写入器

    对每个源 Segment:
    ├─ storage.NewBinlogRecordReader() // 读取原始数据
    └─ mixCompactionTask.writeSegment()
    └─ [循环读取 Record]
    ├─ reader.Next() // 读取 Arrow Record(含 timestamp)
    ├─ 过滤已删除/过期数据
    └─ mWriter.Write(rec) ★ 写入合并后的数据
    └─ MultiSegmentWriter.Write()
    └─ BinlogValueWriter.Write()
    └─ BinlogRecordWriter.Write()
    ├─ [V1] CompositeBinlogRecordWriter.Write()
    │ ├─ 提取 timestamp 范围
    │ └─ 序列化为 binlog

    └─ [V2] PackedBinlogRecordWriter.Write()
    ├─ 提取 timestamp 范围
    └─ 写入 Parquet

    4.2 关键代码位置

    4.2.1 Mix Compaction 主流程

    文件: internal/datanode/compactor/mix_compactor.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    func (t *mixCompactionTask) mergeSplit(ctx context.Context) ([]*datapb.CompactionSegment, error) {
    // 创建 ID 分配器
    segIDAlloc := allocator.NewLocalAllocator(
    t.plan.GetPreAllocatedSegmentIDs().GetBegin(),
    t.plan.GetPreAllocatedSegmentIDs().GetEnd())
    logIDAlloc := allocator.NewLocalAllocator(
    t.plan.GetPreAllocatedLogIDs().GetBegin(),
    t.plan.GetPreAllocatedLogIDs().GetEnd())
    compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc)

    // 创建 MultiSegmentWriter
    mWriter, err := NewMultiSegmentWriter(ctx, t.binlogIO, compAlloc,
    t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams,
    t.maxRows, t.partitionID, t.collectionID, t.GetChannelName(), 4096)
    if err != nil {
    return nil, err
    }

    deletedRowCount := int64(0)
    expiredRowCount := int64(0)

    // 处理每个源 segment
    for _, seg := range t.plan.GetSegmentBinlogs() {
    del, exp, err := t.writeSegment(ctx, seg, mWriter, pkField)
    if err != nil {
    return nil, err
    }
    deletedRowCount += del
    expiredRowCount += exp
    }

    // 关闭 writer 完成写入
    if err := mWriter.Close(); err != nil {
    return nil, err
    }

    return mWriter.GetCompactionSegments(), nil
    }

    4.2.2 写入单个 Segment

    文件: internal/datanode/compactor/mix_compactor.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    func (t *mixCompactionTask) writeSegment(ctx context.Context,
    seg *datapb.CompactionSegmentBinlogs,
    mWriter *MultiSegmentWriter,
    pkField *schemapb.FieldSchema) (deletedRowCount, expiredRowCount int64, err error) {

    // 1. 读取 delta log(删除记录)
    deltaPaths := make([]string, 0)
    for _, fieldBinlog := range seg.GetDeltalogs() {
    for _, binlog := range fieldBinlog.GetBinlogs() {
    deltaPaths = append(deltaPaths, binlog.GetLogPath())
    }
    }
    delta, err := compaction.ComposeDeleteFromDeltalogs(ctx, t.binlogIO, deltaPaths)
    if err != nil {
    return
    }
    entityFilter := compaction.NewEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime)

    // 2. 创建 RecordReader 读取原始数据
    var reader storage.RecordReader
    if seg.GetManifest() != "" {
    reader, err = storage.NewManifestRecordReader(ctx, seg.GetManifest(),
    t.plan.GetSchema(), ...)
    } else {
    reader, err = storage.NewBinlogRecordReader(ctx, seg.GetFieldBinlogs(),
    t.plan.GetSchema(), ...)
    }
    if err != nil {
    return
    }
    defer reader.Close()

    // 3. 循环读取并写入数据
    for {
    var r storage.Record
    r, err = reader.Next()
    if err != nil {
    if err == sio.EOF {
    err = nil
    break
    }
    return
    }

    // 4. 过滤删除和过期的数据
    var (
    pkArray = r.Column(pkField.FieldID)
    tsArray = r.Column(common.TimeStampField).(*array.Int64) // ★ 读取 timestamp
    sliceStart = -1
    rb *storage.RecordBuilder
    )

    for i := range r.Len() {
    // 获取 PK 和 Timestamp
    var pk any
    switch pkField.DataType {
    case schemapb.DataType_Int64:
    pk = pkArray.(*array.Int64).Value(i)
    case schemapb.DataType_VarChar:
    pk = pkArray.(*array.String).Value(i)
    }
    ts := typeutil.Timestamp(tsArray.Value(i))

    // 根据 timestamp 和删除记录过滤
    if entityFilter.Filtered(pk, ts) {
    if rb == nil {
    rb = storage.NewRecordBuilder(t.plan.GetSchema())
    }
    if sliceStart != -1 {
    rb.Append(r, sliceStart, i)
    }
    sliceStart = -1
    continue
    }

    if sliceStart == -1 {
    sliceStart = i
    }
    }

    // 5. 写入过滤后的数据
    if rb != nil {
    if sliceStart != -1 {
    rb.Append(r, sliceStart, r.Len())
    }
    if rb.GetRowNum() > 0 {
    rec := rb.Build()
    defer rec.Release()
    err := mWriter.Write(rec) // ★ 写入 Record(包含 timestamp)
    if err != nil {
    return 0, 0, err
    }
    }
    } else {
    err := mWriter.Write(r)
    if err != nil {
    return 0, 0, err
    }
    }
    }

    return
    }

    4.2.3 MultiSegmentWriter 写入

    文件: internal/datanode/compactor/segment_writer.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    type MultiSegmentWriter struct {
    ctx context.Context
    binlogIO io.BinlogIO
    allocator *compactionAlloactor

    writer *storage.BinlogValueWriter // 底层 writer
    currentSegmentID typeutil.UniqueID

    maxRows int64
    segmentSize int64

    schema *schemapb.CollectionSchema
    partitionID int64
    collectionID int64

    res []*datapb.CompactionSegment
    }

    func (w *MultiSegmentWriter) Write(r storage.Record) error {
    // 检查是否需要轮转到新 segment
    if w.writer == nil || w.writer.GetWrittenUncompressed() >= uint64(w.segmentSize) {
    if err := w.rotateWriter(); err != nil {
    return err
    }
    }

    // 调用底层 writer 写入 Record
    return w.writer.Write(r) // ★ Record 包含所有字段(含 timestamp)
    }

    func (w *MultiSegmentWriter) rotateWriter() error {
    // 关闭当前 writer
    if err := w.closeWriter(); err != nil {
    return err
    }

    // 分配新的 segment ID
    newSegmentID, err := w.allocator.allocSegmentID()
    if err != nil {
    return err
    }
    w.currentSegmentID = newSegmentID

    // 创建新的 BinlogRecordWriter
    rw, err := storage.NewBinlogRecordWriter(w.ctx, w.collectionID,
    w.partitionID, newSegmentID, w.schema, w.allocator.logIDAlloc,
    chunkSize, w.maxRows, w.rwOption...)
    if err != nil {
    return err
    }

    // 包装为 BinlogValueWriter
    w.writer = storage.NewBinlogValueWriter(rw, w.batchSize)
    return nil
    }

    4.2.4 底层 Record Writer

    文件: internal/storage/binlog_record_writer.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    func (pw *PackedBinlogRecordWriter) Write(r Record) error {
    if err := pw.initWriters(r); err != nil {
    return err
    }

    // 1. 提取 timestamp 范围
    tsArray := r.Column(common.TimeStampField).(*array.Int64)
    rows := r.Len()
    for i := 0; i < rows; i++ {
    ts := typeutil.Timestamp(tsArray.Value(i))
    if ts < pw.tsFrom {
    pw.tsFrom = ts
    }
    if ts > pw.tsTo {
    pw.tsTo = ts
    }
    }

    // 2. 收集统计信息
    if err := pw.pkCollector.Collect(r); err != nil {
    return err
    }
    if err := pw.bm25Collector.Collect(r); err != nil {
    return err
    }

    // 3. 写入数据(包含所有字段)
    err := pw.writer.Write(r) // ★ 完整的 Arrow Record
    if err != nil {
    return err
    }

    pw.writtenUncompressed = pw.writer.GetWrittenUncompressed()
    return nil
    }

    5. Storage 层的 Timestamp 处理

    5.1 Growing Segment 中的 Timestamp

    文件: internal/core/src/segcore/SegmentGrowingImpl.cpp

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    void SegmentGrowingImpl::load_field_data_common(
    FieldId field_id,
    size_t reserved_offset,
    const std::vector<FieldDataPtr>& field_data,
    FieldId primary_field_id,
    size_t num_rows) {

    // 特殊处理 timestamp 字段
    if (field_id == TimestampFieldID) {
    // query node already guarantees that the timestamp is ordered
    // fill into Segment.ConcurrentVector
    insert_record_.timestamps_.set_data_raw(reserved_offset, field_data);
    return;
    }

    // 处理其他字段...
    }

    5.2 Segment Writer 中的 Timestamp 追踪

    文件: internal/datanode/compactor/segment_writer.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    func (w *SegmentWriter) WriteRecord(r storage.Record) error {
    // 1. 提取 timestamp 并更新范围
    tsArray := r.Column(common.TimeStampField).(*array.Int64)
    rows := r.Len()
    for i := 0; i < rows; i++ {
    ts := typeutil.Timestamp(tsArray.Value(i))
    if ts < w.tsFrom {
    w.tsFrom = ts
    }
    if ts > w.tsTo {
    w.tsTo = ts
    }

    // 2. 更新 PK 统计信息
    switch schemapb.DataType(w.pkstats.PkType) {
    case schemapb.DataType_Int64:
    pkArray := r.Column(w.GetPkID()).(*array.Int64)
    pk := &storage.Int64PrimaryKey{Value: pkArray.Value(i)}
    w.pkstats.Update(pk)
    case schemapb.DataType_VarChar:
    pkArray := r.Column(w.GetPkID()).(*array.String)
    pk := &storage.VarCharPrimaryKey{Value: pkArray.Value(i)}
    w.pkstats.Update(pk)
    }

    w.rowCount.Inc()
    }

    // 3. 写入完整的 Record(包含 timestamp)
    return w.writer.Write(r)
    }

    func (w *SegmentWriter) GetTimeRange() *writebuffer.TimeRange {
    return writebuffer.NewTimeRange(w.tsFrom, w.tsTo)
    }

    6. 两种存储版本对比

    6.1 StorageV1 (Binlog 格式)

    特点

    • 每个字段一个独立的 binlog 文件
    • Timestamp 字段对应文件:{segmentID}/insert_log/1/{logID}
    • 使用 InsertCodec 序列化
    • Protobuf 格式存储

    文件结构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    {collectionID}/{partitionID}/{segmentID}/
    ├── insert_log/
    │ ├── 0/ # RowID field
    │ │ └── {logID}
    │ ├── 1/ # Timestamp field ★
    │ │ └── {logID}
    │ ├── 100/ # User field 1
    │ │ └── {logID}
    │ └── 101/ # User field 2
    │ └── {logID}
    ├── delta_log/
    │ └── {logID}
    └── stats_log/
    └── {logID}

    代码路径

    1
    2
    3
    4
    BulkPackWriter.Write()
    └─> storageV1Serializer.serializeBinlog()
    └─> InsertCodec.Serialize()
    └─> 为每个 field 创建 binlog

    6.2 StorageV2 (Parquet 格式)

    特点

    • 列式存储,使用 Apache Arrow + Parquet
    • Timestamp 作为 Arrow Record 的一列
    • 支持列组(Column Group)优化
    • 更高的压缩率和查询性能

    文件结构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    {collectionID}/{partitionID}/{segmentID}/
    ├── insert_log/
    │ ├── {columnGroupID}/
    │ │ └── {logID}.parquet # 包含多个字段(含 timestamp)
    │ └── manifest.json # 元数据清单
    ├── delta_log/
    │ └── {logID}
    └── stats_log/
    └── {logID}

    代码路径

    1
    2
    3
    4
    BulkPackWriterV2.Write()
    └─> serializeBinlog() -> Arrow Record
    └─> PackedRecordWriter.Write()
    └─> Parquet Writer (通过 FFI 调用 C++)

    6.3 对比表

    特性 StorageV1 (Binlog) StorageV2 (Parquet)
    文件格式 Protobuf + 自定义编码 Apache Parquet
    Timestamp 存储 独立 binlog 文件(FieldID=1) Arrow Record 的一列
    文件数量 多个(每个 field 一个) 少量(按列组组织)
    压缩率 中等
    查询性能 需要读取多个文件 列式访问更高效
    元数据 每个 binlog 独立元数据 统一 manifest 管理
    序列化类 InsertCodec Arrow Builder
    写入类 BulkPackWriter BulkPackWriterV2

    7. 总结

    7.1 核心要点

    1. Timestamp 完整存储

      • Timestamp 作为 FieldID=1 的系统字段,在所有写入路径中都会完整持久化
      • 无论是 Flush 还是 Compaction,都会保留完整的 timestamp 信息
    2. 两种主要写入路径

      • Flush:正常数据写入的主流程,从 WriteBuffer 触发
      • Compaction:Segment 合并优化流程,读取旧数据重新写入
    3. 存储格式演进

      • StorageV1:每个字段独立 binlog 文件
      • StorageV2:列式 Parquet 格式,更高效
    4. Timestamp 的多重用途

      • MVCC 版本控制和可见性判断
      • TTL 过期数据清理
      • 查询优化(通过 timestamp 范围过滤)
      • Binlog 元数据(TimestampFrom/TimestampTo)

    7.2 数据流向图

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    用户数据 Insert/Delete

    [内存] WriteBuffer
    ↓ (Buffer Full / Time Trigger)
    [Flush] SyncTask

    [序列化] InsertCodec / Arrow Builder

    [持久化] Binlog / Parquet 文件
    ↓ (多个小 Segment)
    [Compaction] Mix/Clustering/L0 Compactor

    [合并] MultiSegmentWriter

    [持久化] 新的 Segment 文件

    7.3 相关文件索引

    Flush 流程

    • internal/flushcommon/writebuffer/write_buffer.go - WriteBuffer 管理
    • internal/flushcommon/syncmgr/task.go - SyncTask 执行
    • internal/flushcommon/syncmgr/pack_writer.go - StorageV1 写入
    • internal/flushcommon/syncmgr/pack_writer_v2.go - StorageV2 写入
    • internal/flushcommon/syncmgr/storage_serializer.go - 序列化器

    Compaction 流程

    • internal/datanode/compactor/mix_compactor.go - Mix Compaction
    • internal/datanode/compactor/segment_writer.go - Segment Writer
    • internal/datanode/compactor/clustering_compactor.go - Clustering Compaction

    Storage 层

    • internal/storage/data_codec.go - InsertCodec 序列化
    • internal/storage/binlog_record_writer.go - Binlog 写入
    • internal/storage/record_writer.go - Parquet 写入
    • internal/storage/serde_events.go - 事件序列化

    C++ 核心

    • internal/core/src/segcore/SegmentGrowingImpl.cpp - Growing Segment
    • internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp - Sealed Segment

    7.4 扩展阅读

    相关设计文档:

    • docs/developer_guides/flush_pipeline_write_buffer_manager.md
    • docs/developer_guides/flush_pipeline_flush_checkpoint.md
    • docs/design_docs/segcore/segment_sealed.md

    附录:术语表

    术语 说明
    MVCC Multi-Version Concurrency Control,多版本并发控制
    Timestamp 系统预留字段(FieldID=1),用于 MVCC 版本控制
    Flush 将内存中的数据持久化到存储的过程
    Compaction Segment 合并优化,将多个小 Segment 合并为大 Segment
    WriteBuffer 数据写入的内存缓冲区
    SyncTask 数据同步任务,负责将 buffer 数据持久化
    Binlog Binary Log,二进制日志,Milvus 的数据文件格式之一
    Parquet Apache Parquet,列式存储格式,StorageV2 使用
    Arrow Record Apache Arrow 的数据记录格式,内存中的列式数据表示
    Growing Segment 增长中的 Segment,正在接收写入
    Sealed Segment 已封存的 Segment,不再接收新数据
    InsertCodec Insert 数据的编解码器,用于 StorageV1
    TTL Time To Live,数据存活时间

    文档版本: v1.1
    最后更新: 2025-01-24
    适用 Milvus 版本: 2.5.x


    附录 A:Segment 内部 Layout

    A.1 Segment 类型概述

    Milvus 中有两种主要的 Segment 类型:

    Growing Segment (增长中的 Segment)

    • 状态: SegmentState_Growing
    • 特点: 正在接收写入操作
    • 实现类: SegmentGrowingImpl
    • 数据结构: 使用 ConcurrentVector 存储,支持并发写入
    • 索引: 可选的 interim index(临时索引)

    Sealed Segment (封存的 Segment)

    • 状态: SegmentState_Sealed / SegmentState_Flushing / SegmentState_Flushed
    • 特点: 不再接收写入,数据已持久化
    • 实现类: ChunkedSegmentSealedImpl
    • 数据结构: 使用 ChunkedColumnInterface 存储,支持 mmap 和延迟加载
    • 索引: 可以加载永久索引

    A.2 Growing Segment 内部结构

    A.2.1 核心组件

    文件: internal/core/src/segcore/SegmentGrowingImpl.h

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    class SegmentGrowingImpl : public SegmentGrowing {
    private:
    // 1. Schema 和元数据
    SchemaPtr schema_; // Collection schema
    IndexMetaPtr index_meta_; // Index metadata
    SegcoreConfig segcore_config_; // Segment 配置
    int64_t id_; // Segment ID

    // 2. 插入数据记录 ★ 核心数据结构
    InsertRecord<false> insert_record_;

    // 3. 索引记录 (Growing Index)
    IndexingRecord indexing_record_;

    // 4. 删除记录
    DeletedRecord<false> deleted_record_;

    // 5. 统计信息
    SegmentStats stats_{};

    // 6. 并发控制
    mutable std::shared_mutex chunk_mutex_;

    // 7. mmap 管理
    storage::MmapChunkDescriptorPtr mmap_descriptor_;
    };

    A.2.2 InsertRecord 数据布局

    文件: internal/core/src/segcore/InsertRecord.h

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    class InsertRecordGrowing {
    public:
    // ★ Timestamp 列 (系统字段)
    ConcurrentVector<Timestamp> timestamps_;

    // ★ Timestamp 索引(用于时间旅行查询)
    TimestampIndex timestamp_index_;

    // ★ PK 到 Offset 的映射(用于快速查找)
    std::unique_ptr<OffsetMap> pk2offset_;

    // ★ 预分配的空间
    std::atomic<int64_t> reserved = 0;

    // ★ 响应器(用于并发控制)
    AckResponder ack_responder_;

    private:
    // ★ 字段数据存储 (map[FieldID] -> Vector)
    std::unordered_map<FieldId, std::unique_ptr<VectorBase>> data_{};

    // ★ Nullable 字段的 valid 数据
    std::unordered_map<FieldId, ThreadSafeValidDataPtr> valid_data_{};

    // 并发保护
    mutable std::shared_mutex shared_mutex_{};
    };

    A.2.3 数据组织方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    Growing Segment Memory Layout:
    ┌─────────────────────────────────────────────────┐
    │ SegmentGrowingImpl │
    ├─────────────────────────────────────────────────┤
    │ insert_record_ (InsertRecordGrowing) │
    │ ├─ timestamps_: ConcurrentVector<Timestamp> │ ★ Timestamp 列
    │ ├─ timestamp_index_: TimestampIndex │ ★ Timestamp 索引
    │ ├─ pk2offset_: OffsetMap │ ★ PK 索引
    │ └─ data_: map<FieldId, VectorBase> │ ★ 字段数据
    │ ├─ FieldID=100: ConcurrentVector<int64> │ (用户字段1)
    │ ├─ FieldID=101: ConcurrentVector<float> │ (用户字段2)
    │ ├─ FieldID=102: ConcurrentVector<vector> │ (向量字段)
    │ └─ ... │
    ├─────────────────────────────────────────────────┤
    │ indexing_record_ (IndexingRecord) │
    │ ├─ Chunk-level indexes │ ★ 分块索引
    │ └─ Interim vector indexes │ ★ 临时向量索引
    ├─────────────────────────────────────────────────┤
    │ deleted_record_ (DeletedRecord) │
    │ └─ Deleted PKs with timestamps │ ★ 删除记录
    └─────────────────────────────────────────────────┘

    A.2.4 Chunk 机制

    Growing Segment 使用 Chunk 机制组织数据:

    • Chunk Size: 默认 32768 行(可配置)
    • 目的: 支持大规模数据的高效存储和查询
    • 索引: 每个 chunk 可以独立建立索引
    1
    2
    3
    4
    // 代码示例
    int64_t chunk_rows = segcore_config_.get_chunk_rows(); // 默认 32768
    auto chunk_id = offset / chunk_rows;
    auto offset_in_chunk = offset % chunk_rows;

    A.3 Sealed Segment 内部结构

    A.3.1 核心组件

    文件: internal/core/src/segcore/ChunkedSegmentSealedImpl.h

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    class ChunkedSegmentSealedImpl : public SegmentSealed {
    private:
    // 1. Schema 和元数据
    SchemaPtr schema_;
    IndexMetaPtr col_index_meta_;
    int64_t id_;
    SegcoreConfig segcore_config_;

    // 2. 行数
    std::optional<int64_t> num_rows_;

    // 3. 加载状态
    BitsetType field_data_ready_bitset_; // 字段数据是否已加载
    BitsetType index_ready_bitset_; // 索引是否已加载
    BitsetType binlog_index_bitset_; // Binlog 索引标记
    std::atomic<int> system_ready_count_; // 系统字段计数器

    // 4. ★ 字段数据存储(列式存储)
    folly::Synchronized<std::unordered_map<
    FieldId,
    std::shared_ptr<ChunkedColumnInterface>
    >> fields_;

    // 5. ★ 索引存储
    // 5.1 Scalar 索引
    folly::Synchronized<std::unordered_map<
    FieldId,
    index::CacheIndexBasePtr
    >> scalar_indexings_;

    // 5.2 Vector 索引
    SealedIndexingRecord vector_indexings_;

    // 5.3 N-gram 索引(用于文本)
    folly::Synchronized<std::unordered_map<
    FieldId,
    std::unordered_map<std::string, index::CacheIndexBasePtr>
    >> ngram_indexings_;

    // 6. ★ InsertRecord (仅包含系统字段和 PK)
    InsertRecord<true> insert_record_;

    // 7. 删除记录
    DeletedRecord<true> deleted_record_;

    // 8. Mmap 字段
    std::unordered_set<FieldId> mmap_field_ids_;

    // 9. 统计信息
    SegmentStats stats_{};

    // 10. PK 排序标记
    bool is_sorted_by_pk_ = false;

    // 11. Storage V2 Reader
    std::unique_ptr<milvus_storage::api::Reader> reader_;
    };

    A.3.2 数据布局

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    Sealed Segment Memory Layout:
    ┌────────────────────────────────────────────────┐
    │ ChunkedSegmentSealedImpl │
    ├────────────────────────────────────────────────┤
    │ insert_record_ (InsertRecordSealed) │
    │ ├─ timestamps_: ConcurrentVector<Timestamp> │ ★ 仅系统字段
    │ ├─ timestamp_index_: TimestampIndex │
    │ └─ pk2offset_: OffsetOrderedArray │ ★ PK 索引(已排序)
    ├────────────────────────────────────────────────┤
    │ fields_: map<FieldId, ChunkedColumnInterface>│ ★ 列数据
    │ ├─ FieldID=100: ChunkedColumn<int64> │ (Scalar 字段)
    │ │ ├─ Chunk 0: [data...] │
    │ │ ├─ Chunk 1: [data...] │
    │ │ └─ ... │
    │ ├─ FieldID=102: ChunkedColumn<vector> │ (向量字段)
    │ │ └─ 可能被索引替代 │
    │ └─ ... │
    ├────────────────────────────────────────────────┤
    │ scalar_indexings_: map<FieldId, Index> │ ★ Scalar 索引
    │ └─ FieldID=100: ScalarIndex │
    ├────────────────────────────────────────────────┤
    │ vector_indexings_: SealedIndexingRecord │ ★ Vector 索引
    │ ├─ FieldID=102: VectorIndex (HNSW/IVF/...) │
    │ └─ index_has_raw_data_: bool │ (是否保留原始数据)
    ├────────────────────────────────────────────────┤
    │ deleted_record_: DeletedRecord │ ★ 删除记录
    └────────────────────────────────────────────────┘

    A.3.3 ChunkedColumn 结构

    Sealed Segment 使用分块列存储:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // 列接口
    class ChunkedColumnInterface {
    public:
    virtual size_t num_chunks() const = 0;
    virtual size_t chunk_size(size_t chunk_id) const = 0;
    virtual SpanBase chunk_data(size_t chunk_id) const = 0;

    // 支持 mmap
    virtual bool is_mmaped() const = 0;
    };

    // 使用示例
    auto column = fields_[field_id];
    for (size_t chunk_id = 0; chunk_id < column->num_chunks(); ++chunk_id) {
    auto data = column->chunk_data(chunk_id);
    // 处理 chunk 数据
    }

    A.4 文件系统中的 Layout

    A.4.1 StorageV1 (Binlog) 布局

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    {root_path}/{collectionID}/{partitionID}/{segmentID}/
    ├── insert_log/ # 插入数据
    │ ├── 0/ # RowID field
    │ │ └── {logID} # Binlog 文件
    │ ├── 1/ # ★ Timestamp field
    │ │ └── {logID}
    │ ├── 100/ # User field 1
    │ │ └── {logID}
    │ ├── 101/ # User field 2
    │ │ └── {logID}
    │ └── 102/ # Vector field
    │ └── {logID}

    ├── delta_log/ # 删除数据
    │ └── {logID}

    ├── stats_log/ # 统计数据(PK stats)
    │ └── {logID}

    └── index/ # 索引文件
    └── {fieldID}/
    ├── {indexBuildID}/
    │ ├── index_params
    │ ├── index_info
    │ └── index_data_*
    └── ...

    A.4.2 StorageV2 (Parquet) 布局

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    {root_path}/{collectionID}/{partitionID}/{segmentID}/
    ├── insert_log/
    │ ├── {columnGroupID}/
    │ │ └── {logID}.parquet # Parquet 文件(包含多个字段)
    │ │ # ★ Timestamp 作为其中一列
    │ ├── manifest.json # 元数据清单
    │ │ ├── schema
    │ │ ├── column_groups
    │ │ │ ├─ group_id: 1
    │ │ │ │ └─ fields: [0, 1, 100] # RowID, Timestamp, Field1
    │ │ │ └─ group_id: 2
    │ │ │ └─ fields: [101, 102] # Field2, VectorField
    │ │ └── files
    │ └── ...

    ├── delta_log/
    │ └── {logID}

    ├── stats_log/
    │ └── {logID}

    └── index/
    └── ... (同 V1)

    A.4.3 Binlog 文件内部结构

    每个 binlog 文件包含:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    Binlog File Structure:
    ┌─────────────────────────────┐
    │ Magic Number │ 4 bytes
    ├─────────────────────────────┤
    │ Descriptor Event │
    │ ├─ CollectionID │
    │ ├─ PartitionID │
    │ ├─ SegmentID │
    │ ├─ FieldID │
    │ ├─ StartTimestamp │ ★ Min timestamp
    │ ├─ EndTimestamp │ ★ Max timestamp
    │ └─ PayloadDataType │
    ├─────────────────────────────┤
    │ Insert Event 1 │
    │ ├─ StartTimestamp │
    │ ├─ EndTimestamp │
    │ └─ Payload (field data) │ ★ 实际字段数据
    ├─────────────────────────────┤
    │ Insert Event 2 │
    │ └─ ... │
    ├─────────────────────────────┤
    │ ... │
    └─────────────────────────────┘

    A.5 查询时的数据访问

    A.5.1 访问路径

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    Query Request

    Plan Node

    Segment Interface

    ├─ [Growing Segment]
    │ └─> InsertRecord.data_[fieldID]
    │ └─> ConcurrentVector::get(offset)

    └─ [Sealed Segment]
    ├─> [Has Index?]
    │ ├─ Yes → vector_indexings_[fieldID]
    │ │ └─> Index Search
    │ └─ No → fields_[fieldID]
    │ └─> ChunkedColumn::chunk_data(chunk_id)

    └─> [Filter by Timestamp]
    └─> insert_record_.timestamps_
    └─> TimestampIndex (Binary Search)

    A.5.2 Timestamp 过滤示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    // 文件: internal/core/src/segcore/SegmentGrowingImpl.cpp

    void SegmentGrowingImpl::mask_with_timestamps(
    BitsetTypeView& bitset_chunk,
    Timestamp timestamp,
    Timestamp ttl) const {

    auto timestamps_data_ptr = insert_record_.timestamps_.data();
    auto size = insert_record_.size();

    // 使用 timestamp 过滤数据
    for (int64_t i = 0; i < size; ++i) {
    auto ts = timestamps_data_ptr[i];

    // 过滤未来的数据(ts > query_timestamp)
    if (ts > timestamp) {
    bitset_chunk[i] = true; // mask out
    }

    // 过滤过期的数据(根据 TTL)
    if (ttl > 0 && timestamp - ts > ttl) {
    bitset_chunk[i] = true; // mask out
    }
    }
    }

    A.6 内存管理

    A.6.1 Growing Segment 内存管理

    1. 动态增长: ConcurrentVector 按需分配内存
    2. Chunk 管理: 数据按 chunk 组织,支持增量加载
    3. Mmap 支持: 可选地使用 mmap 减少内存占用
    1
    2
    3
    4
    5
    6
    // Chunk 大小配置
    int64_t chunk_rows = segcore_config_.get_chunk_rows(); // 默认 32768

    // 估算内存使用
    size_t memory_per_row = EstimateRowSize(schema);
    size_t total_memory = num_rows * memory_per_row;

    A.6.2 Sealed Segment 内存管理

    1. 延迟加载: 字段和索引按需加载
    2. Mmap: 支持 mmap 模式,减少内存拷贝
    3. 缓存管理: 使用 LRU 缓存管理索引和字段数据
    1
    2
    3
    4
    5
    6
    // 加载状态检查
    bool CanQuery() {
    return system_ready_count_ == 2 // RowID & Timestamp loaded
    && AllRequiredFieldsReady() // Query fields loaded
    && (HasIndex() || HasRawData()); // Index or raw data available
    }

    A.7 关键数据结构总结

    组件 Growing Segment Sealed Segment 说明
    Timestamp ConcurrentVector<Timestamp> ConcurrentVector<Timestamp> 两者都存储完整 timestamp
    TimestampIndex TimestampIndex TimestampIndex 用于时间旅行查询
    PK Index OffsetOrderedMap OffsetOrderedArray Growing 用 Map,Sealed 用 Array
    Field Data map<FieldId, ConcurrentVector> map<FieldId, ChunkedColumn> 列式存储
    Vector Index IndexingRecord (interim) SealedIndexingRecord (permanent) 索引类型不同
    Scalar Index 可选 map<FieldId, Index> Sealed 支持更多索引
    Delete Record DeletedRecord<false> DeletedRecord<true> 删除记录
    并发控制 std::shared_mutex folly::Synchronized 不同的同步机制

    文档版本: v1.1
    最后更新: 2025-01-24
    适用 Milvus 版本: 2.5.x