剖析REDIS的rio

RIO为读写流提供了统一的接口

  • 不仅可以用于读写RDB、AOF,实现持久化
  • 也可以用于读写socket与pipe,实现主从同步任务

RIO

rio 用于操作不同读写流,因此需要提供统一的接口。

  • 操作缓冲区的方法:读写、当前位置、缓冲区

    1
    2
    3
    4
    size_t (*read) (struct _rio *, void *buf,       size_t len);
    size_t (*write)(struct _rio *, const void *buf, size_t len);
    off_t (*tell) (struct _rio *);
    int (*flush)(struct _rio *);
  • 计算校验和

    1
    2
    3
    void (*update_cksum)(struct _rio *, const void *buf, size_t len);

    uint64_t cksum;
  • 基于不同后端的 rio对象,为节省内存使用了union

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    union {
    // 内存
    struct { /*...*/ } buffer; // 支持读写

    // 文件fp
    struct { /*...*/ } file; // 支持读写,实现持久化

    // socketfd
    struct { /*...*/ } conn; // 仅支持读,实现主从复制

    // 文件fd
    struct { /*...*/ } fd; // 仅支持写,实现主从同步
    } io;

RIO的完整结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
struct _rio {
// 返回0表示错误,非0即成功
size_t (*read) (struct _rio *, void *buf, size_t len);
size_t (*write)(struct _rio *, const void *buf, size_t len);
off_t (*tell) (struct _rio *);
int (*flush)(struct _rio *);

// 校验和计算函数
void (*update_cksum)(struct _rio *, const void *buf, size_t len);

/* The current checksum and flags (see RIO_FLAG_*) */
uint64_t cksum, flags;

size_t processed_bytes; // 当前处理字节

size_t max_processing_chunk; // 单次最大/写字节数

/* Backend-specific vars. */
union {
/* In-memory buffer target. */
// 内存
struct {
sds ptr; // buffer
off_t pos; // 位置
} buffer;
/* Stdio file pointer target. */
// 文件fp
struct {
FILE *fp;
off_t buffered; /* Bytes written since last fsync. */
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;

/* Connection object (used to read from socket) */
// socketfd
struct {
connection* conn; /* Connection */
off_t pos; /* pos in buf that was returned */
sds buf; /* buffered data */
size_t read_limit; /* don't allow to buffer/read more than that */ // 读取字节数限制
size_t read_so_far; /* amount of data read from the rio (not buffered) */ // 目前已经读取字节数
} conn;

/* FD target (used to write to pipe). */
// 文件fd
struct {
int fd; /* File descriptor. */
off_t pos;
sds buf;
} fd;
} io;
};

下面分别基于四个后端,实现不同的 writereadtellflush操作

rioBufferIO

处于 buffer模式下,数据都是在内存中操作。结构体如下:

1
2
3
4
5
// 内存
struct {
sds ptr; // buffer
off_t pos; // 位置
} buffer;
  • ptr:向 ptr里写,or 从 ptr 里读
  • pps:当前读/写的位置

基于Buffer模式下的rio遍历如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 全局变量
static const rio rioBufferIO = {
rioBufferRead, // read
rioBufferWrite, // write
rioBufferTell, // tell
rioBufferFlush, // flush
NULL, /* update_checksum */

0, /* current checksum */
0, /* flags */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};

rioInitWithBuffer

初始化基于bufferrio对象

1
2
3
4
5
void rioInitWithBuffer(rio *r, sds s) {
*r = rioBufferIO;
r->io.buffer.ptr = s;
r->io.buffer.pos = 0;
}

下面是四个回调函数

rioBufferWrite

r->io.buf.ptr里写数据,直接在后面追加即可

1
2
3
4
5
static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len);
r->io.buffer.pos += len;
return 1;
}

rioBufferRead

r->io.buf.ptr 中读取数据到buf中,即将数据复制到buf中。

注意:不能同时读和写,只能用于一个,因此在读写操作中都是 r->io.buffer.pos += len

1
2
3
4
5
6
7
static size_t rioBufferRead(rio *r, void *buf, size_t len) {
if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
return 0; /* not enough buffer to return len bytes. */
memcpy(buf, r->io.buffer.ptr+r->io.buffer.pos, len);
r->io.buffer.pos += len;
return 1;
}

rioBufferTell

直接返回当前操作的位置即可

1
2
3
4
// 返回当前读写的位置
static off_t rioBufferTell(rio *r) {
return r->io.buffer.pos;
}

rioBufferFlush

buffer模式下,一直在内存中,不需要flush

1
2
3
4
5
6
/* Flushes any buffer to target device if applicable.
* Returns 1 on success and 0 on failures. */
static int rioBufferFlush(rio *r) {
UNUSED(r);
return 1; /* Nothing to do, our write just appends to the buffer. */
}

rioFileIO

基于文件流实现的rio对象 rioFileIO

1
2
3
4
5
6
7
8
9
10
11
12
static const rio rioFileIO = {
rioFileRead,
rioFileWrite,
rioFileTell,
rioFileFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* flags */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};

rioInitWithFile

rioFileIO 来初始化 rio 对象:

1
2
3
4
5
6
7
8
9
10
11
12
struct {
FILE *fp;
off_t buffered; /* Bytes written since last fsync. */ // 已向 fp 中写入的字节数
off_t autosync; /* fsync after 'autosync' bytes written. */ // buffered 达到 autosync,则flush一次
} file;

void rioInitWithFile(rio *r, FILE *fp) {
*r = rioFileIO;
r->io.file.fp = fp; // 文件流
r->io.file.buffered = 0;
r->io.file.autosync = 0;
}
rioSetAutoSync

为防止最后写完数据再同步,会对服务器造成较长时间的阻塞。因此,可以使用rioSetAutoSync函数,开启自动同步,即在向rio写入的过程中,每间隔累计超过byte个字节,就同步一次。

1
2
3
4
5
void rioSetAutoSync(rio *r, off_t bytes) {
if(r->write != rioFileIO.write)
return;
r->io.file.autosync = bytes;
}

rioFileWrite

buf中的数据写到 r->io.file.fp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 将数据写到文件r中
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
size_t retval;

retval = fwrite(buf, len, 1 ,r->io.file.fp);
r->io.file.buffered += len;

// 如果达到flush的标准,就flush一下
if (r->io.file.autosync &&
r->io.file.buffered >= r->io.file.autosync)
{
fflush(r->io.file.fp);
redis_fsync(fileno(r->io.file.fp)); // fdatasync(fileno(r->io.file.fp)),这是为了彻底将数据写入到文件
r->io.file.buffered = 0; // 置0
}
return retval;
}

rioFileRead

读取,则直接调用 fread

1
2
3
4
/* Returns 1 or 0 for success/failure. */
static size_t rioFileRead(rio *r, void *buf, size_t len) {
return fread(buf,len,1,r->io.file.fp);
}

rioConnIO

rioConnIO 是基于socket 实现的 rio对象,也有相应的读写函数

1
2
3
4
5
6
7
8
9
10
11
12
static const rio rioConnIO = {
rioConnRead,
rioConnWrite,
rioConnTell,
rioConnFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* flags */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};

rioInitWithConn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// socketfd
struct {
connection* conn; /* Connection */
off_t pos; /* pos in buf that was returned */ // 当前在buf的读取位置
sds buf; /* buffered data */ // 读缓冲区
size_t read_limit; /* don't allow to buffer/read more than that */ // 从 rioConnIO 中读取字节数限制
size_t read_so_far; /* amount of data read from the rio (not buffered) */ // 从 rioConnIO 中已读取字节数
} conn;

void rioInitWithConn(rio *r, connection *conn, size_t read_limit) {
*r = rioConnIO;
r->io.conn.conn = conn;
r->io.conn.pos = 0;
r->io.conn.read_limit = read_limit;
r->io.conn.read_so_far = 0;
r->io.conn.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN);
sdsclear(r->io.conn.buf);
}

rioFreeConn

释放内存,输入参数 reamining 决定是否保留 r->io.conn.buf的剩余空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void rioFreeConn(rio *r, sds *remaining) {
if (remaining && (size_t)r->io.conn.pos < sdslen(r->io.conn.buf)) {
if (r->io.conn.pos > 0)
sdsrange(r->io.conn.buf, r->io.conn.pos, -1);
// 将剩余空间,返回给 remaining
*remaining = r->io.conn.buf;
} else {
// 直接释放内存了
sdsfree(r->io.conn.buf);
if (remaining)
*remaining = NULL;
}
r->io.conn.buf = NULL;
}

rioConnWrite

1
2
3
4
5
6
7
// 没有写实现
static size_t rioConnWrite(rio *r, const void *buf, size_t len) {
UNUSED(r);
UNUSED(buf);
UNUSED(len);
return 0; /* Error, this target does not yet support writing. */
}

rioConnRead

要从 r->io.conn.buf 中读取len个字节,整个流程步骤如下

  1. 先看 r->io.conn.buf 当前的空间容量是否满足 len个字节,如果不满足,则需要扩容

    1
    2
    if (sdslen(r->io.conn.buf) + sdsavail(r->io.conn.buf) < len)
    r->io.conn.buf = sdsMakeRoomFor(r->io.conn.buf, len - sdslen(r->io.conn.buf));
  2. 如果 r->io.conn.buf中的剩余可读字节少于 len,并且加上剩余可用空间后也小于 len,则说明 r->io.conn.buf内存不足,则调整 r->io.conn.bufr->io.conn.pos,为后面从 r->io.conn接受数据准备

    1
    2
    3
    4
    if (avail < len && sdsavail(r->io.conn.buf) < len - avail) {
    sdsrange(r->io.conn.buf, r->io.conn.pos, -1);
    r->io.conn.pos = 0;
    }
  3. 如果 len 中的可读字节数小于 len,则不断地从 r->io.conn 接受数据,填充到 r->io.conn.buf,直至可读字节满足len

    1
    while (len > sdslen(r->io.conn.buf) - r->io.conn.pos) { /****/ }
  4. 可读字节数 sdslen(r->io.conn.buf) - r->io.conn.pos 满足 len 个字节,则可以将数据复制到 buf

    1
    2
    3
    4
    // 数据满足了,将 r->io.conn.buf的可读字节数复制到 buf 只能
    memcpy(buf, (char*)r->io.conn.buf + r->io.conn.pos, len);
    r->io.conn.read_so_far += len;
    r->io.conn.pos += len;

完整的过程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#define PROTO_IOBUF_LEN         (1024*16)  /* Generic I/O buffer size */

static size_t rioConnRead(rio *r, void *buf, size_t len) {
size_t avail = sdslen(r->io.conn.buf)-r->io.conn.pos; // 剩余可读字节数

/* If the buffer is too small for the entire request: realloc. */
// 如果 r 缓冲区总的长度都是小于 len
// 就需要realloc,使其容量是 len
if (sdslen(r->io.conn.buf) + sdsavail(r->io.conn.buf) < len)
r->io.conn.buf = sdsMakeRoomFor(r->io.conn.buf, len - sdslen(r->io.conn.buf));

/* If the remaining unused buffer is not large enough: memmove so that we
* can read the rest.
*
* 如果剩余可读字节数 avail 小于 len,加上 buf 中剩余空间还是小于 len
* 则调整 pos, 以及buf的start
*/
if (avail < len && sdsavail(r->io.conn.buf) < len - avail) {
sdsrange(r->io.conn.buf, r->io.conn.pos, -1);
r->io.conn.pos = 0;
}

/* If we don't already have all the data in the sds, read more */
// while循环连续的从 r->io.conn 中读取数据,直到 r->io.conn.buf 中可取字节数大于等于 len
// r->io.conn.pos 不变
while (len > sdslen(r->io.conn.buf) - r->io.conn.pos) {
size_t buffered = sdslen(r->io.conn.buf) - r->io.conn.pos; // 当前剩余可读字节
size_t toread = len - buffered; // 还需要再读取的字节数

/* Read either what's missing, or PROTO_IOBUF_LEN, the bigger of the two. */
if (toread < PROTO_IOBUF_LEN) toread = PROTO_IOBUF_LEN; // 按照大的来
// 确保待读取字节数 toread 不会超过 r->io.conn.buf 的剩余空间
if (toread > sdsavail(r->io.conn.buf)) toread = sdsavail(r->io.conn.buf);

if (r->io.conn.read_limit != 0 &&
r->io.conn.read_so_far + buffered + toread > r->io.conn.read_limit)
{
// 在没有读取 toread 个字节之前,还没超出限制
if (r->io.conn.read_limit >= r->io.conn.read_so_far - buffered)
// 在没有出限制的情况下,最大可读字节数
toread = r->io.conn.read_limit - r->io.conn.read_so_far - buffered;
else {
// 否则,超出限制则溢出错误
errno = EOVERFLOW;
return 0;
}
}
// 从 conn 中读取字节,填充到 io->conn.buf
int retval = connRead(r->io.conn.conn,
(char*)r->io.conn.buf + sdslen(r->io.conn.buf),
toread);
if (retval <= 0) {
if (errno == EWOULDBLOCK)
errno = ETIMEDOUT;
return 0;
}
sdsIncrLen(r->io.conn.buf, retval);
}

// 数据满足了,将 r->io.conn.buf的可读字节数复制到 buf 只能
memcpy(buf, (char*)r->io.conn.buf + r->io.conn.pos, len);
r->io.conn.read_so_far += len; // 每次从 rioConnIO 读写的字节数
r->io.conn.pos += len; // 更新在读 buf 的位置
return len;
}

rioFdIO

不同于 rioFileIO,这是以文件描述符 fd为接口。

1
2
3
4
5
6
7
8
9
10
11
12
static const rio rioFdIO = {
rioFdRead,
rioFdWrite,
rioFdTell,
rioFdFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* flags */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};

rioInitWithFd

1
2
3
4
5
6
7
8
9
10
11
12
13
// 文件fd
struct {
int fd; /* File descriptor. */
off_t pos; // 不是buf的当前写字节数,而是一共写入fd的字节数
sds buf;
} fd;

void rioInitWithFd(rio *r, int fd) {
*r = rioFdIO;
r->io.fd.fd = fd;
r->io.fd.pos = 0;
r->io.fd.buf = sdsempty();
}

rioFdIO中,读写都要先通过 io->fd.buf,即写数据先写入 io->fd.buf,相当于实现了fp类型的文件流。

rioFreeFd

1
2
3
4
/* release the rio stream. */
void rioFreeFd(rio *r) {
sdsfree(r->io.fd.buf);
}

rioFdWrite

buf中的数据写入文件 rioFdIO。涉及到写,则可能需要flush

  1. 写操作,是先将数据写入到缓冲区 io->fd.buf 。要根据buf中长度 len,查看是否需要先对 io->fd.buf 先执行一次flush

    • 如果 len > PROTO_IOBUF_LEN,则直接先将 io->fd.buf 中的数据发送出去,即flush一次

      1
      2
      3
      4
      5
      6
      7
      // 如果len很大,先flush
      if (len > PROTO_IOBUF_LEN) {
      if (sdslen(r->io.fd.buf)) {
      if (rioFdWrite(r, NULL, 0) == 0)
      return 0;
      }
      }
    • 否则,就将buf数据添加到 io->fd.buf中,再查看是否需要 flush

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      if (len) {
      r->io.fd.buf = sdscatlen(r->io.fd.buf, buf, len); // 直接添加到内部缓冲区
      if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN) // 超过限制,也flush
      doflush = 1;
      if (!doflush) // 不 flush,则直接返回
      return 1;

      // 说明需要flush
      // len==0 也是直接到此
      p = (unsigned char*) r->io.fd.buf;
      len = sdslen(r->io.fd.buf);
      }
  2. 运行至此,说明需要将 io->fd.buf中的数据发送出去了

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    size_t nwritten = 0;
    while(nwritten != len) {
    retval = write(r->io.fd.fd,p+nwritten,len-nwritten);
    if (retval <= 0) {
    /* With blocking io, which is the sole user of this
    * rio target, EWOULDBLOCK is returned only because of
    * the SO_SNDTIMEO socket option, so we translate the error
    * into one more recognizable by the user. */
    if (retval == -1 && errno == EWOULDBLOCK)
    errno = ETIMEDOUT;
    return 0; /* error. */
    }
    nwritten += retval;
    }
  3. 更新参数

    1
    2
    r->io.fd.pos += len;   	// 总写入字节数
    sdsclear(r->io.fd.buf);

完整的流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
ssize_t retval;
unsigned char *p = (unsigned char*) buf;
int doflush = (buf == NULL && len == 0);

/* For small writes, we rather keep the data in user-space buffer, and flush
* it only when it grows. however for larger writes, we prefer to flush
* any pre-existing buffer, and write the new one directly without reallocs
* and memory copying. */
if (len > PROTO_IOBUF_LEN) {
/* First, flush any pre-existing buffered data. */
// 如果数据很大,1直接flush
if (sdslen(r->io.fd.buf)) {
if (rioFdWrite(r, NULL, 0) == 0)
return 0;
}
/* Write the new data, keeping 'p' and 'len' from the input. */
} else {
if (len) {
// 否则是直接将数据添加到缓冲区 r->io.fd.buf
r->io.fd.buf = sdscatlen(r->io.fd.buf, buf, len);
// 超过限制,也flush
if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN)
doflush = 1;
if (!doflush)
return 1;
}
// 说明需要flush
// len==0 也是直接到此
/* Flusing the buffered data. set 'p' and 'len' accordintly. */
p = (unsigned char*) r->io.fd.buf;
len = sdslen(r->io.fd.buf);
}

size_t nwritten = 0;
while(nwritten != len) {
retval = write(r->io.fd.fd,p+nwritten,len-nwritten);
if (retval <= 0) {
/* With blocking io, which is the sole user of this
* rio target, EWOULDBLOCK is returned only because of
* the SO_SNDTIMEO socket option, so we translate the error
* into one more recognizable by the user. */
if (retval == -1 && errno == EWOULDBLOCK)
errno = ETIMEDOUT;
return 0; /* error. */
}
nwritten += retval;
}

r->io.fd.pos += len;
sdsclear(r->io.fd.buf);
return 1;
}

rioFdRead

没有实现读,即不支持读

1
2
3
4
5
6
7
/* Returns 1 or 0 for success/failure. */
static size_t rioFdRead(rio *r, void *buf, size_t len) {
UNUSED(r);
UNUSED(buf);
UNUSED(len);
return 0; /* Error, this target does not support reading. */
}

rioFdTell

总的已读字节数

1
2
3
4
/* Returns read/write position in file. */
static off_t rioFdTell(rio *r) {
return r->io.fd.pos;
}

rioGenericUpdateChecksum

校验和计算函数

1
2
3
4
// 用于计算校验和,可以用于使用在内部和文件流中
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
r->cksum = crc64(r->cksum,buf,len);
}

W & R

Error

读写有两种错误,在 rioWriterioRead发生错误时设置。

1
2
#define RIO_FLAG_READ_ERROR  (1<<0)	  	// 读错误
#define RIO_FLAG_WRITE_ERROR (1<<1) // 写错误

对错误标志位获取及改变

1
2
3
4
5
6
7
8
9
10
11
12
static inline int rioGetReadError(rio *r) {
return (r->flags & RIO_FLAG_READ_ERROR) != 0;
}

/* Like rioGetReadError() but for write errors. */
static inline int rioGetWriteError(rio *r) {
return (r->flags & RIO_FLAG_WRITE_ERROR) != 0;
}

static inline void rioClearErrors(rio *r) {
r->flags &= ~(RIO_FLAG_READ_ERROR|RIO_FLAG_WRITE_ERROR);
}

rioWrite

对写操作的高级封装,r->write底层是上面四种不同IO接口的其中之一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
if (r->flags & RIO_FLAG_WRITE_ERROR) return 0;

while (len) {
// 单次最大写字节数
size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ?
r->max_processing_chunk :
len;
// 是否有校验和函数
if (r->update_cksum)
r->update_cksum(r, buf, bytes_to_write);
if (r->write(r, buf, bytes_to_write) == 0) {
r->flags |= RIO_FLAG_WRITE_ERROR;
return 0;
}

buf = (char*)buf + bytes_to_write;
len -= bytes_to_write;
r->processed_bytes += bytes_to_write;
}

return 1;
}

rdbWriteRaw

rdbWriteRaw函数,在 rdb.c 中广泛使用,是 rioWrite的一个Wrapper

1
2
3
4
5
static int rdbWriteRaw(rio *rdb, void *p, size_t len) {
if (rdb && rioWrite(rdb, p,len) == 0)
return -1;
return len;
}

rioRead

读操作也个高级封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static inline size_t rioRead(rio *r, void *buf, size_t len) {
if (r->flags & RIO_FLAG_READ_ERROR) return 0;

while (len) {
size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ?
r->max_processing_chunk :
len;

if (r->read(r, buf, bytes_to_read) == 0) {
r->flags |= RIO_FLAG_READ_ERROR;
return 0;
}

if (r->update_cksum)
r->update_cksum(r, buf, bytes_to_read);

buf = (char*)buf + bytes_to_read; // 更新到末尾
len -= bytes_to_read;
r->processed_bytes += bytes_to_read;
}

return 1;
}