staticintaeApiCreate(aeEventLoop *eventLoop){ aeApiState* state = zmalloc(sizeof(aeApiState)); if (!state) return-1; state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return-1; } state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return-1; } eventLoop->apidata = state; // 将 epollfd 存在这个地方 return0; }
// 让 fd 关注事件 mask staticintaeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask){ aeApiState *state = eventLoop->apidata; structepoll_event ee = {0}; /* avoid valgrind warning */ /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ // 如果之前就关注了一些事件,则此次就是修改,否则就是添加 int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */// 本次调用后,总共需要关注的事件 if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (epoll_ctl(state->epfd, op, fd, &ee) == -1) return-1; return0; }
aeApiDelEvent
使文件描述符fd不再关注delmask事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
staticvoidaeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask){ aeApiState *state = eventLoop->apidata; structepoll_event ee = {0}; /* avoid valgrind warning */ int mask = eventLoop->events[fd].mask & (~delmask); // 取消对于 delmask 的关注
staticint _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog) { int s = -1, rv; char _port[6]; /* strlen("65535") */ // 调用 getadddrinfo 函数的前提准备 structaddrinfo hints, *servinfo, *p;
intaeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents;
/* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return0;
/* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. * * 1 eventLoop->maxfd == -1,即没有事件fd,那么就没有等待处理的事件 * 2 flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT):如果存在定时器事件, * 要么等待定时器事件, * 要么在没有设置 AE_DONT_WAIT,则就一直阻塞在 aeApiPoll,直到有事件触发 * */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; structtimeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); // 如果有时间事件存在,则获取最近的定时器超时时间 if (shortest) { long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms); tvp = &tv;
/* How many milliseconds we need to wait for the next * time event to fire? */ // 最近超时的时间转为毫秒单位 longlong ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms;
if (ms > 0) { tvp->tv_sec = ms/1000; tvp->tv_usec = (ms % 1000)*1000; } else { tvp->tv_sec = 0; tvp->tv_usec = 0; } } else { // 要么是超时时间为0,即使设置了AE_DONT_WAIT,不要阻塞 // 要么是没有定时时间,则可以阻塞 /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ // 不要阻塞,则设置超时时间为0 if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ // 没有可触发事件,则等待 tvp = NULL; /* wait forever */ } }
/* Call the multiplexing API, will return only on timeout or when some event fires. */ numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); // 在休眠之后执行
// 逐个处理触发的事件 for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */
/* Normally we execute the readable event first, and the writable * event laster. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if AE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsynching a file to disk, * before replying to a client. */ int invert = fe->mask & AE_BARRIER;
/* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * * Fire the readable event if the call sequence is not inverted. */ // fe->mask 是之前关注的事件,mask是产生的事件,如果都有 AE_READABLE // 则触发可读事件 if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ }
/* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } }
/* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert) { fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } }
processed++; } }
// 可以去处理时间事件了 /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */ }
/* Process time events */ staticintprocessTimeEvents(aeEventLoop *eventLoop){ int processed = 0; aeTimeEvent *te; longlong maxId; time_t now = time(NULL);
/* If the system clock is moved to the future, and then set back to the * right value, time events may be delayed in a random way. Often this * means that scheduled operations will not be performed soon enough. * * Here we try to detect system clock skews, and force all the time * events to be processed ASAP when this happens: the idea is that * processing events earlier is less dangerous than delaying them * indefinitely, and practice suggests it is. */ if (now < eventLoop->lastTime) { te = eventLoop->timeEventHead; while(te) { te->when_sec = 0; te = te->next; } } eventLoop->lastTime = now;
te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; while(te) { long now_sec, now_ms; longlong id;
/* Remove events scheduled for deletion. */ if (te->id == AE_DELETED_EVENT_ID) { aeTimeEvent *next = te->next; /* If a reference exists for this timer event, * don't free it. This is currently incremented * for recursive timerProc calls */ if (te->refcount) { te = next; continue; } if (te->prev) te->prev->next = te->next; else eventLoop->timeEventHead = te->next; if (te->next) te->next->prev = te->prev; if (te->finalizerProc) te->finalizerProc(eventLoop, te->clientData); zfree(te); te = next; continue; }
/* Make sure we don't process time events created by time events in * this iteration. Note that this check is currently useless: we always * add new timers on the head, however if we change the implementation * detail, this check may be useful again: we keep it here for future * defense. */ if (te->id > maxId) { te = te->next; continue; } aeGetTime(&now_sec, &now_ms); if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval;