概述
bufferManager 是 Milvus DataNode 中管理多个 WriteBuffer 实例的核心组件。它负责按通道组织和管理数据缓冲,实现内存监控、自动驱逐和刷新策略。
架构设计
核心结构
1 2 3 4 5 6 7
| type bufferManager struct { syncMgr syncmgr.SyncManager buffers *typeutil.ConcurrentMap[string, WriteBuffer] wg sync.WaitGroup ch lifetime.SafeChan }
|
关键字段说明:
syncMgr: 同步管理器,用于提交同步任务
buffers: 线程安全的映射表,key 为通道名,value 为对应的 WriteBuffer
wg: WaitGroup,用于等待后台 goroutine 退出
ch: 安全通道,用于控制后台检查的启停
接口定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| type BufferManager interface { Register(channel string, metacache metacache.MetaCache, opts ...WriteBufferOption) error CreateNewGrowingSegment(ctx context.Context, channel string, partition int64, segmentID int64) error SealSegments(ctx context.Context, channel string, segmentIDs []int64) error FlushChannel(ctx context.Context, channel string, flushTs uint64) error RemoveChannel(channel string) DropChannel(channel string) DropPartitions(channel string, partitionIDs []int64) BufferData(channel string, insertData []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) NotifyCheckpointUpdated(channel string, ts uint64) Start() Stop() }
|
核心功能
1. 通道注册
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (m *bufferManager) Register(channel string, metacache metacache.MetaCache, opts ...WriteBufferOption) error { buf, err := NewWriteBuffer(channel, metacache, m.syncMgr, opts...) if err != nil { return err } _, loaded := m.buffers.GetOrInsert(channel, buf) if loaded { buf.Close(context.Background(), false) return merr.WrapErrChannelReduplicate(channel) } return nil }
|
功能:
- 创建新的 WriteBuffer 实例
- 注册到管理器
- 防止重复注册
2. 数据缓冲
1 2 3 4 5 6 7 8 9 10
| func (m *bufferManager) BufferData(channel string, insertData []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error { buf, loaded := m.buffers.Get(channel) if !loaded { return merr.WrapErrChannelNotFound(channel) } return buf.BufferData(insertData, deleteMsgs, startPos, endPos) }
|
3. 段管理
创建 Growing 段
1 2 3 4 5 6 7 8 9 10
| func (m *bufferManager) CreateNewGrowingSegment(ctx context.Context, channel string, partitionID int64, segmentID int64) error { buf, loaded := m.buffers.Get(channel) if !loaded { return merr.WrapErrChannelNotFound(channel) } buf.CreateNewGrowingSegment(partitionID, segmentID, nil) return nil }
|
封装段
1 2 3 4 5 6 7 8
| func (m *bufferManager) SealSegments(ctx context.Context, channel string, segmentIDs []int64) error { buf, loaded := m.buffers.Get(channel) if !loaded { return merr.WrapErrChannelNotFound(channel) } return buf.SealSegments(ctx, segmentIDs) }
|
功能:
- 将段状态从 Growing 变为 Sealed
- 触发段的同步操作
4. 刷新通道
1 2 3 4 5 6 7 8
| func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error { buf, loaded := m.buffers.Get(channel) if !loaded { return merr.WrapErrChannelNotFound(channel) } buf.SetFlushTimestamp(flushTs) return nil }
|
功能:
- 设置通道的刷新时间戳(flushTs)
- 触发基于时间戳的刷新策略
5. 检查点管理
获取检查点
1 2 3 4 5 6 7 8 9 10 11
| func (m *bufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) { buf, loaded := m.buffers.Get(channel) if !loaded { return nil, false, merr.WrapErrChannelNotFound(channel) } cp := buf.GetCheckpoint() flushTs := buf.GetFlushTimestamp() return cp, flushTs != nonFlushTS && cp.GetTimestamp() >= flushTs, nil }
|
返回值说明:
*msgpb.MsgPosition: 通道检查点
bool: 是否需要更新(当检查点时间戳 >= flushTs 时返回 true)
error: 错误信息
通知检查点更新
1 2 3 4 5 6 7 8 9 10 11 12 13
| func (m *bufferManager) NotifyCheckpointUpdated(channel string, ts uint64) { buf, loaded := m.buffers.Get(channel) if !loaded { return } flushTs := buf.GetFlushTimestamp() if flushTs != nonFlushTS && ts > flushTs { log.Info("reset channel flushTs", zap.String("channel", channel)) buf.SetFlushTimestamp(nonFlushTS) } }
|
关键逻辑:
- 当检查点时间戳严格大于 flushTs 时,重置 flushTs
- 这确保了刷新操作已完成,可以安全重置
内存管理
后台检查机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func (m *bufferManager) Start() { m.wg.Add(1) go func() { defer m.wg.Done() m.check() }() }
func (m *bufferManager) check() { timer := time.NewTimer(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond)) defer timer.Stop() for { select { case <-timer.C: m.memoryCheck() timer.Reset(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond)) case <-m.ch.CloseCh(): log.Info("buffer manager memory check stopped") return } } }
|
内存检查逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| func (m *bufferManager) memoryCheck() { if !paramtable.Get().DataNodeCfg.MemoryForceSyncEnable.GetAsBool() { return } for { var total int64 var candidate WriteBuffer var candiSize int64 var candiChan string m.buffers.Range(func(chanName string, buf WriteBuffer) bool { size := buf.MemorySize() total += size if size > candiSize { candiSize = size candidate = buf candiChan = chanName } return true }) totalMemory := hardware.GetMemoryCount() memoryWatermark := float64(totalMemory) * paramtable.Get().DataNodeCfg.MemoryForceSyncWatermark.GetAsFloat() if float64(total) < memoryWatermark { return } if candidate != nil { candidate.EvictBuffer(GetOldestBufferPolicy( paramtable.Get().DataNodeCfg.MemoryForceSyncSegmentNum.GetAsInt())) log.Info("notify writebuffer to sync", zap.String("channel", candiChan), zap.Float64("bufferSize(MB)", logutil.ToMB(float64(candiSize)))) } } }
|
内存管理策略:
- 定期检查: 按配置的时间间隔检查内存使用
- 水位线机制: 当总内存超过水位线时触发同步
- 选择策略: 选择内存使用最大的缓冲区进行同步
- 驱逐策略: 使用
GetOldestBufferPolicy 选择最老的段进行同步
通道生命周期管理
移除通道
1 2 3 4 5 6 7 8 9
| func (m *bufferManager) RemoveChannel(channel string) { buf, loaded := m.buffers.GetAndRemove(channel) if !loaded { return } buf.Close(context.Background(), false) }
|
用途: 通道迁移时,丢弃本地缓冲数据
删除通道
1 2 3 4 5 6 7 8 9
| func (m *bufferManager) DropChannel(channel string) { buf, loaded := m.buffers.GetAndRemove(channel) if !loaded { return } buf.Close(context.Background(), true) }
|
用途: 删除集合时,保存所有缓冲数据
删除分区
1 2 3 4 5 6 7 8
| func (m *bufferManager) DropPartitions(channel string, partitionIDs []int64) { buf, loaded := m.buffers.Get(channel) if !loaded { return } buf.DropPartitions(partitionIDs) }
|
设计特点
1. 线程安全
使用 ConcurrentMap 实现线程安全的并发访问。
2. 内存保护
通过后台检查机制,防止内存溢出。
3. 灵活的策略
支持多种同步策略(内存、时间、手动刷新等)。
4. 优雅关闭
通过 SafeChan 和 WaitGroup 实现优雅关闭。
与其他组件的关系
1 2 3 4 5 6 7 8 9 10 11 12
| bufferManager │ ├── WriteBuffer (按通道组织) │ │ │ ├── segmentBuffer (按段组织) │ │ ├── InsertBuffer │ │ └── DeltaBuffer │ │ │ └── SyncPolicy (同步策略) │ └── SyncManager └── SyncTask
|
配置参数
MemoryCheckInterval: 内存检查间隔
MemoryForceSyncEnable: 是否启用强制同步
MemoryForceSyncWatermark: 内存强制同步水位线
MemoryForceSyncSegmentNum: 每次强制同步的段数量
相关文档