剖析REDIS的 hiredis

hiredis是为客户端redis-cli设计的。在sentinel中,也是使用的是hiredis中的 redisAsyncContext 来创建连接、建立通信,而不是普通的client对象。因此在介绍sentinel的工作原理之前,先介绍hiredis设计。

redisAsyncContext

redisAsyncContext 结构体作用类似于client结构体,用于实现客户端与服务器、以及服务器之间的数据通信。因此,也需要scoketfd、与sockfd相关的发起连接、读写回调函数等。在REDIS中,redisAsyncContext用于两个地方:

  • sentinel与主服务器、从服务器以及其他sentinels通信
  • 客户端与服务器通信,比如实现redis-cli

因此,redisAsyncContext的基本设计和client类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
typedef struct redisAsyncContext {
redisContext c; // 同步环境下的

int err; // c->err
char *errstr; // c->errstr
void *data; // 用于 sentinel 中时, 保存 instanceLink*
struct {
void *data; // 用于 sentinel 中时,保存 redisAeEvents*
// 读写回调函数
void (*addRead)(void *privdata);
void (*delRead)(void *privdata);
void (*addWrite)(void *privdata);
void (*delWrite)(void *privdata);
void (*cleanup)(void *privdata);
void (*scheduleTimer)(void *privdata, struct timeval tv);
} ev;

redisDisconnectCallback* onDisconnect; // 断开连接的回调函数
redisConnectCallback *onConnect; // 连接上的回调函数
redisCallbackList replies; // 常规指令的回调函数

// 用于connect()地址
struct sockaddr *saddr;
size_t addrlen;

// 订阅回调函数
struct {
redisCallbackList invalid; // 无效指令的回调函数
struct dict *channels; // 订阅通道: channel --> 回复
struct dict *patterns; // 订阅模式: pattern --> 回复
} sub;
} redisAsyncContext;

redisContext

redisAsyncContext是异步环境下,而redisContext可以简单的看作是同步环境下的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 为客户端创建环境
typedef struct redisContext {
const redisContextFuncs* funcs; // 读写回调函数

int err; // 错误,err ==0 时即无错误
char errstr[128]; // 错误的字符串表示

redisFD fd; // socketfd
int flags;
char * obuf; // 发送缓冲区
redisReader* reader; // 读取缓冲区

enum redisConnectionType connection_type; // 连接类型
struct timeval *timeout; // 超时时间

// tcp连接
struct {
char *host;
char *source_addr;
int port;
} tcp;

struct {
char *path;
} unix_sock;

// 连接地址
struct sockadr *saddr;
size_t addrlen;

void *privdata; // 私有数据
} redisContext;

funcs

funcs不仅封装了适用于redisContext的读写回调函数,也封装了用于异步对象redisAsyncContext的回调函数。redisContextFuncs 封装如下:

1
2
3
4
5
6
7
typedef struct redisContextFuncs {
void (*free_privdata)(void *); // 释放 privdata
void (*async_read)(struct redisAsyncContext *); // 异步读取
void (*async_write)(struct redisAsyncContext *); // 异步发送
int (*read)(struct redisContext *, char *, size_t); // 同步读取
int (*write)(struct redisContext *); // 同步发送
} redisContextFuncs;

hiredis.c文件中,redisContextFuncs结构体有一个默认的对象redisContextDefaultFuncs,初始化如下:

1
2
3
4
5
6
7
static redisContextFuncs redisContextDefaultFuncs = {
.free_privdata = NULL,
.async_read = redisAsyncRead,
.async_write = redisAsyncWrite,
.read = redisNetRead,
.write = redisNetWrite
};

redisAsyncConnectBind

下面开始讲解,创建一个struct redisAsyncContext对象到建立连接的过程。

每个TCP连接属性都有一些可选参数,这个由struct redisOptions设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct {
int type; // 连接类型:REDIS_CONN_TCP 、REDIS_CONN_UNIX
int options; // TCP选项: REDIS_OPT_xxx
const struct timeval *timeout; // 超时时间
union {
// 用于 TCP 连接
struct {
const char *source_addr; // 绑定的本地地址
const char *ip; // 对端ip
int port; // 对端port
} tcp;
// 用于 unix-domain
const char *unix_socket;
// hireds 用于操作一个已打开的fd
redisFD fd;
} endpoint;
} redisOptions;

对于options中TCP相关参数的设置,由REDIS_OPTIONS_SET_TCP宏实现:

1
2
3
4
#define REDIS_OPTIONS_SET_TCP(opts, ip_, port_) \
(opts)->type = REDIS_CONN_TCP; \ // 设置TCP连接
(opts)->endpoint.tcp.ip = ip_; \ // 对端 ip
(opts)->endpoint.tcp.port = port_; // 对端 port

redisAsyncConnectBind函数,如下:

1
2
3
4
5
6
7
redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, const char* source_addr) {
redisOptions options = {0};
REDIS_OPTIONS_SET_TCP(&options, ip, port);
options.endpoint.tcp.source_addr = source_addr;
// 非阻塞连接并绑定到 source_addr
return redisAsyncConnectWithOptions(&options);
}

redisAsyncConnectWithOptions

redisAsyncConnectWithOptions函数步骤如下:

  • options参数将TCP连接设置为非阻塞连接optionsredisAsyncConnect函数中设置。
  • 创建同步环境 c
  • 创建异步环境 ac
  • c->err的设置到ac->err,方便可以直接访问

redisAsyncConnectWithOptions函数获得ac,默认是非阻塞IO。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
redisAsyncContext* redisAsyncConnectWithOptions(const redisOptions *options) {
redisOptions myOptions = *options;
redisContext *c;
redisAsyncContext *ac;
// 1) 设置为非阻塞连接
myOptions.options |= REDIS_OPT_NONBLOCK;

// 2) 将c->e->fd设置为非阻塞IO,并客户端对象C
c = redisConnectWithOptions(&myOptions);
if (c == NULL) {
return NULL;
}
// 3) 基于同步客户端c,初始化异步对象 ac
ac = redisAsyncInitialize(c);
if (ac == NULL) {
redisFree(c);
return NULL;
}
// 4) 将 c->err 复制到 ac->err
__redisAsyncCopyError(ac);
return ac;
}

redisConnectWithOptions

设置TCP连接选择,比如常见的是否阻塞、是否地址复用,在REDIS多了个密码认证。然后就是调用redisContextConnectBindTcp函数来连接并绑定本地地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
redisContext* redisConnectWithOptions(const redisOptions *options) {
redisContext* c = redisContextInit(options);
if (c == NULL) return NULL;
// 设置了 REDIS_OPT_NONBLOCK,则为非阻塞IO
if (!(options->options & REDIS_OPT_NONBLOCK)) {
c->flags |= REDIS_BLOCK;
}
// 地址复用
if (options->options & REDIS_OPT_REUSEADDR) {
c->flags |= REDIS_REUSEADDR;
}
// 认证
if (options->options & REDIS_OPT_NOAUTOFREE) {
c->flags |= REDIS_NO_AUTO_FREE;
}
// tcp连接
if (options->type == REDIS_CONN_TCP) {
redisContextConnectBindTcp(c,
options->endpoint.tcp.ip,
options->endpoint.tcp.port,
options->timeout,
options->endpoint.tcp.source_addr);
}
else if (options->type == REDIS_CONN_UNIX) {
redisContextConnectUnix(c,
options->endpoint.unix_socket,
options->timeout);
}
else if (options->type == REDIS_CONN_USERFD) {
c->fd = options->endpoint.fd;
c->flags |= REDIS_CONNECTED;
}
else {
// Unknown type - FIXME - FREE
return NULL;
}

// 仅对阻塞连接才有效
/// 设置超时时间
if (options->timeout != NULL &&
(c->flags & REDIS_BLOCK) &&
c->fd != REDIS_INVALID_FD)
{
redisContextSetTimeout(c, *options->timeout);
}

return c;
}

redisAsyncInitialize

redisAsyncInitialize函数,基于创建的同步对象c来初始化一个 redisAsyncContext 对象ac

由于设置了非阻塞连接options.options |= REDIS_OPT_NONBLOCK;,因此要监测connect是否成功连接上对端,需要等待第一次可写事件触发。因此,在此redisAsyncInitialize函数中需要去除REDIS_CONNECTED 标志位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static redisAsyncContext* redisAsyncInitialize(redisContext *c) {
// c 是 redisAsyncContext对象的第一个字段,且是栈对象
// 因此可以调用 realloc 是为了继续在c后面创建数据
redisAsyncContext *ac = realloc(c, sizeof(redisAsyncContext));
if (ac == NULL) return NULL;

c = &(ac->c);

/* 同步connect函数返回,则成功连接,可以直接设置 REDIS_CONNECT 标志,
* 异步模式下,connect函数返回并不一定就成功连接,
* 直到第一个写事件触发后才算是成功连接,那个时候再设置 REDIS_CONNECT 标志,
* 因此在这里重置它
*/
c->flags &= ~REDIS_CONNECTED;

// 其他字段初始化需要根据应用场景
// 应用于 sentinel ,则在 sentinel.c/redisAeAttach 函数中初始化
ac->err = 0;
ac->errstr = NULL;
ac->data = NULL;
ac->ev.data = NULL;

//...
return ac;
}

redisAsyncSetConnectCallback

sentinel.c中,调用了redisAsyncConnectBind函数来连接对端后,就会调用redisAsyncSetConnectCallback函数,主要有两个作用:

  1. 设置连接回调函数fn。在sentinel.c中将建立连接的回调函数fn设置的是sentinelLinkEstablishedCallback函数,用来检测连接结果是否正确

    1
    2
    3
    void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
    if (status != C_OK) instanceLinkConnectionError(c);
    }
  2. 重要:前面说过了,在异步环境下redisAsyncConnectBind函数执行完毕,此时不知道是否建立成功,只有等第一次可写事件触发才能知晓,因此需要注册可写事件来监听可写事件的触发。

可能有人好奇,为啥这里是注册可写事件,而不是可读事件???

因为,刚建立连接的时候,c->fd的发送缓冲区是空的,会立即触发可写事件;但是刚建立连接时可不一定就有数据可读取。因此,要为c->fd注册可写事件。至于具体的可写事件触发流程以及验证是否成功建立连接,见下文分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
if (ac->onConnect == NULL) {
ac->onConnect = fn;

/* The common way to detect an established connection is to wait for
* the first write event to be fired. This assumes the related event
* library functions are already set. */
// 检测连接建立,最常用的方法是为其注册可写事件,第一个可写事件触发时,则连接建立
_EL_ADD_WRITE(ac);
return REDIS_OK;
}
return REDIS_ERR;
}

redisAeAttach

redisAsyncContext*用于sentinel中,对于ac其余未初始化字段在sentinel.c/redisAeAttach函数中初始化。

redisAeAttach函数,将异步客户端ac添加到loop循环中,使ac能像client一样处理各项事情。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
redisContext* c = &(ac->c);
redisAeEvents *e;

if (ac->ev.data != NULL) return C_ERR;

/* Create container for context and r/w events */
e = (redisAeEvents*)zmalloc(sizeof(*e));
e->context = ac;
e->loop = loop;
e->fd = c->fd; // 与对端通信的 sockfd !!!
e->reading = e->writing = 0;

// 设置各类回调函数
ac->ev.addRead = redisAeAddRead;
ac->ev.delRead = redisAeDelRead;
ac->ev.addWrite = redisAeAddWrite;
ac->ev.delWrite = redisAeDelWrite;
ac->ev.cleanup = redisAeCleanup;
ac->ev.data = e;

return C_OK;
}

完整地创建redisAsyncContext客户端,及其运行在sentinel.c/sentinelReconnectInstance函数实现,这个待下一章讲解sentinel再讲解。

_EL_ADD_WRITE(ac)

下面从 _EL_ADD_WRITE 宏开始讲解可写事件的触发及其处理流程。

_EL_ADD_WRITE 宏展开如下:

1
2
3
4
5
#define _EL_ADD_WRITE(ctx)                                          \
do { \
refreshTimeout(ctx); \
if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \
} while (0)

sentinel中,addWrite指向的函数是redisAeAddWrite

1
ac->ev.addWrite = redisAeAddWrite;

redisAeAddWrite函数中,为e->fd注册可写事件,设置可写事件回调函数redisAeWriteEvent

1
2
3
4
5
6
7
8
static void redisAeAddWrite(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (!e->writing) {
e->writing = 1;
aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
}
}

e->fd发送缓冲区有空,则触发可写事件并调用 redisAeWriteEvent函数来发送数据,内部是调用 redisAsyncHandleWrite 函数完成的。

1
2
3
4
5
6
7
8
static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
((void)el);
((void)fd);
((void)mask);

redisAeEvents *e = (redisAeEvents*)privdata;
redisAsyncHandleWrite(e->context);
}

redisAsyncHandleWrite

当触发可写事件时,如果是第一次调用redisAsyncHandleWrite函数不一定是本端有数据待发送,可能是因为连接连接了,整个函数流程如下:

  • 先判断当前是否建立连接。在前文说过,在redisAsyncConnectBind函数调用过后,可能并没有直接和对端建立连接,在第一次调用redisAsyncHandleWrite函数时需要通过__redisAsyncHandleConnect函数对结果进行判断。
  • 成功建立连接后,再调用c->funcs->async_writee->fd发送数据

其中,c->funcs->async_write指向的是redisAsyncWrite函数。整个流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
void redisAsyncHandleWrite(redisAsyncContext *ac) {
redisContext *c = &(ac->c);

// 如果没有建立连接,则先建立
if (!(c->flags & REDIS_CONNECTED)) {
if (__redisAsyncHandleConnect(ac) != REDIS_OK) return;
if (!(c->flags & REDIS_CONNECTED)) return;
}

// 发送数据
c->funcs->async_write(ac);
}

__redisAsyncHandleConnect

__redisAsyncHandleConnect函数中,需要通过redisCheckConnectDone函数对此次可写事件的状态进行判断,以确认是是否真的成功建立了连接。

方法是,此时直接调用connect函数向对端发起连接:

  • 如果connect函数返回0,无论之前是否成功连接,此时调用connect函数成功建立连接;
  • 如果connect函数返回-1,需要根据错误标志errno来判断是否成功连接。
    • 如果 errno == EISCONN,表示之前已经建立了连接,使得此次调用connect函数返回-1。因此,redisCheckConnectDone函数仍返回 REDIS_OK
    • errno是其他值,都表示没有建立连接。此时,返回REEDIS_ERR,忽略此次可写事件。

因此在redisCheckConnectDone函数中完成了异步连接的确认:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int redisCheckConnectDone(redisContext *c, int *completed) {
int rc = connect(c->fd, (const struct sockaddr *)c->saddr, c->addrlen);
if (rc == 0) {
*completed = 1;
return REDIS_OK;
}
switch (errno) {
case EISCONN:
*completed = 1;
return REDIS_OK;
case EALREADY:
case EINPROGRESS:
case EWOULDBLOCK:
*completed = 0;
return REDIS_OK;
default:
return REDIS_ERR;
}
}

redisCheckConnectDone函数,无论返回REDIS_ERR还是REDIS_OK,都会调用之前设置的连接回调函数ac->onConnect,即sentinelLinkEstablishedCallback函数,不过只在无法建立成功连接时才有效:

1
2
3
void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
if (status != C_OK) instanceLinkConnectionError(c);
}

因此,当成功建立连接后,只是会加上 c->flags |= REDIS_CONNECTED 标志位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
int completed = 0;
redisContext *c = &(ac->c);
if (redisCheckConnectDone(c, &completed) == REDIS_ERR) {
redisCheckSocketError(c); // 设置 c->err
if (ac->onConnect) // 连接回调函数
ac->onConnect(ac, REDIS_ERR);
__redisAsyncDisconnect(ac); // 断开连接
return REDIS_ERR;
}
// 成功建立连接
if (completed == 1) {
if (ac->onConnect)
ac->onConnect(ac, REDIS_OK);
// 加上标志位:REDIS_CONNECTED
c->flags |= REDIS_CONNECTED;
}
return REDIS_OK;
}

redisAsyncWrite

redisAsyncWrite函数,即c->funcs->async_write,异步发送数据。

  • 会尝试调用redisBufferWrite函数直接发送数据
  • 如果c->obuf中的数据全部发送完毕,则取消可写事件。
    _EL_DEL_WRITE 宏展开如下:
    1
    2
    3
    4
    5
    6
    #define _EL_DEL_WRITE(ctx) do {                                     \
    if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \
    } while(0)

    // 指向的函数
    ac->ev.delWrite = redisAeDelWrite;
    通过redisAeDelWrite调用函数,取消e->fd上的可写事件
    1
    2
    3
    4
    5
    6
    7
    8
    static void redisAeDelWrite(void *privdata) {
    redisAeEvents *e = (redisAeEvents*)privdata;
    aeEventLoop *loop = e->loop;
    if (e->writing) {
    e->writing = 0;
    aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
    }
    }
  • 如果c->obuf中的数据没有发送完毕,此时_EL_ADD_WRITE(ac)宏无效,因为已经注册了可写事件。
  • 每次都会注册一次可读事件(如果已经注册则忽略,下一部分专门讲解)

整个流程大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void redisAsyncWrite(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
int done = 0;

// 直接发送数据
if (redisBufferWrite(c,&done) == REDIS_ERR) {
__redisAsyncDisconnect(ac);
} else {
// 没有写完,继续注册可写事件
if (!done)
_EL_ADD_WRITE(ac);
else
// 写完,则删除可写事件
_EL_DEL_WRITE(ac);

/* Always schedule reads after writes */
// 若ac为注册可读事件,则注册
_EL_ADD_READ(ac);
}
}

redisBufferWrite

redisBufferWrite函数,内部调用c->funcs->write指向的函数将发送缓冲区c->obuf中的数据发送至网络。redisBufferWrite函数,返回REDIS_OK,表示成功发送。进一步,如果done==1则表示c->obuf清空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int redisBufferWrite(redisContext *c, int *done) {
if (c->err) return REDIS_ERR;

if (sdslen(c->obuf) > 0) {
int nwritten = c->funcs->write(c);
if (nwritten < 0) return REDIS_ERR;

if (nwritten > 0) {
if (nwritten == (signed)sdslen(c->obuf)) {
sdsfree(c->obuf);
c->obuf = sdsempty();
}
else {
sdsrange(c->obuf,nwritten,-1);
}
}
}

// 完全发送完毕,则done置1
if (done != NULL) *done = (sdslen(c->obuf) == 0);
return REDIS_OK;
}

redisNetWrite

redisNetWrite函数,即c->funcs->write, 内部调用send函数将c->obuf中数据发送出去。send函数返回值nwritten有几种可能:

  1. nwritten < 0 时,以下两种情况不是错误:

    • errno == EWOULDBLOCK:在非阻塞IO模式下, errno == EWOULDBLOCK 只是表示当前e->fd的发送缓冲区是满的,无法将c->obuf中的数据复制到e->fd的发送缓冲区。
    • errno == EINTR:表示当前发送数据操作被系统中断了。

    因此,对这两种情况,只需要直接return,等待下一次的可写事件触发即可。 问题:此时return也是 nwritten < 0,那么会造成这个连接被关闭???

  2. nwritten >= 0:正常情况

整个流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int redisNetWrite(redisContext *c) {
int nwritten = send(c->fd, c->obuf, sdslen(c->obuf), 0);
if (nwritten < 0) {
// 非阻塞IO下的错误
if ((errno == EWOULDBLOCK && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
// 难到不缺少一个 return 0;
}
else {
__redisSetError(c, REDIS_ERR_IO, NULL);
return -1;
}
}
return nwritten;
}

redisvAsyncCommand

上面的过程是将c->obuf的数据发送出去,那么c->obuf的数据是从何而来?

redisAsyncCommand*系列函数用于将待发送数据添加到c->obuf中:主要有以下几个,作用都是相同的:

  • ap/.../argv/cmd中的请求数据添加到ac->c->obuf
  • 设置这些请求的回调函数fn。当接收到对端对这些请求时执行fn,因此这些回调函数fn是在可读事件中执行的。

因此下面只选择其中的redisvAsyncCommand作为讲解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int redisvAsyncCommand(redisAsyncContext *ac, 
redisCallbackFn *fn,
void *privdata,
const char *format,
va_list ap);
int redisAsyncCommand(redisAsyncContext *ac,
redisCallbackFn *fn,
void *privdata,
const char *format,
...);
int redisAsyncCommandArgv(redisAsyncContext *ac,
redisCallbackFn *fn,
void *privdata,
int argc,
const char** argv,
const size_t *argvlen);
int redisAsyncFormattedCommand(redisAsyncContext *ac,
redisCallbackFn *fn,
void *privdata,
const char *cmd,
size_t len);

redisvAsyncCommand函数,先是调用redisvFormatCommand函数对不定参数ap按照format提供的格式进行格式化,结果存储在cmd,再调用__redisAsyncCommand函数将cmd序列化到c->obuf中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int redisvAsyncCommand(redisAsyncContext *ac, 
redisCallbackFn *fn,
void *privdata,
const char *format,
va_list ap)
{
char *cmd;
int len;
int status;
// 1 按照format格式解析ap参数,输出至cmd,参数个数返回到len
len = redisvFormatCommand(&cmd,format,ap);

/* We don't want to pass -1 or -2 to future functions as a length. */
if (len < 0)
return REDIS_ERR;

// 2 将 fn 添加到任务队列,并将 cmd 添加到 c->obuf 中
status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
free(cmd);
return status;
}

在介绍__redisAsyncCommand 函数之前,先具体介绍下hiredis中回调函数的相关实现。

struct redisAsyncContext中有四个回调函数任务队列:订阅指令subscribe channlepsubscribe pattern有专门的任务队列,当(p)subscribe指令的键不正确时,会存储在ac->sub.invalid中。其他的指令回调函数都存储在 ac->replies中:

1
2
3
4
5
6
7
8
// 常规指令的回调函数
redisCallbackList replies;
// 订阅指令的回调函数
struct {
redisCallbackList invalid; // 指令名不正确时回调函数
struct dict *channels; // channel_name ---> callback
struct dict *patterns; // pattern_name ---> callback
} sub;

而每个回调函数节点都是 redisCallback对象,由redisCallback对象形成的回调函数链表、任务队列。而这些任务列表,都是从尾部插入,头部取出,即先响应的请求先执行。

1
2
3
4
5
6
7
8
9
10
11
typedef struct redisCallback {
struct redisCallback* next; // 单链表
redisCallbackFn* fn; // 回调函数
int pending_subs; // 对应的通道/模式订阅次数
void *privdata;
} redisCallback;

typedef struct redisCallbackList {
redisCallback *head; // 链表头部
redisCallback *tail; // 链表尾部
} redisCallbackList;

__redisPushCallback

__redisPushCallback函数, 在任务列表list的尾部添加新的节点source

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static int __redisPushCallback(redisCallbackList* list, redisCallback *source) {
redisCallback *cb;

/* Copy callback from stack to heap */
cb = malloc(sizeof(*cb));
if (cb == NULL)
return REDIS_ERR_OOM;

if (source != NULL) {
memcpy(cb,source,sizeof(*cb));
cb->next = NULL; // 单独一个节点
}

// 将这个节点 cb 插入在在 list 的尾部
if (list->head == NULL)
list->head = cb;
if (list->tail != NULL)
list->tail->next = cb;
list->tail = cb;
return REDIS_OK;
}

__redisShiftCallback

__redisShiftCallback函数,从从任务列表list头部取出任务节点至target

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
redisCallback* cb = list->head;
if (cb != NULL) {
list->head = cb->next;
if (cb == list->tail)
list->tail = NULL;

if (target != NULL)
memcpy(target,cb,sizeof(*cb));
free(cb);
return REDIS_OK;
}
return REDIS_ERR;
}

__redisRunCallback

__redisRunCallback函数,执行回调函数cb->fnREDIS_IN_CALLBACK标志位表示正在执行中。

1
2
3
4
5
6
7
8
static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
redisContext *c = &(ac->c);
if (cb->fn != NULL) {
c->flags |= REDIS_IN_CALLBACK; // 在执行中
cb->fn(ac,reply,cb->privdata);
c->flags &= ~REDIS_IN_CALLBACK; // 执行完毕
}
}

__redisAsyncCommand

__redisAsyncCommand函数要完成两个任务:

  1. 根据指令cmd类型,将fn回调函数设置到ac相应的队列中。

    指令字符串 cmd 格式依然满足之前在剖析REDIS的输出缓冲区章中所述的字符串格式。 hiredis通过nextArgument函数解析cmd指令,来根据指令类型为回调函数fn选择任务队列。

  2. 将指令字符串cmd写入到c->obuf中,待发送

整个过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
static int __redisAsyncCommand(redisAsyncContext *ac,
redisCallbackFn *fn,
void *privdata,
const char *cmd,
size_t len)
{
redisContext *c = &(ac->c);
redisCallback cb;
struct dict *cbdict;
dictEntry *de;
redisCallback *existcb;
int pvariant, hasnext;
const char *cstr, *astr;
size_t clen, alen;
const char *p;
sds sname;
int ret;

// 如果连接即将关闭,则并不再接受新的指令
if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;

// 设置回调函数节点
cb.fn = fn;
cb.privdata = privdata;
cb.pending_subs = 1;

// 解析指令字符串:
// cstr 存储是指令名字,clen 是指令长度,
// p 指向指令参数的起始位置
p = nextArgument(cmd,&cstr,&clen);
assert(p != NULL);
hasnext = (p[0] == '$');
pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
cstr += pvariant;
clen -= pvariant;

/*************** 1 将回调函数 fn 加入到相应的任务队列 ************************/

// 1.1) 发出去的指令是:subscribe channel/pattern
if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
c->flags |= REDIS_SUBSCRIBED;

// 获取订阅的 pattern/channel 参数
while ((p = nextArgument(p,&astr,&alen)) != NULL) {
sname = sdsnewlen(astr,alen); // pattern_name / channel_name
// pvariant =1,订阅模式
// pvariant =0,订阅通道
if (pvariant)
cbdict = ac->sub.patterns;
else
cbdict = ac->sub.channels;

de = dictFind(cbdict, sname); // name --> 该通道的回调函数

// 如果订阅的通道/模式已经存在
if (de != NULL) {
existcb = dictGetEntryVal(de);
cb.pending_subs = existcb->pending_subs + 1;
}
// 将 sname 的回调函数重新设置为 cb
ret = dictReplace(cbdict,sname,&cb);

if (ret == 0) sdsfree(sname);
}
}
// 1.2) 发出去指令是 : unsubscribe
else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
// 只有之前执行过 subscribed, 现在执行 unsubscribe 才有用
if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;

// UNSUBSCRIBE 指令不设置回调函数
}
// 1.3) 发出去的指令是 : monitor
else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
c->flags |= REDIS_MONITORING; // 设置标志位
__redisPushCallback(&ac->replies,&cb); // 加入到 ac->replies 队列
}
// 1.4) 其他指令
else {
// 如果已经处于被订阅,在取消订阅之前无法指令其他指令
if (c->flags & REDIS_SUBSCRIBED)
__redisPushCallback(&ac->sub.invalid,&cb); // 因为加入无效队列
else
// 否则就是常规的请求,加入到 ac->replies
__redisPushCallback(&ac->replies,&cb);
}

/********* 2 将格式化指令字符串 cmd 写入 c->obuf **********/

// 再将 cmd 加入到 c 的发送缓冲区
__redisAppendCommand(c,cmd,len);

// 注册可写事件,并设置回调函数
// 此时注册可写事件,只要发送缓冲区不是满的,则立马触发
_EL_ADD_WRITE(ac);

return REDIS_OK;
}

发送数据的流程,介绍完毕。

_EL_ADD_READ(ac)

下面从 _EL_ADD_READ 宏讲解可读事件的处理流程。

_EL_ADD_READ宏展开如下:

1
2
3
4
5
#define _EL_ADD_READ(ctx)                                         \
do { \
refreshTimeout(ctx); \
if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \
} while (0)

sentineladdRead 指向的函数是 redisAeAddRead

1
ac->ev.addRead = redisAeAddRead;

redisAeAddRead 函数,实现的是为 e->fd注册可读事件,而可读事件的回调函数是 redisAeReadEvent

1
2
3
4
5
6
7
8
static void redisAeAddRead(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (!e->reading) {
e->reading = 1;
aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
}
}

当可读事件触发,读取回调函数redisAeReadEvent,内部调用 redisAsyncHandleRead函数处理可读事件。

1
2
3
4
5
6
7
8
static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
((void)el);
((void)fd);
((void)mask);

redisAeEvents *e = (redisAeEvents*)privdata;
redisAsyncHandleRead(e->context);
}

redisAsyncHandleRead函数:

  • 判断有没有建立客户端ac是否已经连接,没有则调用__redisAsyncHandleConnect函数发起连接请求
  • 如果已经建立连接或者上面成功建立连接,则调用异步读取函数c->funcs->async_read读取网络中的数据。

其中c->funcs->async_read指向的是redisAsyncRead函数。整个流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void redisAsyncHandleRead(redisAsyncContext *ac) {
redisContext *c = &(ac->c);

// 如果没有建立连接,则先建立
if (!(c->flags & REDIS_CONNECTED)) {
/* Abort connect was not successful. */
if (__redisAsyncHandleConnect(ac) != REDIS_OK)
return;
/* Try again later when the context is still not connected. */
if (!(c->flags & REDIS_CONNECTED))
return;
}

// 异步读
c->funcs->async_read(ac);
}

redisAsyncRead

redisAsyncRead函数, 会先调用redisBufferRead函数尝试直接读取数据,读取成功后:

  • 如果此时没有注册可读事件,则注册可读事件
    1
    _EL_ADD_READ(ac); 
  • 对读取到的数据进行处理
    1
    redisProcessCallbacks(ac)

整个过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
void redisAsyncRead(redisAsyncContext *ac) {
redisContext *c = &(ac->c);

if (redisBufferRead(c) == REDIS_ERR) {
__redisAsyncDisconnect(ac);
} else {
/* Always re-schedule reads */
// 如果没有注册可读事件,则注册
_EL_ADD_READ(ac);
// 读取到数据,则执行相应的回调函数
redisProcessCallbacks(ac);
}
}

redisBufferRead

redisBufferRead 函数分为两步:

  • 先将数据从网络中读取到buf。这里使用的 c->funcs->read同步读函数,即redisNetRead函数,
  • 如果c->funcs->read返回值大于0,则成功从网络中读取到数据,那么将读取到的f数据写入到接受缓冲区 c->reader->buf,这部分由redisReaderFeed函数实现。

因此,redisBufferRead函数是直接从当前网络中读取数据,如果有数据 or 对端关闭则返回REDIS_OK,否则返回REDIS_ERR

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int redisBufferRead(redisContext *c) {
char buf[1024*16];
int nread;
// 有错误则直接返回
if (c->err) return REDIS_ERR;
// 1)将数据从网络中读取到 buf
// 内部还是调用 c->funcs->read,
nread = c->funcs->read(c, buf, sizeof(buf)); // 同步读取
if (nread > 0) {
// 2) 将buf中数据写入到 c->reader 中
if (redisReaderFeed(c->reader, buf, nread) != REDIS_OK) {
// 将 1)或 2)中发生的错误记录到 c-err 中
__redisSetError(c, c->reader->err, c->reader->errstr);
return REDIS_ERR;
}
} else if (nread < 0) {
return REDIS_ERR;
}
return REDIS_OK;
}

redisNetRead

redisNetRead函数,即 c->funcs->read,实现从网络读取数据。根据recv函数的返回值nread对结果进行判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int redisNetRead(redisContext *c, char *buf, size_t bufcap) {
int nread = recv(c->fd, buf, bufcap, 0);
if (nread == -1) {
// 非阻塞IO模式下的,errno == EWOULDBLOCK 表示当前没有可读数据
// errno == EINTR 表示读取操作被中断
// 以上不是错误,稍后再试
if ((errno == EWOULDBLOCK && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
return 0;
}
// 阻塞模式下,读取数据超时是错误
if(errno == ETIMEDOUT && (c->flags & REDIS_BLOCK)) {
__redisSetError(c, REDIS_ERR_TIMEOUT, "recv timeout");
return -1;
}
// 其他错误标志,
__redisSetError(c, REDIS_ERR_IO, NULL);
return -1;
}
// 对端关闭
if (nread == 0) {
__redisSetError(c, REDIS_ERR_EOF, "Server closed the connection");
return -1;
}

// 你read > 0:真正读取到数据
return nread;
}

redisReaderFeed

redisReaderFeed函数,用于将从网络中读取到数据buf写入到客户端的接受缓冲区c->reader中。redisReader结构如下,本次读取数据即将接受到的数据存储到c->reader->buf中,但是对c->reader->buf最大未使用空间有限制,不得超过c->reader->maxbuf

1
2
3
4
5
6
7
8
9
10
11
12
13
typedef struct redisReader {
int err;
char errstr[128];

// 存储接受到的数据
char* buf; /* Read buffer 读取缓冲区 */
size_t pos; /* Buffer cursor 当前指针位置 */
size_t len; /* Buffer length 读取缓冲区长度*/
size_t maxbuf; /* Max length of unused buffer 最大未使用长度*/

// 下面字段用于解析接收到的数据
//...
} redisReader;

因此完整的操作如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int redisReaderFeed(redisReader *r, const char *buf, size_t len) {
sds newbuf;

// 如果前面从网络中读取数据发生错误,直接返回
if (r->err) return REDIS_ERR;

if (buf != NULL && len >= 1) {
// 如果 r->buf 是空的,并且 r->buf 的容量大于 r->maxbuf
// 则释放 r->buf
if (r->len == 0 && r->maxbuf != 0 && sdsavail(r->buf) > r->maxbuf) {
sdsfree(r->buf);
r->buf = sdsempty();
r->pos = 0;

assert(r->buf != NULL);
}

// 将 buf 数据添加到 r->buf 后
newbuf = sdscatlen(r->buf,buf,len);
if (newbuf == NULL) {
__redisReaderSetErrorOOM(r);
return REDIS_ERR;
}

// 更新参数
r->buf = newbuf;
r->len = sdslen(r->buf);
}
return REDIS_OK;
}

再将视线拉回到redisAsyncRead函数:如果 redisBufferRead(c)函数将对端请求数据存储至c->reader->buf没有发生错误,那么就要对c->reader->buf中的数据进行处理,而处理过程由redisProcessCallbacks函数完成。

1
2
3
4
5
6
7
if (redisBufferRead(c) == REDIS_ERR) {
__redisAsyncDisconnect(ac);
}
else {
_EL_ADD_READ(ac);
redisProcessCallbacks(ac);
}

然而,c->reader->buf中的数据就类似于readQueryFromClient函数读取的c->querybuf数据,这次是原始的数据,readQueryFromClient读取到c->quetybuf后,还需要经过processInputBuffer解析后才能正常使用。在这里,也是也从此。在被 redisProcessCallbacks 使用前会先经过 redisGetReply函数处理。

因此,数据从对端传递到本端输入缓冲区ac->c->reader->buf中后,下面要做的:

  • 解析c->reader->buf中的数据,并将解析结果保存至reply。这一部分由redisGetReply 函数实现。
  • 根据reply,执行相应的回调函数。这一部分由 redisProcessCallbacks 实现,即相当于 processCommand函数。而每个单独的xxxCallback就相当于之前的xxxCommand

下面依次介绍解析数据到执行回调函数。

redisGetReply

redisGetReply函数,会先调用redisGetReplyFromReader函数尝试直接从c->reader->buf中获取reply,如果读取到了则返回。如果没有读取到并且c与对端是阻塞连接,再经过两个步骤:

  • 先将发送缓冲区c->obuf中的数据全部发送至对端
  • 再阻塞等待数据对端发送数据,当对端发送数据了,redisBufferRead函数将对端发送的数据存储到输入缓冲区c->reader中,再调用 redisGetReplyFromReader函数从c->reader解析。

因此,在非阻塞IO模式下,返回REDIS_OK也不一定会读取到数据,而在阻塞IO模式下,返回REDIS_OK即读取到数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int redisGetReply(redisContext *c, void **reply) {
int wdone = 0;
void *aux = NULL;
// 尝试直接读取
if (redisGetReplyFromReader(c, &aux) == REDIS_ERR)
return REDIS_ERR;

// 对于阻塞类型,
// 1)就将 c->obuf 中的数据全部发送出去
// 2)一直阻塞等待回应
if (aux == NULL && c->flags & REDIS_BLOCK) {
// 一直向c中写数据,直到将 c->obuf 发送完毕
do {
if (redisBufferWrite(c,&wdone) == REDIS_ERR)
return REDIS_ERR;
} while (!wdone);

// 阻塞等待读取数据
do {
if (redisBufferRead(c) == REDIS_ERR)
return REDIS_ERR;
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR;
} while (aux == NULL);
}

if (reply != NULL) *reply = aux;
return REDIS_OK;
}

redisGetReplyFromReader

redisGetReplyFromReader函数,在内部是调用 redisReaderGetReply函数完成功能。如果发生错误则将在c->reader中的错误复制至c->err

1
2
3
4
5
6
7
int redisGetReplyFromReader(redisContext *c, void **reply) {
if (redisReaderGetReply(c->reader,reply) == REDIS_ERR) {
__redisSetError(c,c->reader->err,c->reader->errstr);
return REDIS_ERR;
}
return REDIS_OK;
}

在讲解redisReaderGetReply解析过程之前。先熟悉下输入缓冲区redisReaderredisReader有两个作用:

  • 输入缓冲区:r->buf 接受对端的数据
  • 存储回应:r->reply中存储着对r->buf的解析结果

上面已经完成了第一步,下面就是要进行第二步骤:对r->buf中的数据进行解析后存储至r->reply。对端每种请求都会相应的回调函数,r->reply后续会用于各种请求的回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct redisReader {
//...
//上面字段用于接受对端数据

// 解析接收到的数据
redisReadTask rstack[9]; // 用于递归解析
int ridx; // 递归层次
void* reply;

redisReplyObjectFunctions* fn; // 创建REDIS对象
void *privdata;
} redisReader;
  1. fn

之前在剖析REDIS的输出缓冲区一章中,讲过REDIS传输协议有不同的类型。在此为创建不同的对象都是由 fn 函数指针实现:

1
2
3
4
5
6
7
8
9
typedef struct redisReplyObjectFunctions {
void *(*createString)(const redisReadTask*, char*, size_t); // 创建 string 类型对象
void *(*createArray)(const redisReadTask*, size_t); // 创建 Array, Map, Set 类型对象
void *(*createInteger)(const redisReadTask*, long long); // 创建整数类型对象
void *(*createDouble)(const redisReadTask*, double, char*, size_t); // 创建 double 类型对象
void *(*createNil)(const redisReadTask*); // 创建 NULL 对象
void *(*createBool)(const redisReadTask*, int); // 创建 bool 类型对象
void (*freeObject)(void*); // 是释放对象内存
} redisReplyObjectFunctions;

对应的函数指针是defaultFunctions,如下。

1
2
3
4
5
6
7
8
9
static redisReplyObjectFunctions defaultFunctions = {
createStringObject,
createArrayObject,
createIntegerObject,
createDoubleObject,
createNilObject,
createBoolObject,
freeReplyObject
};
  1. rstack[9]

由于某些数据类型可能是Map、Set、Array等复合类型,因此可能发生嵌套,比如Array<Array<Map>>>等,因此为了解析复合类型依靠 rstack[9]来追踪深入递归的路径(这是不是很像基于栈实现的函数调用),这里对嵌套层次有规定,最多只能嵌套9层。

那么问题来了,怎么返回到上一层,就像函数如何返回到主函数?下面来看看 redisReadTask 结构体:

1
2
3
4
5
6
7
8
9
typedef struct redisReadTask {
int type; // 回复类型
// 针对符合类型
int elements; /* 元素个数 */
int idx; /* 如果复合类型,idx 表示当前元素的index*/
void *obj; /* 实际类型是 redisReply* ,即父节点的reply */
struct redisReadTask *parent; /* 指向上一层,即父节点 */
void *privdata;
} redisReadTask;

假设 noderedisReadTask* 对象,那么每个node都包含了一个指针parent 指向上一层,即node->parent可以用于返回上一层;node->idx字段指示着node该位于复合类型的哪个位置。而当前节点从c->reader解析所得数据存储在 node->obj,其实际类型是redisReply*;解析所得数据类型是node->tyoe,如果是复合类型则node->elements指示着元素个数。

因此,可总结如下:

  • redisReadTask[9]中每个redisReadTask都可以视为一个单链表,通过parent*串联各个嵌套层之间关系,而每一层的数据都由obj*存储。
  • ridx 变量,表征着当前嵌套的层数,可以在O(1)时间定位当前的层数。

其中,type可能值如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#define REDIS_REPLY_STRING  1
#define REDIS_REPLY_ARRAY 2
#define REDIS_REPLY_INTEGER 3
#define REDIS_REPLY_NIL 4
#define REDIS_REPLY_STATUS 5
#define REDIS_REPLY_ERROR 6
#define REDIS_REPLY_DOUBLE 7
#define REDIS_REPLY_BOOL 8
#define REDIS_REPLY_MAP 9
#define REDIS_REPLY_SET 10
#define REDIS_REPLY_ATTR 11
#define REDIS_REPLY_PUSH 12
#define REDIS_REPLY_BIGNUM 13
#define REDIS_REPLY_VERB 14
  1. reply

reply的实际类型是redisReply结构体,用于保存回复的类型以及数据。其中当type==REDIS_REPLY_DOUBLE时,使用dvalstr两个方式来保存数据,当使用str字段时,也要使用len来表示str的长度。

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct redisReply {
int type; /* REDIS_REPLY_* */
// 下面是针对不同 type 的值
long long integer; /* type == REDIS_REPLY_INTEGER 时的整数值*/
double dval; /* type == REDIS_REPLY_DOUBLE 时的 double 值 */
size_t len; /* 字符串的长度*/
char *str; /* type == REDIS_REPLY_ERROR | REDIS_REPLY_STRING | REDIS_REPLY_DOUBLE */
char vtype[4]; /* type == REDIS_REPLY_VERB, 最后一个字节 vtype[3]=='\0' */

size_t elements; /* type == REDIS_REPLY_ARRAY,元素个数 */
struct redisReply** element; /* type == REDIS_REPLY_ARRAY,数据实体 */
} redisReply;

createArrayObject

下面以创建Array、Map、Set类型的createArrayObject函数来讲解是如何深入递归的。

  • 先调用createReplyObject函数创建一个回复redisReply对象,此时只具有类型数据r->type=type

    1
    2
    3
    4
    5
    6
    7
    8
    9
    static redisReply *createReplyObject(int type) {
    redisReply *r = calloc(1, sizeof(*r));

    if (r == NULL)
    return NULL;

    r->type = type;
    return r;
    }
  • elements个元素分配内存,每个元素都是都是使用redisReply对象存储。

  • 将当前回复节点r放入到复合类型指定的索引位置

因此,createArrayObject函数只是完成了:1)创建elementsredisReply节点,提供了一个容器;3)将r和所在的复合类型建立关系,即放入指定的位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static void *createArrayObject(const redisReadTask* task, size_t elements) {
redisReply *r, *parent;
// 1) 创建reply节点
r = createReplyObject(task->type);
if (r == NULL)
return NULL;
// 2) 设置元素个数
if (elements > 0) {
r->element = calloc(elements,sizeof(redisReply*));
if (r->element == NULL) {
freeReplyObject(r);
return NULL;
}
}

r->elements = elements;

// 3) 判断是不是复合类型
// 如果 task->parent != NULL
// 则说明 r 是某个复合类型(Map,Set,Array)的一个节点
if (task->parent) {
parent = task->parent->obj; // 父节点的回应
// 父节点肯定某个复合类型
assert(parent->type == REDIS_REPLY_ARRAY ||
parent->type == REDIS_REPLY_MAP ||
parent->type == REDIS_REPLY_SET);
// 将自己(r)加入到父节点当前的索引位置
parent->element[task->idx] = r;
}
// 返回此次回应
return r;
}

createIntegerObject

createArrayObject函数调用结束并返回r,但是此时r 中还没有数据 ,需要等STINGINTEGERDOUBLE等类型创建好,发现自己是属于某个复合类型r则将自己放置到r中。

以整数为例,会调用createIntegerObject 函数创建整数对象,并将值value存储在 r->integer。发现自己属于某个复合类型,task->parent !=NULL,则会自己放置到相应的位置。

最后,会将这个reply返回给调用者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static void *createIntegerObject(const redisReadTask *task, long long value) {
redisReply *r, *parent;

r = createReplyObject(REDIS_REPLY_INTEGER);
if (r == NULL)
return NULL;

r->integer = value;

if (task->parent) {
parent = task->parent->obj;
assert(parent->type == REDIS_REPLY_ARRAY ||
parent->type == REDIS_REPLY_MAP ||
parent->type == REDIS_REPLY_SET);
parent->element[task->idx] = r;
}

return r;
}

processItem

processItem函数, 解析c->reader数据,主要是两步:

  • 读取一个字节的类型数据,获取当前数据类型cur->type
  • 根据cur->type,调用相应的函数来解析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static int processItem(redisReader *r) {
redisReadTask* cur = &(r->rstack[r->ridx]); // 当前读取任务
char *p;

// 检测是否需要读取类型
if (cur->type < 0) {
// 读取一个字节的类型数据
if ((p = readBytes(r,1)) == NULL) return REDIS_ERR;

// 不同类型的回复,其前缀不同
switch (p[0]) {
case '-': cur->type = REDIS_REPLY_ERROR; break;
case '+': cur->type = REDIS_REPLY_STATUS;break;
case ':': cur->type = REDIS_REPLY_INTEGER;break;
case ',': cur->type = REDIS_REPLY_DOUBLE; break;
case '_': cur->type = REDIS_REPLY_NIL; break;
case '$': cur->type = REDIS_REPLY_STRING; break;
case '*': cur->type = REDIS_REPLY_ARRAY; break;
case '%': cur->type = REDIS_REPLY_MAP; break;
case '~': cur->type = REDIS_REPLY_SET; break;
case '#': cur->type = REDIS_REPLY_BOOL; break;
case '=': cur->type = REDIS_REPLY_VERB; break;
// 默认协议错误
default: __redisReaderSetErrorProtocolByte(r,*p); return REDIS_ERR;
}
}

// 不同前缀,调用不同函数对c->reader进行解析
switch(cur->type) {
case REDIS_REPLY_ERROR :
case REDIS_REPLY_STATUS :
case REDIS_REPLY_INTEGER:
case REDIS_REPLY_DOUBLE :
case REDIS_REPLY_NIL :
case REDIS_REPLY_BOOL : return processLineItem(r);
case REDIS_REPLY_STRING :
case REDIS_REPLY_VERB : return processBulkItem(r);
case REDIS_REPLY_ARRAY :
case REDIS_REPLY_MAP :
case REDIS_REPLY_SET : return processAggregateItem(r);
default: assert(NULL); return REDIS_ERR; /* Avoid warning. */
}
}

processLineItem

processLineItem函数,解析非字符串的简单类型。下面是以整数为例,其他流程基本一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static int processLineItem(redisReader *r) {
redisReadTask *cur = &(r->rstack[r->ridx]);
void *obj;
char *p;
int len;

if ((p = readLine(r,&len)) != NULL) {
// 整数类型
if (cur->type == REDIS_REPLY_INTEGER) {
// 判断是不是有对应的函数
if (r->fn && r->fn->createInteger) {
long long v;
if (string2ll(p, len, &v) == REDIS_ERR) {
__redisReaderSetError(r,
REDIS_ERR_PROTOCOL,
"Bad integer value");
return REDIS_ERR;
}
// 创建对象
obj = r->fn->createInteger(cur,v);
}
else {
obj = (void*)REDIS_REPLY_INTEGER;
}
}
else if (cur->type == REDIS_REPLY_DOUBLE) { /*****/ }
else if (cur->type == REDIS_REPLY_NIL) { /*****/ }
else if (cur->type == REDIS_REPLY_BOOL) { /*****/ }
else { /* Type will be error or status. */ }

if (obj == NULL) {
__redisReaderSetErrorOOM(r); // error: out of memory
return REDIS_ERR;
}

// r->ridx ==0,则说明处于最上层,
if (r->ridx == 0) r->reply = obj;
moveToNextTask(r); // !!! 重要
return REDIS_OK;
}

return REDIS_ERR;
}

moveToNextTask

moveToNextTask 函数,控制着解析c->reader的流程。processLineItem函数将c->reader数据解析后,将解析结果保存在c->reader->reply。如果reply->type是非复合类型,解析就到此结束。但是如果是个复合类型,就需要继续解析:

  • 如果当前嵌套层还没解析完,则向当前嵌套层的下一个位置
  • 如果当前嵌套层解析完,则需要返回上一层

具体地说,对于下面的嵌套层:

1
std::vector<std::vector<std::vector<....>>>  var // 最多只能有8层嵌套

moveToNextTask的作用是指定下次解析所得元素的存储位置,对于某层var[ridx][idx]元素:

  • idx == var[ridx].size()-1时,则var[ridx]已经完全填满了,下一个元素将填充到var[ridx+1]中的位置。

如果 ridx+1 == var[ridx-1].size(),那么就继续返回。如果var所有位置全部填了,就直接返回此次解析结果var。否则,var某个位置没有数据,则返回至该位置var[i][j],下次元素将复制至此。

  • idx < var[ridx].size()-1时,下次解析所得的参数复制到var[ridx][idx++]的位置,而不至于覆盖var[ridx][idx]的元素

无论当前正在解析是个复合类型,还是简单类型。整个流程成功解析完毕后, r->ridx == -1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static void moveToNextTask(redisReader *r) {
redisReadTask *cur, *prv;
while (r->ridx >= 0) {
// 解析完毕,则直接返回
if (r->ridx == 0) {
r->ridx--;
return;
}

cur = &(r->rstack[r->ridx]); // 当前嵌套层
prv = &(r->rstack[r->ridx-1]); // 上一个嵌套层

assert(prv->type == REDIS_REPLY_ARRAY ||
prv->type == REDIS_REPLY_MAP ||
prv->type == REDIS_REPLY_SET);

if (cur->idx == prv->elements-1) {
// 说明当前嵌套层的元素个数,已经把上一层容器的坑位都填满了
// 就好比定义了 : std::vector<int> vec(3);
// 现在vec已经有3个元素了,那么就需要返回到上一层了,
// 即 r->ridx--;
r->ridx--;

// 注意:
// 这里可能会逐层递归,即如果每一层的元素都满了,那么会在此逐层递归
// 一直到返回最上层,即 r->ridx==0,那么整个复合类型的解析完毕
// 在上面的 if(r->ridx==0) 处返回
} else {
// 否则就移动到当前层的下一个坑位

assert(cur->idx < prv->elements);
// 先将下一个坑位状态初始化,
cur->type = -1; // 在读取下一个reply时,先读取reply类型
cur->elements = -1;
cur->idx++; // 移动到下一个坑位
return;
}
}
}

processAggregateItem

processAggregateItem函数,解析复合类型 array/map/set。当解析到此时,剩下的数据格式是:

1
2
3
4
n\r\n
len_1\r\narg_1\r\n
...
len_n\r\narg_n\r\n

因为 processItem 函数已经将前缀 * 读取了,那么此时剩下的就是上面的数据。第一行n即这个复合类型的元素个数,下面的n行是具体参数。这也解释了上面每次调用processLineItem函数后都需要调用moveToNextTask函数,因为下一行的解析的参数arg_idx的位置需要由moveToNextTask来确定。

processAggregateItem函数的执行流程如下:

  • 在读取数据之前,先判断当前递归层数r->ridxr->ridx不能大于8
  • 解析元素个数elements,判断elemengts是否合理
  • elements >0则需要创建容器。并且由于是复合类型,需要为递归进行准备

整个流程如只有在这个复合数据类型解析完毕后,才会返回REDSI_OK,否则返回的是REDIS_ERR

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
static int processAggregateItem(redisReader *r) {
redisReadTask *cur = &(r->rstack[r->ridx]);
void *obj;
char *p;
long long elements;
int root = 0, len;

// 递归到第9层,报错
if (r->ridx == 8) {
__redisReaderSetError(r,
REDIS_ERR_PROTOCOL,
"No support for nested multi bulk replies with depth > 7");
return REDIS_ERR;
}

/*** 下面要调用 createArrayObject 函数来创建容器 ****/

if ((p = readLine(r,&len)) != NULL) {
// 1) 先读取array/map/set中的元素个数 elements
if (string2ll(p, len, &elements) == REDIS_ERR) {
__redisReaderSetError(r, REDIS_ERR_PROTOCOL, "Bad multi-bulk length");
return REDIS_ERR;
}

// 用于递归解析:当前是不是最顶层
root = (r->ridx == 0);

// 元素个数越界
if (elements < -1 || (LLONG_MAX > SIZE_MAX && elements > SIZE_MAX)) {
__redisReaderSetError(r,REDIS_ERR_PROTOCOL,"Multi-bulk length out of range");
return REDIS_ERR;
}

// 如果元素,则设置 NULL
if (elements == -1) {
if (r->fn && r->fn->createNil)
obj = r->fn->createNil(cur);
else
obj = (void*)REDIS_REPLY_NIL;

// 内存不足
if (obj == NULL) {
__redisReaderSetErrorOOM(r);
return REDIS_ERR;
}
// 移动到下一个任务
moveToNextTask(r);
}
else {
// 多个元素
if (cur->type == REDIS_REPLY_MAP) elements *= 2;

// 2) 创建容器
if (r->fn && r->fn->createArray)
obj = r->fn->createArray(cur,elements);
else
obj = (void*)(long)cur->type;

// 内存不足
if (obj == NULL) {
__redisReaderSetErrorOOM(r);
return REDIS_ERR;
}

// 3) 为递归准备
/* Modify task stack when there are more than 0 elements. */
if (elements > 0) {
cur->elements = elements; // 当前层的元素个数
cur->obj = obj; // 当前层的值:此时仅是个容器,里面还没有值
r->ridx++; // 递归,进入下一层:来填充容器obj

// 初始化下一层
r->rstack[r->ridx].type = -1; // 强迫:每次调用 processItem 前都先读取reply类型
r->rstack[r->ridx].elements = -1;
r->rstack[r->ridx].idx = 0;
r->rstack[r->ridx].obj = NULL;
r->rstack[r->ridx].parent = cur; // parent 指针指向上一层
r->rstack[r->ridx].privdata = r->privdata;
}
else {
moveToNextTask(r);
}
}

/* Set reply if this is the root object. */
// 如果 root ==1,则说明 r 已全部解析完毕
// 将结构存储在 r->reply
if (root) r->reply = obj;
return REDIS_OK;
}

return REDIS_ERR;
}

redisReaderGetReply

上面讲解了解析r->buf并将结果存储至r->reply的过程:

  1. 如果 redisBufferRead 函数将对端数据存储至r->buf发生错误,无法继续执行
  2. 如果r->len==0,说明没有接收到数据、无法解析,则直接返回。
  3. r->rstack[0]进行初始化
  4. 调用processItem函数,对r->buf进行解析
  5. 对解析结果进行判断及处理,如果解析正确,则将解析结果设置到传入的参数 reply

整个流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
int redisReaderGetReply(redisReader *r, void **reply) {
/* Default target pointer to NULL. */
if (reply != NULL) *reply = NULL;

if (r->err) return REDIS_ERR;

/* When the buffer is empty, there will never be a reply. */
if (r->len == 0)
return REDIS_OK;

/* Set first item to process when the stack is empty. */
// 初始化
if (r->ridx == -1) {
r->rstack[0].type = -1; // 强迫 processItem() 函数中先读取类型数据
r->rstack[0].elements = -1;
r->rstack[0].idx = -1;
r->rstack[0].obj = NULL;
r->rstack[0].parent = NULL;
r->rstack[0].privdata = r->privdata;
r->ridx = 0;
}

// 此次reply读取成功时, processItem 返回 REDIS_OK
// processItem(r) 成功解析,返回 REDIS_OK,
// 并且 r->ridx==-1
while (r->ridx >= 0)
if (processItem(r) != REDIS_OK)
break;
// 发生错误
if (r->err) return REDIS_ERR;

/* 如果 processItem() 函数中对 r->buf 解析的字节数超过 1K,
* 则丢弃已解析数据,将 r->pos 设置到未解析的起始数据位置
*/
if (r->pos >= 1024) {
sdsrange(r->buf,r->pos,-1);
r->pos = 0;
r->len = sdslen(r->buf);
}

// 成功解析完毕,assert(r->ridx == -1);
if (r->ridx == -1) {
// 将 reply 传递给输入参数
if (reply != NULL) {
*reply = r->reply;
}
else if (r->reply != NULL && r->fn && r->fn->freeObject) {
// 如果 reply ==NULL,即不需要解析结果 r->reply
// 则释放 r-reply
r->fn->freeObject(r->reply);
}
r->reply = NULL;
}

return REDIS_OK;
}

redisProcessCallbacks

再将视线拉回到redisAsyncRead函数,下面可以调用redisProcessCallbacks函数处理对端请求了。

  1. 调用redisGetReply函数,对ac->c->reader->buf进行解析获得reply
  2. 如果reply ==NULL,有几种可能
    • ac即将要关闭,则关闭连接再return
    • 处于监视模式
    • 确实没有数据,则等待下次调用redisProcessCallbacks
  3. ac->replies中取出回调函数节点至cb并执行

任务队列ac->repliesac->pubsubs/channels都是先来先服务,eventloop线程又是个单线程,因此不会发生错乱。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb = {NULL, NULL, 0, NULL};
void *reply = NULL;
int status;

// 逐次从输入缓冲区中读取reply
while((status = redisGetReply(c,&reply)) == REDIS_OK) {
/******* 1 没有回复 ************/

// 可能是reply读取完毕,则直接break
if (reply == NULL) {
// 如果客户端正在与服务器处理断开连接的状态,
// 那么就直接开始断开
if (c->flags & REDIS_DISCONNECTING &&
sdslen(c->obuf) == 0 &&
ac->replies.head == NULL)
{
__redisAsyncDisconnect(ac);
return;
}

// 监视模式,则将 cb 放入队列
if(c->flags & REDIS_MONITORING) {
__redisPushCallback(&ac->replies, &cb);
}

// 如果没有获取到reply,则等待下一次
break;
}

/******* 2 取出任务节点 ************/

// 2.1) 先尝试从待执行的任务列表ac-replies,取出任务至 cb
if (__redisShiftCallback(&ac->replies, &cb) != REDIS_OK) {
// 解析错误
if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
c->err = REDIS_ERR_OTHER;
snprintf(c->errstr, sizeof(c->errstr), "%s",((redisReply*)reply)->str);
c->reader->fn->freeObject(reply); // 释放此次回应
__redisAsyncDisconnect(ac); // 断开连接
return;
}

// 2.2) 从 ac->replies 没有取出任务,可能是在订阅任务队列中
assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
// 从 订阅任务列表中取出任务至 cb
if(c->flags & REDIS_SUBSCRIBED)
__redisGetSubscribeCallback(ac, reply, &cb);
}

/******* 3 执行回调函数并free(reply) ************/
if (cb.fn != NULL) {
__redisRunCallback(ac,&cb,reply);
c->reader->fn->freeObject(reply);

// 由于回调函数 cb 已经执行,那么之前待释放的ac
// 此刻可以安全地free'd
// 更多信息,可以参考后面的断开连接介绍
if (c->flags & REDIS_FREEING) {
__redisAsyncFree(ac);
return;
}
}
else {
// 没有回调函数,则直接忽略此次reply
c->reader->fn->freeObject(reply);
}
} // while

// 发生读取错误则断开连接
if (status != REDIS_OK)
__redisAsyncDisconnect(ac);
}

到此,整个可读事件的处理过程完结。下面再介绍下上面提到但是没有使用的断开连接操作。

__redisAsyncDisconnect

详见代码注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void __redisAsyncDisconnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
// 将 c->err 复制到 ac->err,便于直接使用
__redisAsyncCopyError(ac);

if (ac->err == 0) {
// 如果这个断开连接操作,不是错误导致,而是主动清除连接
// 得确保没有回调函数节点了
int ret = __redisShiftCallback(&ac->replies, NULL);
assert(ret == REDIS_ERR);
} else {
// 如果调用此函数是由于发生错误导致的,
// 加上 REDIS_DISCONNECTING 标志
// 是为了不再接受对端的新指令
c->flags |= REDIS_DISCONNECTING;
}

// 清理所有的可读可写事件
_EL_CLEANUP(ac);

// 释放
if (!(c->flags & REDIS_NO_AUTO_FREE)) {
__redisAsyncFree(ac);
}
}

__redisAsyncFree

在完成redisFree(ac)之前,需要执行完ac的四个任务队列中的回调函数节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
static void __redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb;
dictIterator *it;
dictEntry *de;

// 将回调函数中的 reply参数传入 NULL
// 再执行 ac->replies 中所有的回调函数
while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
__redisRunCallback(ac,&cb,NULL);

// 再执行所有无效指令 &ac->sub.invalid 中所有的回调函数
while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
__redisRunCallback(ac,&cb,NULL);

// 以 reply=NULL 的方式
// 执行订阅的所有回调函数 ac->sub.channels
it = dictGetIterator(ac->sub.channels);
while ((de = dictNext(it)) != NULL)
__redisRunCallback(ac, dictGetEntryVal(de), NULL);
dictReleaseIterator(it);
dictRelease(ac->sub.channels);

// 以 reply=NULL 的方式
// 执行订阅模式的所有回调函数 ac->sub.pattern
it = dictGetIterator(ac->sub.patterns);
while ((de = dictNext(it)) != NULL)
__redisRunCallback(ac,dictGetEntryVal(de),NULL);
dictReleaseIterator(it);
dictRelease(ac->sub.patterns);

// 清理所有的可读可写事件
_EL_CLEANUP(ac);

// 执行断开连接回调函数
if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
// 如果 redisAsyncFree() 中设置了 REDIS_FREEING 标志位
// 则 ac->onDisconnect 第二个参数传入 REDIS_OK
if (c->flags & REDIS_FREEING) {
ac->onDisconnect(ac,REDIS_OK);
} else {
ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
}
}

/* Cleanup self */
redisFree(c);
}

sentinelDisconnectCallback

__redisAsyncFree 函数的最后几步,对断开连接回调函数的第二个参数有判断:

1
2
3
4
5
6
7
 if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
if (c->flags & REDIS_FREEING) {
ac->onDisconnect(ac,REDIS_OK);
} else {
ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
}
}

但是在sentinel.c中设置的断开连接回调sentinelDisconnectCallback,并没有使用status,最终都是直接调用instanceLinkConnectionError函数,这和建立连接失败时的行为一致。

1
2
3
4
void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
UNUSED(status);
instanceLinkConnectionError(c);
}

redisAsyncFree

sentinel.c/instanceLinkCloseConnection函数中调用,用于异步关闭连接。

1
2
3
4
5
6
7
8
9
void redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
c->flags |= REDIS_FREEING; // 表示正在freeing
if (!(c->flags & REDIS_IN_CALLBACK)) // 目前正在执行某个回调函数,那么不能此刻free
__redisAsyncFree(ac); // 不然怎么通信,只能等此次通信结束再free'd
// 可以看看 redisProcessCallbacks 函数
// 在执行完callback之后,发现如果有 REDIS_FREEING
// 则立马调用 __redisAsyncFree 函数释放 ac
}