hiredis是为客户端redis-cli
设计的。在sentinel中,也是使用的是hiredis中的 redisAsyncContext
来创建连接、建立通信,而不是普通的client
对象。因此在介绍sentinel
的工作原理之前,先介绍hiredis设计。
redisAsyncContext redisAsyncContext
结构体作用类似于client
结构体,用于实现客户端与服务器、以及服务器之间的数据通信。因此,也需要scoketfd
、与sockfd
相关的发起连接、读写回调函数等。在REDIS中,redisAsyncContext
用于两个地方:
sentinel
与主服务器、从服务器以及其他sentinels通信
客户端与服务器通信,比如实现redis-cli
因此,redisAsyncContext
的基本设计和client
类似。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 typedef struct redisAsyncContext { redisContext c; int err; char *errstr; void *data; struct { void *data; void (*addRead)(void *privdata); void (*delRead)(void *privdata); void (*addWrite)(void *privdata); void (*delWrite)(void *privdata); void (*cleanup)(void *privdata); void (*scheduleTimer)(void *privdata, struct timeval tv); } ev; redisDisconnectCallback* onDisconnect; redisConnectCallback *onConnect; redisCallbackList replies; struct sockaddr *saddr; size_t addrlen; struct { redisCallbackList invalid; struct dict *channels; struct dict *patterns; } sub; } redisAsyncContext;
redisContext
redisAsyncContext
是异步环境下,而redisContext
可以简单的看作是同步环境下的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 typedef struct redisContext { const redisContextFuncs* funcs; int err; char errstr[128 ]; redisFD fd; int flags; char * obuf; redisReader* reader; enum redisConnectionType connection_type; struct timeval *timeout; struct { char *host; char *source_addr; int port; } tcp; struct { char *path; } unix_sock; struct sockadr *saddr; size_t addrlen; void *privdata; } redisContext;
funcs
funcs
不仅封装了适用于redisContext
的读写回调函数,也封装了用于异步对象redisAsyncContext
的回调函数。redisContextFuncs
封装如下:
1 2 3 4 5 6 7 typedef struct redisContextFuncs { void (*free_privdata)(void *); void (*async_read)(struct redisAsyncContext *); void (*async_write)(struct redisAsyncContext *); int (*read)(struct redisContext *, char *, size_t ); int (*write)(struct redisContext *); } redisContextFuncs;
hiredis.c
文件中,redisContextFuncs
结构体有一个默认的对象redisContextDefaultFuncs
,初始化如下:
1 2 3 4 5 6 7 static redisContextFuncs redisContextDefaultFuncs = { .free_privdata = NULL , .async_read = redisAsyncRead, .async_write = redisAsyncWrite, .read = redisNetRead, .write = redisNetWrite };
redisAsyncConnectBind 下面开始讲解,创建一个struct redisAsyncContext
对象到建立连接的过程。
每个TCP连接属性都有一些可选参数,这个由struct redisOptions
设置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 typedef struct { int type; int options; const struct timeval *timeout ; union { struct { const char *source_addr; const char *ip; int port; } tcp; const char *unix_socket; redisFD fd; } endpoint; } redisOptions;
对于options
中TCP相关参数的设置,由REDIS_OPTIONS_SET_TCP
宏实现:
1 2 3 4 #define REDIS_OPTIONS_SET_TCP(opts, ip_, port_) \ (opts)->type = REDIS_CONN_TCP; \ (opts)->endpoint.tcp.ip = ip_; \ (opts)->endpoint.tcp.port = port_;
redisAsyncConnectBind
函数,如下:
1 2 3 4 5 6 7 redisAsyncContext *redisAsyncConnectBind (const char *ip, int port, const char * source_addr) { redisOptions options = {0 }; REDIS_OPTIONS_SET_TCP (&options, ip, port); options.endpoint.tcp.source_addr = source_addr; return redisAsyncConnectWithOptions (&options); }
redisAsyncConnectWithOptions redisAsyncConnectWithOptions
函数步骤如下:
由options
参数将TCP连接设置为非阻塞连接 ,options
在 redisAsyncConnect
函数中设置。
创建同步环境 c
创建异步环境 ac
将c->err
的设置到ac->err
,方便可以直接访问
由redisAsyncConnectWithOptions
函数获得ac
,默认是非阻塞IO。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 redisAsyncContext* redisAsyncConnectWithOptions (const redisOptions *options) { redisOptions myOptions = *options; redisContext *c; redisAsyncContext *ac; myOptions.options |= REDIS_OPT_NONBLOCK; c = redisConnectWithOptions (&myOptions); if (c == NULL ) { return NULL ; } ac = redisAsyncInitialize (c); if (ac == NULL ) { redisFree (c); return NULL ; } __redisAsyncCopyError(ac); return ac; }
redisConnectWithOptions 设置TCP连接选择,比如常见的是否阻塞、是否地址复用,在REDIS多了个密码认证。然后就是调用redisContextConnectBindTcp
函数来连接并绑定本地地址。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 redisContext* redisConnectWithOptions (const redisOptions *options) { redisContext* c = redisContextInit(options); if (c == NULL ) return NULL ; if (!(options->options & REDIS_OPT_NONBLOCK)) { c->flags |= REDIS_BLOCK; } if (options->options & REDIS_OPT_REUSEADDR) { c->flags |= REDIS_REUSEADDR; } if (options->options & REDIS_OPT_NOAUTOFREE) { c->flags |= REDIS_NO_AUTO_FREE; } if (options->type == REDIS_CONN_TCP) { redisContextConnectBindTcp(c, options->endpoint.tcp.ip, options->endpoint.tcp.port, options->timeout, options->endpoint.tcp.source_addr); } else if (options->type == REDIS_CONN_UNIX) { redisContextConnectUnix(c, options->endpoint.unix_socket, options->timeout); } else if (options->type == REDIS_CONN_USERFD) { c->fd = options->endpoint.fd; c->flags |= REDIS_CONNECTED; } else { return NULL ; } if (options->timeout != NULL && (c->flags & REDIS_BLOCK) && c->fd != REDIS_INVALID_FD) { redisContextSetTimeout(c, *options->timeout); } return c; }
redisAsyncInitialize redisAsyncInitialize
函数,基于创建的同步对象c
来初始化一个 redisAsyncContext
对象ac
。
由于设置了非阻塞连接options.options |= REDIS_OPT_NONBLOCK;
,因此要监测connect
是否成功连接上对端,需要等待第一次可写事件触发。因此,在此redisAsyncInitialize
函数中需要去除REDIS_CONNECTED
标志位。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 static redisAsyncContext* redisAsyncInitialize (redisContext *c) { redisAsyncContext *ac = realloc (c, sizeof (redisAsyncContext)); if (ac == NULL ) return NULL ; c = &(ac->c); c->flags &= ~REDIS_CONNECTED; ac->err = 0 ; ac->errstr = NULL ; ac->data = NULL ; ac->ev.data = NULL ; return ac; }
redisAsyncSetConnectCallback 在sentinel.c
中,调用了redisAsyncConnectBind
函数来连接对端后,就会调用redisAsyncSetConnectCallback
函数,主要有两个作用:
设置连接回调函数fn
。在sentinel.c
中将建立连接的回调函数fn
设置的是sentinelLinkEstablishedCallback
函数,用来检测连接结果是否正确
1 2 3 void sentinelLinkEstablishedCallback (const redisAsyncContext *c, int status) { if (status != C_OK) instanceLinkConnectionError(c); }
重要: 前面说过了,在异步环境下redisAsyncConnectBind
函数执行完毕,此时不知道是否建立成功,只有等第一次可写事件触发才能知晓,因此需要注册可写事件来监听可写事件的触发。
可能有人好奇,为啥这里是注册可写事件,而不是可读事件???
因为,刚建立连接的时候,c->fd
的发送缓冲区是空的,会立即触发可写事件;但是刚建立连接时可不一定就有数据可读取。因此,要为c->fd
注册可写事件。至于具体的可写事件触发流程以及验证是否成功建立连接,见下文分析。
1 2 3 4 5 6 7 8 9 10 11 12 13 int redisAsyncSetConnectCallback (redisAsyncContext *ac, redisConnectCallback *fn) { if (ac->onConnect == NULL ) { ac->onConnect = fn; _EL_ADD_WRITE(ac); return REDIS_OK; } return REDIS_ERR; }
redisAeAttach 当redisAsyncContext*
用于sentinel
中,对于ac
其余未初始化字段在sentinel.c/redisAeAttach
函数中初始化。
redisAeAttach
函数,将异步客户端ac
添加到loop
循环中,使ac
能像client
一样处理各项事情。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 static int redisAeAttach (aeEventLoop *loop, redisAsyncContext *ac) { redisContext* c = &(ac->c); redisAeEvents *e; if (ac->ev.data != NULL ) return C_ERR; e = (redisAeEvents*)zmalloc(sizeof (*e)); e->context = ac; e->loop = loop; e->fd = c->fd; e->reading = e->writing = 0 ; ac->ev.addRead = redisAeAddRead; ac->ev.delRead = redisAeDelRead; ac->ev.addWrite = redisAeAddWrite; ac->ev.delWrite = redisAeDelWrite; ac->ev.cleanup = redisAeCleanup; ac->ev.data = e; return C_OK; }
完整地创建redisAsyncContext
客户端,及其运行在sentinel.c/sentinelReconnectInstance
函数实现,这个待下一章讲解sentinel
再讲解。
_EL_ADD_WRITE(ac) 下面从 _EL_ADD_WRITE
宏开始讲解可写事件的触发及其处理流程。
_EL_ADD_WRITE
宏展开如下:
1 2 3 4 5 #define _EL_ADD_WRITE(ctx) \ do { \ refreshTimeout(ctx); \ if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \ } while (0)
在sentinel
中,addWrite
指向的函数是redisAeAddWrite
1 ac->ev.addWrite = redisAeAddWrite;
在redisAeAddWrite
函数中,为e->fd
注册可写事件,设置可写事件回调函数redisAeWriteEvent
1 2 3 4 5 6 7 8 static void redisAeAddWrite (void *privdata) { redisAeEvents *e = (redisAeEvents*)privdata; aeEventLoop *loop = e->loop; if (!e->writing) { e->writing = 1 ; aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e); } }
当e->fd
发送缓冲区有空,则触发可写事件 并调用 redisAeWriteEvent
函数来发送数据,内部是调用 redisAsyncHandleWrite
函数完成的。
1 2 3 4 5 6 7 8 static void redisAeWriteEvent (aeEventLoop *el, int fd, void *privdata, int mask) { ((void )el); ((void )fd); ((void )mask); redisAeEvents *e = (redisAeEvents*)privdata; redisAsyncHandleWrite(e->context); }
redisAsyncHandleWrite 当触发可写事件时,如果是第一次调用redisAsyncHandleWrite
函数不一定是本端有数据待发送,可能是因为连接连接了,整个函数流程如下:
先判断当前是否建立连接。在前文说过,在redisAsyncConnectBind
函数调用过后,可能并没有直接和对端建立连接,在第一次调用redisAsyncHandleWrite
函数时需要通过__redisAsyncHandleConnect
函数对结果进行判断。
成功建立连接后,再调用c->funcs->async_write
向e->fd
发送数据
其中,c->funcs->async_write
指向的是redisAsyncWrite
函数。整个流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 void redisAsyncHandleWrite (redisAsyncContext *ac) { redisContext *c = &(ac->c); if (!(c->flags & REDIS_CONNECTED)) { if (__redisAsyncHandleConnect(ac) != REDIS_OK) return ; if (!(c->flags & REDIS_CONNECTED)) return ; } c->funcs->async_write(ac); }
__redisAsyncHandleConnect 在__redisAsyncHandleConnect
函数中,需要通过redisCheckConnectDone
函数对此次可写事件的状态进行判断,以确认是是否真的成功建立了连接。
方法是,此时直接调用connect
函数向对端发起连接:
如果connect
函数返回0,无论之前是否成功连接,此时调用connect
函数成功建立连接;
如果connect
函数返回-1,需要根据错误标志errno
来判断是否成功连接。
如果 errno == EISCONN
,表示之前已经建立了连接,使得此次调用connect
函数返回-1。因此,redisCheckConnectDone
函数仍返回 REDIS_OK
。
errno
是其他值,都表示没有建立连接。此时,返回REEDIS_ERR
,忽略此次可写事件。
因此在redisCheckConnectDone
函数中完成了异步连接的确认:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 int redisCheckConnectDone (redisContext *c, int *completed) { int rc = connect(c->fd, (const struct sockaddr *)c->saddr, c->addrlen); if (rc == 0 ) { *completed = 1 ; return REDIS_OK; } switch (errno) { case EISCONN: *completed = 1 ; return REDIS_OK; case EALREADY: case EINPROGRESS: case EWOULDBLOCK: *completed = 0 ; return REDIS_OK; default : return REDIS_ERR; } }
redisCheckConnectDone
函数,无论返回REDIS_ERR
还是REDIS_OK
,都会调用之前设置的连接回调函数ac->onConnect
,即sentinelLinkEstablishedCallback
函数,不过只在无法建立成功连接时才有效:
1 2 3 void sentinelLinkEstablishedCallback (const redisAsyncContext *c, int status) { if (status != C_OK) instanceLinkConnectionError(c); }
因此,当成功建立连接后,只是会加上 c->flags |= REDIS_CONNECTED
标志位。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static int __redisAsyncHandleConnect(redisAsyncContext *ac) { int completed = 0 ; redisContext *c = &(ac->c); if (redisCheckConnectDone(c, &completed) == REDIS_ERR) { redisCheckSocketError(c); if (ac->onConnect) ac->onConnect(ac, REDIS_ERR); __redisAsyncDisconnect(ac); return REDIS_ERR; } if (completed == 1 ) { if (ac->onConnect) ac->onConnect(ac, REDIS_OK); c->flags |= REDIS_CONNECTED; } return REDIS_OK; }
redisAsyncWrite redisAsyncWrite
函数,即c->funcs->async_write
,异步发送数据。
会尝试调用redisBufferWrite
函数直接发送数据
如果c->obuf
中的数据全部发送完毕,则取消可写事件。_EL_DEL_WRITE
宏展开如下:1 2 3 4 5 6 #define _EL_DEL_WRITE(ctx) do { \ if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \ } while(0) ac->ev.delWrite = redisAeDelWrite;
通过redisAeDelWrite
调用函数,取消e->fd
上的可写事件1 2 3 4 5 6 7 8 static void redisAeDelWrite (void *privdata) { redisAeEvents *e = (redisAeEvents*)privdata; aeEventLoop *loop = e->loop; if (e->writing) { e->writing = 0 ; aeDeleteFileEvent(loop,e->fd,AE_WRITABLE); } }
如果c->obuf
中的数据没有发送完毕,此时_EL_ADD_WRITE(ac)
宏无效,因为已经注册了可写事件。
每次都会注册一次可读事件(如果已经注册则忽略,下一部分专门讲解)
整个流程大致如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void redisAsyncWrite (redisAsyncContext *ac) { redisContext *c = &(ac->c); int done = 0 ; if (redisBufferWrite(c,&done) == REDIS_ERR) { __redisAsyncDisconnect(ac); } else { if (!done) _EL_ADD_WRITE(ac); else _EL_DEL_WRITE(ac); _EL_ADD_READ(ac); } }
redisBufferWrite redisBufferWrite
函数,内部调用c->funcs->write
指向的函数将发送缓冲区c->obuf
中的数据发送至网络。redisBufferWrite
函数,返回REDIS_OK
,表示成功发送。进一步,如果done==1
则表示c->obuf
清空。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 int redisBufferWrite (redisContext *c, int *done) { if (c->err) return REDIS_ERR; if (sdslen(c->obuf) > 0 ) { int nwritten = c->funcs->write(c); if (nwritten < 0 ) return REDIS_ERR; if (nwritten > 0 ) { if (nwritten == (signed )sdslen(c->obuf)) { sdsfree(c->obuf); c->obuf = sdsempty(); } else { sdsrange(c->obuf,nwritten,-1 ); } } } if (done != NULL ) *done = (sdslen(c->obuf) == 0 ); return REDIS_OK; }
redisNetWrite redisNetWrite
函数,即c->funcs->write
, 内部调用send
函数将c->obuf
中数据发送出去。send
函数返回值nwritten
有几种可能:
nwritten < 0
时,以下两种情况不是错误:
errno == EWOULDBLOCK
:在非阻塞IO模式下, errno == EWOULDBLOCK
只是表示当前e->fd
的发送缓冲区是满的,无法将c->obuf
中的数据复制到e->fd
的发送缓冲区。
errno == EINTR
:表示当前发送数据操作被系统中断了。
因此,对这两种情况,只需要直接return,等待下一次的可写事件触发即可。 问题:此时return也是 nwritten < 0
,那么会造成这个连接被关闭???
nwritten >= 0
:正常情况
整个流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 int redisNetWrite (redisContext *c) { int nwritten = send(c->fd, c->obuf, sdslen(c->obuf), 0 ); if (nwritten < 0 ) { if ((errno == EWOULDBLOCK && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) { } else { __redisSetError(c, REDIS_ERR_IO, NULL ); return -1 ; } } return nwritten; }
redisvAsyncCommand 上面的过程是将c->obuf
的数据发送出去,那么c->obuf
的数据是从何而来?
redisAsyncCommand*
系列函数用于将待发送数据添加到c->obuf
中:主要有以下几个,作用都是相同的:
将ap/.../argv/cmd
中的请求数据添加到ac->c->obuf
中
设置这些请求的回调函数fn
。当接收到对端对这些请求时执行fn
,因此这些回调函数fn
是在可读事件中执行的。
因此下面只选择其中的redisvAsyncCommand
作为讲解。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 int redisvAsyncCommand (redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) ;int redisAsyncCommand (redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) ;int redisAsyncCommandArgv (redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char ** argv, const size_t *argvlen) ;int redisAsyncFormattedCommand (redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) ;
redisvAsyncCommand
函数,先是调用redisvFormatCommand
函数对不定参数ap
按照format
提供的格式进行格式化,结果存储在cmd
,再调用__redisAsyncCommand
函数将cmd
序列化到c->obuf
中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 int redisvAsyncCommand (redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) { char *cmd; int len; int status; len = redisvFormatCommand(&cmd,format,ap); if (len < 0 ) return REDIS_ERR; status = __redisAsyncCommand(ac,fn,privdata,cmd,len); free (cmd); return status; }
在介绍__redisAsyncCommand
函数之前,先具体介绍下hiredis中回调函数的相关实现。
在struct redisAsyncContext
中有四个回调函数任务队列:订阅指令subscribe channle
、psubscribe pattern
有专门的任务队列,当(p)subscribe
指令的键不正确时,会存储在ac->sub.invalid
中。其他的指令回调函数都存储在 ac->replies
中:
1 2 3 4 5 6 7 8 redisCallbackList replies; struct { redisCallbackList invalid; struct dict *channels ; struct dict *patterns ; } sub;
而每个回调函数节点都是 redisCallback
对象,由redisCallback
对象形成的回调函数链表、任务队列。而这些任务列表,都是从尾部插入,头部取出 ,即先响应的请求先执行。
1 2 3 4 5 6 7 8 9 10 11 typedef struct redisCallback { struct redisCallback * next; redisCallbackFn* fn; int pending_subs; void *privdata; } redisCallback; typedef struct redisCallbackList { redisCallback *head; redisCallback *tail; } redisCallbackList;
__redisPushCallback __redisPushCallback
函数, 在任务列表list
的尾部添加新的节点source
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 static int __redisPushCallback(redisCallbackList* list , redisCallback *source) { redisCallback *cb; cb = malloc (sizeof (*cb)); if (cb == NULL ) return REDIS_ERR_OOM; if (source != NULL ) { memcpy (cb,source,sizeof (*cb)); cb->next = NULL ; } if (list ->head == NULL ) list ->head = cb; if (list ->tail != NULL ) list ->tail->next = cb; list ->tail = cb; return REDIS_OK; }
__redisShiftCallback __redisShiftCallback
函数,从从任务列表list
头部取出任务节点至target
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 static int __redisShiftCallback(redisCallbackList *list , redisCallback *target) { redisCallback* cb = list ->head; if (cb != NULL ) { list ->head = cb->next; if (cb == list ->tail) list ->tail = NULL ; if (target != NULL ) memcpy (target,cb,sizeof (*cb)); free (cb); return REDIS_OK; } return REDIS_ERR; }
__redisRunCallback __redisRunCallback
函数,执行回调函数cb->fn
。REDIS_IN_CALLBACK
标志位表示正在执行中。
1 2 3 4 5 6 7 8 static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) { redisContext *c = &(ac->c); if (cb->fn != NULL ) { c->flags |= REDIS_IN_CALLBACK; cb->fn(ac,reply,cb->privdata); c->flags &= ~REDIS_IN_CALLBACK; } }
__redisAsyncCommand __redisAsyncCommand
函数要完成两个任务:
根据指令cmd
类型,将fn
回调函数设置到ac
相应的队列中。
指令字符串 cmd
格式依然满足之前在剖析REDIS的输出缓冲区 章中所述的字符串格式。 hiredis通过nextArgument
函数解析cmd
指令,来根据指令类型为回调函数fn
选择任务队列。
将指令字符串cmd
写入到c->obuf
中,待发送
整个过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) { redisContext *c = &(ac->c); redisCallback cb; struct dict *cbdict ; dictEntry *de; redisCallback *existcb; int pvariant, hasnext; const char *cstr, *astr; size_t clen, alen; const char *p; sds sname; int ret; if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR; cb.fn = fn; cb.privdata = privdata; cb.pending_subs = 1 ; p = nextArgument(cmd,&cstr,&clen); assert(p != NULL ); hasnext = (p[0 ] == '$' ); pvariant = (tolower (cstr[0 ]) == 'p' ) ? 1 : 0 ; cstr += pvariant; clen -= pvariant; if (hasnext && strncasecmp(cstr,"subscribe\r\n" ,11 ) == 0 ) { c->flags |= REDIS_SUBSCRIBED; while ((p = nextArgument(p,&astr,&alen)) != NULL ) { sname = sdsnewlen(astr,alen); if (pvariant) cbdict = ac->sub.patterns; else cbdict = ac->sub.channels; de = dictFind(cbdict, sname); if (de != NULL ) { existcb = dictGetEntryVal(de); cb.pending_subs = existcb->pending_subs + 1 ; } ret = dictReplace(cbdict,sname,&cb); if (ret == 0 ) sdsfree(sname); } } else if (strncasecmp(cstr,"unsubscribe\r\n" ,13 ) == 0 ) { if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR; } else if (strncasecmp(cstr,"monitor\r\n" ,9 ) == 0 ) { c->flags |= REDIS_MONITORING; __redisPushCallback(&ac->replies,&cb); } else { if (c->flags & REDIS_SUBSCRIBED) __redisPushCallback(&ac->sub.invalid,&cb); else __redisPushCallback(&ac->replies,&cb); } __redisAppendCommand(c,cmd,len); _EL_ADD_WRITE(ac); return REDIS_OK; }
发送数据的流程,介绍完毕。
_EL_ADD_READ(ac) 下面从 _EL_ADD_READ
宏讲解可读事件的处理流程。
_EL_ADD_READ
宏展开如下:
1 2 3 4 5 #define _EL_ADD_READ(ctx) \ do { \ refreshTimeout(ctx); \ if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \ } while (0)
在sentinel
中 addRead
指向的函数是 redisAeAddRead
1 ac->ev.addRead = redisAeAddRead;
而 redisAeAddRead
函数,实现的是为 e->fd
注册可读事件,而可读事件的回调函数是 redisAeReadEvent
。
1 2 3 4 5 6 7 8 static void redisAeAddRead (void *privdata) { redisAeEvents *e = (redisAeEvents*)privdata; aeEventLoop *loop = e->loop; if (!e->reading) { e->reading = 1 ; aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e); } }
当可读事件触发 ,读取回调函数redisAeReadEvent
,内部调用 redisAsyncHandleRead
函数处理可读事件。
1 2 3 4 5 6 7 8 static void redisAeReadEvent (aeEventLoop *el, int fd, void *privdata, int mask) { ((void )el); ((void )fd); ((void )mask); redisAeEvents *e = (redisAeEvents*)privdata; redisAsyncHandleRead(e->context); }
redisAsyncHandleRead
函数:
判断有没有建立客户端ac
是否已经连接,没有则调用__redisAsyncHandleConnect
函数发起连接请求
如果已经建立连接或者上面成功建立连接,则调用异步读取函数c->funcs->async_read
读取网络中的数据。
其中c->funcs->async_read
指向的是redisAsyncRead
函数。整个流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void redisAsyncHandleRead (redisAsyncContext *ac) { redisContext *c = &(ac->c); if (!(c->flags & REDIS_CONNECTED)) { if (__redisAsyncHandleConnect(ac) != REDIS_OK) return ; if (!(c->flags & REDIS_CONNECTED)) return ; } c->funcs->async_read(ac); }
redisAsyncRead redisAsyncRead
函数, 会先调用redisBufferRead
函数尝试直接读取数据,读取成功后:
如果此时没有注册可读事件,则注册可读事件
对读取到的数据进行处理1 redisProcessCallbacks(ac)
整个过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 void redisAsyncRead (redisAsyncContext *ac) { redisContext *c = &(ac->c); if (redisBufferRead(c) == REDIS_ERR) { __redisAsyncDisconnect(ac); } else { _EL_ADD_READ(ac); redisProcessCallbacks(ac); } }
redisBufferRead redisBufferRead
函数分为两步:
先将数据从网络中读取到buf
。这里使用的 c->funcs->read
同步读函数,即redisNetRead
函数,
如果c->funcs->read
返回值大于0,则成功从网络中读取到数据,那么将读取到的f数据写入到接受缓冲区 c->reader->buf
,这部分由redisReaderFeed
函数实现。
因此,redisBufferRead
函数是直接从当前网络中读取数据,如果有数据 or 对端关闭则返回REDIS_OK
,否则返回REDIS_ERR
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 int redisBufferRead (redisContext *c) { char buf[1024 *16 ]; int nread; if (c->err) return REDIS_ERR; nread = c->funcs->read(c, buf, sizeof (buf)); if (nread > 0 ) { if (redisReaderFeed(c->reader, buf, nread) != REDIS_OK) { __redisSetError(c, c->reader->err, c->reader->errstr); return REDIS_ERR; } } else if (nread < 0 ) { return REDIS_ERR; } return REDIS_OK; }
redisNetRead redisNetRead
函数,即 c->funcs->read
,实现从网络读取数据。根据recv
函数的返回值nread
对结果进行判断。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 int redisNetRead (redisContext *c, char *buf, size_t bufcap) { int nread = recv(c->fd, buf, bufcap, 0 ); if (nread == -1 ) { if ((errno == EWOULDBLOCK && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) { return 0 ; } if (errno == ETIMEDOUT && (c->flags & REDIS_BLOCK)) { __redisSetError(c, REDIS_ERR_TIMEOUT, "recv timeout" ); return -1 ; } __redisSetError(c, REDIS_ERR_IO, NULL ); return -1 ; } if (nread == 0 ) { __redisSetError(c, REDIS_ERR_EOF, "Server closed the connection" ); return -1 ; } return nread; }
redisReaderFeed redisReaderFeed
函数,用于将从网络中读取到数据buf
写入到客户端的接受缓冲区c->reader
中。redisReader
结构如下,本次读取数据即将接受到的数据存储到c->reader->buf
中,但是对c->reader->buf
最大未使用空间有限制,不得超过c->reader->maxbuf
。
1 2 3 4 5 6 7 8 9 10 11 12 13 typedef struct redisReader { int err; char errstr[128 ]; char * buf; size_t pos; size_t len; size_t maxbuf; } redisReader;
因此完整的操作如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 int redisReaderFeed (redisReader *r, const char *buf, size_t len) { sds newbuf; if (r->err) return REDIS_ERR; if (buf != NULL && len >= 1 ) { if (r->len == 0 && r->maxbuf != 0 && sdsavail(r->buf) > r->maxbuf) { sdsfree(r->buf); r->buf = sdsempty(); r->pos = 0 ; assert(r->buf != NULL ); } newbuf = sdscatlen(r->buf,buf,len); if (newbuf == NULL ) { __redisReaderSetErrorOOM(r); return REDIS_ERR; } r->buf = newbuf; r->len = sdslen(r->buf); } return REDIS_OK; }
再将视线拉回到redisAsyncRead
函数:如果 redisBufferRead(c)
函数将对端请求数据存储至c->reader->buf
没有发生错误,那么就要对c->reader->buf
中的数据进行处理,而处理过程由redisProcessCallbacks
函数完成。
1 2 3 4 5 6 7 if (redisBufferRead(c) == REDIS_ERR) { __redisAsyncDisconnect(ac); } else { _EL_ADD_READ(ac); redisProcessCallbacks(ac); }
然而,c->reader->buf
中的数据就类似于readQueryFromClient
函数读取的c->querybuf
数据,这次是原始的数据,readQueryFromClient
读取到c->quetybuf
后,还需要经过processInputBuffer
解析后才能正常使用。在这里,也是也从此。在被 redisProcessCallbacks
使用前会先经过 redisGetReply
函数处理。
因此,数据从对端传递到本端输入缓冲区ac->c->reader->buf
中后,下面要做的:
解析c->reader->buf
中的数据,并将解析结果保存至reply
。这一部分由redisGetReply
函数实现。
根据reply
,执行相应的回调函数。这一部分由 redisProcessCallbacks
实现,即相当于 processCommand
函数。而每个单独的xxxCallback
就相当于之前的xxxCommand
。
下面依次介绍解析数据到执行回调函数。
redisGetReply redisGetReply
函数,会先调用redisGetReplyFromReader
函数尝试直接从c->reader->buf
中获取reply
,如果读取到了则返回。如果没有读取到并且c
与对端是阻塞连接,再经过两个步骤:
先将发送缓冲区c->obuf
中的数据全部发送至对端
再阻塞等待数据对端发送数据,当对端发送数据了,redisBufferRead
函数将对端发送的数据存储到输入缓冲区c->reader
中,再调用 redisGetReplyFromReader
函数从c->reader
解析。
因此,在非阻塞IO模式下,返回REDIS_OK
也不一定会读取到数据,而在阻塞IO模式下,返回REDIS_OK
即读取到数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 int redisGetReply (redisContext *c, void **reply) { int wdone = 0 ; void *aux = NULL ; if (redisGetReplyFromReader(c, &aux) == REDIS_ERR) return REDIS_ERR; if (aux == NULL && c->flags & REDIS_BLOCK) { do { if (redisBufferWrite(c,&wdone) == REDIS_ERR) return REDIS_ERR; } while (!wdone); do { if (redisBufferRead(c) == REDIS_ERR) return REDIS_ERR; if (redisGetReplyFromReader(c,&aux) == REDIS_ERR) return REDIS_ERR; } while (aux == NULL ); } if (reply != NULL ) *reply = aux; return REDIS_OK; }
redisGetReplyFromReader redisGetReplyFromReader
函数,在内部是调用 redisReaderGetReply
函数完成功能。如果发生错误则将在c->reader
中的错误复制至c->err
。
1 2 3 4 5 6 7 int redisGetReplyFromReader (redisContext *c, void **reply) { if (redisReaderGetReply(c->reader,reply) == REDIS_ERR) { __redisSetError(c,c->reader->err,c->reader->errstr); return REDIS_ERR; } return REDIS_OK; }
在讲解redisReaderGetReply
解析过程之前。先熟悉下输入缓冲区redisReader
。redisReader
有两个作用:
输入缓冲区:r->buf
接受对端的数据
存储回应:r->reply
中存储着对r->buf
的解析结果
上面已经完成了第一步,下面就是要进行第二步骤:对r->buf
中的数据进行解析后存储至r->reply
。对端每种请求都会相应的回调函数,r->reply
后续会用于各种请求的回调函数。
1 2 3 4 5 6 7 8 9 10 11 12 typedef struct redisReader { redisReadTask rstack[9 ]; int ridx; void * reply; redisReplyObjectFunctions* fn; void *privdata; } redisReader;
fn
之前在剖析REDIS的输出缓冲区 一章中,讲过REDIS传输协议有不同的类型。在此为创建不同的对象都是由 fn
函数指针实现:
1 2 3 4 5 6 7 8 9 typedef struct redisReplyObjectFunctions { void *(*createString)(const redisReadTask*, char *, size_t ); void *(*createArray)(const redisReadTask*, size_t ); void *(*createInteger)(const redisReadTask*, long long ); void *(*createDouble)(const redisReadTask*, double , char *, size_t ); void *(*createNil)(const redisReadTask*); void *(*createBool)(const redisReadTask*, int ); void (*freeObject)(void *); } redisReplyObjectFunctions;
对应的函数指针是defaultFunctions
,如下。
1 2 3 4 5 6 7 8 9 static redisReplyObjectFunctions defaultFunctions = { createStringObject, createArrayObject, createIntegerObject, createDoubleObject, createNilObject, createBoolObject, freeReplyObject };
rstack[9]
由于某些数据类型可能是Map、Set、Array
等复合类型,因此可能发生嵌套,比如Array<Array<Map>>>
等,因此为了解析复合类型依靠 rstack[9]
来追踪深入递归的路径(这是不是很像基于栈实现的函数调用),这里对嵌套层次有规定,最多只能嵌套9层。
那么问题来了,怎么返回到上一层,就像函数如何返回到主函数?下面来看看 redisReadTask
结构体:
1 2 3 4 5 6 7 8 9 typedef struct redisReadTask { int type; int elements; int idx; void *obj; struct redisReadTask *parent ; void *privdata; } redisReadTask;
假设 node
是 redisReadTask*
对象,那么每个node
都包含了一个指针parent
指向上一层,即node->parent
可以用于返回上一层;node->idx
字段指示着node
该位于复合类型的哪个位置。而当前节点从c->reader
解析所得数据存储在 node->obj
,其实际类型是redisReply*
;解析所得数据类型是node->tyoe
,如果是复合类型则node->elements
指示着元素个数。
因此,可总结如下:
redisReadTask[9]
中每个redisReadTask
都可以视为一个单链表,通过parent*
串联各个嵌套层之间关系,而每一层的数据都由obj*
存储。
ridx
变量,表征着当前嵌套的层数,可以在O(1)时间定位当前的层数。
其中,type
可能值如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 #define REDIS_REPLY_STRING 1 #define REDIS_REPLY_ARRAY 2 #define REDIS_REPLY_INTEGER 3 #define REDIS_REPLY_NIL 4 #define REDIS_REPLY_STATUS 5 #define REDIS_REPLY_ERROR 6 #define REDIS_REPLY_DOUBLE 7 #define REDIS_REPLY_BOOL 8 #define REDIS_REPLY_MAP 9 #define REDIS_REPLY_SET 10 #define REDIS_REPLY_ATTR 11 #define REDIS_REPLY_PUSH 12 #define REDIS_REPLY_BIGNUM 13 #define REDIS_REPLY_VERB 14
reply
reply
的实际类型是redisReply
结构体,用于保存回复的类型以及数据。其中当type==REDIS_REPLY_DOUBLE
时,使用dval
和str
两个方式来保存数据,当使用str
字段时,也要使用len
来表示str
的长度。
1 2 3 4 5 6 7 8 9 10 11 12 typedef struct redisReply { int type; long long integer; double dval; size_t len; char *str; char vtype[4 ]; size_t elements; struct redisReply ** element ; } redisReply;
createArrayObject 下面以创建Array、Map、Set
类型的createArrayObject
函数来讲解是如何深入递归的。
因此,createArrayObject
函数只是完成了:1)创建elements
个redisReply
节点,提供了一个容器;3)将r
和所在的复合类型建立关系,即放入指定的位置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 static void *createArrayObject (const redisReadTask* task, size_t elements) { redisReply *r, *parent; r = createReplyObject(task->type); if (r == NULL ) return NULL ; if (elements > 0 ) { r->element = calloc (elements,sizeof (redisReply*)); if (r->element == NULL ) { freeReplyObject(r); return NULL ; } } r->elements = elements; if (task->parent) { parent = task->parent->obj; assert(parent->type == REDIS_REPLY_ARRAY || parent->type == REDIS_REPLY_MAP || parent->type == REDIS_REPLY_SET); parent->element[task->idx] = r; } return r; }
createIntegerObject createArrayObject
函数调用结束并返回r
,但是此时r
中还没有数据 ,需要等STING
、INTEGER
、DOUBLE
等类型创建好,发现自己是属于某个复合类型r
则将自己放置到r
中。
以整数为例,会调用createIntegerObject
函数创建整数对象,并将值value
存储在 r->integer
。发现自己属于某个复合类型,task->parent !=NULL
,则会自己放置到相应的位置。
最后,会将这个reply
返回给调用者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static void *createIntegerObject (const redisReadTask *task, long long value) { redisReply *r, *parent; r = createReplyObject(REDIS_REPLY_INTEGER); if (r == NULL ) return NULL ; r->integer = value; if (task->parent) { parent = task->parent->obj; assert(parent->type == REDIS_REPLY_ARRAY || parent->type == REDIS_REPLY_MAP || parent->type == REDIS_REPLY_SET); parent->element[task->idx] = r; } return r; }
processItem processItem
函数, 解析c->reader
数据,主要是两步:
读取一个字节的类型数据,获取当前数据类型cur->type
根据cur->type
,调用相应的函数来解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 static int processItem (redisReader *r) { redisReadTask* cur = &(r->rstack[r->ridx]); char *p; if (cur->type < 0 ) { if ((p = readBytes(r,1 )) == NULL ) return REDIS_ERR; switch (p[0 ]) { case '-' : cur->type = REDIS_REPLY_ERROR; break ; case '+' : cur->type = REDIS_REPLY_STATUS;break ; case ':' : cur->type = REDIS_REPLY_INTEGER;break ; case ',' : cur->type = REDIS_REPLY_DOUBLE; break ; case '_' : cur->type = REDIS_REPLY_NIL; break ; case '$' : cur->type = REDIS_REPLY_STRING; break ; case '*' : cur->type = REDIS_REPLY_ARRAY; break ; case '%' : cur->type = REDIS_REPLY_MAP; break ; case '~' : cur->type = REDIS_REPLY_SET; break ; case '#' : cur->type = REDIS_REPLY_BOOL; break ; case '=' : cur->type = REDIS_REPLY_VERB; break ; default : __redisReaderSetErrorProtocolByte(r,*p); return REDIS_ERR; } } switch (cur->type) { case REDIS_REPLY_ERROR : case REDIS_REPLY_STATUS : case REDIS_REPLY_INTEGER: case REDIS_REPLY_DOUBLE : case REDIS_REPLY_NIL : case REDIS_REPLY_BOOL : return processLineItem(r); case REDIS_REPLY_STRING : case REDIS_REPLY_VERB : return processBulkItem(r); case REDIS_REPLY_ARRAY : case REDIS_REPLY_MAP : case REDIS_REPLY_SET : return processAggregateItem(r); default : assert(NULL ); return REDIS_ERR; } }
processLineItem processLineItem
函数,解析非字符串的简单类型。下面是以整数为例,其他流程基本一致。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 static int processLineItem (redisReader *r) { redisReadTask *cur = &(r->rstack[r->ridx]); void *obj; char *p; int len; if ((p = readLine(r,&len)) != NULL ) { if (cur->type == REDIS_REPLY_INTEGER) { if (r->fn && r->fn->createInteger) { long long v; if (string2ll(p, len, &v) == REDIS_ERR) { __redisReaderSetError(r, REDIS_ERR_PROTOCOL, "Bad integer value" ); return REDIS_ERR; } obj = r->fn->createInteger(cur,v); } else { obj = (void *)REDIS_REPLY_INTEGER; } } else if (cur->type == REDIS_REPLY_DOUBLE) { } else if (cur->type == REDIS_REPLY_NIL) { } else if (cur->type == REDIS_REPLY_BOOL) { } else { } if (obj == NULL ) { __redisReaderSetErrorOOM(r); return REDIS_ERR; } if (r->ridx == 0 ) r->reply = obj; moveToNextTask(r); return REDIS_OK; } return REDIS_ERR; }
moveToNextTask moveToNextTask
函数,控制着解析c->reader
的流程。processLineItem
函数将c->reader
数据解析后,将解析结果保存在c->reader->reply
。如果reply->type
是非复合类型,解析就到此结束。但是如果是个复合类型,就需要继续解析:
如果当前嵌套层还没解析完,则向当前嵌套层的下一个位置
如果当前嵌套层解析完,则需要返回上一层
具体地说,对于下面的嵌套层:
1 std::vector<std::vector<std::vector<....>>> var
moveToNextTask
的作用是指定下次解析所得元素的存储位置 ,对于某层var[ridx][idx]
元素:
当idx == var[ridx].size()-1
时,则var[ridx]
已经完全填满了,下一个元素将填充到var[ridx+1]
中的位置。
如果 ridx+1 == var[ridx-1].size()
,那么就继续返回。如果var所有位置全部填了,就直接返回此次解析结果var。否则,var某个位置没有数据,则返回至该位置var[i][j]
,下次元素将复制至此。
当idx < var[ridx].size()-1
时,下次解析所得的参数复制到var[ridx][idx++]
的位置,而不至于覆盖var[ridx][idx]
的元素
无论当前正在解析是个复合类型,还是简单类型。整个流程成功解析完毕后, r->ridx == -1
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 static void moveToNextTask (redisReader *r) { redisReadTask *cur, *prv; while (r->ridx >= 0 ) { if (r->ridx == 0 ) { r->ridx--; return ; } cur = &(r->rstack[r->ridx]); prv = &(r->rstack[r->ridx-1 ]); assert(prv->type == REDIS_REPLY_ARRAY || prv->type == REDIS_REPLY_MAP || prv->type == REDIS_REPLY_SET); if (cur->idx == prv->elements-1 ) { r->ridx--; } else { assert(cur->idx < prv->elements); cur->type = -1 ; cur->elements = -1 ; cur->idx++; return ; } } }
processAggregateItem processAggregateItem
函数,解析复合类型 array/map/set
。当解析到此时,剩下的数据格式是:
1 2 3 4 n\r\n len_1\r\narg_1\r\n ... len_n\r\narg_n\r\n
因为 processItem
函数已经将前缀 *
读取了,那么此时剩下的就是上面的数据。第一行n
即这个复合类型的元素个数,下面的n行是具体参数。这也解释了上面每次调用processLineItem
函数后都需要调用moveToNextTask
函数,因为下一行的解析的参数arg_idx
的位置需要由moveToNextTask
来确定。
processAggregateItem
函数的执行流程如下:
在读取数据之前,先判断当前递归层数r->ridx
,r->ridx
不能大于8
解析元素个数elements
,判断elemengts
是否合理
elements >0
则需要创建容器。并且由于是复合类型,需要为递归进行准备
整个流程如只有在这个复合数据类型解析完毕后,才会返回REDSI_OK
,否则返回的是REDIS_ERR
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 static int processAggregateItem (redisReader *r) { redisReadTask *cur = &(r->rstack[r->ridx]); void *obj; char *p; long long elements; int root = 0 , len; if (r->ridx == 8 ) { __redisReaderSetError(r, REDIS_ERR_PROTOCOL, "No support for nested multi bulk replies with depth > 7" ); return REDIS_ERR; } if ((p = readLine(r,&len)) != NULL ) { if (string2ll(p, len, &elements) == REDIS_ERR) { __redisReaderSetError(r, REDIS_ERR_PROTOCOL, "Bad multi-bulk length" ); return REDIS_ERR; } root = (r->ridx == 0 ); if (elements < -1 || (LLONG_MAX > SIZE_MAX && elements > SIZE_MAX)) { __redisReaderSetError(r,REDIS_ERR_PROTOCOL,"Multi-bulk length out of range" ); return REDIS_ERR; } if (elements == -1 ) { if (r->fn && r->fn->createNil) obj = r->fn->createNil(cur); else obj = (void *)REDIS_REPLY_NIL; if (obj == NULL ) { __redisReaderSetErrorOOM(r); return REDIS_ERR; } moveToNextTask(r); } else { if (cur->type == REDIS_REPLY_MAP) elements *= 2 ; if (r->fn && r->fn->createArray) obj = r->fn->createArray(cur,elements); else obj = (void *)(long )cur->type; if (obj == NULL ) { __redisReaderSetErrorOOM(r); return REDIS_ERR; } if (elements > 0 ) { cur->elements = elements; cur->obj = obj; r->ridx++; r->rstack[r->ridx].type = -1 ; r->rstack[r->ridx].elements = -1 ; r->rstack[r->ridx].idx = 0 ; r->rstack[r->ridx].obj = NULL ; r->rstack[r->ridx].parent = cur; r->rstack[r->ridx].privdata = r->privdata; } else { moveToNextTask(r); } } if (root) r->reply = obj; return REDIS_OK; } return REDIS_ERR; }
redisReaderGetReply 上面讲解了解析r->buf
并将结果存储至r->reply
的过程:
如果 redisBufferRead
函数将对端数据存储至r->buf
发生错误,无法继续执行
如果r->len==0
,说明没有接收到数据、无法解析,则直接返回。
对r->rstack[0]
进行初始化
调用processItem
函数,对r->buf
进行解析
对解析结果进行判断及处理,如果解析正确,则将解析结果设置到传入的参数 reply
中
整个流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 int redisReaderGetReply (redisReader *r, void **reply) { if (reply != NULL ) *reply = NULL ; if (r->err) return REDIS_ERR; if (r->len == 0 ) return REDIS_OK; if (r->ridx == -1 ) { r->rstack[0 ].type = -1 ; r->rstack[0 ].elements = -1 ; r->rstack[0 ].idx = -1 ; r->rstack[0 ].obj = NULL ; r->rstack[0 ].parent = NULL ; r->rstack[0 ].privdata = r->privdata; r->ridx = 0 ; } while (r->ridx >= 0 ) if (processItem(r) != REDIS_OK) break ; if (r->err) return REDIS_ERR; if (r->pos >= 1024 ) { sdsrange(r->buf,r->pos,-1 ); r->pos = 0 ; r->len = sdslen(r->buf); } if (r->ridx == -1 ) { if (reply != NULL ) { *reply = r->reply; } else if (r->reply != NULL && r->fn && r->fn->freeObject) { r->fn->freeObject(r->reply); } r->reply = NULL ; } return REDIS_OK; }
redisProcessCallbacks 再将视线拉回到redisAsyncRead
函数,下面可以调用redisProcessCallbacks
函数处理对端请求了。
调用redisGetReply
函数,对ac->c->reader->buf
进行解析获得reply
如果reply ==NULL
,有几种可能
ac
即将要关闭,则关闭连接再return
处于监视模式
确实没有数据,则等待下次调用redisProcessCallbacks
从ac->replies
中取出回调函数节点至cb
并执行
任务队列ac->replies
和ac->pubsubs/channels
都是先来先服务,eventloop
线程又是个单线程,因此不会发生错乱。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 void redisProcessCallbacks (redisAsyncContext *ac) { redisContext *c = &(ac->c); redisCallback cb = {NULL , NULL , 0 , NULL }; void *reply = NULL ; int status; while ((status = redisGetReply(c,&reply)) == REDIS_OK) { if (reply == NULL ) { if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0 && ac->replies.head == NULL ) { __redisAsyncDisconnect(ac); return ; } if (c->flags & REDIS_MONITORING) { __redisPushCallback(&ac->replies, &cb); } break ; } if (__redisShiftCallback(&ac->replies, &cb) != REDIS_OK) { if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) { c->err = REDIS_ERR_OTHER; snprintf (c->errstr, sizeof (c->errstr), "%s" ,((redisReply*)reply)->str); c->reader->fn->freeObject(reply); __redisAsyncDisconnect(ac); return ; } assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)); if (c->flags & REDIS_SUBSCRIBED) __redisGetSubscribeCallback(ac, reply, &cb); } if (cb.fn != NULL ) { __redisRunCallback(ac,&cb,reply); c->reader->fn->freeObject(reply); if (c->flags & REDIS_FREEING) { __redisAsyncFree(ac); return ; } } else { c->reader->fn->freeObject(reply); } } if (status != REDIS_OK) __redisAsyncDisconnect(ac); }
到此,整个可读事件的处理过程完结。下面再介绍下上面提到但是没有使用的断开连接操作。
__redisAsyncDisconnect 详见代码注释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 void __redisAsyncDisconnect(redisAsyncContext *ac) { redisContext *c = &(ac->c); __redisAsyncCopyError(ac); if (ac->err == 0 ) { int ret = __redisShiftCallback(&ac->replies, NULL ); assert(ret == REDIS_ERR); } else { c->flags |= REDIS_DISCONNECTING; } _EL_CLEANUP(ac); if (!(c->flags & REDIS_NO_AUTO_FREE)) { __redisAsyncFree(ac); } }
__redisAsyncFree 在完成redisFree(ac)
之前,需要执行完ac
的四个任务队列中的回调函数节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 static void __redisAsyncFree(redisAsyncContext *ac) { redisContext *c = &(ac->c); redisCallback cb; dictIterator *it; dictEntry *de; while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) __redisRunCallback(ac,&cb,NULL ); while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK) __redisRunCallback(ac,&cb,NULL ); it = dictGetIterator(ac->sub.channels); while ((de = dictNext(it)) != NULL ) __redisRunCallback(ac, dictGetEntryVal(de), NULL ); dictReleaseIterator(it); dictRelease(ac->sub.channels); it = dictGetIterator(ac->sub.patterns); while ((de = dictNext(it)) != NULL ) __redisRunCallback(ac,dictGetEntryVal(de),NULL ); dictReleaseIterator(it); dictRelease(ac->sub.patterns); _EL_CLEANUP(ac); if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) { if (c->flags & REDIS_FREEING) { ac->onDisconnect(ac,REDIS_OK); } else { ac->onDisconnect(ac,(ac->err == 0 ) ? REDIS_OK : REDIS_ERR); } } redisFree(c); }
sentinelDisconnectCallback 在 __redisAsyncFree
函数的最后几步,对断开连接回调函数的第二个参数有判断:
1 2 3 4 5 6 7 if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) { if (c->flags & REDIS_FREEING) { ac->onDisconnect(ac,REDIS_OK); } else { ac->onDisconnect(ac,(ac->err == 0 ) ? REDIS_OK : REDIS_ERR); } }
但是在sentinel.c
中设置的断开连接回调sentinelDisconnectCallback
,并没有使用status
,最终都是直接调用instanceLinkConnectionError
函数,这和建立连接失败时的行为一致。
1 2 3 4 void sentinelDisconnectCallback (const redisAsyncContext *c, int status) { UNUSED(status); instanceLinkConnectionError(c); }
redisAsyncFree 在sentinel.c/instanceLinkCloseConnection
函数中调用,用于异步关闭连接。
1 2 3 4 5 6 7 8 9 void redisAsyncFree (redisAsyncContext *ac) { redisContext *c = &(ac->c); c->flags |= REDIS_FREEING; if (!(c->flags & REDIS_IN_CALLBACK)) __redisAsyncFree(ac); }