cached_store 分析
cached_store
核心职责: 实现 JuiceFS 的带缓存的块存储(ChunkStore),提供数据读写、缓存管理、压缩、写回(writeback)等功能。
文件结构
1 | cached_store.go |
常量定义
chunkSize
1 | const chunkSize = 1 << 26 // 64M |
作用: 定义 Chunk 的大小(64MB)
使用场景: 写入时检查边界,计算 Chunk 索引
pageSize
1 | const pageSize = 1 << 16 // 64K |
作用: 定义 Page 的大小(64KB)
使用场景: Page 池分配,内存管理
SlowRequest
1 | const SlowRequest = time.Second * time.Duration(10) |
作用: 定义慢请求阈值(10秒)
使用场景: 记录慢请求日志
核心结构体
1. pendingItem - 待上传项
1 | type pendingItem struct { |
作用: 表示待上传到对象存储的本地文件
字段说明:
| 字段 | 类型 | 作用 |
|---|---|---|
key |
string |
对象存储的键 |
fpath |
string |
本地文件路径 |
ts |
time.Time |
添加时间戳 |
uploading |
bool |
是否正在上传 |
使用场景: 写回(writeback)模式下的延迟上传
2. rSlice - 读取切片
1 | type rSlice struct { |
作用: 实现 Reader 接口,用于读取数据块
字段说明:
| 字段 | 类型 | 作用 |
|---|---|---|
id |
uint64 |
切片 ID |
length |
int |
切片长度 |
store |
*cachedStore |
所属的存储 |
核心方法:
ReadAt: 从指定偏移读取数据Remove: 删除切片的所有块key: 生成对象存储的键blockSize: 计算块大小
3. wSlice - 写入切片
1 | type wSlice struct { |
作用: 实现 Writer 接口,用于写入数据块
字段说明:
| 字段 | 类型 | 作用 |
|---|---|---|
rSlice |
rSlice |
嵌入的读取切片 |
pages |
[][]*Page |
页面数组(按块组织) |
uploaded |
int |
已上传的字节数 |
errors |
chan error |
上传错误通道 |
uploadError |
error |
上传错误 |
pendings |
int |
待上传的块数 |
writeback |
bool |
是否启用写回 |
核心方法:
WriteAt: 写入数据FlushTo: 刷新到指定偏移Finish: 完成写入Abort: 中止写入upload: 上传块
4. Config - 配置结构
1 | type Config struct { |
作用: 存储缓存和存储的配置参数
关键字段:
- 缓存相关:
CacheDir,CacheSize,CacheEviction,CacheExpire - 写回相关:
Writeback,WritebackThresholdSize,UploadDelay,UploadHours - 性能相关:
MaxUpload,UploadLimit,DownloadLimit,Prefetch - 压缩相关:
Compress,BlockSize
5. cachedStore - 带缓存的存储
1 | type cachedStore struct { |
作用: 实现 ChunkStore 接口,提供带缓存的块存储功能
字段说明:
| 字段 | 类型 | 作用 |
|---|---|---|
storage |
object.ObjectStorage |
底层对象存储 |
bcache |
CacheManager |
缓存管理器 |
fetcher |
*prefetcher |
预取器 |
conf |
Config |
配置 |
group |
*Controller |
单飞控制器(去重) |
currentUpload |
chan bool |
当前上传通道(限流) |
pendingCh |
chan *pendingItem |
待上传通道 |
pendingKeys |
map[string]*pendingItem |
待上传键映射 |
pendingMutex |
sync.Mutex |
保护待上传映射 |
startHour |
int |
上传开始时间(小时) |
endHour |
int |
上传结束时间(小时) |
compressor |
compress.Compressor |
压缩器 |
seekable |
bool |
是否支持随机访问 |
upLimit |
*ratelimit.Bucket |
上传限流器 |
downLimit |
*ratelimit.Bucket |
下载限流器 |
指标字段: Prometheus 指标(缓存命中、对象请求等)
核心方法分析
rSlice 方法
1. key - 生成对象存储键
1 | func (s *rSlice) key(indx int) string { |
功能: 根据切片 ID、块索引和块大小生成对象存储的键
键格式:
- HashPrefix:
chunks/{hash}/{id/1000/1000}/{id}_{indx}_{size} - 普通:
chunks/{id/1000/1000}/{id/1000}/{id}_{indx}_{size}
设计目的:
- 分散对象存储的负载
- 支持按 ID 范围查找
2. ReadAt - 读取数据
1 | func (s *rSlice) ReadAt(ctx context.Context, page *Page, off int) (n int, err error) { |
功能: 从指定偏移读取数据
执行流程:
边界检查:
- 如果
off >= length,返回EOF - 如果跨块,递归读取
- 如果
缓存查找:
- 如果启用缓存,从缓存加载
- 如果命中,更新指标,返回数据
- 如果失败,移除缓存项
部分读取优化(如果支持随机访问):
- 尝试从正在进行的完整读取中”搭便车”(piggyback)
- 如果失败,使用 Range 请求读取部分数据
- 如果成功,触发预取
完整读取(回退):
- 使用单飞(singleflight)去重
- 调用
load方法加载完整块 - 如果偏移不为 0,复制数据
关键优化:
- 缓存优先: 先查缓存,减少网络请求
- 部分读取: 支持 Range 请求,减少数据传输
- 单飞去重: 避免重复请求
- 搭便车: 从正在进行的读取中获取数据
3. Remove - 删除切片
1 | func (s *rSlice) Remove() error { |
功能: 删除切片的所有块
流程:
- 计算最后一个块索引
- 遍历所有块:
- 移除待上传项
- 移除缓存项
- 删除对象存储中的对象
并发安全: 允许多个客户端同时删除,只要有一个成功即可
wSlice 方法
1. WriteAt - 写入数据
1 | func (s *wSlice) WriteAt(p []byte, off int64) (n int, err error) { |
功能: 写入数据到指定偏移
执行流程:
边界检查:
- 检查是否超出 Chunk 边界
- 检查是否覆盖已上传的数据
填充空洞:
- 如果偏移大于当前长度,用零填充
分块写入:
- 计算块索引和块内偏移
- 分配或获取 Page
- 复制数据到 Page
- 更新长度
关键点:
- 支持稀疏写入(用零填充空洞)
- 按块组织数据(
pages[indx]) - 支持跨块写入
2. upload - 上传块
1 | func (s *wSlice) upload(indx int) { |
功能: 异步上传块到对象存储
执行流程:
合并页面:
- 如果只有一个页面,直接使用
- 否则合并多个页面为一个块
写回模式(如果启用且块大小小于阈值):
- 先写入本地缓存(
stage) - 如果成功且满足上传条件,立即上传
- 否则添加到延迟上传列表
- 先写入本地缓存(
直接上传(回退):
- 获取上传槽位(限流)
- 调用
store.upload上传
关键点:
- 异步执行: 使用 goroutine 不阻塞写入
- 写回优化: 小块先写本地,延迟上传
- 限流控制: 使用
currentUpload通道限流
3. FlushTo - 刷新到指定偏移
1 | func (s *wSlice) FlushTo(offset int) error { |
功能: 刷新指定范围内的所有块
流程:
- 检查偏移有效性
- 遍历所有块,如果块在范围内且未上传,上传它
- 更新已上传偏移
4. Finish - 完成写入
1 | func (s *wSlice) Finish(length int) error { |
功能: 完成写入,等待所有上传完成
流程:
- 检查长度匹配
- 刷新所有块
- 等待所有上传完成(从
errors通道读取) - 如果有错误,返回错误
cachedStore 方法
1. NewCachedStore - 创建带缓存的存储
1 | func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.Registerer) ChunkStore { |
功能: 创建并初始化带缓存的存储
初始化流程:
- 创建压缩器: 根据配置创建压缩器
- 设置默认值: 重试次数、超时时间
- 创建存储结构: 初始化所有字段
- 设置限流器: 上传和下载限流
- 初始化指标: Prometheus 指标
- 解析上传时间: 如果启用写回
- 创建缓存管理器: 带上传回调
- 启动后台任务:
- 检查缓存是否为空(如果为空,切换到内存缓存)
- 启动预取器(如果启用)
- 启动上传器(如果启用写回)
- 启动延迟上传扫描器
关键设计:
- 自动降级: 如果缓存目录不可用,自动切换到内存缓存
- 后台任务: 多个 goroutine 处理缓存和上传
- 单飞去重: 预取时使用单飞避免重复请求
2. load - 加载块
1 | func (store *cachedStore) load(ctx context.Context, key string, page *Page, cache bool, forceCache bool) (err error) { |
功能: 从对象存储加载块
执行流程:
- 压缩检查: 判断是否需要解压
- 限流: 如果未压缩,先限流
- 分配页面: 如果压缩,分配更大的页面
- 下载数据: 从对象存储获取完整数据
- 限流: 如果压缩,下载后限流
- 解压: 如果压缩,解压数据
- 缓存: 如果启用缓存,写入缓存
关键点:
- 超时控制: 使用
WithTimeout控制超时 - 压缩处理: 支持压缩和解压
- 错误处理: 记录错误指标
3. upload - 上传块
1 | func (store *cachedStore) upload(key string, block *Page, s *wSlice) error { |
功能: 上传块到对象存储
执行流程:
- 压缩: 压缩数据块
- 缓存: 如果同步且满足条件,写入缓存
- 重试上传: 使用指数退避重试
- 错误处理: 如果取消或达到最大重试次数,返回错误
关键点:
- 压缩: 支持数据压缩
- 重试: 同步上传使用更多重试次数
- 指数退避: 重试间隔递增
4. put - 上传数据
1 | func (store *cachedStore) put(key string, p *Page) error { |
功能: 上传数据到对象存储
流程:
- 限流
- 超时控制
- 上传数据
- 记录指标
5. uploader - 上传器
1 | func (store *cachedStore) uploader() { |
功能: 后台上传器,从 pendingCh 通道读取待上传项并上传
使用场景: 写回模式下的延迟上传
6. uploadStagingFile - 上传暂存文件
1 | func (store *cachedStore) uploadStagingFile(key string, stagingPath string) { |
功能: 上传暂存文件到对象存储
流程:
- 获取上传槽位(限流)
- 检查待上传项是否有效
- 检查是否允许上传(时间窗口)
- 读取暂存文件
- 上传数据
- 清理暂存文件和待上传项
关键点:
- 并发安全: 检查项是否仍然有效
- 错误处理: 如果项已无效,删除已上传的对象
数据流分析
读取流程
1 | 用户请求 |
写入流程
1 | 用户请求 |
写回流程
1 | 写入数据 |
设计模式和特点
1. 缓存策略
多级缓存:
- 内存缓存: 热数据在内存中
- 磁盘缓存: 温数据在磁盘上
- 对象存储: 冷数据在对象存储中
缓存查找顺序:
- 内存缓存
- 磁盘缓存
- 对象存储
2. 写回(Writeback)机制
特点:
- 小数据块先写入本地缓存
- 延迟上传到对象存储
- 支持时间窗口控制
优势:
- 提高写入性能
- 减少网络请求
- 支持批量上传
3. 单飞(Singleflight)去重
实现: group.Execute(key, func() (*Page, error))
作用: 避免重复请求相同的数据块
使用场景:
- 读取时的完整块加载
- 预取时的数据加载
4. 搭便车(Piggyback)优化
实现: group.TryPiggyback(key)
作用: 从正在进行的完整读取中获取部分数据
优势: 减少网络请求,提高效率
5. 限流控制
上传限流: upLimit (ratelimit.Bucket)
下载限流: downLimit (ratelimit.Bucket)
实现: 使用令牌桶算法
6. 压缩支持
压缩时机: 上传时压缩
解压时机: 下载时解压
判断: compressor.CompressBound(0) == 0 判断是否支持随机访问
7. 预取(Prefetch)
实现: fetcher.fetch(key)
作用: 后台预取可能访问的数据
触发: 读取时自动触发
性能优化
1. 缓存优先
策略: 先查缓存,减少网络请求
实现: ReadAt 中先调用 bcache.load
2. 部分读取优化
策略: 如果支持随机访问,使用 Range 请求
实现: seekable && boff > 0 && len(p) <= blockSize/4
3. 单飞去重
策略: 避免重复请求相同数据
实现: group.Execute 确保同一 key 只请求一次
4. 搭便车优化
策略: 从正在进行的读取中获取数据
实现: group.TryPiggyback
5. 写回优化
策略: 小数据块先写本地,延迟上传
实现: writeback && blen < WritebackThresholdSize
6. 批量上传
策略: 延迟上传,批量处理
实现: pendingCh 通道 + uploader goroutine
7. 预取
策略: 后台预取可能访问的数据
实现: fetcher.fetch 在读取后触发
并发控制
1. 锁保护
pendingMutex: 保护 pendingKeys 映射
使用场景:
addDelayedStagingremovePendingisPendingValidscanDelayedStaging
2. 通道限流
currentUpload: 限制并发上传数
实现: make(chan bool, config.MaxUpload)
3. 单飞去重
group: 使用单飞控制器去重请求
实现: group.Execute 确保同一 key 只执行一次
错误处理
1. 重试机制
上传重试: 指数退避,最多 MaxRetries + 1 次
实现: for try < max; try++ { time.Sleep(time.Second * time.Duration(try*try)) }
2. 超时控制
读取超时: GetTimeout (默认 60 秒)
上传超时: PutTimeout (默认 60 秒)
实现: utils.WithTimeout
3. 错误指标
指标:
objectReqErrors: 对象请求错误数stageBlockErrors: 暂存块错误数
4. 错误恢复
缓存错误: 移除缓存项,回退到对象存储
上传错误: 重试,如果失败返回错误
内存管理
1. Page 池
实现: pagePool 通道
作用: 复用 Page,减少内存分配
2. 引用计数
Page.Acquire/Release: 管理 Page 的生命周期
3. 内存限制
缓存大小: CacheSize 限制缓存总大小
实现: 缓存管理器负责淘汰
关键算法
1. 键生成算法
HashPrefix 模式:
1 | chunks/{id%256}/{(id/1000)/1000}/{id}_{indx}_{size} |
普通模式:
1 | chunks/{(id/1000)/1000}/{id/1000}/{id}_{indx}_{size} |
设计目的: 分散对象存储负载
2. 块大小计算
1 | func (s *rSlice) blockSize(indx int) int { |
功能: 计算指定索引的块大小
特点: 最后一个块可能小于 BlockSize
3. 上传时间窗口
1 | func (store *cachedStore) canUpload() bool { |
功能: 判断当前时间是否允许上传
支持: 跨天时间窗口(如 22:00-02:00)
数据结构关系图
1 | cachedStore |
关键设计决策
1. 为什么使用多级缓存?
原因:
- 内存缓存:最快,但容量有限
- 磁盘缓存:较快,容量较大
- 对象存储:最慢,容量无限
优势:
- 平衡性能和成本
- 提高缓存命中率
2. 为什么使用写回机制?
原因:
- 提高写入性能
- 减少网络请求
- 支持批量上传
实现:
- 小数据块先写本地
- 延迟上传到对象存储
- 支持时间窗口控制
3. 为什么使用单飞去重?
原因:
- 避免重复请求相同数据
- 减少网络请求
- 提高效率
实现:
group.Execute确保同一 key 只执行一次- 其他请求等待结果
4. 为什么使用搭便车优化?
原因:
- 从正在进行的读取中获取数据
- 减少网络请求
- 提高效率
实现:
group.TryPiggyback尝试获取部分数据- 如果成功,直接返回
5. 为什么使用部分读取优化?
原因:
- 减少数据传输
- 提高效率
- 支持随机访问
实现:
- 如果支持随机访问且读取范围小,使用 Range 请求
- 否则读取完整块
潜在问题和改进
1. 缓存一致性
问题: 多个客户端可能缓存不同版本的数据
改进: 使用版本号或校验和
2. 缓存淘汰
问题: 缓存淘汰策略可能不够优化
改进: 使用更智能的淘汰策略(如 LRU)
3. 错误恢复
问题: 某些错误(如网络错误)可能导致数据不一致
改进: 实现更完善的错误恢复机制
4. 性能优化
问题:
- 部分读取可能不够优化
- 预取策略可能不够智能
改进:
- 优化部分读取逻辑
- 实现更智能的预取策略
总结
cached_store.go 文件实现了 JuiceFS 的带缓存的块存储功能,具有以下特点:
- 多级缓存: 内存、磁盘、对象存储三级缓存
- 写回机制: 小数据块先写本地,延迟上传
- 性能优化: 单飞去重、搭便车、部分读取、预取
- 限流控制: 上传和下载限流
- 压缩支持: 支持数据压缩和解压
- 并发安全: 使用锁和通道保护并发访问
- 错误处理: 重试机制、超时控制、错误指标
- 内存管理: Page 池、引用计数、缓存淘汰
它是 JuiceFS 存储性能的关键组件,实现了高效的缓存管理和数据访问优化。