Milvus C++ 功能通过 CGO 集成分析

概述

Milvus 采用混合架构设计,核心性能关键路径使用 C++ 实现,通过 CGO(C Go)提供给 Go 代码调用。这种设计既保证了性能,又保持了 Go 代码的简洁性和可维护性。

一、CGO 集成架构

1.1 基本结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌─────────────────────────────────────────┐
│ Go 代码层 │
│ (Proxy, QueryNode, DataNode, etc.) │
└──────────────┬──────────────────────────┘
│ CGO 调用

┌─────────────────────────────────────────┐
│ C 接口层 (_c.h, _c.cpp) │
│ 封装 C++ 实现,提供 C 兼容接口 │
└──────────────┬──────────────────────────┘


┌─────────────────────────────────────────┐
│ C++ 实现层 │
│ (高性能核心逻辑) │
└─────────────────────────────────────────┘

1.2 CGO 使用方式

典型示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package segcore

/*
#cgo pkg-config: milvus_core

#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
#include "common/type_c.h"
*/
import "C"

// Go 代码调用 C 函数
func CreateCCollection(req *CreateCCollectionRequest) (*CCollection, error) {
var ptr C.CCollection
status := C.NewCollection(
unsafe.Pointer(&schemaBlob[0]),
(C.int64_t)(len(schemaBlob)),
&ptr)
// ...
}

二、主要 C++ 功能模块

2.1 索引构建(Index Building)

位置internal/core/src/indexbuilder/

CGO 接口indexbuilder/index_c.h

Go 包装internal/util/indexcgowrapper/

功能

  • ✅ 向量索引构建(IVF、HNSW、FLAT 等)
  • ✅ 标量索引构建(倒排索引、B+树等)
  • ✅ 文本索引构建(BM25、N-gram 等)
  • ✅ 索引序列化和反序列化
  • ✅ 索引文件管理

关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
// internal/util/indexcgowrapper/index.go
type CgoIndex struct {
indexPtr C.CIndex
close bool
}

func CreateIndex(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexInfo) (CodecIndex, error) {
var indexPtr C.CIndex
status := C.CreateIndex(&indexPtr,
(*C.uint8_t)(unsafe.Pointer(&buildIndexInfoBlob[0])),
(C.uint64_t)(len(buildIndexInfoBlob)))
// ...
}

C++ 实现文件

  • index_c.cpp - 索引创建和管理
  • VecIndexCreator.cpp - 向量索引创建
  • ScalarIndexCreator.cpp - 标量索引创建

2.2 段核心(Segcore)

位置internal/core/src/segcore/

CGO 接口:多个 *_c.h 文件

Go 包装internal/util/segcore/

功能模块

2.2.1 Collection 管理

CGO 接口segcore/collection_c.h

功能

  • ✅ Collection 创建和销毁
  • ✅ Schema 管理
  • ✅ Index Meta 管理
  • ✅ 字段加载配置

关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// internal/util/segcore/collection.go
type CCollection struct {
ptr C.CCollection
collectionID int64
schema *schemapb.CollectionSchema
indexMeta *segcorepb.CollectionIndexMeta
}

func CreateCCollection(req *CreateCCollectionRequest) (*CCollection, error) {
var ptr C.CCollection
status := C.NewCollection(
unsafe.Pointer(&schemaBlob[0]),
(C.int64_t)(len(schemaBlob)),
&ptr)
// ...
}

2.2.2 Segment 操作

CGO 接口segcore/segment_c.h

功能

  • ✅ Segment 创建(Growing/Sealed)
  • ✅ 数据插入(Insert)
  • ✅ 数据删除(Delete)
  • ✅ 字段数据加载(LoadFieldData)
  • ✅ 索引加载(LoadIndex)
  • ✅ 内存使用统计

关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// internal/util/segcore/segment.go
type cSegmentImpl struct {
id int64
ptr C.CSegmentInterface
}

func (s *cSegmentImpl) Insert(ctx context.Context, request *InsertRequest) (*InsertResult, error) {
status := C.Insert(s.ptr,
(*C.uint8_t)(unsafe.Pointer(&insertBlob[0])),
(C.int64_t)(len(insertBlob)),
C.int64_t(request.NumRows),
C.int64_t(request.Timestamp))
// ...
}

C++ 实现文件

  • segment_c.cpp - Segment C 接口
  • SegmentGrowingImpl.cpp - Growing Segment 实现
  • ChunkedSegmentSealedImpl.cpp - Sealed Segment 实现

2.2.3 查询计划(Query Plan)

CGO 接口segcore/plan_c.h

功能

  • ✅ 查询计划生成
  • ✅ 表达式解析
  • ✅ 查询优化

关键代码

1
2
3
4
5
6
7
8
9
// internal/util/segcore/plan.go
func CreatePlan(collection *CCollection, planBlob []byte) (CPlan, error) {
var planPtr C.CPlan
status := C.CreatePlan(collection.rawPointer(),
(*C.uint8_t)(unsafe.Pointer(&planBlob[0])),
(C.int64_t)(len(planBlob)),
&planPtr)
// ...
}

2.2.4 结果归并(Reduce)

CGO 接口segcore/reduce_c.h

功能

  • ✅ 多段查询结果归并
  • ✅ TopK 结果合并
  • ✅ 聚合操作

关键代码

1
2
3
4
5
6
7
8
// internal/util/segcore/reduce.go
func ReduceSearchResults(ctx context.Context,
plan CPlan,
searchResults []*SearchResult,
numSegments int64) (*SearchResult, error) {
// 调用 C++ reduce 函数
// ...
}

C++ 实现文件

  • reduce_c.cpp - Reduce C 接口
  • reduce/Reduce.cpp - Reduce 实现
  • reduce/GroupReduce.cpp - 分组 Reduce

2.2.5 向量索引操作

CGO 接口segcore/vector_index_c.h

功能

  • ✅ 向量索引加载
  • ✅ 向量搜索
  • ✅ 索引管理

关键代码

1
2
3
4
5
6
7
// internal/util/vecindexmgr/vector_index_mgr.go
func LoadVectorIndex(ctx context.Context,
indexBlob []byte,
indexParams map[string]string) (C.CVectorIndex, error) {
// 调用 C++ 加载向量索引
// ...
}

2.2.6 字段数据加载

CGO 接口segcore/load_field_data_c.h

功能

  • ✅ 从存储加载字段数据
  • ✅ 数据格式转换
  • ✅ 内存管理

C++ 实现文件

  • load_field_data_c.cpp - 字段数据加载 C 接口

2.2.7 索引加载

CGO 接口segcore/load_index_c.h

功能

  • ✅ 索引文件加载
  • ✅ 索引内存映射
  • ✅ 索引验证

C++ 实现文件

  • load_index_c.cpp - 索引加载 C 接口

2.3 存储(Storage)

位置internal/core/src/storage/

CGO 接口storage/storage_c.h, segcore/packed_writer_c.h, segcore/packed_reader_c.h

Go 包装internal/storagev2/packed/

功能

2.3.1 Parquet 读写

功能

  • ✅ Arrow RecordBatch → Parquet 文件写入
  • ✅ Parquet 文件 → Arrow RecordBatch 读取
  • ✅ 列组(Column Group)管理
  • ✅ 压缩和编码

关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// internal/storagev2/packed/packed_writer.go
func (pw *PackedWriter) WriteRecordBatch(recordBatch arrow.Record) error {
// 导出 Arrow 数据到 C 结构
cArrays := make([]CArrowArray, recordBatch.NumCols())
cSchemas := make([]CArrowSchema, recordBatch.NumCols())

for i := range recordBatch.NumCols() {
var caa cdata.CArrowArray
var cas cdata.CArrowSchema
cdata.ExportArrowArray(recordBatch.Column(int(i)), &caa, &cas)
cArrays[i] = *(*CArrowArray)(unsafe.Pointer(&caa))
cSchemas[i] = *(*CArrowSchema)(unsafe.Pointer(&cas))
}

// 调用 C++ 写入 Parquet
status := C.WriteRecordBatch(pw.cPackedWriter,
&cArrays[0],
&cSchemas[0],
cSchema)
// ...
}

C++ 实现文件

  • packed_writer_c.cpp - Parquet 写入 C 接口
  • packed_reader_c.cpp - Parquet 读取 C 接口
  • loon_ffi/ffi_writer_c.cpp - FFI 写入接口
  • loon_ffi/ffi_reader_c.cpp - FFI 读取接口

2.3.2 存储抽象

功能

  • ✅ 文件系统抽象
  • ✅ 对象存储支持(S3、MinIO 等)
  • ✅ 本地文件系统支持

C++ 实现文件

  • storage_c.cpp - 存储 C 接口

2.4 表达式执行(Expression Execution)

位置internal/core/src/exec/expression/

功能

  • ✅ 表达式解析和编译
  • ✅ 标量函数执行
  • ✅ 向量函数执行
  • ✅ JSON 表达式处理
  • ✅ 过滤条件评估

C++ 实现文件

  • function/init_c.cpp - 表达式函数初始化

2.5 聚类分析(Clustering)

位置internal/core/src/clustering/

CGO 接口clustering/analyze_c.h

Go 包装internal/util/analyzecgowrapper/

功能

  • ✅ K-means 聚类
  • ✅ 数据分析和统计

关键代码

1
2
3
4
5
6
// internal/util/analyzecgowrapper/helper.go
func Analyze(ctx context.Context,
analyzeInfo *indexcgopb.AnalyzeInfo) error {
// 调用 C++ 聚类分析
// ...
}

C++ 实现文件

  • analyze_c.cpp - 聚类分析 C 接口
  • KmeansClustering.cpp - K-means 实现

2.6 监控(Monitor)

位置internal/core/src/monitor/

CGO 接口monitor/monitor_c.h

功能

  • ✅ 性能指标收集
  • ✅ 资源使用监控
  • ✅ 指标导出

C++ 实现文件

  • monitor_c.cpp - 监控 C 接口
  • Monitor.cpp - 监控实现

2.7 Tokenizer

位置internal/core/src/segcore/

CGO 接口segcore/tokenizer_c.h, segcore/token_stream_c.h

功能

  • ✅ 文本分词
  • ✅ Token 流处理
  • ✅ 支持多种分词器

C++ 实现文件

  • tokenizer_c.cpp - Tokenizer C 接口
  • token_stream_c.cpp - Token Stream C 接口

2.8 Future(异步操作)

位置internal/core/src/futures/

CGO 接口futures/future_c.h

Go 包装internal/util/cgo/futures.go

功能

  • ✅ 异步操作支持
  • ✅ Future/Promise 模式
  • ✅ 异步查询和检索

关键代码

1
2
3
4
5
6
7
// internal/util/cgo/futures.go
type CFuturePtr = C.CFuturePtr

func WaitForFuture(future CFuturePtr) (*C.CProto, error) {
result := C.WaitAndGetFuture(future)
// ...
}

C++ 实现文件

  • future_c.cpp - Future C 接口
  • Future.cpp - Future 实现

2.9 向量索引检查

位置internal/core/src/segcore/

CGO 接口segcore/check_vec_index_c.h

Go 包装internal/proxy/cgo_util.go

功能

  • ✅ 向量索引存在性检查
  • ✅ 索引类型验证

关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// internal/proxy/cgo_util.go
func CheckVecIndexWithDataTypeExist(name string,
dataType schemapb.DataType,
indexType string) (bool, error) {
cName := C.CString(name)
defer C.free(unsafe.Pointer(cName))

cIndexType := C.CString(indexType)
defer C.free(unsafe.Pointer(cIndexType))

ret := C.CheckVecIndexWithDataTypeExist(cName,
C.int32_t(int32(dataType)),
cIndexType)
// ...
}

三、CGO 接口设计模式

3.1 错误处理

C 结构

1
2
3
4
typedef struct CStatus {
int32_t error_code;
char* error_msg;
} CStatus;

Go 处理

1
2
3
4
5
6
7
8
9
func ConsumeCStatusIntoError(status *C.CStatus) error {
if status.error_code == 0 {
return nil
}
errorCode := status.error_code
errorMsg := C.GoString(status.error_msg)
C.free(unsafe.Pointer(status.error_msg))
return merr.SegcoreError(int32(errorCode), errorMsg)
}

3.2 内存管理

原则

  • C 分配的内存由 C 释放
  • Go 传递的字符串需要转换为 C 字符串
  • 使用 defer C.free() 确保内存释放

示例

1
2
3
4
5
cStr := C.CString(goString)
defer C.free(unsafe.Pointer(cStr))

// 调用 C 函数
C.SomeFunction(cStr)

3.3 数据转换

Protobuf 转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Go Protobuf → C 内存
schemaBlob, err := proto.Marshal(req.Schema)
status := C.NewCollection(
unsafe.Pointer(&schemaBlob[0]),
(C.int64_t)(len(schemaBlob)),
&ptr)

// C 内存 → Go Protobuf
func UnmarshalProtoLayout(protoLayout any, msg proto.Message) error {
layout := unsafe.Pointer(reflect.ValueOf(protoLayout).Pointer())
cProtoLayout := (*C.ProtoLayout)(layout)
blob := (*(*[math.MaxInt32]byte)(cProtoLayout.blob))[:int(cProtoLayout.size)]
return proto.Unmarshal(blob, msg)
}

Arrow 数据转换

1
2
3
4
5
// Arrow RecordBatch → C Arrow Array
var caa cdata.CArrowArray
var cas cdata.CArrowSchema
cdata.ExportArrowArray(recordBatch.Column(i), &caa, &cas)
cArray := (*CArrowArray)(unsafe.Pointer(&caa))

四、性能优化考虑

4.1 CGO 调用开销

问题

  • CGO 调用有固定开销(约 50-100ns)
  • 跨语言边界数据拷贝
  • 线程切换开销

优化策略

  1. 批量操作:减少 CGO 调用次数
  2. 零拷贝:使用 Arrow C Data Interface
  3. 异步操作:使用 Future 模式
  4. 线程池:复用 CGO 线程

4.2 线程管理

CGO 线程绑定

1
2
3
4
5
6
7
// internal/proxy/cgo_util.go
func initDynamicPool() {
pool := conc.NewPool[any](
hardware.GetCPUNum(),
conc.WithPreHandler(runtime.LockOSThread), // 锁定 OS 线程
)
}

原因

  • CGO 调用需要在同一线程
  • 避免线程切换开销
  • 保证线程安全

4.3 内存管理优化

策略

  • 使用对象池减少分配
  • 预分配缓冲区
  • 及时释放 C 内存

五、主要使用场景

5.1 QueryNode 中的使用

场景

  • Segment 加载和管理
  • 查询执行(Search/Retrieve)
  • 结果归并
  • 索引加载

关键文件

  • internal/querynodev2/segments/segment.go
  • internal/util/segcore/

5.2 DataNode 中的使用

场景

  • 索引构建
  • 字段数据加载
  • 存储读写

关键文件

  • internal/util/indexcgowrapper/
  • internal/datanode/index/

5.3 Proxy 中的使用

场景

  • 向量索引检查
  • 参数验证

关键文件

  • internal/proxy/cgo_util.go

5.4 StorageV2 写入流程

场景

  • Arrow Record → Parquet 文件
  • 列组管理
  • 压缩和编码

关键文件

  • internal/storagev2/packed/packed_writer.go
  • internal/core/src/segcore/packed_writer_c.cpp

六、C++ 核心模块总结

模块 CGO 接口 Go 包装 主要功能
索引构建 indexbuilder/index_c.h internal/util/indexcgowrapper/ 向量/标量/文本索引构建
Collection segcore/collection_c.h internal/util/segcore/collection.go Collection 管理
Segment segcore/segment_c.h internal/util/segcore/segment.go Segment 操作(Insert/Delete/Load)
查询计划 segcore/plan_c.h internal/util/segcore/plan.go 查询计划生成
结果归并 segcore/reduce_c.h internal/util/segcore/reduce.go 多段结果归并
向量索引 segcore/vector_index_c.h internal/util/vecindexmgr/ 向量索引操作
存储 storage/storage_c.h internal/storagev2/packed/ Parquet 读写
Parquet 写入 segcore/packed_writer_c.h internal/storagev2/packed/packed_writer.go Arrow → Parquet
Parquet 读取 segcore/packed_reader_c.h internal/storagev2/packed/packed_reader.go Parquet → Arrow
聚类分析 clustering/analyze_c.h internal/util/analyzecgowrapper/ K-means 聚类
监控 monitor/monitor_c.h - 性能监控
Tokenizer segcore/tokenizer_c.h - 文本分词
Future futures/future_c.h internal/util/cgo/futures.go 异步操作

七、总结

7.1 设计优势

  1. 性能:核心路径使用 C++,充分利用硬件优化
  2. 可维护性:Go 代码简洁,易于维护
  3. 生态:利用 C++ 生态(Arrow、Parquet、Knowhere 等)
  4. 灵活性:可以逐步迁移功能到 C++

7.2 关键设计原则

  1. 清晰的接口边界:C 接口封装 C++ 实现
  2. 统一错误处理:CStatus 统一错误格式
  3. 内存安全:明确的内存管理责任
  4. 性能优化:减少 CGO 调用,使用零拷贝

7.3 未来方向

  • 继续优化 CGO 调用开销
  • 更多功能迁移到 C++
  • 更好的异步支持
  • 改进内存管理策略

Milvus 的 C++/CGO 集成设计为系统提供了高性能和可维护性的良好平衡,是混合语言架构的优秀实践。