/* Connection object (used to read from socket) */ // socketfd struct { connection* conn; /* Connection */ off_t pos; /* pos in buf that was returned */ sds buf; /* buffered data */ size_t read_limit; /* don't allow to buffer/read more than that */// 读取字节数限制 size_t read_so_far; /* amount of data read from the rio (not buffered) */// 目前已经读取字节数 } conn;
/* FD target (used to write to pipe). */ // 文件fd struct { int fd; /* File descriptor. */ off_t pos; sds buf; } fd; } io; };
/* Flushes any buffer to target device if applicable. * Returns 1 on success and 0 on failures. */ staticintrioBufferFlush(rio *r){ UNUSED(r); return1; /* Nothing to do, our write just appends to the buffer. */ }
rioFileIO
基于文件流实现的rio对象 rioFileIO:
1 2 3 4 5 6 7 8 9 10 11 12
staticconst rio rioFileIO = { rioFileRead, rioFileWrite, rioFileTell, rioFileFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ };
rioInitWithFile
以 rioFileIO 来初始化 rio 对象:
1 2 3 4 5 6 7 8 9 10 11 12
struct { FILE *fp; off_t buffered; /* Bytes written since last fsync. */// 已向 fp 中写入的字节数 off_t autosync; /* fsync after 'autosync' bytes written. */// buffered 达到 autosync,则flush一次 } file;
/* Returns 1 or 0 for success/failure. */ staticsize_trioFileRead(rio *r, void *buf, size_t len){ returnfread(buf,len,1,r->io.file.fp); }
rioConnIO
rioConnIO 是基于socket 实现的 rio对象,也有相应的读写函数
1 2 3 4 5 6 7 8 9 10 11 12
staticconst rio rioConnIO = { rioConnRead, rioConnWrite, rioConnTell, rioConnFlush, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ };
rioInitWithConn
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// socketfd struct { connection* conn; /* Connection */ off_t pos; /* pos in buf that was returned */// 当前在buf的读取位置 sds buf; /* buffered data */// 读缓冲区 size_t read_limit; /* don't allow to buffer/read more than that */// 从 rioConnIO 中读取字节数限制 size_t read_so_far; /* amount of data read from the rio (not buffered) */// 从 rioConnIO 中已读取字节数 } conn;
/* If the buffer is too small for the entire request: realloc. */ // 如果 r 缓冲区总的长度都是小于 len // 就需要realloc,使其容量是 len if (sdslen(r->io.conn.buf) + sdsavail(r->io.conn.buf) < len) r->io.conn.buf = sdsMakeRoomFor(r->io.conn.buf, len - sdslen(r->io.conn.buf));
/* If the remaining unused buffer is not large enough: memmove so that we * can read the rest. * * 如果剩余可读字节数 avail 小于 len,加上 buf 中剩余空间还是小于 len * 则调整 pos, 以及buf的start */ if (avail < len && sdsavail(r->io.conn.buf) < len - avail) { sdsrange(r->io.conn.buf, r->io.conn.pos, -1); r->io.conn.pos = 0; }
/* If we don't already have all the data in the sds, read more */ // while循环连续的从 r->io.conn 中读取数据,直到 r->io.conn.buf 中可取字节数大于等于 len // r->io.conn.pos 不变 while (len > sdslen(r->io.conn.buf) - r->io.conn.pos) { size_t buffered = sdslen(r->io.conn.buf) - r->io.conn.pos; // 当前剩余可读字节 size_t toread = len - buffered; // 还需要再读取的字节数 /* Read either what's missing, or PROTO_IOBUF_LEN, the bigger of the two. */ if (toread < PROTO_IOBUF_LEN) toread = PROTO_IOBUF_LEN; // 按照大的来 // 确保待读取字节数 toread 不会超过 r->io.conn.buf 的剩余空间 if (toread > sdsavail(r->io.conn.buf)) toread = sdsavail(r->io.conn.buf);
如果 len > PROTO_IOBUF_LEN,则直接先将 io->fd.buf 中的数据发送出去,即flush一次
1 2 3 4 5 6 7
// 如果len很大,先flush if (len > PROTO_IOBUF_LEN) { if (sdslen(r->io.fd.buf)) { if (rioFdWrite(r, NULL, 0) == 0) return0; } }
否则,就将buf数据添加到 io->fd.buf中,再查看是否需要 flush
1 2 3 4 5 6 7 8 9 10 11 12
if (len) { r->io.fd.buf = sdscatlen(r->io.fd.buf, buf, len); // 直接添加到内部缓冲区 if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN) // 超过限制,也flush doflush = 1; if (!doflush) // 不 flush,则直接返回 return1; // 说明需要flush // len==0 也是直接到此 p = (unsignedchar*) r->io.fd.buf; len = sdslen(r->io.fd.buf); }
运行至此,说明需要将 io->fd.buf中的数据发送出去了
1 2 3 4 5 6 7 8 9 10 11 12 13 14
size_t nwritten = 0; while(nwritten != len) { retval = write(r->io.fd.fd,p+nwritten,len-nwritten); if (retval <= 0) { /* With blocking io, which is the sole user of this * rio target, EWOULDBLOCK is returned only because of * the SO_SNDTIMEO socket option, so we translate the error * into one more recognizable by the user. */ if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; return0; /* error. */ } nwritten += retval; }
/* For small writes, we rather keep the data in user-space buffer, and flush * it only when it grows. however for larger writes, we prefer to flush * any pre-existing buffer, and write the new one directly without reallocs * and memory copying. */ if (len > PROTO_IOBUF_LEN) { /* First, flush any pre-existing buffered data. */ // 如果数据很大,1直接flush if (sdslen(r->io.fd.buf)) { if (rioFdWrite(r, NULL, 0) == 0) return0; } /* Write the new data, keeping 'p' and 'len' from the input. */ } else { if (len) { // 否则是直接将数据添加到缓冲区 r->io.fd.buf r->io.fd.buf = sdscatlen(r->io.fd.buf, buf, len); // 超过限制,也flush if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN) doflush = 1; if (!doflush) return1; } // 说明需要flush // len==0 也是直接到此 /* Flusing the buffered data. set 'p' and 'len' accordintly. */ p = (unsignedchar*) r->io.fd.buf; len = sdslen(r->io.fd.buf); }
size_t nwritten = 0; while(nwritten != len) { retval = write(r->io.fd.fd,p+nwritten,len-nwritten); if (retval <= 0) { /* With blocking io, which is the sole user of this * rio target, EWOULDBLOCK is returned only because of * the SO_SNDTIMEO socket option, so we translate the error * into one more recognizable by the user. */ if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; return0; /* error. */ } nwritten += retval; }
/* Returns 1 or 0 for success/failure. */ staticsize_trioFdRead(rio *r, void *buf, size_t len){ UNUSED(r); UNUSED(buf); UNUSED(len); return0; /* Error, this target does not support reading. */ }
rioFdTell
总的已读字节数
1 2 3 4
/* Returns read/write position in file. */ staticoff_trioFdTell(rio *r){ return r->io.fd.pos; }