Milvus WriteBuffer 实现详解

概述

WriteBuffer 是 Milvus DataNode 中管理单个通道数据缓冲的核心组件。它负责接收和缓冲插入/删除数据,根据同步策略触发数据同步,并管理段的生命周期。

接口定义

1
2
3
4
5
6
7
8
9
10
11
12
13
type WriteBuffer interface {
HasSegment(segmentID int64) bool
CreateNewGrowingSegment(partitionID int64, segmentID int64, startPos *msgpb.MsgPosition)
BufferData(insertMsgs []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error
SetFlushTimestamp(flushTs uint64)
GetFlushTimestamp() uint64
SealSegments(ctx context.Context, segmentIDs []int64) error
DropPartitions(partitionIDs []int64)
GetCheckpoint() *msgpb.MsgPosition
MemorySize() int64
EvictBuffer(policies ...SyncPolicy)
Close(ctx context.Context, drop bool)
}

基础实现(writeBufferBase)

核心结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type writeBufferBase struct {
channelName string
collectionID int64

mut sync.RWMutex

segmentBuffers map[int64]*segmentBuffer
checkpoint *msgpb.MsgPosition
flushTimestamp *atomic.Uint64

syncMgr syncmgr.SyncManager
metaCache metacache.MetaCache

syncPolicies []SyncPolicy
// ... 其他字段
}

段缓冲(segmentBuffer)

1
2
3
4
5
6
7
8
9
type segmentBuffer struct {
segmentID int64

insertBuffer *InsertBuffer
deltaBuffer *DeltaBuffer

startPosition *msgpb.MsgPosition
checkpoint *msgpb.MsgPosition
}

功能:

  • insertBuffer: 存储插入数据
  • deltaBuffer: 存储删除数据
  • 维护段的起始位置和检查点

L0 WriteBuffer 特殊实现

为什么需要 L0 WriteBuffer?

在流式服务模式下,已刷新的段不再维护 Bloom Filter。为了高效处理删除操作,Milvus 引入了 L0 段(Level-0 Segment)专门用于存储删除数据。

核心结构

1
2
3
4
5
6
7
8
9
type l0WriteBuffer struct {
*writeBufferBase

l0Segments map[int64]int64 // partitionID => l0 segment ID
l0partition map[int64]int64 // l0 segment id => partition id

syncMgr syncmgr.SyncManager
idAllocator allocator.Interface
}

关键差异

1. 删除消息分发(不使用 Bloom Filter)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (wb *l0WriteBuffer) dispatchDeleteMsgsWithoutFilter(deleteMsgs []*msgstream.DeleteMsg, 
startPos, endPos *msgpb.MsgPosition) {

for _, msg := range deleteMsgs {
// 获取或创建 L0 段 ID
l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID(), startPos)

// 解析主键和时间戳
pks := storage.ParseIDs2PrimaryKeys(msg.GetPrimaryKeys())
pkTss := msg.GetTimestamps()

if len(pks) > 0 {
// 直接缓冲删除数据,不使用 Bloom Filter 过滤
wb.bufferDelete(l0SegmentID, pks, pkTss, startPos, endPos)
}
}
}

特点:

  • 不进行 Bloom Filter 过滤
  • 所有删除消息都写入 L0 段
  • 简化了删除处理逻辑

2. L0 段管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (wb *l0WriteBuffer) getL0SegmentID(partitionID int64, startPos *msgpb.MsgPosition) int64 {
segmentID, ok := wb.l0Segments[partitionID]
if !ok {
// 分配新的 L0 段 ID
err := retry.Do(context.Background(), func() error {
var err error
segmentID, err = wb.idAllocator.AllocOne()
return err
})

wb.l0Segments[partitionID] = segmentID
wb.l0partition[segmentID] = partitionID

// 添加到 MetaCache,标记为 L0 Growing
wb.metaCache.AddSegment(&datapb.SegmentInfo{
ID: segmentID,
PartitionID: partitionID,
CollectionID: wb.collectionID,
InsertChannel: wb.channelName,
StartPosition: startPos,
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L0, // L0 级别
}, ...)
}
return segmentID
}

特点:

  • 每个分区对应一个 L0 段
  • L0 段始终标记为 Growing 状态
  • L0 段总是需要刷新(isFlush = true)

数据缓冲流程

BufferData 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (wb *l0WriteBuffer) BufferData(insertData []*InsertData, deleteMsgs []*msgstream.DeleteMsg, 
startPos, endPos *msgpb.MsgPosition) error {

wb.mut.Lock()
defer wb.mut.Unlock()

// 1. 缓冲插入数据
for _, inData := range insertData {
err := wb.bufferInsert(inData, startPos, endPos)
if err != nil {
return err
}
}

// 2. 分发删除消息(不使用 Bloom Filter)
wb.dispatchDeleteMsgsWithoutFilter(deleteMsgs, startPos, endPos)

// 3. 更新检查点
wb.checkpoint = endPos

// 4. 触发同步
segmentsSync := wb.triggerSync()

// 5. 清理已同步的 L0 段映射
for _, segment := range segmentsSync {
partition, ok := wb.l0partition[segment]
if ok {
delete(wb.l0partition, segment)
delete(wb.l0Segments, partition)
}
}

return nil
}

同步策略(SyncPolicy)

策略接口

1
2
3
4
type SyncPolicy interface {
SelectSegments(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64
Reason() string
}

常用策略

1. 满缓冲区策略

1
2
3
4
5
6
7
8
9
10
11
func GetFullBufferPolicy() SyncPolicy {
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
var result []int64
for _, buf := range buffers {
if buf.insertBuffer.IsFull() {
result = append(result, buf.segmentID)
}
}
return result
}, "full buffer")
}

2. 刷新时间戳策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func GetFlushTsPolicy(flushTimestamp *atomic.Uint64, meta metacache.MetaCache) SyncPolicy {
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
flushTs := flushTimestamp.Load()
if flushTs != nonFlushTS && ts >= flushTs {
var result []int64
for _, buf := range buffers {
if buf.insertBuffer.MinTimestamp() < flushTs {
result = append(result, buf.segmentID)
}
}
return result
}
return nil
}, "flush ts")
}

触发条件: 当前时间戳 >= flushTs

3. 最老缓冲区策略

1
2
3
4
5
6
func GetOldestBufferPolicy(maxNum int) SyncPolicy {
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
// 按时间戳排序,选择最老的 N 个
// ...
}, "oldest buffer")
}

同步触发机制

EvictBuffer 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (wb *writeBufferBase) EvictBuffer(policies ...SyncPolicy) {
wb.mut.RLock()
buffers := make([]*segmentBuffer, 0, len(wb.segmentBuffers))
for _, buf := range wb.segmentBuffers {
buffers = append(buffers, buf)
}
currentTs := wb.checkpoint.GetTimestamp()
wb.mut.RUnlock()

// 合并所有策略
allPolicies := append(wb.syncPolicies, policies...)

// 收集需要同步的段
segmentIDs := typeutil.NewUniqueSet()
for _, policy := range allPolicies {
selected := policy.SelectSegments(buffers, currentTs)
for _, id := range selected {
segmentIDs.Insert(id)
}
}

// 执行同步
if segmentIDs.Len() > 0 {
wb.syncSegments(context.Background(), segmentIDs.Collect())
}
}

syncSegments 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) []*conc.Future[struct{}] {
var futures []*conc.Future[struct{}]

for _, segmentID := range segmentIDs {
// 创建同步任务
task, err := wb.getSyncTask(ctx, segmentID)
if err != nil {
continue
}

// 提交到 SyncManager
future, err := wb.syncMgr.SyncData(ctx, task, func(err error) error {
if err == nil && task.IsFlush() {
// 刷新成功后,从 MetaCache 移除段
wb.metaCache.RemoveSegments(metacache.WithSegmentIDs(segmentID))
}
return err
})

futures = append(futures, future)
}

return futures
}

检查点管理

GetCheckpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition {
wb.mut.RLock()
defer wb.mut.RUnlock()

// 如果有非空缓冲区,返回最早的起始位置
var earliest *checkpointCandidate
for _, buf := range wb.segmentBuffers {
if buf.insertBuffer.rows > 0 || buf.deltaBuffer.Size() > 0 {
candidate := &checkpointCandidate{
segmentID: buf.segmentID,
position: buf.startPosition,
}
if earliest == nil || candidate.position.GetTimestamp() < earliest.position.GetTimestamp() {
earliest = candidate
}
}
}

// 否则返回最新检查点
if earliest != nil {
return earliest.position
}
return wb.checkpoint
}

段生命周期管理

SealSegments

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (wb *writeBufferBase) SealSegments(ctx context.Context, segmentIDs []int64) error {
wb.mut.Lock()
defer wb.mut.Unlock()

// 更新 MetaCache:Growing -> Sealed
wb.metaCache.UpdateSegments(
metacache.UpdateState(commonpb.SegmentState_Sealed),
metacache.WithSegmentIDs(segmentIDs...),
)

// 触发同步
wb.syncSegments(ctx, segmentIDs)

return nil
}

默认实现选择

1
2
3
4
5
6
func NewWriteBuffer(channel string, metacache metacache.MetaCache, 
syncMgr syncmgr.SyncManager, opts ...WriteBufferOption) (WriteBuffer, error) {

// 默认使用 L0WriteBuffer
return NewL0WriteBuffer(channel, metacache, syncMgr, option)
}

原因:

  • L0WriteBuffer 适配流式服务模式
  • 简化删除处理逻辑
  • 提高删除操作性能

设计特点

1. 分层设计

  • WriteBuffer 接口:定义抽象
  • writeBufferBase:基础实现
  • l0WriteBuffer:特殊实现

2. 策略模式

通过 SyncPolicy 实现灵活的同步策略。

3. 线程安全

使用 RWMutex 保护共享状态。

4. 异步同步

通过 SyncManager 实现异步数据同步。

相关文档