概述 WriteBuffer 是 Milvus DataNode 中管理单个通道数据缓冲的核心组件。它负责接收和缓冲插入/删除数据,根据同步策略触发数据同步,并管理段的生命周期。
接口定义 1 2 3 4 5 6 7 8 9 10 11 12 13 type WriteBuffer interface { HasSegment(segmentID int64 ) bool CreateNewGrowingSegment(partitionID int64 , segmentID int64 , startPos *msgpb.MsgPosition) BufferData(insertMsgs []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error SetFlushTimestamp(flushTs uint64 ) GetFlushTimestamp() uint64 SealSegments(ctx context.Context, segmentIDs []int64 ) error DropPartitions(partitionIDs []int64 ) GetCheckpoint() *msgpb.MsgPosition MemorySize() int64 EvictBuffer(policies ...SyncPolicy) Close(ctx context.Context, drop bool ) }
基础实现(writeBufferBase) 核心结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 type writeBufferBase struct { channelName string collectionID int64 mut sync.RWMutex segmentBuffers map [int64 ]*segmentBuffer checkpoint *msgpb.MsgPosition flushTimestamp *atomic.Uint64 syncMgr syncmgr.SyncManager metaCache metacache.MetaCache syncPolicies []SyncPolicy }
段缓冲(segmentBuffer) 1 2 3 4 5 6 7 8 9 type segmentBuffer struct { segmentID int64 insertBuffer *InsertBuffer deltaBuffer *DeltaBuffer startPosition *msgpb.MsgPosition checkpoint *msgpb.MsgPosition }
功能:
insertBuffer: 存储插入数据
deltaBuffer: 存储删除数据
维护段的起始位置和检查点
L0 WriteBuffer 特殊实现 为什么需要 L0 WriteBuffer? 在流式服务模式下,已刷新的段不再维护 Bloom Filter。为了高效处理删除操作,Milvus 引入了 L0 段(Level-0 Segment)专门用于存储删除数据。
核心结构 1 2 3 4 5 6 7 8 9 type l0WriteBuffer struct { *writeBufferBase l0Segments map [int64 ]int64 l0partition map [int64 ]int64 syncMgr syncmgr.SyncManager idAllocator allocator.Interface }
关键差异 1. 删除消息分发(不使用 Bloom Filter) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (wb *l0WriteBuffer) dispatchDeleteMsgsWithoutFilter(deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) { for _, msg := range deleteMsgs { l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID(), startPos) pks := storage.ParseIDs2PrimaryKeys(msg.GetPrimaryKeys()) pkTss := msg.GetTimestamps() if len (pks) > 0 { wb.bufferDelete(l0SegmentID, pks, pkTss, startPos, endPos) } } }
特点:
不进行 Bloom Filter 过滤
所有删除消息都写入 L0 段
简化了删除处理逻辑
2. L0 段管理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (wb *l0WriteBuffer) getL0SegmentID(partitionID int64 , startPos *msgpb.MsgPosition) int64 { segmentID, ok := wb.l0Segments[partitionID] if !ok { err := retry.Do(context.Background(), func () error { var err error segmentID, err = wb.idAllocator.AllocOne() return err }) wb.l0Segments[partitionID] = segmentID wb.l0partition[segmentID] = partitionID wb.metaCache.AddSegment(&datapb.SegmentInfo{ ID: segmentID, PartitionID: partitionID, CollectionID: wb.collectionID, InsertChannel: wb.channelName, StartPosition: startPos, State: commonpb.SegmentState_Growing, Level: datapb.SegmentLevel_L0, }, ...) } return segmentID }
特点:
每个分区对应一个 L0 段
L0 段始终标记为 Growing 状态
L0 段总是需要刷新(isFlush = true)
数据缓冲流程 BufferData 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func (wb *l0WriteBuffer) BufferData(insertData []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error { wb.mut.Lock() defer wb.mut.Unlock() for _, inData := range insertData { err := wb.bufferInsert(inData, startPos, endPos) if err != nil { return err } } wb.dispatchDeleteMsgsWithoutFilter(deleteMsgs, startPos, endPos) wb.checkpoint = endPos segmentsSync := wb.triggerSync() for _, segment := range segmentsSync { partition, ok := wb.l0partition[segment] if ok { delete (wb.l0partition, segment) delete (wb.l0Segments, partition) } } return nil }
同步策略(SyncPolicy) 策略接口 1 2 3 4 type SyncPolicy interface { SelectSegments(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 Reason() string }
常用策略 1. 满缓冲区策略 1 2 3 4 5 6 7 8 9 10 11 func GetFullBufferPolicy () SyncPolicy { return wrapSelectSegmentFuncPolicy(func (buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 { var result []int64 for _, buf := range buffers { if buf.insertBuffer.IsFull() { result = append (result, buf.segmentID) } } return result }, "full buffer" ) }
2. 刷新时间戳策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func GetFlushTsPolicy (flushTimestamp *atomic.Uint64, meta metacache.MetaCache) SyncPolicy { return wrapSelectSegmentFuncPolicy(func (buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 { flushTs := flushTimestamp.Load() if flushTs != nonFlushTS && ts >= flushTs { var result []int64 for _, buf := range buffers { if buf.insertBuffer.MinTimestamp() < flushTs { result = append (result, buf.segmentID) } } return result } return nil }, "flush ts" ) }
触发条件: 当前时间戳 >= flushTs
3. 最老缓冲区策略 1 2 3 4 5 6 func GetOldestBufferPolicy (maxNum int ) SyncPolicy { return wrapSelectSegmentFuncPolicy(func (buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 { }, "oldest buffer" ) }
同步触发机制 EvictBuffer 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (wb *writeBufferBase) EvictBuffer(policies ...SyncPolicy) { wb.mut.RLock() buffers := make ([]*segmentBuffer, 0 , len (wb.segmentBuffers)) for _, buf := range wb.segmentBuffers { buffers = append (buffers, buf) } currentTs := wb.checkpoint.GetTimestamp() wb.mut.RUnlock() allPolicies := append (wb.syncPolicies, policies...) segmentIDs := typeutil.NewUniqueSet() for _, policy := range allPolicies { selected := policy.SelectSegments(buffers, currentTs) for _, id := range selected { segmentIDs.Insert(id) } } if segmentIDs.Len() > 0 { wb.syncSegments(context.Background(), segmentIDs.Collect()) } }
syncSegments 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64 ) []*conc.Future[struct {}] { var futures []*conc.Future[struct {}] for _, segmentID := range segmentIDs { task, err := wb.getSyncTask(ctx, segmentID) if err != nil { continue } future, err := wb.syncMgr.SyncData(ctx, task, func (err error ) error { if err == nil && task.IsFlush() { wb.metaCache.RemoveSegments(metacache.WithSegmentIDs(segmentID)) } return err }) futures = append (futures, future) } return futures }
检查点管理 GetCheckpoint 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition { wb.mut.RLock() defer wb.mut.RUnlock() var earliest *checkpointCandidate for _, buf := range wb.segmentBuffers { if buf.insertBuffer.rows > 0 || buf.deltaBuffer.Size() > 0 { candidate := &checkpointCandidate{ segmentID: buf.segmentID, position: buf.startPosition, } if earliest == nil || candidate.position.GetTimestamp() < earliest.position.GetTimestamp() { earliest = candidate } } } if earliest != nil { return earliest.position } return wb.checkpoint }
段生命周期管理 SealSegments 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (wb *writeBufferBase) SealSegments(ctx context.Context, segmentIDs []int64 ) error { wb.mut.Lock() defer wb.mut.Unlock() wb.metaCache.UpdateSegments( metacache.UpdateState(commonpb.SegmentState_Sealed), metacache.WithSegmentIDs(segmentIDs...), ) wb.syncSegments(ctx, segmentIDs) return nil }
默认实现选择 1 2 3 4 5 6 func NewWriteBuffer (channel string , metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, opts ...WriteBufferOption) (WriteBuffer, error ) { return NewL0WriteBuffer(channel, metacache, syncMgr, option) }
原因:
L0WriteBuffer 适配流式服务模式
简化删除处理逻辑
提高删除操作性能
设计特点 1. 分层设计
WriteBuffer 接口:定义抽象
writeBufferBase:基础实现
l0WriteBuffer:特殊实现
2. 策略模式 通过 SyncPolicy 实现灵活的同步策略。
3. 线程安全 使用 RWMutex 保护共享状态。
4. 异步同步 通过 SyncManager 实现异步数据同步。
相关文档