int _addReplyToBuffer(client *c, constchar *s, size_t len) { size_t available = sizeof(c->buf)-c->bufpos; // 可用空间 // CLIENT_CLOSE_AFTER_REPLY 表示将当前客户端 output buffer中数据发送完毕,就关闭, // 因此不会再接受新的数据 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
/* If there already are entries in the reply list, we cannot * add anything more to the static buffer. */ // c->reply 存在数据,说明 buf 已经满了 if (listLength(c->reply) > 0) return C_ERR;
/* Check that the buffer has enough space available for this string. */ // 查看 buf 是否能容纳当前待存放的数据 if (len > available) return C_ERR; memcpy(c->buf+c->bufpos,s,len); // 复制到buf中 c->bufpos +=len; // 改变 bufpos return C_OK; }
/* Note that 'tail' may be NULL even if we have a tail node, becuase when * addReplyDeferredLen() is used, it sets a dummy node to NULL just * fo fill it later, when the size of the bulk length is set. * * tail 可能是 NULL,因为在 addReplyDeferredLen() 函数中你会创建一个 dummy node,其值是NULL * 因此,即使存在尾部节点,其值也可能是 NULL */
/* Append to tail string when possible. */ // 如果不是NULL, 则尝试直接在尾部存储 if (tail) { /* Copy the part we can fit into the tail, and leave the rest for new node */ size_t avail = tail->size - tail->used; size_t copy = avail >= len? len: avail; // 取最小 memcpy(tail->buf + tail->used, s, copy); tail->used += copy; s += copy; len -= copy; } /* 注意:在使用了 addReplyDeferredLen() 函数后,tail 是 NULL,因此上面的 if(tail) 不会执行, * 而是直接在下面的 if(len) 分支中创建新的节点接在 tail 后面,而 tail 依然是个 dummy node */
// 如果 tail是 NULL 或者无法完全存储数据,就需要创建一个新的节点 if (len) { /* Create a new node, make sure it is allocated to at least PROTO_REPLY_CHUNK_BYTES */ size_t size = len < PROTO_REPLY_CHUNK_BYTES ? PROTO_REPLY_CHUNK_BYTES : len; //取最大 tail = zmalloc(size + sizeof(clientReplyBlock)); // 创建新的节点 /* take over the allocation's internal fragmentation */ tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock); tail->used = len; memcpy(tail->buf, s, len); listAddNodeTail(c->reply, tail); // 新节点添加在 tail 尾 c->reply_bytes += tail->size; // 更新总的已经分配字节数 }
intcheckClientOutputBufferLimits(client *c){ int soft = 0, hard = 0, class; unsignedlong used_mem = getClientOutputBufferMemoryUsage(c); // 见下面分析
class = getClientType(c); /* For the purpose of output buffer limiting, masters are handled * like normal clients. */ if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL; // 硬限制 if (server.client_obuf_limits[class].hard_limit_bytes && used_mem >= server.client_obuf_limits[class].hard_limit_bytes) hard = 1; // 软限制 if (server.client_obuf_limits[class].soft_limit_bytes && used_mem >= server.client_obuf_limits[class].soft_limit_bytes) soft = 1;
/* We need to check if the soft limit is reached continuously for the * specified amount of seconds. */ // 对于这个软限制,进行综合考虑,给予客户端一次机会,防止直接就关闭客户端 if (soft) { if (c->obuf_soft_limit_reached_time == 0) { // 如果是首次达到 soft limit,则记录首次达到的时间,然后忽略这次 c->obuf_soft_limit_reached_time = server.unixtime; soft = 0; /* First time we see the soft limit reached */ } else { time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time; // 如果第二次达到,并且两次之间的时间差小于阈值 soft_limit_seconds // 则认为是第一次 soft limit if (elapsed <= server.client_obuf_limits[class].soft_limit_seconds) { soft = 0; /* The client still did not reached the max number of seconds for the soft limit to be considered reached. */ } } } else { c->obuf_soft_limit_reached_time = 0; }
voidaddReply(client *c, robj *obj){ if (prepareClientToWrite(c) != C_OK) // 是否有数据回应 client return; if (sdsEncodedObject(obj)) { // 是字符串编码,则直接添加到缓冲区中 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) // 先使用 buf _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); // 再使用 c->reply } elseif (obj->encoding == OBJ_ENCODING_INT) { /* For integer encoded strings we just convert it into a string * using our optimized function, and attach the resulting string * to the output buffer. */ // 是整数编码,则先转换为字符串,再添加到缓冲区中 char buf[32]; size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); if (_addReplyToBuffer(c,buf,len) != C_OK) _addReplyProtoToList(c,buf,len); } else { serverPanic("Wrong obj->encoding in addReply()"); } }
#define CLIENT_LUA (1<<8) /* This is a non connected client used by Lua *///假的客户端 #define CLIENT_REPLY_OFF (1<<22) /* Don't send replies to client. */// 不发送 #define CLIENT_REPLY_SKIP (1<<24) /* Don't send just this reply. */// 不发送 #define CLIENT_MASTER (1<<1) /* This client is a master */ #define CLIENT_MASTER_FORCE_REPLY (1<<13) /* Queue replies even if is master */
intprepareClientToWrite(client *c){ /* If it's the Lua client we always return ok without installing any * handler since there is no socket at all. */ if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK; // ???
/* CLIENT REPLY OFF / SKIP handling: don't send replies. */ if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR; // 不发送
/* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag * is set. */ if ((c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
if (!c->conn) return C_ERR; /* Fake client for AOF loading. */
/* Schedule the client to write the output buffers to the socket, unless * it should already be setup to do so (it has already pending data). */ // 如果之前的数据已经空了,则重新注册可写事件 if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c); // 此处的安装handler, // 就是设置个标志位 CLIENT_PENDING_WRITE
/* Authorize the caller to queue in the output buffer of this client. */ return C_OK; }
voidclientInstallWriteHandler(client *c){ /* Schedule the client to write the output buffers to the socket only * if not already done and, for slaves, if the slave can actually receive * writes at this stage. */ if (!(c->flags & CLIENT_PENDING_WRITE) && (c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) { /* Here instead of installing the write handler, we just flag the * client and put it into a list of clients that have something * to write to the socket. * * 这里没有注册可写事件,而是设置了可写标志位:CLIENT_PENDING_WRITE, * 并将客户端 c 加入了待写的链表 server.clients_pending_write 中 * * This way before re-entering the event loop, * we can try to directly write to the client sockets avoiding a system call. * We'll only really install the write handler * if we'll not be able to write the whole reply at once. */ c->flags |= CLIENT_PENDING_WRITE; listAddNodeHead(server.clients_pending_write, c); } }
// 直接添加 sds 格式的字符串到输出缓冲区,其中 s 会被释放 voidaddReplySds(client *c, sds s){ if (prepareClientToWrite(c) != C_OK) { /* The caller expects the sds to be free'd. */ sdsfree(s); return; } if (_addReplyToBuffer(c,s,sdslen(s)) != C_OK) _addReplyProtoToList(c,s,sdslen(s)); sdsfree(s); }
voidaddReplyErrorLength(client *c, constchar *s, size_t len){ /* If the string already starts with "-..." then the error code * is provided by the caller. Otherwise we use "-ERR". */ // 如果s中没有包含错误码,则使用-ERR替代,总体格式:-ERR <errorMsg> \r\n if (!len || s[0] != '-') addReplyProto(c,"-ERR ",5); addReplyProto(c,s,len); addReplyProto(c,"\r\n",2); // ... }
void* addReplyDeferredLen(client *c){ /* Note that we install the write event here even if the object is not * ready to be sent, since we are sure that before returning to the * event loop setDeferredAggregateLen() will be called. */ if (prepareClientToWrite(c) != C_OK) returnNULL;
/* Note that 'tail' may be NULL even if we have a tail node, becuase when * addReplyDeferredLen() is used */ if (!tail) return;
/* We only try to trim the space is relatively high (more than a 1/4 of the allocation), * otherwise there's a high chance realloc will NOP. * Also, to avoid large memmove which happens as part of realloc, we only do * that if the used part is small. */
// 回复给客户端的第一行: c len \r\n size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length);
/* Abort when *node is NULL: when the client should not accept writes * we return NULL in addReplyDeferredLen() */ // node 是NULL 表示不应该发数据给客户端, 比如 prepareClientToWrite 返回 C__ERR if (node == NULL) return; serverAssert(!listNodeValue(ln)); // 填充 dummy node,其值是NULL,现在为其分配值 /* Normally we fill this dummy NULL node, added by addReplyDeferredLen(), * with a new buffer structure containing the protocol needed to specify * the length of the array following. However sometimes when there is * little memory to move, we may instead remove this NULL node, and prefix * our protocol in the node immediately after to it, in order to save a * write(2) syscall later. Conditions needed to do it: * * - The next node is non-NULL, * - It has enough room already allocated * - And not too large (avoid large memmove) */ if (ln->next != NULL && (next = listNodeValue(ln->next)) && lenstr_len <= (next->size - next->used) && // 下一个节点可用大小可容纳 lenser next->used < PROTO_REPLY_CHUNK_BYTES * 4) { // 1 那么就可以将 lenstr 插入 next->buf 的头部 memmove(next->buf + lenstr_len, next->buf, next->used); memcpy(next->buf, lenstr, lenstr_len); next->used += lenstr_len; listDelNode(c->reply, ln); // 将 dummy node删除,那么ln->next的上一个节点是 // 被 trimReplyUnusedTailSpace 裁剪过的节点 } else { /* Create a new node */ // 2 否则就创建一个新的缓冲区来存储 len_str, clientReplyBlock *buf = zmalloc(lenstr_len + sizeof(clientReplyBlock)); /* Take over the allocation's internal fragmentation */ buf->size = zmalloc_usable(buf) - sizeof(clientReplyBlock); buf->used = lenstr_len; memcpy(buf->buf, lenstr, lenstr_len); listNodeValue(ln) = buf; // 让这个缓冲区成为这个dummy node的值 c->reply_bytes += buf->size; } // 查看 c->reply 是否超过限制 asyncCloseClientOnOutputBufferLimitReached(c); }
voidaddReplyDouble(client *c, double d){ if (isinf(d)) { /* Libc in odd systems (Hi Solaris!) will format infinite in a * different way, so better to handle it in an explicit way. */ if (c->resp == 2) { addReplyBulkCString(c, d > 0 ? "inf" : "-inf"); } else { addReplyProto(c, d > 0 ? ",inf\r\n" : ",-inf\r\n", d > 0 ? 6 : 7); } } else { char dbuf[MAX_LONG_DOUBLE_CHARS+3], sbuf[MAX_LONG_DOUBLE_CHARS+32];
voidaddReplyLongLongWithPrefix(client *c, longlong ll, char prefix){ char buf[128]; int len;
/* Things like $3\r\n or *2\r\n are emitted very often by the protocol * so we have a few shared objects to use if the integer is small * like it is most of the times. */ if (prefix == '*' && 0 <= ll && ll < OBJ_SHARED_BULKHDR_LEN) { // 数组格式 addReply(c, shared.mbulkhdr[ll]); // 回复给客户端的是:*ll\r\n return; } elseif (prefix == '$' && 0 <= ll && ll < OBJ_SHARED_BULKHDR_LEN) { // 批量字符串格式 addReply(c, shared.bulkhdr[ll]); // 回复给客户端的是: $llr\n return; }
/* If the buffer was sent, set bufpos to zero to continue with * the remainder of the reply. */ if ((int)c->sentlen == c->bufpos) { c->bufpos = 0; c->sentlen = 0; } } else { // 再发送动态缓冲区中的数据: // 从头部节点开始发送依次发送,每次发送完一个节点的数据,就将其从 c->reply 中删除 // 当 c->reply 中的所有数据都发送完毕,此时 c->reply_bytes 也应该是0 o = listNodeValue(listFirst(c->reply)); objlen = o->used; //当前节点的待发送数据长度
/* If we fully sent the object on head go to the next one */ if (c->sentlen == objlen) { c->reply_bytes -= o->size; listDelNode(c->reply,listFirst(c->reply)); c->sentlen = 0; /* If there are no longer objects in the list, we expect * the count of reply bytes to be exactly zero. */ if (listLength(c->reply) == 0) serverAssert(c->reply_bytes == 0); } }
/** Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT bytes, * in a single threaded server it's a good idea to serve other clients as well, * even if a very large request comes from super fast link that is always able to accept data * (in real world scenario think about 'KEYS *' against the loopback interface). * * However if we are over the maxmemory limit we ignore that and * just deliver as much data as it is possible to deliver. * * Moreover, we also send as much as possible if the client is * a slave or a monitor (otherwise, on high-speed traffic, the * replication/output buffer will grow indefinitely) */ // 避免单次发送过多的数据 if (totwritten > NET_MAX_WRITES_PER_EVENT && (server.maxmemory == 0 || zmalloc_used_memory() < server.maxmemory) && !(c->flags & CLIENT_SLAVE)) { break; } } // while-end
if (totwritten > 0) { /* For clients representing masters we don't count sending data * as an interaction, since we always send REPLCONF ACK commands * that take some time to just fill the socket output buffer. * We just rely on data / pings received for timeout detection. */ if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime; } // 如果客户端缓冲区已经没有数据了 if (!clientHasPendingReplies(c)) { c->sentlen = 0; /* Note that writeToClient() is called in a threaded way, but * adDeleteFileEvent() is not thread safe: however writeToClient() * is always called with handler_installed set to 0 from threads * so we are fine. */ if (handler_installed) connSetWriteHandler(c->conn, NULL); // handler_installed 设置为1, // 则在 output buffer 发送完时, // 取消注册可写事件,防止busy loop
/* Close connection after entire reply has been sent. */ // 缓冲区中的数据都被发送了,并且设置了标志位 CLIENT_CLOSE_AFTER_REPLY, 就关闭客户端 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { freeClientAsync(c); return C_ERR; } }
/* Write event handler. Just send data to the client. */ voidsendReplyToClient(connection *conn){ client *c = connGetPrivateData(conn); writeToClient(c,1); }
inthandleClientsWithPendingWrites(void){ listIter li; listNode *ln; int processed = listLength(server.clients_pending_write); // 遍历每个缓冲区数据待发送的客户端 listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; // 去掉标志位 CLIENT_PENDING_WRITE,表示数据已经发送 listDelNode(server.clients_pending_write, ln); // 将当前客户端从 // server.clients_pending_write 中删除 /* If a client is protected, don't do anything, * that may trigger write error or recreate handler. */ if (c->flags & CLIENT_PROTECTED) continue;
/* Try to write buffers to the client socket. */ // 将这个客户端的 output buffer 发送给客户端 if (writeToClient(c,0) == C_ERR) continue;
/* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ // 如果还有数据待发送,则需要注册可写事件,等待可写事件的到来 if (clientHasPendingReplies(c)) { int ae_barrier = 0; /* For the fsync=always policy, we want that a given FD is never * served for reading and writing in the same event loop iteration, * so that in the middle of receiving the query, and serving it * to the client, we'll call beforeSleep() that will do the * actual fsync of AOF to disk. the write barrier ensures that. */ if (server.aof_state == AOF_ON && server.aof_fsync == AOF_FSYNC_ALWAYS) { ae_barrier = 1; }