// 如果没有设置可写事件的处理函数,则直接取消可写事件 if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE); // 处理新连接到来 if (!callHandler(conn, conn->conn_handler)) return; conn->conn_handler = NULL; }
/* Normally we execute the readable event first, and the writable * event later. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * 通常我们先执行可读事件,然后执行可写事件。 * 这种方式很有用,因为有时我们可以在处理查询之后立即提供查询的结果。 * However if WRITE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event after the readable. * In such a case, we invert the calls. * * 然后,如果设置了 WRITE_BARRIER, 那么处理程序就反过来:在处理了可读事件之后都不触发可写事件 * * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsync'ing a file to disk, * before replying to a client. * * 这种操作很有用,比如,当我在beforeSleep()函数中,执行一些阻塞操作,类似fsync操作 * */ int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
int call_write = (mask & AE_WRITABLE) && conn->write_handler; int call_read = (mask & AE_READABLE) && conn->read_handler;
/* Handle normal I/O flows */ if (!invert && call_read) { if (!callHandler(conn, conn->read_handler)) return; } /* Fire the writable event. */ if (call_write) { if (!callHandler(conn, conn->write_handler)) return; } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert && call_read) { if (!callHandler(conn, conn->read_handler)) return; } }
connSocketWrite
这是REdis中实际完成写操作的最底层的函数,调用write函数完成发送。
1 2 3 4 5 6 7 8 9
staticintconnSocketWrite(connection *conn, constvoid *data, size_t data_len){ int ret = write(conn->fd, data, data_len); if (ret < 0 && errno != EAGAIN) { // ret ==-1 && errno ==EAGAIN,在非阻塞IO是正常下的,不是错误 conn->last_errno = errno; conn->state = CONN_STATE_ERROR; }
/* If called from within a handler, schedule the close but * keep the connection until the handler returns. */ // 此处,即通过引用计数来控制conn的生命周期 // 此时,尽管已经要关闭conn,但是其引用计数不是0 // 说明处于某个回调函数中,等该回调函数返回之后,这个客户端就可以关闭 if (connHasRefs(conn)) { conn->flags |= CONN_FLAG_CLOSE_SCHEDULED; return; }
/* Admission control will happen before a client is created and connAccept() called, * because we don't want to even start transport-level negotiation if rejected. */ // 如果客户端连接请求超过限制,则直接关闭这个客户端即可, if (listLength(server.clients) >= server.maxclients) { char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors. * Note that for TLS connections, no handshake was done yet so nothing is written * and the connection will just drop. */ // 发送给客户端错误信息 if (connWrite(conn,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ } server.stat_rejected_conn++; // 用于调试信息 connClose(conn); // 关闭客户端 return; }
/* Create connection and client */ // 创建客户端 if ((c = createClient(conn)) == NULL) { // 创建失败,内存不足,则应该直接关闭这次行为 char conninfo[100]; serverLog(LL_WARNING, "Error registering fd event for the new client: %s (conn: %s)", connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); connClose(conn); /* May be already closed, just ignore errors */ return; }
/* Last chance to keep flags */ c->flags |= flags;
/* Initiate accept. * * Note that connAccept() is free to do two things here: * 1. Call clientAcceptHandler() immediately; * 2. Schedule a future call to clientAcceptHandler(). * * Because of that, we must do nothing else afterwards. */ // connAccept 主要是调用函数 clientAcceptHandler 对获得的客户端状态进行判断 if (connAccept(conn, clientAcceptHandler) == C_ERR) { char conninfo[100]; if (connGetState(conn) == CONN_STATE_ERROR) { serverLog(LL_WARNING, "Error accepting a client connection: %s (conn: %s)", connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); } freeClient(connGetPrivateData(conn)); // 同步关闭 return; } }
/* passing NULL as conn it is possible to create a non connected client. * This is useful since all the commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ // Lua 脚本环境下,conn是NULL if (conn) { connNonBlock(conn); // 设置客户端为非阻塞 connEnableTcpNoDelay(conn); // 开启 nagle if (server.tcpkeepalive) // 心跳检测 connKeepAlive(conn,server.tcpkeepalive); connSetReadHandler(conn, readQueryFromClient); // 注册可读事件,可读事件的回调函数是 readQueryFromClient connSetPrivateData(conn, c); // conn->private_data = data; } // --- 下面是一些参数初始化
intanetSetBlock(char *err, int fd, int non_block){ int flags;
/* Set the socket blocking (if non_block is zero) or non-blocking. * Note that fcntl(2) for F_GETFL and F_SETFL can't be * interrupted by a signal. */ if ((flags = fcntl(fd, F_GETFL)) == -1) { anetSetError(err, "fcntl(F_GETFL): %s", strerror(errno)); return ANET_ERR; }
if (non_block) flags |= O_NONBLOCK; else flags &= ~O_NONBLOCK;
/* Set TCP keep alive option to detect dead peers. The interval option * is only used for Linux as we are using Linux-specific APIs to set * the probe send time, interval, and count. */ intanetKeepAlive(char *err, int fd, int interval) { int val = 1;
#ifdef __linux__ /* Default settings are more or less garbage, with the keepalive time * set to 7200 by default on Linux. Modify settings to make the feature * actually useful. */
/* Send first probe after interval. */ val = interval; if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val)) < 0) { anetSetError(err, "setsockopt TCP_KEEPIDLE: %s\n", strerror(errno)); return ANET_ERR; }
/* Send next probes after the specified interval. Note that we set the * delay as interval / 3, as we send three probes before detecting * an error (see the next setsockopt call). */ val = interval/3; if (val == 0) val = 1; if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, sizeof(val)) < 0) { anetSetError(err, "setsockopt TCP_KEEPINTVL: %s\n", strerror(errno)); return ANET_ERR; }
/* Consider the socket in error state after three we send three ACK * probes without getting a reply. */ val = 3; if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &val, sizeof(val)) < 0) { anetSetError(err, "setsockopt TCP_KEEPCNT: %s\n", strerror(errno)); return ANET_ERR; } #else ((void) interval); /* Avoid unused var warning for non Linux systems. */ #endif
if (connGetState(conn) != CONN_STATE_CONNECTED) { serverLog(LL_WARNING, "Error accepting a client connection: %s", connGetLastError(conn)); freeClientAsync(c); // 异步的 return; }
/* If the server is running in protected mode (the default) and there is no password set, * nor a specific interface is bound, we don't accept * requests from non loopback interfaces. Instead we try to explain the * user what to do to fix it if needed. */ if (server.protected_mode && server.bindaddr_count == 0 && DefaultUser->flags & USER_FLAG_NOPASS && !(c->flags & CLIENT_UNIX_SOCKET)) { char cip[NET_IP_STR_LEN+1] = { 0 }; connPeerToString(conn, cip, sizeof(cip)-1, NULL); // 如果 cip 同时满足 127.0.0.1 与 ::1 // 下面 if 中的不可能执行 if (strcmp(cip,"127.0.0.1") && strcmp(cip,"::1")) { char *err = "-DENIED Redis is running in protected mode because protected " "mode is enabled, no bind address was specified, no " "authentication password is requested to clients. In this mode " "connections are only accepted from the loopback interface. " "If you want to connect from external computers to Redis you " "may adopt one of the following solutions: " "1) Just disable protected mode sending the command " "'CONFIG SET protected-mode no' from the loopback interface " "by connecting to Redis from the same host the server is " "running, however MAKE SURE Redis is not publicly accessible " "from internet if you do so. Use CONFIG REWRITE to make this " "change permanent. " "2) Alternatively you can just disable the protected mode by " "editing the Redis configuration file, and setting the protected " "mode option to 'no', and then restarting the server. " "3) If you started the server manually just for testing, restart " "it with the '--protected-mode no' option. " "4) Setup a bind address or an authentication password. " "NOTE: You only need to do one of the above things in order for " "the server to start accepting connections from the outside.\r\n"; if (connWrite(c->conn,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ } server.stat_rejected_conn++; freeClientAsync(c); return; } }