剖析REDIS的多线程
REdis6.0中加入多线程,但这不是One Loop per Thread
模型,而是One Loop + Thread Pool
模型,即增加了一个线程池来处理任务。
在redisServer中,io_threads_num
字段定义了REdis的线程数,
1 | struct redisServer { |
REdis6.0默认还是单线程,可以在配置文件config.c
中修改,REdis6.0的线程数上限是128。
1 | createIntConfig("io-threads", |
线程变量
1 | // in networking.c |
beforeSleep
beforeSleep
函数,是在EventLoop中进入阻塞之前调用(EventLoop
怎么运转可参考EventLoop小节描述)。每次在进入阻塞之前,都会先执行 handleClientsWithPendingReadsUsingThreads
和 handleClientsWithPendingWritesUsingThreads
,使得子线程在阻塞期间也能正常运行。
1 | void beforeSleep(struct aeEventLoop *eventLoop) { |
handleClientsWithPendingReadsUsingThreads
详细分析见InputBuffer
handleClientsWithPendingWritesUsingThreads
在正式进入线程部分之前,先介绍下 handleClientsWithPendingWritesUsingThreads
函数,因为它作为主线程中的生产者,将任务分发到子线程执行。
- 在单线程模式下,
handleClientsWithPendingWritesUsingThreads
函数就是个handleClientsWithPendingWrites
的wrapper。 - 在多线程模式下,有如下6个基本步骤:
- 将
server.clients_pending_write
中待处理的客户端,按照轮询的方式分发到server.io_threads_num
个线程的任务列表io_threads_list[id]
中 - 设置原子变量
io_threads_op
为写操作,并将每个子线程的任务数记录到io_threads_pending[id]
中 - 第2步骤设置完,子线程可以去执行了(如何实现线程间的同步,见后文分析)
- 主线程去执行自己任务列表
io_threads_list[0]
中的任务 - 等待所有的子线程完成写任务
- 如果,还要某个客户端的output buffer中还有数据,则再注册可写事件,并设置写回调函数为
sendReplyToClient
- 将
整个函数流程如下:
1 | // 客户端的可写事件, 在 beforeSleep 中处理 |
在多线程模式下,handleClientsWithPendingWrites/ReadUsingThreads
函数运行和单线程模式下还是有区别:
在单线程中,对于客户端的请求,在
beforeSleep
函数中是先运行handleClientsWithPendingReads
,再处理handleClientsWithPendingWrites
,这对于客户端的简单请求可以直接回复在多线程下,第一次必须先运行
xxx_Write_xxx
,因为先运行xxx_Reads_xxx
会因为第一个if判断条件不满足而直接退出1
if (!io_threads_active || !server.io_threads_do_reads) return 0;
当处理任务较少时,有可能还是使用使用单线程来处理。
在
handleClientsWithPendingWritesUsingThreads
函数中的第二个if
判断分支中,stopThreadedIOIfNeeded
函数判断当前待执行任务server.clients_pending_write
的数量pengding
和线程数server.io_threads_num
之间的关系,若stopThreadedIOIfNeeded
返回1,则继续使用单单线程handleClientsWithPendingWrites
1
2
3if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}- 如果第一次执行
handleClientsWithPendingWritesUsingThreads
,stopThreadedIOIfNeeded
就返回1,子线程不会启动 - 如果非首次执行,
stopThreadedIOIfNeeded
返回1,则会停止所有的子线程,变成单线程工作
- 如果第一次执行
initThreadedIO
创建并初始化 server.io_threads_num-1
个子线程
1 | void initThreadedIO(void) { |
IOThreadMain
子线程函数入口,在 IOThreadMain
里与客户端进行数据发送与接受。执行流程大致如下:
至多循环100w次,等待主线程将任务分配到子线程的任务列表
io_threads_list[id]
中若待处理的任务数
pending
和线程数server.io_threads_num
之间满足pending < (server.io_threads_num*2)
,则停止多线程。检测任务数和多线程之间的关系,是在定时器事件中检测的,1
2
3
4
5
6int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
//....
/* Stop the I/O threads if we don't have enough pending work. */
stopThreadedIOIfNeeded();
// ...
}若有任务可处理,则通过判断原子变量
io_threads_op
状态,来进行相应的读写操作IO_THREADS_OP_WRITE
:发送 OutBuffer 中的数据IO_THREADS_OP_READ
:读取并处理 InputBuffer 数据
io_threads_list[id]
中的任务执行完,主线程中while(1)
才能跳出。
在主线程和子线程之间,是通过原子变量 io_threads_pending[id]
实现同步关系:
- 在主线程中,计算了每个子线程的任务数
io_threads_pending[id]
后,子线程才去执行,然后就会阻塞在whiile(1)中,等待io_threads_pending[id]
都变为0。 - 在子线程中,while(1)循环体需要等待
io_threads_pending[id] !=0
才能向下执行。执行完任务后,清空io_threads_pending[id]
,主线程中while(1)才会跳出。
关键!!! 变量 io_threads_pending
是个原子变量。因此不用mutex
即可实现同步关系,即这个是基于lock-free
的生产-消费模式的多线程。
1 | void* IOThreadMain(void *myid) { |
线程的生命周期
启动顺序
下面介绍三个函数,关于子线程的启动与停止。问题:主线程和子线程谁先启动?
- 虽然主线程是在
initServer
函数中完成初始化,但是启动还是需要等待aeMain
函数被调用 - 子线程是通过
initThreadedIO()
完成创建,并开始执行子线程入口函数IOThreadMain
,这一切都是在server.c
中的InitServerLast()
中完成REdis服务器的主程序中1
2
3
4
5
6void InitServerLast() {
bioInit();
initThreadedIO();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}1
2
3
4
5
6
7
8
9
10int main(int argc, char **argv) {
// ...
initServer(); // 主线程完成初始化
// ...
InitServerLast(); // 子线程完成初始化并开始运行
// ...
aeMain(server.el); // 主程序启动
aeDeleteEventLoop(server.el);
return 0;
}
因此,可以得出的结论是先运行子线程,再运行主线程。那么下面开始分析线程的生命周期。
生命周期
在函数 handleClientsWithPendingWritesUsingThreads
中使用函数startThreadedIO
来启动线程,其关键在于 startThreadedIO
函数中用for
循环来逐个解锁。整个流程如下:
子线程先启动,因此
initThreadedIO
函数中对每个子线程先加上锁1
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
由于此时主线程还没启动,没有任务分发给子线程。这会导致在子线程执行函数
IOThreadMain
会进入下whiie(1)循环中的条件分支,并阻塞在再次加锁位置:1
2
3
4
5if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]); // 阻塞于此
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}当主线程启动时,
handleClientsWithPendingWritesUsingThreads
函数第一次会调用startThreadedIO
函数1
2
3
4
5
6if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
// 不满足上面的if分支,才会启动子线程
if (!io_threads_active) startThreadedIO();在
startThreadedIO
函数的for
循环会对每个子线程依次解锁1
2for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_unlock(&io_threads_mutex[j]);此时,使得子线程执行函数
IOThreadMain
解除阻塞状态 ,能够继续运行 下去,并且往后都不会再进入if (io_threads_pending[id] == 0)
分支。当需要停止子线程时,在子线程停止函数
stopThreadedIO
中又对每个子线程进行了一次加锁操作,结束整个过程。1
2for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);
startThreadedIO
1 | void startThreadedIO(void) { |
stopThreadedIOIfNeeded
判断是否要停止多线程、恢复单线程。条件即: pending < (server.io_threads_num*2 && io_threads_active ==1
。
1 | int stopThreadedIOIfNeeded(void) { |
stopThreadedIO
停止所有的子线程
1 | void stopThreadedIO(void) { |