1. 1. cached_store
    1. 1.1. 文件结构
    2. 1.2. 常量定义
      1. 1.2.1. chunkSize
      2. 1.2.2. pageSize
      3. 1.2.3. SlowRequest
    3. 1.3. 核心结构体
      1. 1.3.1. 1. pendingItem - 待上传项
      2. 1.3.2. 2. rSlice - 读取切片
      3. 1.3.3. 3. wSlice - 写入切片
      4. 1.3.4. 4. Config - 配置结构
      5. 1.3.5. 5. cachedStore - 带缓存的存储
    4. 1.4. 核心方法分析
      1. 1.4.1. rSlice 方法
        1. 1.4.1.1. 1. key - 生成对象存储键
        2. 1.4.1.2. 2. ReadAt - 读取数据
        3. 1.4.1.3. 3. Remove - 删除切片
      2. 1.4.2. wSlice 方法
        1. 1.4.2.1. 1. WriteAt - 写入数据
        2. 1.4.2.2. 2. upload - 上传块
        3. 1.4.2.3. 3. FlushTo - 刷新到指定偏移
        4. 1.4.2.4. 4. Finish - 完成写入
      3. 1.4.3. cachedStore 方法
        1. 1.4.3.1. 1. NewCachedStore - 创建带缓存的存储
        2. 1.4.3.2. 2. load - 加载块
        3. 1.4.3.3. 3. upload - 上传块
        4. 1.4.3.4. 4. put - 上传数据
        5. 1.4.3.5. 5. uploader - 上传器
        6. 1.4.3.6. 6. uploadStagingFile - 上传暂存文件
    5. 1.5. 数据流分析
      1. 1.5.1. 读取流程
      2. 1.5.2. 写入流程
      3. 1.5.3. 写回流程
    6. 1.6. 设计模式和特点
      1. 1.6.1. 1. 缓存策略
      2. 1.6.2. 2. 写回(Writeback)机制
      3. 1.6.3. 3. 单飞(Singleflight)去重
      4. 1.6.4. 4. 搭便车(Piggyback)优化
      5. 1.6.5. 5. 限流控制
      6. 1.6.6. 6. 压缩支持
      7. 1.6.7. 7. 预取(Prefetch)
    7. 1.7. 性能优化
      1. 1.7.1. 1. 缓存优先
      2. 1.7.2. 2. 部分读取优化
      3. 1.7.3. 3. 单飞去重
      4. 1.7.4. 4. 搭便车优化
      5. 1.7.5. 5. 写回优化
      6. 1.7.6. 6. 批量上传
      7. 1.7.7. 7. 预取
    8. 1.8. 并发控制
      1. 1.8.1. 1. 锁保护
      2. 1.8.2. 2. 通道限流
      3. 1.8.3. 3. 单飞去重
    9. 1.9. 错误处理
      1. 1.9.1. 1. 重试机制
      2. 1.9.2. 2. 超时控制
      3. 1.9.3. 3. 错误指标
      4. 1.9.4. 4. 错误恢复
    10. 1.10. 内存管理
      1. 1.10.1. 1. Page 池
      2. 1.10.2. 2. 引用计数
      3. 1.10.3. 3. 内存限制
    11. 1.11. 关键算法
      1. 1.11.1. 1. 键生成算法
      2. 1.11.2. 2. 块大小计算
      3. 1.11.3. 3. 上传时间窗口
    12. 1.12. 数据结构关系图
    13. 1.13. 关键设计决策
      1. 1.13.1. 1. 为什么使用多级缓存?
      2. 1.13.2. 2. 为什么使用写回机制?
      3. 1.13.3. 3. 为什么使用单飞去重?
      4. 1.13.4. 4. 为什么使用搭便车优化?
      5. 1.13.5. 5. 为什么使用部分读取优化?
    14. 1.14. 潜在问题和改进
      1. 1.14.1. 1. 缓存一致性
      2. 1.14.2. 2. 缓存淘汰
      3. 1.14.3. 3. 错误恢复
      4. 1.14.4. 4. 性能优化
    15. 1.15. 总结

cached_store 分析

cached_store

核心职责: 实现 JuiceFS 的带缓存的块存储(ChunkStore),提供数据读写、缓存管理、压缩、写回(writeback)等功能。

文件结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
cached_store.go
├── 常量定义
│ ├── chunkSize (64MB)
│ ├── pageSize (64KB)
│ └── SlowRequest (10秒)

├── 辅助结构体
│ ├── pendingItem (待上传项)
│ ├── rSlice (读取切片)
│ └── wSlice (写入切片)

├── 核心结构体
│ ├── Config (配置)
│ └── cachedStore (带缓存的存储)

└── 方法实现
├── rSlice 方法
├── wSlice 方法
├── cachedStore 方法
└── 辅助函数

常量定义

chunkSize

1
const chunkSize = 1 << 26 // 64M

作用: 定义 Chunk 的大小(64MB)

使用场景: 写入时检查边界,计算 Chunk 索引

pageSize

1
const pageSize = 1 << 16  // 64K

作用: 定义 Page 的大小(64KB)

使用场景: Page 池分配,内存管理

SlowRequest

1
const SlowRequest = time.Second * time.Duration(10)

作用: 定义慢请求阈值(10秒)

使用场景: 记录慢请求日志

核心结构体

1. pendingItem - 待上传项

1
2
3
4
5
6
type pendingItem struct {
key string
fpath string // full path of local file corresponding to the key
ts time.Time // timestamp when this item is added
uploading bool
}

作用: 表示待上传到对象存储的本地文件

字段说明:

字段 类型 作用
key string 对象存储的键
fpath string 本地文件路径
ts time.Time 添加时间戳
uploading bool 是否正在上传

使用场景: 写回(writeback)模式下的延迟上传

2. rSlice - 读取切片

1
2
3
4
5
type rSlice struct {
id uint64
length int
store *cachedStore
}

作用: 实现 Reader 接口,用于读取数据块

字段说明:

字段 类型 作用
id uint64 切片 ID
length int 切片长度
store *cachedStore 所属的存储

核心方法:

  • ReadAt: 从指定偏移读取数据
  • Remove: 删除切片的所有块
  • key: 生成对象存储的键
  • blockSize: 计算块大小

3. wSlice - 写入切片

1
2
3
4
5
6
7
8
9
type wSlice struct {
rSlice
pages [][]*Page
uploaded int
errors chan error
uploadError error
pendings int
writeback bool
}

作用: 实现 Writer 接口,用于写入数据块

字段说明:

字段 类型 作用
rSlice rSlice 嵌入的读取切片
pages [][]*Page 页面数组(按块组织)
uploaded int 已上传的字节数
errors chan error 上传错误通道
uploadError error 上传错误
pendings int 待上传的块数
writeback bool 是否启用写回

核心方法:

  • WriteAt: 写入数据
  • FlushTo: 刷新到指定偏移
  • Finish: 完成写入
  • Abort: 中止写入
  • upload: 上传块

4. Config - 配置结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type Config struct {
CacheDir string
CacheMode os.FileMode
CacheSize uint64
CacheItems int64
CacheChecksum string
CacheEviction string
CacheScanInterval time.Duration
CacheExpire time.Duration
OSCache bool
FreeSpace float32
AutoCreate bool
Compress string
MaxUpload int
MaxStageWrite int
MaxRetries int
UploadLimit int64 // bytes per second
DownloadLimit int64 // bytes per second
Writeback bool
WritebackThresholdSize int
UploadDelay time.Duration
UploadHours string
HashPrefix bool
BlockSize int
GetTimeout time.Duration
PutTimeout time.Duration
CacheFullBlock bool
CacheLargeWrite bool
BufferSize uint64
Readahead int
Prefetch int
}

作用: 存储缓存和存储的配置参数

关键字段:

  • 缓存相关: CacheDir, CacheSize, CacheEviction, CacheExpire
  • 写回相关: Writeback, WritebackThresholdSize, UploadDelay, UploadHours
  • 性能相关: MaxUpload, UploadLimit, DownloadLimit, Prefetch
  • 压缩相关: Compress, BlockSize

5. cachedStore - 带缓存的存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type cachedStore struct {
storage object.ObjectStorage
bcache CacheManager
fetcher *prefetcher
conf Config
group *Controller
currentUpload chan bool
pendingCh chan *pendingItem
pendingKeys map[string]*pendingItem
pendingMutex sync.Mutex
startHour int
endHour int
compressor compress.Compressor
seekable bool
upLimit *ratelimit.Bucket
downLimit *ratelimit.Bucket

cacheHits prometheus.Counter
cacheMiss prometheus.Counter
cacheHitBytes prometheus.Counter
cacheMissBytes prometheus.Counter
cacheReadHist prometheus.Histogram
objectReqsHistogram *prometheus.HistogramVec
objectReqErrors prometheus.Counter
objectDataBytes *prometheus.CounterVec
stageBlockDelay prometheus.Counter
stageBlockErrors prometheus.Counter
}

作用: 实现 ChunkStore 接口,提供带缓存的块存储功能

字段说明:

字段 类型 作用
storage object.ObjectStorage 底层对象存储
bcache CacheManager 缓存管理器
fetcher *prefetcher 预取器
conf Config 配置
group *Controller 单飞控制器(去重)
currentUpload chan bool 当前上传通道(限流)
pendingCh chan *pendingItem 待上传通道
pendingKeys map[string]*pendingItem 待上传键映射
pendingMutex sync.Mutex 保护待上传映射
startHour int 上传开始时间(小时)
endHour int 上传结束时间(小时)
compressor compress.Compressor 压缩器
seekable bool 是否支持随机访问
upLimit *ratelimit.Bucket 上传限流器
downLimit *ratelimit.Bucket 下载限流器

指标字段: Prometheus 指标(缓存命中、对象请求等)

核心方法分析

rSlice 方法

1. key - 生成对象存储键

1
2
3
4
5
6
func (s *rSlice) key(indx int) string {
if s.store.conf.HashPrefix {
return fmt.Sprintf("chunks/%02X/%v/%v_%v_%v", s.id%256, s.id/1000/1000, s.id, indx, s.blockSize(indx))
}
return fmt.Sprintf("chunks/%v/%v/%v_%v_%v", s.id/1000/1000, s.id/1000, s.id, indx, s.blockSize(indx))
}

功能: 根据切片 ID、块索引和块大小生成对象存储的键

键格式:

  • HashPrefix: chunks/{hash}/{id/1000/1000}/{id}_{indx}_{size}
  • 普通: chunks/{id/1000/1000}/{id/1000}/{id}_{indx}_{size}

设计目的:

  • 分散对象存储的负载
  • 支持按 ID 范围查找

2. ReadAt - 读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
func (s *rSlice) ReadAt(ctx context.Context, page *Page, off int) (n int, err error) {
p := page.Data
if len(p) == 0 {
return 0, nil
}
if off >= s.length {
return 0, io.EOF
}

indx := s.index(off)
boff := off % s.store.conf.BlockSize
blockSize := s.blockSize(indx)
if boff+len(p) > blockSize {
// read beyond current page
var got int
for got < len(p) {
// aligned to current page
l := min(len(p)-got, s.blockSize(s.index(off))-off%s.store.conf.BlockSize)
pp := page.Slice(got, l)
n, err = s.ReadAt(ctx, pp, off)
pp.Release()
if err != nil {
return got + n, err
}
if n == 0 {
return got, io.EOF
}
got += n
off += n
}
return got, nil
}

key := s.key(indx)
if s.store.conf.CacheEnabled() {
start := time.Now()
r, err := s.store.bcache.load(key)
if err == nil {
n, err = r.ReadAt(p, int64(boff))
if !s.store.conf.OSCache {
dropOSCache(r)
}
_ = r.Close()
if err == nil {
s.store.cacheHits.Add(1)
s.store.cacheHitBytes.Add(float64(n))
s.store.cacheReadHist.Observe(time.Since(start).Seconds())
return n, nil
}
logger.Warnf("remove partial cached block %s: %d %s", key, n, err)
s.store.bcache.remove(key, false)
}
}

s.store.cacheMiss.Add(1)
s.store.cacheMissBytes.Add(float64(len(p)))

if s.store.seekable &&
(!s.store.conf.CacheEnabled() || (boff > 0 && len(p) <= blockSize/4)) {
if s.store.downLimit != nil {
s.store.downLimit.Wait(int64(len(p)))
}
fullPage, err := s.store.group.TryPiggyback(key)
if fullPage != nil {
defer fullPage.Release()
if err == nil { // piggybacked a full read
n = copy(p, fullPage.Data[boff:])
return n, nil
}
}
// partial read
st := time.Now()
var (
reqID string
sc = object.DefaultStorageClass
)
page.Acquire()
err = utils.WithTimeout(ctx, func(cCtx context.Context) error {
defer page.Release()
in, err := s.store.storage.Get(cCtx, key, int64(boff), int64(len(p)), object.WithRequestID(&reqID), object.WithStorageClass(&sc))
if err == nil {
n, err = io.ReadFull(in, p)
_ = in.Close()
}
return err
}, s.store.conf.GetTimeout)
used := time.Since(st)
logRequest("GET", key, fmt.Sprintf("RANGE(%d,%d) ", boff, len(p)), reqID, err, used)
if errors.Is(err, context.Canceled) {
return 0, err
}
s.store.objectDataBytes.WithLabelValues("GET", sc).Add(float64(n))
s.store.objectReqsHistogram.WithLabelValues("GET", sc).Observe(used.Seconds())
if err == nil {
s.store.fetcher.fetch(key)
return n, nil
} else {
s.store.objectReqErrors.Add(1)
// fall back to full read
}
}

block, err := s.store.group.Execute(key, func() (*Page, error) {
tmp := page
if boff > 0 || len(p) < blockSize {
tmp = NewOffPage(blockSize)
} else {
tmp.Acquire()
}
err = s.store.load(ctx, key, tmp, s.store.shouldCache(blockSize), false)
return tmp, err
})
defer block.Release()
if err != nil {
return 0, err
}
if block != page {
copy(p, block.Data[boff:])
}
return len(p), nil
}

功能: 从指定偏移读取数据

执行流程:

  1. 边界检查:

    • 如果 off >= length,返回 EOF
    • 如果跨块,递归读取
  2. 缓存查找:

    • 如果启用缓存,从缓存加载
    • 如果命中,更新指标,返回数据
    • 如果失败,移除缓存项
  3. 部分读取优化(如果支持随机访问):

    • 尝试从正在进行的完整读取中”搭便车”(piggyback)
    • 如果失败,使用 Range 请求读取部分数据
    • 如果成功,触发预取
  4. 完整读取(回退):

    • 使用单飞(singleflight)去重
    • 调用 load 方法加载完整块
    • 如果偏移不为 0,复制数据

关键优化:

  • 缓存优先: 先查缓存,减少网络请求
  • 部分读取: 支持 Range 请求,减少数据传输
  • 单飞去重: 避免重复请求
  • 搭便车: 从正在进行的读取中获取数据

3. Remove - 删除切片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (s *rSlice) Remove() error {
if s.length == 0 {
// no block
return nil
}

lastIndx := (s.length - 1) / s.store.conf.BlockSize
for i := 0; i <= lastIndx; i++ {
// there could be multiple clients try to remove the same chunk in the same time,
// any of them should succeed if any blocks is removed
key := s.key(i)
s.store.removePending(key)
s.store.bcache.remove(key, true)
}

var err error
for i := 0; i <= lastIndx; i++ {
if e := s.delete(i); e != nil {
err = e
}
}
return err
}

功能: 删除切片的所有块

流程:

  1. 计算最后一个块索引
  2. 遍历所有块:
    • 移除待上传项
    • 移除缓存项
  3. 删除对象存储中的对象

并发安全: 允许多个客户端同时删除,只要有一个成功即可

wSlice 方法

1. WriteAt - 写入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (s *wSlice) WriteAt(p []byte, off int64) (n int, err error) {
if int(off)+len(p) > chunkSize {
return 0, fmt.Errorf("write out of chunk boudary: %d > %d", int(off)+len(p), chunkSize)
}
if off < int64(s.uploaded) {
return 0, fmt.Errorf("Cannot overwrite uploaded block: %d < %d", off, s.uploaded)
}

// Fill previous blocks with zeros
if s.length < int(off) {
zeros := make([]byte, int(off)-s.length)
_, _ = s.WriteAt(zeros, int64(s.length))
}

for n < len(p) {
indx := s.index(int(off) + n)
boff := (int(off) + n) % s.store.conf.BlockSize
var bs = pageSize
if indx > 0 || bs > s.store.conf.BlockSize {
bs = s.store.conf.BlockSize
}
bi := boff / bs
bo := boff % bs
var page *Page
if bi < len(s.pages[indx]) {
page = s.pages[indx][bi]
} else {
page = allocPage(bs)
page.Data = page.Data[:0]
s.pages[indx] = append(s.pages[indx], page)
}
left := len(p) - n
if bo+left > bs {
page.Data = page.Data[:bs]
} else if len(page.Data) < bo+left {
page.Data = page.Data[:bo+left]
}
n += copy(page.Data[bo:], p[n:])
}
if int(off)+n > s.length {
s.length = int(off) + n
}
return n, nil
}

功能: 写入数据到指定偏移

执行流程:

  1. 边界检查:

    • 检查是否超出 Chunk 边界
    • 检查是否覆盖已上传的数据
  2. 填充空洞:

    • 如果偏移大于当前长度,用零填充
  3. 分块写入:

    • 计算块索引和块内偏移
    • 分配或获取 Page
    • 复制数据到 Page
    • 更新长度

关键点:

  • 支持稀疏写入(用零填充空洞)
  • 按块组织数据(pages[indx]
  • 支持跨块写入

2. upload - 上传块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func (s *wSlice) upload(indx int) {
blen := s.blockSize(indx)
key := s.key(indx)
pages := s.pages[indx]
s.pages[indx] = nil
s.pendings++

go func() {
var block *Page
var off int
if len(pages) == 1 {
block = pages[0]
off = len(block.Data)
} else {
block = NewOffPage(blen)
for _, b := range pages {
off += copy(block.Data[off:], b.Data)
freePage(b)
}
}
if off != blen {
panic(fmt.Sprintf("block length does not match: %v != %v", off, blen))
}
if s.writeback && blen < s.store.conf.WritebackThresholdSize {
stagingPath := "unknown"
stageFailed := false
block.Acquire()
err := utils.WithTimeout(context.TODO(), func(context.Context) (err error) { // In case it hangs for more than 5 minutes(see fileWriter.flush), fallback to uploading directly to avoid `EIO`
defer block.Release()
stagingPath, err = s.store.bcache.stage(key, block.Data)
if err == nil && stageFailed { // upload thread already marked me as failed because of timeout
_ = s.store.bcache.removeStage(key)
}
return err
}, s.store.conf.PutTimeout)
if err != nil {
stageFailed = true
if !errors.Is(err, errStageConcurrency) {
s.store.stageBlockErrors.Add(1)
logger.Warnf("write %s to disk: %s, upload it directly", key, err)
}
} else {
s.errors <- nil
if s.store.conf.UploadDelay == 0 && s.store.canUpload() {
select {
case s.store.currentUpload <- true:
defer func() { <-s.store.currentUpload }()
if err = s.store.upload(key, block, nil); err == nil {
s.store.bcache.uploaded(key, blen)
if err := s.store.bcache.removeStage(key); err != nil {
logger.Warnf("failed to remove stage %s in upload", stagingPath)
}
} else { // add to delay list and wait for later scanning
s.store.addDelayedStaging(key, stagingPath, time.Now(), false)
}
return
default:
}
}
block.Release()
s.store.addDelayedStaging(key, stagingPath, time.Now(), false)
return
}
}
s.store.currentUpload <- true
defer func() { <-s.store.currentUpload }()
s.errors <- s.store.upload(key, block, s)
}()
}

功能: 异步上传块到对象存储

执行流程:

  1. 合并页面:

    • 如果只有一个页面,直接使用
    • 否则合并多个页面为一个块
  2. 写回模式(如果启用且块大小小于阈值):

    • 先写入本地缓存(stage
    • 如果成功且满足上传条件,立即上传
    • 否则添加到延迟上传列表
  3. 直接上传(回退):

    • 获取上传槽位(限流)
    • 调用 store.upload 上传

关键点:

  • 异步执行: 使用 goroutine 不阻塞写入
  • 写回优化: 小块先写本地,延迟上传
  • 限流控制: 使用 currentUpload 通道限流

3. FlushTo - 刷新到指定偏移

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (s *wSlice) FlushTo(offset int) error {
if offset < s.uploaded {
panic(fmt.Sprintf("Invalid offset: %d < %d", offset, s.uploaded))
}
for i, block := range s.pages {
start := i * s.store.conf.BlockSize
end := start + s.store.conf.BlockSize
if start >= s.uploaded && end <= offset {
if block != nil {
s.upload(i)
}
s.uploaded = end
}
}

return nil
}

功能: 刷新指定范围内的所有块

流程:

  1. 检查偏移有效性
  2. 遍历所有块,如果块在范围内且未上传,上传它
  3. 更新已上传偏移

4. Finish - 完成写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (s *wSlice) Finish(length int) error {
if s.length != length {
return fmt.Errorf("Length mismatch: %v != %v", s.length, length)
}

n := (length-1)/s.store.conf.BlockSize + 1
if err := s.FlushTo(n * s.store.conf.BlockSize); err != nil {
return err
}
for i := 0; i < s.pendings; i++ {
if err := <-s.errors; err != nil {
s.uploadError = err
return err
}
}
return nil
}

功能: 完成写入,等待所有上传完成

流程:

  1. 检查长度匹配
  2. 刷新所有块
  3. 等待所有上传完成(从 errors 通道读取)
  4. 如果有错误,返回错误

cachedStore 方法

1. NewCachedStore - 创建带缓存的存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.Registerer) ChunkStore {
compressor := compress.NewCompressor(config.Compress)
if compressor == nil {
logger.Fatalf("unknown compress algorithm: %s", config.Compress)
}
if config.MaxRetries == 0 {
config.MaxRetries = 10
}
if config.GetTimeout == 0 {
config.GetTimeout = time.Second * 60
}
if config.PutTimeout == 0 {
config.PutTimeout = time.Second * 60
}
store := &cachedStore{
storage: storage,
conf: config,
currentUpload: make(chan bool, config.MaxUpload),
compressor: compressor,
seekable: compressor.CompressBound(0) == 0,
pendingCh: make(chan *pendingItem, 100*config.MaxUpload),
pendingKeys: make(map[string]*pendingItem),
group: NewController(),
}
if config.UploadLimit > 0 {
// there are overheads coming from HTTP/TCP/IP
store.upLimit = ratelimit.NewBucketWithRate(float64(config.UploadLimit)*0.85, config.UploadLimit/10)
}
if config.DownloadLimit > 0 {
store.downLimit = ratelimit.NewBucketWithRate(float64(config.DownloadLimit)*0.85, config.DownloadLimit/10)
}
store.initMetrics()
if store.conf.Writeback {
store.startHour, store.endHour, _ = config.parseHours()
if store.startHour != store.endHour {
logger.Infof("background upload at %d:00 ~ %d:00", store.startHour, store.endHour)
}
}
store.bcache = newCacheManager(&config, reg, func(key, fpath string, force bool) bool {
if fi, err := os.Stat(fpath); err == nil {
return store.addDelayedStaging(key, fpath, fi.ModTime(), force)
} else {
logger.Warnf("Stat staging block %s: %s", fpath, err)
return false
}
})

go func() {
for {
if store.bcache.isEmpty() {
logger.Warn("cache store is empty, use memory cache")
config.CacheSize = 100 << 20
config.CacheDir = "memory"
store.bcache = newMemStore(&config, store.bcache.getMetrics())
}
time.Sleep(time.Second)
}
}()

if !config.CacheEnabled() {
config.Prefetch = 0 // disable prefetch if cache is disabled
}
store.fetcher = newPrefetcher(config.Prefetch, func(key string) {
size := parseObjOrigSize(key)
if size == 0 || size > store.conf.BlockSize {
return
}
p := NewOffPage(size)
defer p.Release()
block, err := store.group.Execute(key, func() (*Page, error) { // dedup requests with full read
p.Acquire()
err := store.load(context.TODO(), key, p, false, false) // delay writing cache until singleflight ends to prevent blocking waiters
return p, err
})
defer block.Release()
if err == nil && block == p {
store.bcache.cache(key, block, true, !store.conf.OSCache)
}
})

if store.conf.Writeback {
for i := 0; i < store.conf.MaxUpload; i++ {
go store.uploader()
}
interval := time.Minute
if d := store.conf.UploadDelay; d > 0 {
if d < time.Minute {
interval = d
logger.Warnf("delay uploading by %s (this value is too small, and is not recommended)", d)
} else {
logger.Infof("delay uploading by %s", d)
}
}
go func() {
for {
time.Sleep(interval)
store.scanDelayedStaging()
}
}()
}
store.regMetrics(reg)
return store
}

功能: 创建并初始化带缓存的存储

初始化流程:

  1. 创建压缩器: 根据配置创建压缩器
  2. 设置默认值: 重试次数、超时时间
  3. 创建存储结构: 初始化所有字段
  4. 设置限流器: 上传和下载限流
  5. 初始化指标: Prometheus 指标
  6. 解析上传时间: 如果启用写回
  7. 创建缓存管理器: 带上传回调
  8. 启动后台任务:
    • 检查缓存是否为空(如果为空,切换到内存缓存)
    • 启动预取器(如果启用)
    • 启动上传器(如果启用写回)
    • 启动延迟上传扫描器

关键设计:

  • 自动降级: 如果缓存目录不可用,自动切换到内存缓存
  • 后台任务: 多个 goroutine 处理缓存和上传
  • 单飞去重: 预取时使用单飞避免重复请求

2. load - 加载块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
func (store *cachedStore) load(ctx context.Context, key string, page *Page, cache bool, forceCache bool) (err error) {
defer func() {
e := recover()
if e != nil {
err = fmt.Errorf("recovered from %s", e)
}
}()
needed := store.compressor.CompressBound(len(page.Data))
compressed := needed > len(page.Data)
// we don't know the actual size for compressed block
if store.downLimit != nil && !compressed {
store.downLimit.Wait(int64(len(page.Data)))
}
var (
in io.ReadCloser
n int
p *Page
reqID string
sc = object.DefaultStorageClass
start = time.Now()
)
if compressed {
c := NewOffPage(needed)
defer c.Release()
p = c
} else {
p = page
}
p.Acquire()
err = utils.WithTimeout(ctx, func(cCtx context.Context) error {
defer p.Release()
// it will be retried in the upper layer.
in, err = store.storage.Get(cCtx, key, 0, -1, object.WithRequestID(&reqID), object.WithStorageClass(&sc))
if err == nil {
n, err = io.ReadFull(in, p.Data)
_ = in.Close()
}
if compressed && err == io.ErrUnexpectedEOF {
err = nil
}
return err
}, store.conf.GetTimeout)
if errors.Is(err, context.Canceled) {
return err
}
used := time.Since(start)
logRequest("GET", key, "", reqID, err, used)
if store.downLimit != nil && compressed {
store.downLimit.Wait(int64(n))
}
store.objectDataBytes.WithLabelValues("GET", sc).Add(float64(n))
store.objectReqsHistogram.WithLabelValues("GET", sc).Observe(used.Seconds())
if err != nil {
store.objectReqErrors.Add(1)
return fmt.Errorf("get %s: %s", key, err)
}
if compressed {
n, err = store.compressor.Decompress(page.Data, p.Data[:n])
}
if err != nil || n < len(page.Data) {
return fmt.Errorf("read %s fully: %v (%d < %d) after %s", key, err, n, len(page.Data), used)
}
if cache {
store.bcache.cache(key, page, forceCache, !store.conf.OSCache)
}
return nil
}

功能: 从对象存储加载块

执行流程:

  1. 压缩检查: 判断是否需要解压
  2. 限流: 如果未压缩,先限流
  3. 分配页面: 如果压缩,分配更大的页面
  4. 下载数据: 从对象存储获取完整数据
  5. 限流: 如果压缩,下载后限流
  6. 解压: 如果压缩,解压数据
  7. 缓存: 如果启用缓存,写入缓存

关键点:

  • 超时控制: 使用 WithTimeout 控制超时
  • 压缩处理: 支持压缩和解压
  • 错误处理: 记录错误指标

3. upload - 上传块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (store *cachedStore) upload(key string, block *Page, s *wSlice) error {
sync := s != nil
blen := len(block.Data)
bufSize := store.compressor.CompressBound(blen)
var buf *Page
if bufSize > blen {
buf = NewOffPage(bufSize)
} else {
buf = block
buf.Acquire()
}
defer buf.Release()
if sync && (blen < store.conf.BlockSize || store.conf.CacheLargeWrite) {
// block will be freed after written into disk
store.bcache.cache(key, block, false, false)
}
n, err := store.compressor.Compress(buf.Data, block.Data)
block.Release()
if err != nil {
return fmt.Errorf("Compress block key %s: %s", key, err)
}
buf.Data = buf.Data[:n]

try, max := 0, 3
if sync {
max = store.conf.MaxRetries + 1
}
for ; try < max; try++ {
time.Sleep(time.Second * time.Duration(try*try))
if s != nil && s.uploadError != nil {
err = fmt.Errorf("(cancelled) upload block %s: %s (after %d tries)", key, err, try)
break
}
if err = store.put(key, buf); err == nil {
break
}
logger.Debugf("Upload %s: %s (try %d)", key, err, try+1)
}
if err != nil && try >= max {
err = fmt.Errorf("(max tries) upload block %s: %s (after %d tries)", key, err, try)
}
return err
}

功能: 上传块到对象存储

执行流程:

  1. 压缩: 压缩数据块
  2. 缓存: 如果同步且满足条件,写入缓存
  3. 重试上传: 使用指数退避重试
  4. 错误处理: 如果取消或达到最大重试次数,返回错误

关键点:

  • 压缩: 支持数据压缩
  • 重试: 同步上传使用更多重试次数
  • 指数退避: 重试间隔递增

4. put - 上传数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (store *cachedStore) put(key string, p *Page) error {
if store.upLimit != nil {
store.upLimit.Wait(int64(len(p.Data)))
}
p.Acquire()
var (
reqID string
sc = object.DefaultStorageClass
)
return utils.WithTimeout(context.TODO(), func(ctx context.Context) error {
defer p.Release()
st := time.Now()
err := store.storage.Put(ctx, key, bytes.NewReader(p.Data), object.WithRequestID(&reqID), object.WithStorageClass(&sc))
used := time.Since(st)
logRequest("PUT", key, "", reqID, err, used)
store.objectDataBytes.WithLabelValues("PUT", sc).Add(float64(len(p.Data)))
store.objectReqsHistogram.WithLabelValues("PUT", sc).Observe(used.Seconds())
if err != nil {
store.objectReqErrors.Add(1)
}
return err
}, store.conf.PutTimeout)
}

功能: 上传数据到对象存储

流程:

  1. 限流
  2. 超时控制
  3. 上传数据
  4. 记录指标

5. uploader - 上传器

1
2
3
4
5
func (store *cachedStore) uploader() {
for it := range store.pendingCh {
store.uploadStagingFile(it.key, it.fpath)
}
}

功能: 后台上传器,从 pendingCh 通道读取待上传项并上传

使用场景: 写回模式下的延迟上传

6. uploadStagingFile - 上传暂存文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
store.currentUpload <- true
defer func() {
<-store.currentUpload
}()

store.pendingMutex.Lock()
item, ok := store.pendingKeys[key]
store.pendingMutex.Unlock()
if !ok {
logger.Debugf("Key %s is not needed, drop it", key)
return
}
defer func() {
item.uploading = false
}()

if !store.canUpload() {
return
}

blen := parseObjOrigSize(key)
f, err := openCacheFile(stagingPath, blen, store.conf.CacheChecksum)
if err != nil {
if store.isPendingValid(key) {
logger.Errorf("Open staging file %s: %s", stagingPath, err)
} else {
logger.Debugf("Key %s is not needed, drop it", key)
}
return
}
block := NewOffPage(blen)
_, err = f.ReadAt(block.Data, 0)
_ = f.Close()
if err != nil {
block.Release()
logger.Errorf("Read staging file %s: %s", stagingPath, err)
return
}
if !store.isPendingValid(key) {
block.Release()
logger.Debugf("Key %s is not needed, drop it", key)
return
}

store.stageBlockDelay.Add(time.Since(item.ts).Seconds())
if err = store.upload(key, block, nil); err == nil {
if !store.isPendingValid(key) { // Delete leaked objects if it's already deleted by other goroutines
err := store.delete(key)
logger.Infof("Key %s is not needed, abandoned, err: %v", key, err)
} else {
store.bcache.uploaded(key, blen)
store.removePending(key)
if err := store.bcache.removeStage(key); err != nil {
logger.Warnf("failed to remove stage %s, in upload staging file", stagingPath)
}
}
}
}

功能: 上传暂存文件到对象存储

流程:

  1. 获取上传槽位(限流)
  2. 检查待上传项是否有效
  3. 检查是否允许上传(时间窗口)
  4. 读取暂存文件
  5. 上传数据
  6. 清理暂存文件和待上传项

关键点:

  • 并发安全: 检查项是否仍然有效
  • 错误处理: 如果项已无效,删除已上传的对象

数据流分析

读取流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
用户请求

├─> rSlice.ReadAt(ctx, page, off)
│ │
│ ├─> 检查缓存 (bcache.load)
│ │ │
│ │ ├─> 命中: 返回数据,更新指标
│ │ └─> 未命中: 继续
│ │
│ ├─> 部分读取优化 (如果支持随机访问)
│ │ │
│ │ ├─> 尝试搭便车 (TryPiggyback)
│ │ │ │
│ │ │ └─> 成功: 返回数据,触发预取
│ │ │
│ │ └─> Range 请求 (storage.Get with range)
│ │ │
│ │ └─> 成功: 返回数据,触发预取
│ │
│ └─> 完整读取 (回退)
│ │
│ ├─> 单飞去重 (group.Execute)
│ │
│ ├─> load(ctx, key, page, cache, forceCache)
│ │ │
│ │ ├─> 限流 (downLimit.Wait)
│ │ │
│ │ ├─> 下载数据 (storage.Get)
│ │ │
│ │ ├─> 解压 (如果压缩)
│ │ │
│ │ └─> 写入缓存 (bcache.cache)
│ │
│ └─> 返回数据

└─> 返回给用户

写入流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
用户请求

├─> wSlice.WriteAt(p, off)
│ │
│ ├─> 边界检查
│ │
│ ├─> 填充空洞 (如果需要)
│ │
│ ├─> 分块写入
│ │ │
│ │ ├─> 计算块索引和偏移
│ │ │
│ │ ├─> 分配或获取 Page
│ │ │
│ │ └─> 复制数据
│ │
│ └─> 更新长度

├─> wSlice.FlushTo(offset) 或 Finish(length)
│ │
│ └─> wSlice.upload(indx)
│ │
│ ├─> 合并页面
│ │
│ ├─> 写回模式 (如果启用)
│ │ │
│ │ ├─> 写入本地缓存 (bcache.stage)
│ │ │
│ │ ├─> 立即上传 (如果满足条件)
│ │ │ │
│ │ │ └─> store.upload(key, block, nil)
│ │ │ │
│ │ │ ├─> 压缩数据
│ │ │ │
│ │ │ ├─> 写入缓存 (如果同步)
│ │ │ │
│ │ │ └─> store.put(key, buf)
│ │ │ │
│ │ │ ├─> 限流 (upLimit.Wait)
│ │ │ │
│ │ │ └─> storage.Put
│ │ │
│ │ └─> 延迟上传 (添加到 pendingCh)
│ │ │
│ │ └─> uploader() → uploadStagingFile()
│ │
│ └─> 直接上传 (回退)
│ │
│ └─> store.upload(key, block, s)

└─> 等待上传完成 (Finish)

写回流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
写入数据

├─> wSlice.upload(indx)
│ │
│ └─> 写回模式 && 块大小 < 阈值
│ │
│ ├─> bcache.stage(key, data)
│ │ │
│ │ └─> 写入本地缓存文件
│ │
│ ├─> 立即上传 (如果满足条件)
│ │ │
│ │ ├─> store.upload(key, block, nil)
│ │ │
│ │ └─> bcache.uploaded(key, blen)
│ │ │
│ │ └─> bcache.removeStage(key)
│ │
│ └─> 延迟上传
│ │
│ ├─> addDelayedStaging(key, path, time, force)
│ │ │
│ │ └─> pendingCh <- item
│ │
│ └─> uploader()
│ │
│ └─> uploadStagingFile(key, path)
│ │
│ ├─> 读取暂存文件
│ │
│ ├─> store.upload(key, block, nil)
│ │
│ └─> bcache.removeStage(key)

设计模式和特点

1. 缓存策略

多级缓存:

  • 内存缓存: 热数据在内存中
  • 磁盘缓存: 温数据在磁盘上
  • 对象存储: 冷数据在对象存储中

缓存查找顺序:

  1. 内存缓存
  2. 磁盘缓存
  3. 对象存储

2. 写回(Writeback)机制

特点:

  • 小数据块先写入本地缓存
  • 延迟上传到对象存储
  • 支持时间窗口控制

优势:

  • 提高写入性能
  • 减少网络请求
  • 支持批量上传

3. 单飞(Singleflight)去重

实现: group.Execute(key, func() (*Page, error))

作用: 避免重复请求相同的数据块

使用场景:

  • 读取时的完整块加载
  • 预取时的数据加载

4. 搭便车(Piggyback)优化

实现: group.TryPiggyback(key)

作用: 从正在进行的完整读取中获取部分数据

优势: 减少网络请求,提高效率

5. 限流控制

上传限流: upLimit (ratelimit.Bucket)
下载限流: downLimit (ratelimit.Bucket)

实现: 使用令牌桶算法

6. 压缩支持

压缩时机: 上传时压缩
解压时机: 下载时解压

判断: compressor.CompressBound(0) == 0 判断是否支持随机访问

7. 预取(Prefetch)

实现: fetcher.fetch(key)

作用: 后台预取可能访问的数据

触发: 读取时自动触发

性能优化

1. 缓存优先

策略: 先查缓存,减少网络请求

实现: ReadAt 中先调用 bcache.load

2. 部分读取优化

策略: 如果支持随机访问,使用 Range 请求

实现: seekable && boff > 0 && len(p) <= blockSize/4

3. 单飞去重

策略: 避免重复请求相同数据

实现: group.Execute 确保同一 key 只请求一次

4. 搭便车优化

策略: 从正在进行的读取中获取数据

实现: group.TryPiggyback

5. 写回优化

策略: 小数据块先写本地,延迟上传

实现: writeback && blen < WritebackThresholdSize

6. 批量上传

策略: 延迟上传,批量处理

实现: pendingCh 通道 + uploader goroutine

7. 预取

策略: 后台预取可能访问的数据

实现: fetcher.fetch 在读取后触发

并发控制

1. 锁保护

pendingMutex: 保护 pendingKeys 映射

使用场景:

  • addDelayedStaging
  • removePending
  • isPendingValid
  • scanDelayedStaging

2. 通道限流

currentUpload: 限制并发上传数

实现: make(chan bool, config.MaxUpload)

3. 单飞去重

group: 使用单飞控制器去重请求

实现: group.Execute 确保同一 key 只执行一次

错误处理

1. 重试机制

上传重试: 指数退避,最多 MaxRetries + 1

实现: for try < max; try++ { time.Sleep(time.Second * time.Duration(try*try)) }

2. 超时控制

读取超时: GetTimeout (默认 60 秒)
上传超时: PutTimeout (默认 60 秒)

实现: utils.WithTimeout

3. 错误指标

指标:

  • objectReqErrors: 对象请求错误数
  • stageBlockErrors: 暂存块错误数

4. 错误恢复

缓存错误: 移除缓存项,回退到对象存储

上传错误: 重试,如果失败返回错误

内存管理

1. Page 池

实现: pagePool 通道

作用: 复用 Page,减少内存分配

2. 引用计数

Page.Acquire/Release: 管理 Page 的生命周期

3. 内存限制

缓存大小: CacheSize 限制缓存总大小

实现: 缓存管理器负责淘汰

关键算法

1. 键生成算法

HashPrefix 模式:

1
chunks/{id%256}/{(id/1000)/1000}/{id}_{indx}_{size}

普通模式:

1
chunks/{(id/1000)/1000}/{id/1000}/{id}_{indx}_{size}

设计目的: 分散对象存储负载

2. 块大小计算

1
2
3
4
5
6
7
func (s *rSlice) blockSize(indx int) int {
bsize := s.length - indx*s.store.conf.BlockSize
if bsize > s.store.conf.BlockSize {
bsize = s.store.conf.BlockSize
}
return bsize
}

功能: 计算指定索引的块大小

特点: 最后一个块可能小于 BlockSize

3. 上传时间窗口

1
2
3
4
5
6
7
8
func (store *cachedStore) canUpload() bool {
if store.startHour == store.endHour {
return true
}
h := time.Now().Hour()
return store.startHour < store.endHour && h >= store.startHour && h < store.endHour ||
store.startHour > store.endHour && (h >= store.startHour || h < store.endHour)
}

功能: 判断当前时间是否允许上传

支持: 跨天时间窗口(如 22:00-02:00)

数据结构关系图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
cachedStore

├─> storage: object.ObjectStorage

├─> bcache: CacheManager
│ │
│ └─> cacheStore (多个)
│ │
│ ├─> pages: map[string]*Page
│ └─> keys: KeyIndex

├─> fetcher: *prefetcher
│ │
│ └─> prefetch queue

├─> group: *Controller
│ │
│ └─> singleflight map

├─> currentUpload: chan bool (限流)

├─> pendingCh: chan *pendingItem

├─> pendingKeys: map[string]*pendingItem

├─> upLimit: *ratelimit.Bucket

└─> downLimit: *ratelimit.Bucket

关键设计决策

1. 为什么使用多级缓存?

原因:

  • 内存缓存:最快,但容量有限
  • 磁盘缓存:较快,容量较大
  • 对象存储:最慢,容量无限

优势:

  • 平衡性能和成本
  • 提高缓存命中率

2. 为什么使用写回机制?

原因:

  • 提高写入性能
  • 减少网络请求
  • 支持批量上传

实现:

  • 小数据块先写本地
  • 延迟上传到对象存储
  • 支持时间窗口控制

3. 为什么使用单飞去重?

原因:

  • 避免重复请求相同数据
  • 减少网络请求
  • 提高效率

实现:

  • group.Execute 确保同一 key 只执行一次
  • 其他请求等待结果

4. 为什么使用搭便车优化?

原因:

  • 从正在进行的读取中获取数据
  • 减少网络请求
  • 提高效率

实现:

  • group.TryPiggyback 尝试获取部分数据
  • 如果成功,直接返回

5. 为什么使用部分读取优化?

原因:

  • 减少数据传输
  • 提高效率
  • 支持随机访问

实现:

  • 如果支持随机访问且读取范围小,使用 Range 请求
  • 否则读取完整块

潜在问题和改进

1. 缓存一致性

问题: 多个客户端可能缓存不同版本的数据

改进: 使用版本号或校验和

2. 缓存淘汰

问题: 缓存淘汰策略可能不够优化

改进: 使用更智能的淘汰策略(如 LRU)

3. 错误恢复

问题: 某些错误(如网络错误)可能导致数据不一致

改进: 实现更完善的错误恢复机制

4. 性能优化

问题:

  • 部分读取可能不够优化
  • 预取策略可能不够智能

改进:

  • 优化部分读取逻辑
  • 实现更智能的预取策略

总结

cached_store.go 文件实现了 JuiceFS 的带缓存的块存储功能,具有以下特点:

  1. 多级缓存: 内存、磁盘、对象存储三级缓存
  2. 写回机制: 小数据块先写本地,延迟上传
  3. 性能优化: 单飞去重、搭便车、部分读取、预取
  4. 限流控制: 上传和下载限流
  5. 压缩支持: 支持数据压缩和解压
  6. 并发安全: 使用锁和通道保护并发访问
  7. 错误处理: 重试机制、超时控制、错误指标
  8. 内存管理: Page 池、引用计数、缓存淘汰

它是 JuiceFS 存储性能的关键组件,实现了高效的缓存管理和数据访问优化。