异步通信

异步通信

这一期来讲下服务器是如何与客户端进行异步通信的

  • 通信,无外乎两点:1)服务器读取对端发送过来的数据;2)向对端发送数据。
  • 异步,目前常用的是epoll + 回调函数。

下面从非阻塞IO开始,逐步构建异步通信框架。

Non-Blocking IO

在服务端常用的IO读写操作,主要是readwrite及其衍生函数。

所谓阻塞模式,也是readwrite等操作的默认行为,比如read函数从stdin读取输入,如果用户没有从终端输入,则会一直阻塞在read函数处。

在非阻塞模式下,read继续从终端读取数据,如果用户没有在终端输入数据,read函数并不会阻塞等待,而是立即返回-1,并将错误码errno 设置为EAGAIN 或者 EWOULDBLOCK。此时read函数返回值n有三种可能:

  • n > 0:读取到n个字节;
  • n == 0:对端关闭、文件末尾;
  • n == -1:表示遇到问题,
    • errno == EAGAIN/EWOULDBLOCK :在非阻塞IO模式下,表示没有数据可读,可忽略本次read操作;
    • errno == EINTR:表示被信号中断,重新读取一次即可。
    • 其他错误类型。

写操作write函数也基本类似,更加详细可以man 2 read/write 查看。

因此,非阻塞IO非常适合服务器设计,不会在read/write处发生堵塞。

将文件描述符fd设置为非阻塞模式,有如下两种方式(来自muduo)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void setNonBlockAndCloseOnExec(int sockfd)
{
int flags = ::fcntl(sockfd, F_GETFL, 0);
flags |= O_NONBLOCK;
int ret = ::fcntl(sockfd, F_SETFL, flags);

// close-on-exec
flags = ::fcntl(sockfd, F_GETFD, 0);
flags |= FD_CLOEXEC;
ret = ::fcntl(sockfd, F_SETFD, flags);

(void)ret;
}

int sockets::createNonblockingOrDie(sa_family_t family)
{
#if VALGRIND
// 方式1
int sockfd = ::socket(family, SOCK_STREAM, IPPROTO_TCP);
if (sockfd < 0)
{
LOG_SYSFATAL << "sockets::createNonblockingOrDie";
}

setNonBlockAndCloseOnExec(sockfd);
#else
// 方式2:系统内核支持 SOCK_NONBLOCK 标志,更加安全。
int sockfd = ::socket(family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP);
if (sockfd < 0)
{
LOG_SYSFATAL << "sockets::createNonblockingOrDie";
}
#endif
return sockfd;
}

ET & LT

众所周知,epoll有两种工作模式:ET & LT。但是在讲解 ET & LT 之前,先讲解下「高电平」和「低电平」的概念。

高低电平

对于可读事件:

  • 内核中socketrecv_buff 为空,此时为「低电平」状态,即无数据可读
  • 内核中socketrecv_buff 不空,此时为「高电平」状态,此时有数据可读

对于可写事件:

  • 内核中socketsend_buff为满 ,此时为「低电平」状态,不可发送数据
  • 内核中socketsend_buff不满,此时为「高电平」状态,可发送数据

简而言之,「低电平」状态下不能进行读、写操作,「高电平」则可以读、写。

为便于描述,socket的接受缓冲区定义为recv_buffsocket的发送缓冲区定义为send_buffconnfd是服务端与对端建立连接的文件描述符。

LT(Level Triggered)

LT,电平触发,即socket缓冲区处于高电平时触发事件。此外,epoll_wait的默认工作模式也是LT。

可读事件

为接受对端connfd发送的数据,服务器首先要为connfd注册可读事件EPOLLIN

如此,当对端发送数据过来,服务器connfdrecv_buff 不为空,则epoll_wait上的可读事件触发,在读回调函数HandleRead中从socketrecv_buff 中读取数据。

然而,即使服务端本次没有将scoketrecv_buff 中的数据全部读取,下次调用epoll_wait时也依然会触发可读事件。

为啥?

因为,只要recv_buff 中的数据没有读取完,即为「高电平」状态,那么就会一直触发epoll_waitEPOLLIN 事件,直到recv_buff 变空,即为「低电平」状态。

因此LT模式下,不用担心数据漏读的问题。

可写事件

然而,可写事件与可读事件不同:可读事件是被动触发的,即服务端不知道对端何时发送数据。因此服务端与对端建立连接之后,要立即为connfd注册可读事件,然后在epoll_wait上阻塞等待客户端发送数据过来。

那么,可写事件呢?是服务端主动触发的。

服务端与对端建立连接后,connfdsend_buff是空的,即处于高电平,是可以直接发送数据的。

如果此时为connfd注册可写事件,这就会导致epoll_wait一直检测到connfd上的可写事件触发,但实际上服务端又没有数据可以发送给对端,造成服务端的CPU资源无端被消耗。

因此,在LT模式下,当服务端要向对端发送数据时,不需要先通过epoll_ctl注册可写事件,然后阻塞在epoll_wait上等待可写事件的发生。 正确的做法如下

  • 直接发送数据 n = write(connfd, outbuffer, sizeof(outbuffer))
  • 如果此次数据没有发送完毕,即n != sizeof(outbuffer),则为connfd注册可写事件;
  • epoll_wait上等待可写事件触发;
  • 可写事件触发,则将剩余的数据发送完毕;
  • 如果没有发送完毕,则等待下次的可写事件;发送完毕,则取消关注可写事件。

注意:当数据发送完毕,一定要取消可写事件,否则当connfdsend_buff变空,后面又没有可发的数据,则又会导致epoll_wait一直触发可写事件。

ET(Edge Triggered)

ET,所谓边缘触发,即只有在电平状态发生变化时才会触发。开启ET模式,需要设置如下:

1
2
3
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // ET 模式
epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &ev);

可读事件

注册了可读事件后,阻塞于epoll_wait等待对端发送数据,再触发可读事件。

注意:如果服务端没有将recv_buff 中的数据全部读取,那么进入下一轮循环并阻塞在epoll_wait后,可读事件就再也不会触发。

为啥?

ET模式下,只有connfdrecv_buffer电平状态发生变化才会触发可读事件,但只要recv_buff 中还有数据,则一直为高电平状态,那么即便下次对端又发送数据过来,并不会改recv_buff 的电平状态,这就导致epoll_wait就无法再检测到connfd上的可读事件。

那么对端发送了数据,服务端迟迟无法给出回应。如果是listenfd,那么这个服务器就不再能处理新的连接请求了。

因此,如果不熟悉ET模式的正确使用方法,很可能导致整个服务器无法使用。

ET模式下,怎么处理可读事件?

一旦检测到connfd上的可读事件,需要不停地从recv_buff 中读取数据,直到read函数返回-1,且错误码errnoEAGAIN标志。

这标志着recv_buff 中的数据已经读取完(已处于低电平),下次对端再发送数据过来(变为高电平),就能再次触发可读事件。

1
2
3
4
5
6
7
8
for(; ; ;) { 
ret = read(connfd, buffer, sizeof(buffer));
if (ret == -1 && errno == EAGAIN) {
LOG(NOTICE) << "READ DONE";
break;
}
//...
}

但是又有一个问题:

  • read函数,什么情况下才会返回EAGAIN
  • 要是read 函数不返回EAGAIN,那么岂不是一直在while/for循环?

仅在connfd设置为非阻塞模式时,read函数无法从空的recv_buff继续读取到数据,此时错误码errno就会被设置为EAGAIN

可写事件

ET模式下,在与客户端建立连接后,可以为connfd注册可写事件,因为此时connfdsend_buff是空的,处于高电平,不会触发可写事件。

当服务端要通过connfd向客户端发送数据时,直接发送即可:

  • 如果应用层缓冲区outbuffer的数据大小小于send_buff大小,则无须任何操作;

    换言之,connfdsend_buff能容纳outbuffer中的全部数据,那么send_buff依然未满,即处于高电平状态。下次应用层有待发送的数据,直接发送即可;

  • 如果outbuffer中的数据大小大于send_buff大小,那么write(connfd, outbuffer, size)返回-1 且 errnoEAGAIN

    由于send_buff满了,即处于「低电平」状态,表示不可再接受来自应用层的数据。为了将outbuffer中剩余的数据也发送到对端,此时需要为connfd注册可写事件。

    send_buff的数据发送出去,则会变为「高电平」状态,此时就会触发connfd上的可写事件,进而就能继续发送outbuffer中剩下的数据了。

总结下,写操作要一直写到应用层outBuffer为空,或者write函数返回EAGAIN

上面的 epoll LT 模式注:如果数据未发送完毕,需要注册可写事件;可写事件触发后,尝试发送outBuffer中的剩余数据,如果数据此时还不能全部发送完,不用再次注册可写事件,若全部发送完毕,需要取消注册可写事件

如果是 epoll ET 模:如果数据未发送完毕,注册可写事件;可写事件触发后,尝试发送剩余数据,如果数据此时还不能全部发送完,需要再次注册可写事件,以便让可写事件下次再次触发,数据全部发送完毕,不用取消注册可写事件

LT or ET ?

说了这么多,那自己设计一个服务器,到底是选 ET 还是 LT ?

从个人的目前经验来说,看到的大多数都是LT

对于可读事件,ET模式只会触发一次epoll_wait,而LT模式下,如果不能一次性读取完recv_buff 中的数据,则会多次触发epoll_wait,增加系统调用开销。

如果我使用LT模式,且一次就将recv_buff 中的数据全部读取出来,那不也就只调用一次epoll_wait,不就和ET模式一样了?

此外,LT模式下的read函数可以少一次系统调用,因为ET模式下的read操作必须读取到返回EAGAIN,就多了一次系统调用开销。

这就是muduo设计了一个InputBuffer的原因,而在redis中也有个输入缓冲区。

muduo、libuv、redis等都是采用LT模式,其他库不太清楚。

更为重要的是,ET模式操作不当,容易造成数据漏读、甚至服务器阻塞等问题,而良好的设计的LT模式效率也依然很高。

下面我们从muduo源码角度还原上述过程。

muduo源码展示

确定好大的方向是「LT模式的epoll + 非阻塞IO」来设计异步通信之后。下面,我们就根据「网络编程」的前三期大致梳理下muduo服务端的源码。

监听客户端连接请求

当muduo的服务器TcpServer运行时,会先在Acceptor中创建一个非阻塞的listenfd,用于监听客户端的连接请求,即muduo中的acceptSocket_字段, 并为acceptSocket_注册可读事件、设置可读回调函数 Acceptor::handleRead

这样,服务端就能监听客户端cli的连接请求:

  • 当监听到客户端的连接请求后,在读取回调函数Acceptor::handleRead中为请求连接的客户端cli创建TcpConnection对象conn
  • 在众多sub-eventloops线程中,选择一个sub-loop线程,将conn分发到该sub-loop线程;
  • 以后服务端与该cli的通信,都在sub-loop线程中完成,而Acceptor所在的main-eventloop线程,只是负责监听客户端的连接请求。

如此,Accptor的作用即任务分发器dispatcher

整个框架逻辑如图。

1
2
3
4
5
6
7
8
9
10
11
12
13
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
acceptChannel_(loop, acceptSocket_.fd()),
listening_(false),
idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
assert(idleFd_ >= 0);
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(reuseport); // 设置端口复用
acceptSocket_.bindAddress(listenAddr); // 绑定地址
acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this)); // 可读事件回调函数
}

自然,分发任务的操作就是在可读事件的回调函数Acceptor::handleRead中完成的,其核心就是newConnectionCallback_函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void Acceptor::handleRead() {
loop_->assertInLoopThread();
InetAddress peerAddr;
int connfd = acceptSocket_.accept(&peerAddr); // 获取客户端的 ip:port
if (connfd >= 0) {
if (newConnectionCallback_) {
newConnectionCallback_(connfd, peerAddr);
}
else {
sockets::close(connfd);
}
}
else {
LOG_SYSERR << "in Acceptor::handleRead";
if (errno == EMFILE) {
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}

而回调函数newConnectionCallback_最终初始化为TcpServer类中的TcpServer::newConnection函数:

  • 创建TcpConnection对象 conn
  • TcpServer中的connections_记录着每个客户端,因此要把新创建的客户端记录在connections_中;
  • conn设置一些回调函数;
  • conn放到sub-eventloop中运行,以后服务器与该客户端的通讯就在ioLoop中进行了。

整个逻辑如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop(); // 从子线程中选择一个
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;

LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// 创建TcpConnection对象
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
connections_[connName] = conn;
// 设置相关的回调函数
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1));
// 放到 sub-eventloop 中运行
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

TcpConnection::connectEstablished回调函数中,为每个刚建立连接的TcpConnection对象注册可读事件,这是为了监听等待客户端的发送数据。

1
2
3
4
5
6
7
8
void TcpConnection::connectEstablished() {
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading(); // 注册可读事件
connectionCallback_(shared_from_this());
}

而这个conn的可读、可写等事件的回调函数在TcpConnection构造函数中就完成了初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),
state_(kConnecting),
reading_(true),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr),
highWaterMark_(64*1024*1024)
{
channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, _1)); // 读回调
channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this)); // 写回调
channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this)); // 关闭回调
channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this)); // 错误处理回调
LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this
<< " fd=" << sockfd;
socket_->setKeepAlive(true);
}

因此,当conn对应的客户端发送过来数据时,触发可读事件后,会调用TcpConnection::handleRead来进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}

到此,启动服务器到和客户端建立连接请求的过程、接受数据的流程大致结束了。

可写事件

在前面说过LT模式下的可写事件需要注意的点,下面顺着代码注释去看就好了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
void TcpConnection::sendInLoop(const void* data, size_t len) {
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}

/// 如果没有关注可写事件 且 outbuffer_ 中无待发送数据,说明之前的数据都已经写完
/// 对于 用户来说是向 outbuffer_ 中写,
/// 对于 socket 来说是从 outbuffer_ 中读取
if (!channel_->isWriting() && outbuffer_.readableBytes() == 0)
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
// 全部写完了,那么就执行写完成回调
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0
{
nwrote = 0;
/// socket 的发送缓冲已经满,无法将 outbuffer_ 的数据都复制到 socket 的 send_buff
/// 非阻塞模式下,函数返回 -1 && errno == EWOULDBLOCK
/// 如果错误码不是EWOULDBLOCK,那么是真的产生错误了,需要关闭连接
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
{
faultError = true;
}
}
}
}

assert(remaining <= len);
/// @brief: 运行到这有两种可能:
/// 1 之前没有注册可写事件,且 outbuffer_ 中没有可读取数据,
/// 运行到此是因为此次数据 data 没有发送完
/// 2 之前的数据没有发送完,又来了新的数据
/// 无论哪种情况,处理方式:
/// 将数据复制到 outbuffer_ 中,关注 EPOLLOUT 事件,等待可写事件触发,发送数据
if (!faultError && remaining > 0)
{
size_t oldLen = outbuffer_.readableBytes();
// 待发送的内容已经超过标志位了,就调用高水位这个函数
if (oldLen + remaining >= highWaterMark_ &&
oldLen < highWaterMark_ &&
highWaterMarkCallback_)
{
loop_->queueInLoop(std::bind(highWaterMarkCallback_,
shared_from_this(),
oldLen + remaining));
}
/// 将剩余的内容加如 outbuffer_
outbuffer_.append(static_cast<const char*>(data)+nwrote, remaining);

/// 在LT 模式下不需要重复关注可写事件,即使运行到此的第二种情况不需要再关注可写事件
if (!channel_->isWriting())
{
channel_->enableWriting();
}
}
}

/// @brief: 写回调函数
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
/// 触发写回调函数
/// 将 @b outbuffer_ 中的数据复制到 @b socket 的`send_buff`
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outbuffer_.peek(),
outbuffer_.readableBytes());
if (n > 0)
{
outbuffer_.retrieve(n);
/// 如果 @b outbuffer_ 中的数据全部写完,
/// 那么就可以取消关注 @b EPOLLOUT ,防止出现 busy loop
/// 并且调用写完成回调函数
/// 如果全部写完,此时就需要取消关注可写事件
/// 如果 @b outbuffer_ 中还有数据,就继续等待下次可写事件的触发
if (outbuffer_.readableBytes() == 0)
{
channel_->disableWriting();
if (writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}

到此,就差不多了。