多线程 HashTable 设计

HashTable<T, Hash, Equal>

RocksDB 为了优化多核下的HashTable性能,放弃了全局锁设计,取而代之是如下分段锁设计:在并发下,将全局锁上的竞争分散到各个bucket上。

对每个桶(bucket)分配一个锁(mutex),当获取某个bucket中的元素时,只使用对应bucket的mutex即可,这样对其他bucket的元素就没有额外的负担。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//                         |<-------- alpha ------------->|
// Buckets Collision list
// ---- +----+ +---+---+--- ...... ---+---+---+
// / | |--->| | | | | |
// / +----+ +---+---+--- ...... ---+---+---+
// / | |
// Locks/ +----+
// +--+/ . .
// | | . .
// +--+ . .
// | | . .
// +--+ . .
// | | . .
// +--+ . .
// \ +----+
// \ | |
// \ +----+
// \ | |
// \---- +----+
//

HashTable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
template <class T, class Hash, class Equal>
class HashTable {
public:
explicit HashTable(const size_t capacity = 1024 * 1024,
const float load_factor = 2.0,
const uint32_t nlocks = 256)
: nbuckets_(static_cast<uint32_t>(load_factor ? capacity / load_factor : 0)),
nlocks_(nlocks) {
assert(capacity);
assert(load_factor);
assert(nbuckets_);
assert(nlocks_);

buckets_.reset(new Bucket[nbuckets_]);
// initialize
mlock(buckets_.get(), nbuckets_ * sizeof(Bucket));
mlock(locks_.get(), nlocks_ * sizeof(port::RWMutex));

assert(buckets_);
assert(locks_);
}

virtual ~HashTable() { AssertEmptyBuckets(); }


protected:
struct Bucket {
std::list<T> list_;
};

//...

void AssertEmptyBuckets() {
#ifndef NDEBUG
for (size_t i = 0; i < nbuckets_; ++i) {
WriteLock _(&locks_[i % nlocks_]);
assert(buckets_[i].list_.empty());
}
#endif
}
//...

const uint32_t nbuckets_; // 桶的总数
std::unique_ptr<Bucket[]> buckets_; // 桶
const uint32_t nlocks_; // 锁的总数
std::unique_ptr<port::RWMutex[]> locks_; // 锁
};

Find

每个bucket都指向一个链表,用于解决hash冲突,因此当查找一个key时,需要在链表中顺序查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/// @brief 顺序查找,时间复杂度O(N)
typename std::list<T>::iterator Find(std::list<T>* list, const T& t) {
for (auto it = list->begin(); it != list->end(); ++it) {
if (Equal()(*it, t)) {
return it;
}
}
return list->end();
}

///@brief 在 bucket 中查找 t
///@param ret 就是传出参数,返回true时有效
bool Find(Bucket* bucket, const T& t, T* ret) {
auto it = Find(&bucket->list_, t);
if (it != bucket->list_.end()) {
if (ret) {
*ret = *it;
}
return true;
}
return false;
}

Insert

1
2
3
4
5
6
7
8
9
10
/// 将 t 添加到 bucket 中,不会重复添加
bool Insert(Bucket* bucket, const T& t) {
auto it = Find(&bucket->list_, t);
if (it != bucket->list_.end()) {
return false;
}
// 不存在,则添加
bucket->list_.push_back(t);
return true;
}

Erase

EraseInsert类似。

1
2
3
4
5
6
7
8
9
10
11
12
bool Erase(Bucket* bucket, const T& t, T* ret) {
auto it = Find(&bucket->list_, t);
if (it != bucket->list_.end()) {
if (ret) {
*ret = *it;
}

bucket->list_.erase(it);
return true;
}
return false;
}

GetMutex

GetMutex函数,用于获得一个key对应的mutex。

  • 获得 key 对应的 hash_code
  • 获得 hash_code 映射到 buckets_ 中的索引 bucket_idx
  • 通过 bucket_idx 映射到 locks_ 中的索引 lock_idx

最终,对 key 进行同步操作的锁就是 locks_[lock_idx]

1
2
3
4
5
6
7
port::RWMutex* GetMutex(const T& t) {
const uint64_t h = Hash()(t);
const uint32_t bucket_idx = h % nbuckets_;
const uint32_t lock_idx = bucket_idx % nlocks_;

return &locks_[lock_idx];
}

insert

下面来看看,将一个 key 插入到 HashTable 中的过程。

  • 获得 bucket_idxlock_idx
  • 使用 locks_[lock_idx] 对要修改的 buckets_[bucket_idx] 进行保护,再调用bucket级别的insert方法,将key插入到 buckets_[bucket_idx] 中。

整体表现如下。

1
2
3
4
5
6
7
8
9
bool Insert(const T& t) {
const uint64_t h = Hash()(t);
const uint32_t bucket_idx = h % nbuckets_;
const uint32_t lock_idx = bucket_idx % nlocks_;

WriteLock _(&locks_[lock_idx]);
auto& bucket = buckets_[bucket_idx];
return Insert(&bucket, t);
}

其余方法,和 Insert 类似。