内存分配器之 concurrent_arena

concurrent_arena

在上一期的 存分配器之 arena 中,讲解了 RocksDB 是如何混合new、mmap设计一个高效的内存分配器。这一期,我们继续深入探索 RocksDB 是如何设计出一个高效的多线程内存分配器 concurrent_arena

let’s go

多线程程序的性能关键,有两个关键因素:

  • 减少竞争

    减少竞争,有诸多方式,比如使用原子变量、细粒度锁(fine-grained mutex),threadlocal。

  • False Sharing

    降低false sharing,一般解决办法是将线程间共享的数据大小与cacheline大小对齐(align)。

而 concurrent_arena 即在 arena 的基础上,增加了一些多线程间的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class ConcurrentArena : public Allocator {
public:
/// @brief 参数 @c block_size 与 @c huge_page_size 含义同 Arena 中
explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize,
size_t huge_page_size = 0);


/// @brief 分配无须对齐的内存
char* Allocate(size_t bytes) override;

/// @brief 分配需要对齐的内存
char* AllocateAligned(size_t bytes,
size_t huge_page_size = 0,
Logger* logger = nullptr) override;

//...

private:
struct Shard {
char padding[40] ROCKSDB_FIELD_UNUSED;
mutable SpinMutex mutex;
char* free_begin_;
std::atomic<size_t> allocated_and_unused_;

Shard() : free_begin_(nullptr), allocated_and_unused_(0) {}
};

static thread_local size_t tls_cpuid;

char padding0[56] ROCKSDB_FIELD_UNUSED;

size_t shard_block_size_;

CoreLocalArray<Shard> shards_; // 存储数据

Arena arena_; // 内存分配器
mutable SpinMutex arena_mutex_; // 保证使用 arena 分配内存时线程安全
std::atomic<size_t> arena_allocated_and_unused_; // arena_ 剩余可使用的内存
std::atomic<size_t> memory_allocated_bytes_; // arena 总的分配内存
std::atomic<size_t> irregular_block_num_;

char padding1[56] ROCKSDB_FIELD_UNUSED;
};

ConcurrentArena::Shard

为避免竞争,concurrent_arena 使用将每个线程所需的内存分配在线程所属的cpu核上,如此每个线程在分配内存时都会使用自己的内存,如此就避免了竞争。这也是一种空间换时间的策略,类似于threadlocal思想。

结构体 Shard 记录了每个核上的内存分配、使用情况。字段 shards_ 记录了所有核上shard。

CoreLocalArray 本质上是个数组,为了提高多线程的访问效率,要将Shard的大小对齐到cacheline大小,以阻止false sharing现象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct Shard {
// 填充字节以对齐到 cacheline, 避免 false sharding
char padding[40] ROCKSDB_FIELD_UNUSED;
// 用于保护 free_begin_ 指向的数据
mutable SpinMutex mutex;
// 每个core上分配内存地址
char* free_begin_;
// 每个core剩余可用内存
std::atomic<size_t> allocated_and_unused_;

Shard() : free_begin_(nullptr), allocated_and_unused_(0) {}
};

CoreLocalArray<Shard> shards_;

CoreLocalArray

CoreLocalArray 只用于存储数据,本身不是线程安全的,因此需要 Shard::mutex 字段保护。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
template <typename T>
class CoreLocalArray {
public:
CoreLocalArray();

/// 数组元素个数
size_t Size() const {
return static_cast<size_t>(1) << size_shift_;
}

/// @return 返回当前线程所在core上的数据地址 ptr
T* Access() const {
return AccessElementAndIndex().first;
}

/// @return 不仅返回当前线程所在core上的数据地址 ptr,还返回该core的index
std::pair<T*, size_t> AccessElementAndIndex() const;

/// @brief 通过 core_idx 来获取该 core 的数据
T* AccessAtCore(size_t core_idx) const;

private:
std::unique_ptr<T[]> data_; // 数组
int size_shift_;
};
CoreLocalArray::CoreLocalArray

C++11中提供了std::thread::hardware_concurrency 函数,来获取CPU的核数num_cpus,将线程数设置为 num_cpus,如此就能避免在一个进程中线程频繁的在不同的core上来回切换,降低线程切换上下文的开销。

  • 如果num_cpus <= 8,则启动8个线程;
  • 否则,开启num_cpus个线程(num_cpus 一般也是2的幂)。

代码简洁如下。

1
2
3
4
5
6
7
8
9
10
template <typename T>
CoreLocalArray<T>::CoreLocalArray() {
int num_cpus = static_cast<int>(std::thread::hardware_concurrency());
size_shift_ = 3;
while (1 << size_shift_ < num_cpus) {
++size_shift_;
}
// 每个core上都有类型 T 的数据
data_.reset(new T[static_cast<size_t>(1) << size_shift_]);
}
CoreLocalArray::AccessAtCore

AccessAtCore 函数,用于获取核core_idx上的数据,即获取数组data_[core_idx]中数据。

在这解释下为啥类型T大小需要对齐到 cacheline大小。

1
2
3
4
5
template <typename T>
T* CoreLocalArray<T>::AccessAtCore(size_t core_idx) const {
assert(core_idx < static_cast<size_t>(1) << size_shift_);
return &data_[core_idx];
}
CoreLocalArray::AccessElementAndIndex
1
2
3
4
5
6
7
8
9
10
11
12
13
template <typename T>
std::pair<T*, size_t> CoreLocalArray<T>::AccessElementAndIndex() const {
int cpuid = port::PhysicalCoreID(); // 获取cpu的物理id
size_t core_idx;
if (UNLIKELY(cpuid < 0)) {
// cpu id unavailable, just pick randomly
core_idx = Random::GetTLSInstance()->Uniform(1 << size_shift_);
} else {
// 将 cpuid 映射到 1 << size_shift_ 以内
core_idx = static_cast<size_t>(cpuid & ((1 << size_shift_) - 1));
}
return {AccessAtCore(core_idx), core_idx};
}

ConcurrentArena::ShardAllocatedAndUnused

ShardAllocatedAndUnused 则用于记录,shards中各个shard中剩余可用内存大小。

1
2
3
4
5
6
7
8
9
// 所有已分配但未使用的内存
size_t ConcurrentArena::ShardAllocatedAndUnused() const {
size_t total = 0;
for (size_t i = 0; i < shards_.Size(); ++i) {
total +=
shards_.AccessAtCore(i)->allocated_and_unused_.load(std::memory_order_relaxed);
}
return total;
}

ConcurrentArena::ConcurrentArena

ConcurrentArena 的构造函数比较简单:

  1. 初始化每个shard 需要分配的内存大小 shard_block_size_
  2. 初始化 shards_ 数组;
  3. 初始化arena
  4. 调用Fixup函数,初始化内存使用情况;

简单如下。

1
2
3
4
5
6
7
8
ConcurrentArena::ConcurrentArena(size_t block_size,
AllocTracker* tracker,
size_t huge_page_size)
: shard_block_size_(std::min(kMaxShardBlockSize, block_size / 8)),
shards_(),
arena_(block_size, tracker, huge_page_size) {
Fixup();
}

ConcurrentArena::Fixup

FixUp 函数用于记录当前内存情况。

下面三个原子变量写入新值时,使用::std::memory_order_relaxed即能满足。关于memory order,花了两个周末的时候重温了下,有时间会再出博客深度讲解。

1
2
3
4
5
6
7
void ConcurrentArena::Fixup() {
// arena_ 所分配的内存中剩余可使用的内存
arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(), std::memory_order_relaxed);
// arena 分配的内存总量
memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(), std::memory_order_relaxed);
irregular_block_num_.store(arena_.IrregularBlockNum(), std::memory_order_relaxed);
}

ConcurrentArena::Repick

Repick 函数,用来初始化线程。把一个线程绑定到所属的核tls_cpuid。这样下次以后这个线程需要内存,直接去对应的shard获取内存。

注意,tls_cpuid 默认为0,因此对于core_idx为0的线程需要映射到num_cpus,这样就能通过tls_cpuid != 0来判断这个线程是否初始化过。

1
2
3
4
5
6
ConcurrentArena::Shard* ConcurrentArena::Repick() {
auto shard_and_index = shards_.AccessElementAndIndex();
// 用于将 cord_idx 为 0 时 映射为 num_cpus,来体现此线程已初始化
tls_cpuid = shard_and_index.second | shards_.Size();
return shard_and_index.first;
}

ConcurrentArena::AllocateImpl

整个流程如下:

  1. 先判断此次分配内存,是否需要从arena直接分配即可

    • 超过一个shards内存大小的1/4
    • 上层强制使用 arena
    • 该线程首次调用AllocateImpl 函数,此时shards中的各个shard->free_begin尚未指向有效内存地址;

    如果要使用 arena_,则需要使用arena_mutex_保护。

  2. 尝试从shard中获取内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
template <typename Func>
char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
size_t cpu;
// 直接使用arena分配内存,有两种情况:
// 1. 待分配的内存比较大。
// 2. Repick 尚未调用过,并且当前 arena_mutex_ 能直接获得
std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
if (bytes > shard_block_size_ / 4
// 使用 huge_page
|| force_arena
// 判断条件执行到此,说明 所需的内存 bytes 并不大,比较小
// 原则上是不需要使用 arena,但是如果同时满足以下二个条件:
// 1. 没有执行过 Repick 函数
// 2. 成功获得锁
|| ((cpu = tls_cpuid) == 0 // 线程尚未初始化
&& !shards_.AccessAtCore(0)->allocated_and_unused_.load(std::memory_order_relaxed)
&& arena_lock.try_lock())) {
if (!arena_lock.owns_lock()) {
// 阻塞等待
arena_lock.lock();
// 获得锁
}
auto rv = func(); // 分配内存的回调函数
Fixup();
return rv;
}

// 2. 选择一个 shard 来满足此次内存分配
Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1));
if (!s->mutex.try_lock()) {
s = Repick();
s->mutex.lock();
}
std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);

size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
if (avail < bytes) {
// reload
// 此时需要使用 arnea 来分配内存
// 1. 该 core 所剩的内存不足
// 2. 第一次执行(本质还是第一种情况)
std::lock_guard<SpinMutex> reload_lock(arena_mutex_);

auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
assert(exact == arena_.AllocatedAndUnused());

if (exact >= bytes && arena_.IsInInlineBlock()) {
// 如果还没有消耗完 arena 的inline block的内存,则直接使用该内存
// 这样就可以避免分配新的block,
auto rv = func(); // 执行回调函数分配
Fixup();
return rv;
}

avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
? exact
: shard_block_size_;
// 重新分配内存
// 首次调用即在此分配内存
s->free_begin_ = arena_.AllocateAligned(avail);
Fixup();
}
// 可用内存减少
s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);

char* rv;
if ((bytes % sizeof(void*)) == 0) {
// aligned allocation from the beginning
rv = s->free_begin_;
s->free_begin_ += bytes;
} else {
// unaligned from the end
rv = s->free_begin_ + avail - bytes;
}
return rv;
}