Milvus WriteBuffer 管理器(bufferManager)详解

概述

bufferManager 是 Milvus DataNode 中管理多个 WriteBuffer 实例的核心组件。它负责按通道组织和管理数据缓冲,实现内存监控、自动驱逐和刷新策略。

架构设计

核心结构

1
2
3
4
5
6
7
type bufferManager struct {
syncMgr syncmgr.SyncManager
buffers *typeutil.ConcurrentMap[string, WriteBuffer]

wg sync.WaitGroup
ch lifetime.SafeChan
}

关键字段说明:

  • syncMgr: 同步管理器,用于提交同步任务
  • buffers: 线程安全的映射表,key 为通道名,value 为对应的 WriteBuffer
  • wg: WaitGroup,用于等待后台 goroutine 退出
  • ch: 安全通道,用于控制后台检查的启停

接口定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type BufferManager interface {
Register(channel string, metacache metacache.MetaCache, opts ...WriteBufferOption) error
CreateNewGrowingSegment(ctx context.Context, channel string, partition int64, segmentID int64) error
SealSegments(ctx context.Context, channel string, segmentIDs []int64) error
FlushChannel(ctx context.Context, channel string, flushTs uint64) error
RemoveChannel(channel string)
DropChannel(channel string)
DropPartitions(channel string, partitionIDs []int64)
BufferData(channel string, insertData []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error
GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error)
NotifyCheckpointUpdated(channel string, ts uint64)

Start()
Stop()
}

核心功能

1. 通道注册

1
2
3
4
5
6
7
8
9
10
11
12
13
func (m *bufferManager) Register(channel string, metacache metacache.MetaCache, opts ...WriteBufferOption) error {
buf, err := NewWriteBuffer(channel, metacache, m.syncMgr, opts...)
if err != nil {
return err
}

_, loaded := m.buffers.GetOrInsert(channel, buf)
if loaded {
buf.Close(context.Background(), false)
return merr.WrapErrChannelReduplicate(channel)
}
return nil
}

功能:

  • 创建新的 WriteBuffer 实例
  • 注册到管理器
  • 防止重复注册

2. 数据缓冲

1
2
3
4
5
6
7
8
9
10
func (m *bufferManager) BufferData(channel string, insertData []*InsertData, 
deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {

buf, loaded := m.buffers.Get(channel)
if !loaded {
return merr.WrapErrChannelNotFound(channel)
}

return buf.BufferData(insertData, deleteMsgs, startPos, endPos)
}

3. 段管理

创建 Growing 段

1
2
3
4
5
6
7
8
9
10
func (m *bufferManager) CreateNewGrowingSegment(ctx context.Context, channel string, 
partitionID int64, segmentID int64) error {

buf, loaded := m.buffers.Get(channel)
if !loaded {
return merr.WrapErrChannelNotFound(channel)
}
buf.CreateNewGrowingSegment(partitionID, segmentID, nil)
return nil
}

封装段

1
2
3
4
5
6
7
8
func (m *bufferManager) SealSegments(ctx context.Context, channel string, segmentIDs []int64) error {
buf, loaded := m.buffers.Get(channel)
if !loaded {
return merr.WrapErrChannelNotFound(channel)
}

return buf.SealSegments(ctx, segmentIDs)
}

功能:

  • 将段状态从 Growing 变为 Sealed
  • 触发段的同步操作

4. 刷新通道

1
2
3
4
5
6
7
8
func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error {
buf, loaded := m.buffers.Get(channel)
if !loaded {
return merr.WrapErrChannelNotFound(channel)
}
buf.SetFlushTimestamp(flushTs)
return nil
}

功能:

  • 设置通道的刷新时间戳(flushTs)
  • 触发基于时间戳的刷新策略

5. 检查点管理

获取检查点

1
2
3
4
5
6
7
8
9
10
11
func (m *bufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) {
buf, loaded := m.buffers.Get(channel)
if !loaded {
return nil, false, merr.WrapErrChannelNotFound(channel)
}
cp := buf.GetCheckpoint()
flushTs := buf.GetFlushTimestamp()

// 返回检查点和是否需要更新的标志
return cp, flushTs != nonFlushTS && cp.GetTimestamp() >= flushTs, nil
}

返回值说明:

  • *msgpb.MsgPosition: 通道检查点
  • bool: 是否需要更新(当检查点时间戳 >= flushTs 时返回 true)
  • error: 错误信息

通知检查点更新

1
2
3
4
5
6
7
8
9
10
11
12
13
func (m *bufferManager) NotifyCheckpointUpdated(channel string, ts uint64) {
buf, loaded := m.buffers.Get(channel)
if !loaded {
return
}
flushTs := buf.GetFlushTimestamp()

// 关键条件:ts > flushTs
if flushTs != nonFlushTS && ts > flushTs {
log.Info("reset channel flushTs", zap.String("channel", channel))
buf.SetFlushTimestamp(nonFlushTS)
}
}

关键逻辑:

  • 当检查点时间戳严格大于 flushTs 时,重置 flushTs
  • 这确保了刷新操作已完成,可以安全重置

内存管理

后台检查机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (m *bufferManager) Start() {
m.wg.Add(1)
go func() {
defer m.wg.Done()
m.check()
}()
}

func (m *bufferManager) check() {
timer := time.NewTimer(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond))
defer timer.Stop()

for {
select {
case <-timer.C:
m.memoryCheck()
timer.Reset(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond))
case <-m.ch.CloseCh():
log.Info("buffer manager memory check stopped")
return
}
}
}

内存检查逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (m *bufferManager) memoryCheck() {
if !paramtable.Get().DataNodeCfg.MemoryForceSyncEnable.GetAsBool() {
return
}

for {
var total int64
var candidate WriteBuffer
var candiSize int64
var candiChan string

// 遍历所有缓冲区,找到最大的
m.buffers.Range(func(chanName string, buf WriteBuffer) bool {
size := buf.MemorySize()
total += size
if size > candiSize {
candiSize = size
candidate = buf
candiChan = chanName
}
return true
})

// 检查是否超过水位线
totalMemory := hardware.GetMemoryCount()
memoryWatermark := float64(totalMemory) * paramtable.Get().DataNodeCfg.MemoryForceSyncWatermark.GetAsFloat()

if float64(total) < memoryWatermark {
return // 未超过水位线,退出
}

// 触发同步
if candidate != nil {
candidate.EvictBuffer(GetOldestBufferPolicy(
paramtable.Get().DataNodeCfg.MemoryForceSyncSegmentNum.GetAsInt()))
log.Info("notify writebuffer to sync",
zap.String("channel", candiChan),
zap.Float64("bufferSize(MB)", logutil.ToMB(float64(candiSize))))
}
}
}

内存管理策略:

  1. 定期检查: 按配置的时间间隔检查内存使用
  2. 水位线机制: 当总内存超过水位线时触发同步
  3. 选择策略: 选择内存使用最大的缓冲区进行同步
  4. 驱逐策略: 使用 GetOldestBufferPolicy 选择最老的段进行同步

通道生命周期管理

移除通道

1
2
3
4
5
6
7
8
9
func (m *bufferManager) RemoveChannel(channel string) {
buf, loaded := m.buffers.GetAndRemove(channel)
if !loaded {
return
}

// 丢弃所有缓冲数据
buf.Close(context.Background(), false)
}

用途: 通道迁移时,丢弃本地缓冲数据

删除通道

1
2
3
4
5
6
7
8
9
func (m *bufferManager) DropChannel(channel string) {
buf, loaded := m.buffers.GetAndRemove(channel)
if !loaded {
return
}

// 保存所有缓冲数据
buf.Close(context.Background(), true)
}

用途: 删除集合时,保存所有缓冲数据

删除分区

1
2
3
4
5
6
7
8
func (m *bufferManager) DropPartitions(channel string, partitionIDs []int64) {
buf, loaded := m.buffers.Get(channel)
if !loaded {
return
}

buf.DropPartitions(partitionIDs)
}

设计特点

1. 线程安全

使用 ConcurrentMap 实现线程安全的并发访问。

2. 内存保护

通过后台检查机制,防止内存溢出。

3. 灵活的策略

支持多种同步策略(内存、时间、手动刷新等)。

4. 优雅关闭

通过 SafeChanWaitGroup 实现优雅关闭。

与其他组件的关系

1
2
3
4
5
6
7
8
9
10
11
12
bufferManager

├── WriteBuffer (按通道组织)
│ │
│ ├── segmentBuffer (按段组织)
│ │ ├── InsertBuffer
│ │ └── DeltaBuffer
│ │
│ └── SyncPolicy (同步策略)

└── SyncManager
└── SyncTask

配置参数

  • MemoryCheckInterval: 内存检查间隔
  • MemoryForceSyncEnable: 是否启用强制同步
  • MemoryForceSyncWatermark: 内存强制同步水位线
  • MemoryForceSyncSegmentNum: 每次强制同步的段数量

相关文档