concurrent_arena 在上一期的 存分配器之 arena 中,讲解了 RocksDB 是如何混合new、mmap设计一个高效的内存分配器。这一期,我们继续深入探索 RocksDB 是如何设计出一个高效的多线程内存分配器 concurrent_arena
。
let’s go
多线程程序的性能关键,有两个关键因素:
而 concurrent_arena 即在 arena 的基础上,增加了一些多线程间的操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 class ConcurrentArena : public Allocator { public : explicit ConcurrentArena (size_t block_size = Arena::kMinBlockSize, size_t huge_page_size = 0 ) ; char * Allocate (size_t bytes) override ; char * AllocateAligned (size_t bytes, size_t huge_page_size = 0 , Logger* logger = nullptr ) override ; private : struct Shard { char padding[40 ] ROCKSDB_FIELD_UNUSED; mutable SpinMutex mutex; char * free_begin_; std::atomic<size_t > allocated_and_unused_; Shard () : free_begin_ (nullptr ), allocated_and_unused_ (0 ) {} }; static thread_local size_t tls_cpuid; char padding0[56 ] ROCKSDB_FIELD_UNUSED; size_t shard_block_size_; CoreLocalArray<Shard> shards_; Arena arena_; mutable SpinMutex arena_mutex_; std::atomic<size_t > arena_allocated_and_unused_; std::atomic<size_t > memory_allocated_bytes_; std::atomic<size_t > irregular_block_num_; char padding1[56 ] ROCKSDB_FIELD_UNUSED; };
ConcurrentArena::Shard 为避免竞争,concurrent_arena
使用将每个线程所需的内存分配在线程所属的cpu核上,如此每个线程在分配内存时都会使用自己的内存,如此就避免了竞争。这也是一种空间换时间的策略,类似于threadlocal思想。
结构体 Shard
记录了每个核上的内存分配、使用情况。字段 shards_
记录了所有核上shard。
类CoreLocalArray
本质上是个数组,为了提高多线程的访问效率,要将Shard
的大小对齐到cacheline大小,以阻止false sharing现象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 struct Shard { char padding[40 ] ROCKSDB_FIELD_UNUSED; mutable SpinMutex mutex; char * free_begin_; std::atomic<size_t > allocated_and_unused_; Shard () : free_begin_ (nullptr ), allocated_and_unused_ (0 ) {} }; CoreLocalArray<Shard> shards_;
CoreLocalArray 类 CoreLocalArray
只用于存储数据,本身不是线程安全的,因此需要 Shard::mutex
字段保护。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 template <typename T>class CoreLocalArray { public : CoreLocalArray (); size_t Size () const { return static_cast <size_t >(1 ) << size_shift_; } T* Access () const { return AccessElementAndIndex ().first; } std::pair<T*, size_t > AccessElementAndIndex () const ; T* AccessAtCore (size_t core_idx) const ; private : std::unique_ptr<T[]> data_; int size_shift_; };
CoreLocalArray::CoreLocalArray C++11中提供了std::thread::hardware_concurrency
函数,来获取CPU的核数num_cpus
,将线程数设置为 num_cpus
,如此就能避免在一个进程中线程频繁的在不同的core上来回切换,降低线程切换上下文的开销。
如果num_cpus <= 8
,则启动8个线程;
否则,开启num_cpus
个线程(num_cpus 一般也是2的幂)。
代码简洁如下。
1 2 3 4 5 6 7 8 9 10 template <typename T>CoreLocalArray<T>::CoreLocalArray () { int num_cpus = static_cast <int >(std::thread::hardware_concurrency ()); size_shift_ = 3 ; while (1 << size_shift_ < num_cpus) { ++size_shift_; } data_.reset (new T[static_cast <size_t >(1 ) << size_shift_]); }
CoreLocalArray::AccessAtCore AccessAtCore
函数,用于获取核core_idx
上的数据,即获取数组data_[core_idx]
中数据。
在这解释下为啥类型T
大小需要对齐到 cacheline大小。
1 2 3 4 5 template <typename T>T* CoreLocalArray<T>::AccessAtCore (size_t core_idx) const { assert (core_idx < static_cast <size_t >(1 ) << size_shift_); return &data_[core_idx]; }
CoreLocalArray::AccessElementAndIndex 1 2 3 4 5 6 7 8 9 10 11 12 13 template <typename T>std::pair<T*, size_t > CoreLocalArray<T>::AccessElementAndIndex () const { int cpuid = port::PhysicalCoreID (); size_t core_idx; if (UNLIKELY (cpuid < 0 )) { core_idx = Random::GetTLSInstance ()->Uniform (1 << size_shift_); } else { core_idx = static_cast <size_t >(cpuid & ((1 << size_shift_) - 1 )); } return {AccessAtCore (core_idx), core_idx}; }
ConcurrentArena::ShardAllocatedAndUnused ShardAllocatedAndUnused
则用于记录,shards
中各个shard
中剩余可用内存大小。
1 2 3 4 5 6 7 8 9 size_t ConcurrentArena::ShardAllocatedAndUnused () const { size_t total = 0 ; for (size_t i = 0 ; i < shards_.Size (); ++i) { total += shards_.AccessAtCore (i)->allocated_and_unused_.load (std::memory_order_relaxed); } return total; }
ConcurrentArena::ConcurrentArena ConcurrentArena
的构造函数比较简单:
初始化每个shard
需要分配的内存大小 shard_block_size_
;
初始化 shards_
数组;
初始化arena
调用Fixup
函数,初始化内存使用情况;
简单如下。
1 2 3 4 5 6 7 8 ConcurrentArena::ConcurrentArena (size_t block_size, AllocTracker* tracker, size_t huge_page_size) : shard_block_size_ (std::min (kMaxShardBlockSize, block_size / 8 )), shards_ (), arena_ (block_size, tracker, huge_page_size) { Fixup (); }
ConcurrentArena::Fixup FixUp
函数用于记录当前内存情况。
下面三个原子变量写入新值时,使用::std::memory_order_relaxed
即能满足。关于memory order
,花了两个周末的时候重温了下,有时间会再出博客深度讲解。
1 2 3 4 5 6 7 void ConcurrentArena::Fixup () { arena_allocated_and_unused_.store (arena_.AllocatedAndUnused (), std::memory_order_relaxed); memory_allocated_bytes_.store (arena_.MemoryAllocatedBytes (), std::memory_order_relaxed); irregular_block_num_.store (arena_.IrregularBlockNum (), std::memory_order_relaxed); }
ConcurrentArena::Repick Repick
函数,用来初始化线程。把一个线程绑定到所属的核tls_cpuid
。这样下次以后这个线程需要内存,直接去对应的shard
获取内存。
注意,tls_cpuid
默认为0,因此对于core_idx
为0的线程需要映射到num_cpus
,这样就能通过tls_cpuid != 0
来判断这个线程是否初始化过。
1 2 3 4 5 6 ConcurrentArena::Shard* ConcurrentArena::Repick () { auto shard_and_index = shards_.AccessElementAndIndex (); tls_cpuid = shard_and_index.second | shards_.Size (); return shard_and_index.first; }
ConcurrentArena::AllocateImpl 整个流程如下:
先判断此次分配内存,是否需要从arena直接分配即可
超过一个shards内存大小的1/4
上层强制使用 arena
该线程首次调用AllocateImpl
函数,此时shards
中的各个shard->free_begin
尚未指向有效内存地址;
如果要使用 arena_
,则需要使用arena_mutex_
保护。
尝试从shard中获取内存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 template <typename Func>char * AllocateImpl (size_t bytes, bool force_arena, const Func& func) { size_t cpu; std::unique_lock<SpinMutex> arena_lock (arena_mutex_, std::defer_lock) ; if (bytes > shard_block_size_ / 4 || force_arena || ((cpu = tls_cpuid) == 0 && !shards_.AccessAtCore (0 )->allocated_and_unused_.load (std::memory_order_relaxed) && arena_lock.try_lock ())) { if (!arena_lock.owns_lock ()) { arena_lock.lock (); } auto rv = func (); Fixup (); return rv; } Shard* s = shards_.AccessAtCore (cpu & (shards_.Size () - 1 )); if (!s->mutex.try_lock ()) { s = Repick (); s->mutex.lock (); } std::unique_lock<SpinMutex> lock (s->mutex, std::adopt_lock) ; size_t avail = s->allocated_and_unused_.load (std::memory_order_relaxed); if (avail < bytes) { std::lock_guard<SpinMutex> reload_lock (arena_mutex_) ; auto exact = arena_allocated_and_unused_.load (std::memory_order_relaxed); assert (exact == arena_.AllocatedAndUnused ()); if (exact >= bytes && arena_.IsInInlineBlock ()) { auto rv = func (); Fixup (); return rv; } avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2 ? exact : shard_block_size_; s->free_begin_ = arena_.AllocateAligned (avail); Fixup (); } s->allocated_and_unused_.store (avail - bytes, std::memory_order_relaxed); char * rv; if ((bytes % sizeof (void *)) == 0 ) { rv = s->free_begin_; s->free_begin_ += bytes; } else { rv = s->free_begin_ + avail - bytes; } return rv; }