剖析REDIS的输入缓冲区
本校主要讲述REDIS处理接受客户端指令的过程
- 单线程下,读取并解析客户端读
- 多线程读
client
中与输入缓冲区有关的字段如下:
1 | struct client { |
单线程-读
readQueryFromClient
这个函数是REdis处理可读事件的回调函数,负责读取客户端的请求数据
1 | client *createClient(connection *conn) { |
readQueryFromClient
函数,主要有如下几个步骤:
基于
postponeClientRead
函数,判断是否需要将这个读取操作延迟到子线程中执行判断此次数据是否是上一个指令的后续部分,详细可以参考后面的
processMultibulkBuffer
函数中分析。1
2
3
4
5
6
7
8
9
10if (c->reqtype == PROTO_REQ_MULTIBULK &&
c->multibulklen &&
c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
/* Note that the 'remaining' variable may be zero in some edge case,
* for example once we resume a blocked client after CLIENT PAUSE. */
if (0 < remaining && remaining < readlen) readlen = remaining;
}调用
connRead
函数读取数据- 根据返回值及
conn
状态,判断connRead
结果 - 如果读取成功,且
c->querybuf
中数据超过限制,也要关闭客户端( 应用层的流量控制)
- 根据返回值及
调用
processInputBuffer
函数,解析并执行指令
1 | void readQueryFromClient(connection *conn) { |
postponeClientRead
如果开启了读子线程,postponeClientRead
函数将暂停EventLoop
中的读取操作并添加到任务队列 server.clients_pending_read
中,延迟线程中去执行。
1 | int postponeClientRead(client *c) { |
processInputBuffer
processInputBuffer
函数,根据请求格式调用processInlineBuffer
函数或 processMultibulkBuffer
函数来解析指令和指令参数,最后调用,最后调用 processCommandAndResetClient
函数执行指令。
确定客户端的请求类型
c->reqtype
1
2
3/* Client request types */
根据
c->reqtype
不同,调用不同的函数来解析输出。c->reqtype == PROTO_REQ_INLINE
, 用processInlineBuffer
解析指令和参数,填充c->cmd
、c->argv
和c->argc
c->reqtype == PROTO_REQ_MULTIBULK
,用processMultibulkBuffer
解析指令和参数,填充c->cmd
、c->argv
和c->argc
解析完成后
如果这个客户端被加上标志位
CLIENT_PENDING_READ
, 则此次不处理读取任务,再次加上标志位CLIENT_PENDING_COMMAND
, 跳while
循环,等到函数handleClientsWithPendingReadsUsingThreads
中去执行。f如果没有加上标志位
CLIENT_PENDING_READ
,则直接调用processCommandAndResetClient
函数执行请求的命令。
1 | void processInputBuffer(client *c) { |
processInlineBuffer
processInlineBuffer
函数用于内部交流,不涉及服务器和客户端的通讯。
processMultibulkBuffer
processMultibulkBuffer
函数用于处理RESP协议,接收到的数据格式基本如下:数据与数据之间都是以 \r\n
间隔。整个数据的前缀是 *
, 表示下面有multibulklen
个参数,下面每个参数前都有个以 $
为前缀的字符串,标志着该参数的字节数bulklen
。
1 | *总的个数 // multibulklen |
processMultibulkBuffer
函数,每次最多处理 multibulklen
个参数,刚好是一条指令及其参数。但是由于单次最多读取PROTO_IOBUF_LEN
字节,可能某一条指令的参数很大,无法一次处理就需要等到下次接着处理。整个过程如下:
先解析出
multibulklen
1
2
3if (c->multibulklen == 0) {
// 解析multibulklen的工作
}那什么时候
c->multibulklen!=0
? ,看下面。尝试解析
multibulklen
个参数,第一个是指令名,第二个开始是参数名(位方便表达,都统称参数)解析参数的字节数
bulklen
。如果
bulklen
较大,c->querybuf
剩余字节数不足bulklen
,则如下处理:1
2
3
4
5
6
7
8if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
sdsrange(c->querybuf, c->qb_pos, -1);
c->qb_pos = 0;
/* Hint the sds library about the amount of bytes this string is
* going to contain. */
// 使 c->querybuf 扩容到 ll+2,接受数据
c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2);
}那么就等待下次可读事件触发将剩余数据发送过来。本次解析在下面的判断中直接
break
,processMultibulkBuffer
函数返回C_ERR
.1
2
3
4if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
/* Not enough data (+2 == trailing \r\n) */
break;
}当终于,一个指令的所有参数都接受完整后,使用下面的分支来创建对象
1
2
3
4
5
6
7
8
9
10if (c->qb_pos == 0 &&
c->bulklen >= PROTO_MBULK_BIG_ARG &&
sdslen(c->querybuf) == (size_t)(c->bulklen+2))
{
c->argv[c->argc++] = createObject(OBJ_STRING, c->querybuf);
sdsIncrLen(c->querybuf, -2); /* remove CRLF */
/* Assume that if we saw a fat argument we'll see another one likely... */
c->querybuf = sdsnewlen(SDS_NOINIT, c->bulklen+2);
sdsclear(c->querybuf);
}如果
bulklen
正常,则使用下面的函数来创建参数1
2c->argv[c->argc++] = createStringObject(c->querybuf+c->qb_pos, c->bulklen);
c->qb_pos += c->bulklen+2; // 移动到下一行
重复第3步骤,直至解析完
multibulklen
个参数,即完成的一个指令解析完,返回C_OK
。1
if (c->multibulklen == 0) return C_OK;
整个函数执行流程如下:
1 | int processMultibulkBuffer(client *c) { |
processCommandAndResetClient
processCommandAndResetClient
函数调用processCommand
函数执行客户端请求的命令。commandProcessed
一些校验和重置工作
1 | int processCommandAndResetClient(client *c) { |
多线程-读
handleClientsWithPendingReadsUsingThreads
handleClientsWithPendingReadsUsingThreads
函数是在 beforeSleep
中执行,用于将 server.clients_pending_read
中的读任务分到各个线程去执行,最后由主线程来执行之前加上标志位 CLIENT_PENDING_COMMAND
的读任务。5个基本步骤流程如下:
是否先启动读线程:第一个判断条件
server.io_threads_do_reads
是个bool
变量,也是在conf.c
中配置,默认是0,因此默认下,不使用多线程读取操作。1
2
3
4
5createBoolConfig("io-threads-do-reads",
NULL,
IMMUTABLE_CONFIG,
server.io_threads_do_reads, 0,
NULL, NULL), /* Read + parse from threads? */将任务列表
server.clients_pending_read
中所有的任务分发到各个线程先让子线程去执行任务
主线程去执行任务,并等待子线程完成任务
执行具有标志位
CLIENT_PENDING_COMMAND
的任务
整个代码如下:
1 | int handleClientsWithPendingReadsUsingThreads(void) { |