剖析REDIS的replication

为保证主从一致,replication主要实现了两件事:

  • 同步:将master的数据同步到slaves,此时主从一致
  • 传播:将导致master的数据库发生改变的操作传播到slaves,使得主从仍旧一致

SYNC

客户端向slave发SLAVEOF IP PORT,使得slave成为由(IP, PORT)指定的master的slave,slave会向master发送SYNC RUN_ID OFFSET指令,请求与master完成数据同步:

  • slave向master发生SYNC命令
  • master收到SYNC命令之后执行BGSAVE命令,生成RDB文件,并使用一个缓冲区记录从此刻开始执行的所有写命令
  • BGSAVE命令结束,master将RDB文件发送给slave,再将缓冲区的数据发送给slave

SYNC命令结束,主从此刻数据保持一致性。

SYNC指令,适用于slave初次复制master,即slave之前没复制过当前maste。但是,对于断线重新连接比较耗时、效率低,因此此时只是想复制断线期间的不同,而不是整个数据库,即,SYNC不支持同步master的部分数据,仅支持同步全部数据。因此,针对SYNC的缺陷,在REDIS2.8之后引入了部分重同步,新的协议是PSYNC

PSYNC

PSYNC具有完全重同步 (Full re-Synchronization)和部分重同步(Partial re-Synchronization)两种能力:

  • 完全重同步:用于slave初次复制master的情况,和SYNC命令一致。
  • 部分重同步:用于断线重连时的复制情况,此时slave应该复制断线期间master累计的写命令,就可以恢复主从数据一致性。

部分重同步,主要是由下面三个部分组成:

  1. server.repl_backlog:是一个固定长度循环数组,但可以看作无限长的数组,可以一直向server.repl_backlog中循环写入数据,只是读取不及时会被覆盖。由三个变量指示着server.repl_backlog的状态:

    +  server.repl_backlog_size:是server.repl_backlog的固定大小,默认是1M,可以由配置文件修改
    +  server.repl_backlog_histlen:是向server.repl_backlog中已经写入的数据长度
    +  server.repl_backlog_idx:是server.repl_backlog当前写指针的位置

    上面三个字段都不会超过server.repl_backloh_size的大小。

  2. 复制偏移量

    +  server.master_repl_offset:是当前写入server.repl_backlog的总共字节数,可以看着是写入的缓冲区的最后一个字节。每次主服务回应从服务器PSYNC请求,在完全重同步时,+FULLSYNC master_replid offset中的offsetserver.master_repl_offset
    +  server.repl_backlog_off:总是等于server.master_repl_offset-server.repl_backlog_histlen + 1,因此可以看作的缓冲区的第一个字节

    因此,当写入字节较多,会出现server.master_repl_offset > server.repl_backlog_size

    feedReplicationBacklog

    feedReplicationBacklog函数向server.repl_backlog中添加长度为len的数据 ptr的过程如下:

    + ` server.master_repl_offset `:表示一共向`server.repl_backlog`添加的字节数,
    + `server.repl_backlog_off`:表示当前待同步偏移量
    

    这两个是一直增长的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    void feedReplicationBacklog(void *ptr, size_t len) {
    unsigned char *p = ptr;

    server.master_repl_offset += len; //缓冲区的最后一个字节

    while(len) {
    size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
    if (thislen > len) thislen = len;
    memcpy(server.repl_backlog+server.repl_backlog_idx, p, thislen);
    server.repl_backlog_idx += thislen;

    // 从头开始写
    if (server.repl_backlog_idx == server.repl_backlog_size)
    server.repl_backlog_idx = 0;
    len -= thislen;
    p += thislen;
    server.repl_backlog_histlen += thislen;
    }

    if (server.repl_backlog_histlen > server.repl_backlog_size)
    server.repl_backlog_histlen = server.repl_backlog_size;

    // 1) 当 server.repl_backlog_histlen == server.repl_backlog_size 不再改变时,
    // 每次向 server.back_log 中写入字节数,总是会增加 server.master_repl_offset
    // 使得 server.repl_backlog_off 也是合理的增加
    //
    // 2) 当 server.repl_backlog_histlen < server.repl_backlog_size,
    // server.repl_backlog_off 总是不变
    server.repl_backlog_off = server.master_repl_offset -
    server.repl_backlog_histlen + 1;
    }

    addReplyReplicationBacklog

    addReplyReplicationBacklog函数,将[offset, server.master_repl_off]的数据添加到发送缓冲区,但是由于offsetserver.master_repl_off会大于server.repl_backlog_size,因此要经过一些转换将将offset转换到隔离范围内。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    long long addReplyReplicationBacklog(client *c, long long offset) {
    long long j, skip, len;

    serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset);

    // server.repl_backlog 没有待发送数据,则直接返回
    if (server.repl_backlog_histlen == 0) {
    serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
    return 0;
    }

    serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld", server.repl_backlog_size);
    serverLog(LL_DEBUG, "[PSYNC] First byte: %lld", server.repl_backlog_off);
    serverLog(LL_DEBUG, "[PSYNC] History len: %lld", server.repl_backlog_histlen);
    serverLog(LL_DEBUG, "[PSYNC] Current index: %lld",server.repl_backlog_idx);

    /*** 下面的变化,是为了将 j 转换到 server.repl_backlog_size 范围内 ***/

    // 要跳过的位置
    skip = offset - server.repl_backlog_off;
    serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);

    /* 由于可能 server.repl_backlog_off > server.repl_backlog_size,
    * 因此不能直接 j = server.repl_backlog_off,而需要转换
    *
    * x = (server.repl_backlog_size - server.repl_backlog_histlen)
    * 那么x是 server.repl_backlog 的剩余空闲空间,
    * j = server.repl_backlog_idx + x 则指向了读指针,
    * 即 j 是 server.repl_backlog_off 在 server.repl_backlog_size 范围内的表示
    */
    j = (server.repl_backlog_idx + (server.repl_backlog_size - server.repl_backlog_histlen)) %
    server.repl_backlog_size;
    serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);

    /* j = (j+skip) % server.repl_backlog_size; 使得 j 指向了 offset
    * 在 server.repl_backlog_size 范围的合理表示
    */
    j = (j + skip) % server.repl_backlog_size;

    // [offset, server.master_repl_off]之间的数据长度 len
    // 即 len = server.repl_backlog_histlen - skip
    // len 表示待发送的数据长度
    len = server.repl_backlog_histlen - skip;
    serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);

    /******************将 len个字节发送出去******************/
    // 如果 [j, server.repl_backlog_idx] 的长度 < len
    // 则说明,还有部分数据存在 server.repl_backlog的前面:
    // [0, len - (server.repl_backlog_size - j)]
    // 如果是上述情况,则分两次发送,否则一次发送
    while(len) {
    long long thislen = ((server.repl_backlog_size - j) < len) ?
    (server.repl_backlog_size - j) :
    len;

    serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
    addReplySds(c, sdsnewlen(server.repl_backlog + j, thislen));
    len -= thislen;
    j = 0; // 这是为了重新回到循环缓冲区前面
    }
    // 已发送字节数
    return server.repl_backlog_histlen - skip;
    }
  3. 服务器的运行Id

    每个服务器都有RUN_ID,每个RUN_ID都是由于40个随机的16进制字符。

    + 初次复制的时候或者之前执行过SLAVEOF NO ONE命令,slave不知道master的RUN_ID,此时发送的PSYNC指令是 **PSYNC ? -1**。master会将自己的RUN_ID发送给slave,slave会将其保存在server.replid
    + 断开重连后,slave向master发送PSYNC指令,master会检测PSYNC中的RUN_ID,是否与自己的RUN_ID一致。如果一致则执行部分重同步;如果RUN_ID不一致,则执行完全重同步。

    上面讲解了实现PSYNC指令在slave端的操作,下面讲解下master对PSYNC指令的回应。

    • 如果master判断要执行完全重同步,则回复slave: +FULLRESYNC  RUN_ID OFFSET,其中Run Id是master的运行I的,供Slave保存,OFFSET是主服务器当前的偏移量server.master_repl_offset,从服务器会将其作为自己的初始化偏移量。
    • 如果master判断要执行部分冲同步,则回复slave:+CONTINUE
    • 如果主服务器无法识别PSYNC命令,则回复slave:- ERR ,master的REdis版本低于2.8,slave需要发送SYNC指令。

主从服务器的同步过程大致如下图:

然后,由于在实际中经常会下面情况,在PSYNC之后又实现了PYSNC2指令:

  • 出现从服务器出现故障,使得复制信息丢失
  • master出现故障,导致主从切换,此时需要从多个从服务器中选择一个作为master,那么master的RUN_ID就会发生改变

这个时候,是无法执行部分重新同步,在REDIS4.0又提出了PSYNC2协议。

PYSNC2

PSYNC2主要进行了两点进行优化:

  1. 将主从复制信息持久化。将REDIS的复制信息给持久化到RDB文件中(关于RDB详细可见我的RDB设计分析),那么服务器重启的时候,就能加载RDB中的复制信息,依旧可以继续复制了。

    1
    2
    3
    4
    5
    6
    if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db) == -1) 
    return -1;
    if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid) == -1)
    return -1;
    if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset) == -1)
    return -1;
  2. 存储上一个主服务器的复制信息

    1
    2
    3
    4
    5
    6
    struct redisServer{
    //...
    char replid2[CONFIG_RUN_ID_SIZE+1];
    long long second_replid_offset;
    //...
    };

    当master发生故障,就调用shiftReplicationId函数使用server.replid2server.second_replid_offset来存储该master的复制信息。此外,判断部分同步的条件也会修改,代码下面叙述。

Slave端开始解析

replicaofCommand

在REDIS5.0开始,SLAVEOF指令和REPLAOF指令完全一致,都是调用replicaofCommand函数开始的,replicaofCommand函数,输入有两种参数:

  • SLAVEOF NO ONE:取消从服务器的复制行为
  • SLAVEOF IP PORT:使从服务器slave复制由(IP, PORT)指定主服务器。

SLAVEOF IP PORT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void replicaofCommand(client *c) {
//...
else {
long port;

// 如果这个slaveof指令是由于另一个slave发送的,而不是用户的客户端,
// 则无效
if (c->flags & CLIENT_SLAVE)
{
addReplyError(c, "Command is not valid when client is a replica.");
return;
}

if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
return;

// 已经是 server.masterhost 的从服务器了,也无效
if (server.masterhost &&
!strcasecmp(server.masterhost, c->argv[1]->ptr) &&
server.masterport == port)
{
serverLog(LL_NOTICE,
"REPLICAOF would result into synchronization "
"with the master we are already connected "
"with. No operation performed.");

addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
return;
}

// 将(ip,port)设置到slave的自己参数里,并断开相关连接
replicationSetMaster(c->argv[1]->ptr, port);
sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,
"REPLICAOF %s:%d enabled (user request from '%s')",
server.masterhost,
server.masterport,
client);

sdsfree(client);
}

addReply(c,shared.ok);
}

replicationSetMaster

replicationSetMaster函数的调用者可能是从服务器,也可能是主服务器,但目的都是将调用replicationSetMaster函数的服务器server变成由(IP, PORT)指定服务器的slave。步骤如下:

  1. 将要复制的master的(IP,PORT)信息存储到server.masterhosserver.masterport

  2. 如果server是slave,已经复制了某个主服务器,那么此次SLAVEOF命令使server复制到新的主服务器,则需要释放掉之前存储的server.master 信息,即freeClient(server.master)。进一步,server切换主服务后,那么之前状态下的被阻塞的客户端需要解除阻塞;

  3. 如果server也有子服务器sub-slaves,那么也要断开连接。这是因为之前sub-slaves是与server之间保持同步,现在server更改server.master了,如果sub-slaves也与server继续保持连接,会破坏sub-slaves的数据。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    void disconnectSlaves(void) {
    listIter li;
    listNode *ln;
    // 遍历,与所有的sub-slaves都断开连接
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
    freeClient((client*)ln->value);
    }
    }
  4. 如果server是slave,但是当前正在与之前的主服务器建立连接,那么就需要调用cancelReplicationHandshake函数断开连接,为新的连接准备。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    int cancelReplicationHandshake(void) {
    // 如果server正与主服务器进行同步,则中断
    if (server.repl_state == REPL_STATE_TRANSFER) {
    replicationAbortSyncTransfer();
    server.repl_state = REPL_STATE_CONNECT;
    }
    // 正在建立连接,也断开
    else if (server.repl_state == REPL_STATE_CONNECTING || slaveIsInHandshakeState()) {
    undoConnectWithMaster();
    server.repl_state = REPL_STATE_CONNECT;
    }
    else {
    return 0;
    }
    return 1;
    }
  5. 如果server是master,或者之前没有复制过任何服务器。那么调用replicationSetMaster函数会导致主从切换,即之前server是别的slave的master,现在server要变成(IP,PORT)服务器的slave,则要考虑缓存自己的信息:

    + 先调用replicationDiscardCachedMaster函数丢弃之前缓存在server.cached_master的信息·
    + 调用replicationCacheMasterUsingMyself函数将自己缓存在server.cached_master·

  6. 最后,将自己的状态设置为 REPL_STATE_CONNECT,这是主从建立的第一个状态。

完整的流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void replicationSetMaster(char *ip, int port) {
int was_master = server.masterhost == NULL;

sdsfree(server.masterhost);

server.masterhost = sdsnew(ip);
server.masterport = port;
if (server.master) {
freeClient(server.master);
}
// 因为从一个主服务器变成 server.masterhost 副本,
// 因此要解除原来从服务器中阻塞的客户端,关闭他们
disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */

/* Force our slaves to resync with us as well. They may hopefully be able
* to partially resync with us, but we can notify the replid change. */
disconnectSlaves();
cancelReplicationHandshake();

/* Before destroying our master state, create a cached master using
* our own parameters, to later PSYNC with the new master. */
if (was_master) {
replicationDiscardCachedMaster();
replicationCacheMasterUsingMyself();
}
//...

server.repl_state = REPL_STATE_CONNECT;
}

replicationDiscardCachedMaster

丢弃之前缓存在server.cached_master的信息。这个函数会被主服务master在上面调用,但是也会被从服务器slave调用,

待进一步解读

1
2
3
4
5
6
7
8
void replicationDiscardCachedMaster(void) {
if (server.cached_master == NULL) return;

serverLog(LL_NOTICE,"Discarding previously cached master state.");
server.cached_master->flags &= ~CLIENT_MASTER;
freeClient(server.cached_master);
server.cached_master = NULL;
}

replicationCacheMasterUsingMyself

要缓存主服务器的信息,有两种可能,一种是slave将自己的server.master缓存到server.cached_master,另一种是master把自己缓存到server.cached_master。顾名思义,replicationCacheMasterUsingMyself属于后者,只被主服务器master调用,因此调用此函数的server即master,执行步骤如下:

  • 缓存server的复制偏移量server.master_repl_offset

  • 由于主服务器的server.master==NULL,因此需要调用replicationCreateMasterClient函数创建一个server.master

    由于server缓存自己,不需要和自己建立连接通信,因此 replicationCreateMasterClient的第一个参数设置NULL.

    1
    replicationCreateMasterClient(NULL, -1);	 // 创建客户端 server.master
  • server将自己的RUN_ID缓存到server.master->replid

  • server.master断开所有的引用后,将server.master信息转移到server.cached_master。这是因为server变成另一个主服务器的slave,那么之前的引用就不再有效了,应该断开相关引用。

  • 由于master即将也变成slave,因此也要将 server.master=NULL

但是replicationCacheMasterUsingMyself函数会调用只有两种场景:

  1. 对一个从未复制过的REDIS服务器执行SLAVEOF命令,会将一个单机变成slave。此时,相当于对server.cached_master进行了一次初始化:

    1
    2
    3
    4
    5
    server.cached_master->conn =NULL;
    server.cached_master->flags |= CLIENT_MASTER;
    server.cached_master->reploff = 0;
    server.cached_master->read_reploff =0;
    server.cached_master->replid = NULL;
  2. 对一个master执行SLAVEOF命令,此时会将master变成slave。那么此时 replicationCacheMasterUsingMyself函数就是缓存当前master的信息。

完整的过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void replicationCacheMasterUsingMyself(void) {
serverLog(LL_NOTICE,
"Before turning into a replica, using my own master parameters "
"to synthesize a cached master: I may be able to synchronize with "
"the new master with just a partial transfer.");

server.master_initial_offset = server.master_repl_offset; // 初始化为0

// 创建 server.master
replicationCreateMasterClient(NULL, -1);

// 将server自己的id作为server.master->replid
memcpy(server.master->replid, server.replid, sizeof(server.replid));

/* Set as cached master. */
unlinkClient(server.master); // 断开所有的连接
server.cached_master = server.master; // 转移到server.cached_master中
server.master = NULL;
}

replicationCreateMasterClient

在介绍 replicationCreateMasterClient函数的实现之前,进一步介绍下server.master的作用。

master和自己的slave之间通信是通过socket实现,在REDIS中封装了网络通信的对象即struct client。因此当建立了主从连接时,在每个slave里都需要创建一个server.master,slave都是通过server.master与主服务器数据交互,相应的主服务中也有个链表结构server.slaves,保存着所有的从服务器的通信接口。

因此,主从服务器建立连接时会调用replicationCreateMasterClient函数,为每个slave创建一个server.master对象,而主服务器的server.master==NULL

replicationCreateMasterClient函数,创建server.master实例的步骤如下:

  1. 调用createClient()函数,创建客户端实例 server.master
  2. 如果调用 replicationCreateMasterClient的函数是个slave,那么传入的参数conn !=NULL,就和往常一样需要为conn注册事件,监听主服务发送给slave的数据,此时的可读事件回调函数依然是 readQueryFromClient。如果调用者是master,那么conn==NULL,就不需要注册可读事件。
  3. 给客户端server.master标志位CLIENT_MASTER
  4. 设置与主从复制相关的三个重要变量:
    • server.master->reploff:记录主服务器的偏移量
    • server.master->read_reploff:偏移量的最后一个字节
    • server.master->replid:主服务器的RUN_ID
  5. 如果初始化下,server.master_initial_offset ==-1, 表示发送的SYNC指令。主服务器版本低于REDIS2.8初始化位-1
  6. 只要dictid != -1,则会为server,master操作的数据库选择一个db

完整的过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void replicationCreateMasterClient(connection *conn, int dbid) {
server.master = createClient(conn);

// 为主服务器注册可读事件,可读回调函数也是 readQueryFromClient
if (conn)
connSetReadHandler(server.master->conn, readQueryFromClient);

server.master->flags |= CLIENT_MASTER; // 这个客户端是主服务器
server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset; // PSYNC初始时的偏移量
server.master->read_reploff = server.master->reploff;
server.master->user = NULL; /* This client can do everything. */

memcpy(server.master->replid, server.master_replid, sizeof(server.master_replid));
/* If master offset is set to -1, this master is old and is not
* PSYNC capable, so we flag it accordingly. */
if (server.master->reploff == -1)
server.master->flags |= CLIENT_PRE_PSYNC;
if (dbid != -1) selectDb(server.master, dbid);
}

connectWithMaster

前面的replicaofCommand函数,为后面的主从之间连接做好了准备,下面就要slave就要与master建立连接了。建立连接的任务由connectWithMaster函数完成,但是这个函数在 replicationCron中调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void replicationCron(void) {
//...
// 如果还没有发起连接请求,则发起
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,
"Connecting to MASTER %s:%d",
server.masterhost,
server.masterport);
// 与主服务器建立连接,此时从服务器就是主服务器的客户端
// 因此从服务器要向主服务器发起连接
if (connectWithMaster() == C_OK)
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
}
//...
}

connectWithMaster函数步骤如下:

  • 建立主从用于主从通信的connection对象server.repl_transfer_s
  • 调用connConnect注册连接回调函数,等待连接成功就调用syncWithMaster 来完成主从连接。
  • 将slave的状态更新为 REPL_STATE_CONNECTING

完整的过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int connectWithMaster(void) {
server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket();
if (connConnect(server.repl_transfer_s,
server.masterhost, server.masterport,
NET_FIRST_BIND_ADDR,
syncWithMaster) == C_ERR)
{
serverLog(LL_WARNING,
"Unable to connect to MASTER: %s",
connGetLastError(server.repl_transfer_s));
connClose(server.repl_transfer_s);
server.repl_transfer_s = NULL;
return C_ERR;
}

server.repl_transfer_lastio = server.unixtime;
server.repl_state = REPL_STATE_CONNECTING; // 更新状态
return C_OK;
}

syncWithMaster

REDIS定义了13个slave的与master的连接状态,执行SLAVEOF的slave必须从 REPL_STATE_NONE 状态变成 REPL_STATE_CONNECTED状态才算是成功建立连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#define REPL_STATE_NONE         0 
#define REPL_STATE_CONNECT 1
#define REPL_STATE_CONNECTING 2
/* --- Handshake states, must be ordered --- */
// 下面是主从的校验状态
#define REPL_STATE_RECEIVE_PONG 3
#define REPL_STATE_SEND_AUTH 4
#define REPL_STATE_RECEIVE_AUTH 5
#define REPL_STATE_SEND_PORT 6
#define REPL_STATE_RECEIVE_PORT 7
#define REPL_STATE_SEND_IP 8
#define REPL_STATE_RECEIVE_IP 9
#define REPL_STATE_SEND_CAPA 10
#define REPL_STATE_RECEIVE_CAPA 11
#define REPL_STATE_SEND_PSYNC 12
#define REPL_STATE_RECEIVE_PSYNC 13
/* --- End of handshake states --- */
// 校验成功后,
// 如果是完全重同步,还需要经过 REPL_STATE_TRANSFER 状态
// 如果是部分重同步,直接是 REPL_STATE_CONNECTED 状态
#define REPL_STATE_TRANSFER 14
#define REPL_STATE_CONNECTED 15

syncWithMaster函数,则是将slave从 REPL_STATE_CONNECTING 状态变成 REPL_STATE_CONNECTED,完成连接。下面逐个状态进行分析。

  1. syncWithMaster判断当前状态是 REPL_STATE_ONE,则不是由于上面回调过来的,而是由于SLAVEOF NO ONE触发了syncWithMaster函数则直接返回。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    void syncWithMaster(connection *conn) {
    char tmpfile[256], *err = NULL;
    int dfd = -1, maxtries = 5; // 最多尝试五次
    int psync_result;

    /* If this event fired after the user turned the instance into a master
    * with SLAVEOF NO ONE we must just return ASAP. */
    if (server.repl_state == REPL_STATE_NONE) {
    connClose(conn);
    return;
    }
    //...
    }

    此外,还需要判断connConnect函数是否调用成功,即socket是否连接成功

    1
    2
    3
    4
    5
    6
    if (connGetState(conn) != CONN_STATE_CONNECTED) {
    serverLog(LL_WARNING,
    "Error condition on socket for SYNC: %s",
    connGetLastError(conn));
    goto error;
    }
  2. REPL_STATE_CONNECTING to REPL_STATE_RECEIVE_PONG

当校验完成后,socket确实连接成功。

+ 注册可读事件,回调函数还是syncWithMaster,·等待master对PING的回应
+ 取消可写事件,因为此次要发送的数据只有PING,四个字符,不存在发送不完
+ 将slave的状态转变为 REPL_STATE_RECEIVE_PONG
+ slave基于conn向master发送PING

注意:执行完就直接return了,也就是说下次调用 syncWithMaster 则应该服务器对PING回应

1
2
3
4
5
6
7
8
9
10
11
12
// 发送 PING 给主服务器
if (server.repl_state == REPL_STATE_CONNECTING) {
serverLog(LL_NOTICE, "Non blocking connect for SYNC fired the event.");
// 由于要等待服务器的 PONG 回应,因此要注册可读事件,
// 同时要取消可写事件,因为暂时没有可写数据要发送
connSetReadHandler(conn, syncWithMaster);
connSetWriteHandler(conn, NULL);
server.repl_state = REPL_STATE_RECEIVE_PONG;
err = sendSynchronousCommand(SYNC_CMD_WRITE, conn, "PING", NULL);
if (err) goto write_error;
return;
}
  1. REPL_STATE_RECEIVE_PONG to REPL_STATE_SEND_AUTH

    slave接受到master对PING的回应后,先校验有效性,两种是有效的

    + +PONG:有效
    + -NOAUTH:有效,不设置认证
    + -ERR operation not permitted:在REDIS低于2.8时回复

    如果没有发生错误,则将状态转为 REPL_STATE_SEND_AUTH

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
    err = sendSynchronousCommand(SYNC_CMD_READ, conn, NULL);

    // 错误回复,
    if (err[0] != '+' &&
    strncmp(err,"-NOAUTH",7) != 0 &&
    strncmp(err,"-ERR operation not permitted",28) != 0)
    {
    serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
    sdsfree(err);
    goto error;
    }
    else {
    serverLog(LL_NOTICE, "Master replied to PING, replication can continue...");
    }

    sdsfree(err);
    // 进入下一个状态
    server.repl_state = REPL_STATE_SEND_AUTH;
    }

    上面的if分支没有return,而是直接向下执行,根绝server.masteruser server.masterauth查看是否需要设置认证。,如果需要则向master发送AUTH信息,进入下一步;否则则直接进入第5步。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    if (server.repl_state == REPL_STATE_SEND_AUTH) {
    if (server.masteruser && server.masterauth) {
    err = sendSynchronousCommand(SYNC_CMD_WRITE,
    conn,
    "AUTH", server.masteruser, server.masterauth,
    NULL);
    if (err) goto write_error;
    server.repl_state = REPL_STATE_RECEIVE_AUTH;
    return;
    } else if (server.masterauth) {
    err = sendSynchronousCommand(SYNC_CMD_WRITE,
    conn,
    "AUTH", server.masterauth,
    NULL);
    if (err) goto write_error;
    server.repl_state = REPL_STATE_RECEIVE_AUTH;
    return;
    } else {
    // 否则,如果没有设置密切认证,则进入下一个分支
    server.repl_state = REPL_STATE_SEND_PORT;
    }
    }
  2. REPL_STATE_SEND_AUTH to REPL_STATE_RECEIVE_AUTH

如果设置了密码认证,则会在第4步会验证master的回复。如果没有错误、校验成功后,再将slave状态转换为 REPL_STATE_SEND_PORT

1
2
3
4
5
6
7
8
9
10
if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
err = sendSynchronousCommand(SYNC_CMD_READ, conn, NULL);
if (err[0] == '-') {
serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_PORT;
}
  1. REPL_STATE_SEND_PORT to REPL_STATE_RECEIVE_PORT

在这里,slave向master发送 REPLCONF listening-port PORT 信息,告诉master自己的监听端口。向master发送PORT,以及后面的IP只是为了INFO指令信息。

如果没有发生写错误,将状态转换为REPL_STATE_RECEIVE_PORT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if (server.repl_state == REPL_STATE_SEND_PORT) {
int port;
// 设置 port
// server.slave_announce_port 默认是 0
// server.tls_replication 与 server.tls_port 默认也是 0
// port = server.port 默认值是 6379
if (server.slave_announce_port) port = server.slave_announce_port;
else if (server.tls_replication && server.tls_port) port = server.tls_port;
else port = server.port;
sds portstr = sdsfromlonglong(port);
err = sendSynchronousCommand(SYNC_CMD_WRITE,
conn,
"REPLCONF", "listening-port",portstr,
NULL);
sdsfree(portstr);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_PORT;
return;
}
  1. REPL_STATE_RECEIVE_PORT to REPL_STATE_SEND_IP

等待master对 REPLCONF listening-port PORT 的回复,没有错误则进入下一个状态 REPL_STATE_SEND_IP

1
2
3
4
5
6
7
8
9
10
11
12
if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
err = sendSynchronousCommand(SYNC_CMD_READ, conn, NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Master does not understand REPLCONF listening-port: %s",
err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_IP;
}

上面是if分支是没有return的,因此直接进入下面的分支。在此判断是否要跳过IP认证阶段,默认情况下,不把ip地址发送master。

如果修改配置文件,server.slave_announce_ip 不是 NULL,则进入下一步,否则进入第9步

1
2
3
4
5
// server.slave_announce_ip 默认值是 NULL
if (server.repl_state == REPL_STATE_SEND_IP && server.slave_announce_ip == NULL)
{
server.repl_state = REPL_STATE_SEND_CAPA;
}
  1. REPL_STATE_SEND_IP to REPL_STATE_RECEIVE_IP

    向master发送 REPLCONF ip-address ip

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    if (server.repl_state == REPL_STATE_SEND_IP) {
    err = sendSynchronousCommand(SYNC_CMD_WRITE,
    conn,
    "REPLCONF", "ip-address", server.slave_announce_ip,
    NULL);
    if (err) goto write_error;
    sdsfree(err);
    server.repl_state = REPL_STATE_RECEIVE_IP;
    return;
    }
  2. REPL_STATE_RECEIVE_IP to REPL_STATE_SEND_CAPA

    在这忽略错误,因为不是所有的REDIS版本都支持 REPLCONF ip-address ip 命令。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    if (server.repl_state == REPL_STATE_RECEIVE_IP) {
    err = sendSynchronousCommand(SYNC_CMD_READ, conn, NULL);
    /* Ignore the error if any, not all the Redis versions support
    * REPLCONF listening-port. */
    if (err[0] == '-') {
    serverLog(LL_NOTICE,
    "(Non critical) Master does not understand REPLCONF ip-address: %s",
    err);
    }
    sdsfree(err);
    server.repl_state = REPL_STATE_SEND_CAPA;
    }
  3. REPL_STATE_SEND_CAPA to REPL_STATE_RECEIVE_CAPA

    CAPAd是单词capabilities的缩写,表示slave具有的能力。slave向master发送 REPLCONF capa eof capa pasync2

    • eof:表示slave支持 dickless 方式的同步数据流程,即master可以直接将server.db中的键值以RDB协议格式向pipe[1]里写,slave从pipe[0]中读取数据,而不需要经过先序列化为RDB文件,再将RDB文件发送到slave的过程。
    • pasync2:即slave支持PSYNC2协议。对于slave的PSYNC请求,主服务器发生故障,发生主从切换后执行部分重同步,支持PYSNC2协议回复的是 +CONTINUE new_repl_id,如果不支持PYSNC2协议回复的是+CONTINUE

    再将slave状态转向为 REPL_STATE_RECEIVE_CAPA

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    if (server.repl_state == REPL_STATE_SEND_CAPA) {
    err = sendSynchronousCommand(SYNC_CMD_WRITE,
    conn,
    "REPLCONF", "capa", "eof", "capa", "psync2",
    NULL);
    if (err) goto write_error;
    sdsfree(err);
    server.repl_state = REPL_STATE_RECEIVE_CAPA;
    return;
    }
  4. REPL_STATE_RECEIVE_CAPA to REPL_STATE_SEND_PSYNC

    上面slave告诉master自己具备的功能,但是如果master的REDIS版本较低无法识别这些功能,那么就会回复slave错误,但是这不是致命错误,因此仍旧可以继续向下运行。

    继续将 slave状态设置为 REPL_STATE_SEND_PSYNC

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
    err = sendSynchronousCommand(SYNC_CMD_READ, conn, NULL);
    if (err[0] == '-') {
    serverLog(LL_NOTICE,
    "(Non critical) Master does not understand REPLCONF capa: %s",
    err);
    }
    sdsfree(err);
    server.repl_state = REPL_STATE_SEND_PSYNC;
    }
  5. REPL_STATE_SEND_PSYNC to REPL_STATE_RECEIVE_PSYNC

    在此:

    • slave调用函数slaveTryPartialResynchronization函数向master发起PSYNC请求,
    • 将状态转变为REPL_STATE_RECEIVE_PSYNC

    这里,是有return返回的,因为需要等待master对PYSNC指令的回应。下次调用syncWithMaster函数状态应该是 REPL_STATE_RECEIVE_PSYNC

    1
    2
    3
    4
    5
    6
    7
    8
    9
    if (server.repl_state == REPL_STATE_SEND_PSYNC) {
    // 向主服务器发起 PSYNC 请求
    if (slaveTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) {
    err = sdsnew("Write error sending the PSYNC command.");
    goto write_error;
    }
    server.repl_state = REPL_STATE_RECEIVE_PSYNC;
    return;
    }

slaveTryPartialResynchronization

先中断下介绍 slaveTryPartialResynchronization函数怎么发起PYSNC请求的,再续上面的状态变化。此函数的read_reply选项,决定着slave从conn里读取master的回复还是向master写数据。

发送PSYNC请求

read_reply ==0 时slave向master发送PSYNC replid offset命令,请求同步数据。

在slave端,根据 server.cached_master 来判断是否发起部分重同步PSYNC请求:

  • server.cached_master !=NULL:说明在此次启动slave之前,slave已复制过一个主服务器,因故障导致下线时将该主服务的信息缓存在serer.cached_master

    • 之前主服务器的replidserver.cached_master->replid
    • 与之前主服务的复制偏移量offsetserver.cached_master->reploff+1

    因此,当slave向新的主服务器master(由(IP,PORT)指定)发送PSYNC replid offset时,master会根据replid以及offset判断,是否能执行部分重同步。如果能,则回复+CONTINUE;不能则执行完全重同步。

  • server.cached_master ==NULL,则直接发送PSYNC ? -1,请求执行完全重同步

如果没有发生错误,则返回 PSYNC_WAIT_REPLY状态,否则返回 PSYNC_WRITE_ERROR 状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
int slaveTryPartialResynchronization(connection *conn, int read_reply) {
char *psync_replid;
char psync_offset[32];
sds reply;

if (!read_reply) {
// 表示当前的 master_run_id 不是有效的,
server.master_initial_offset = -1;

if (server.cached_master) {
psync_replid = server.cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
serverLog(LL_NOTICE,
"Trying a partial resynchronization (request %s:%s).",
psync_replid, psync_offset);
}
else {
serverLog(LL_NOTICE,
"Partial resynchronization not possible (no cached master)");
psync_replid = "?";
memcpy(psync_offset,"-1",3);
}

// 发送 PSYNC 指令
reply = sendSynchronousCommand(SYNC_CMD_WRITE,
conn,
"PSYNC", psync_replid, psync_offset, // PSYNC replid offset
NULL);
if (reply != NULL) {
serverLog(LL_WARNING,
"Unable to send PSYNC to master: %s",
reply);
sdsfree(reply);
connSetReadHandler(conn, NULL); // 取消注册可读事件
return PSYNC_WRITE_ERROR;
}

return PSYNC_WAIT_REPLY; // 成功发送 PSYNC 指令,等待主服务器回复
}

// ...
// 读取
}

syncWtithMaster函数中,发出了PSYNC请求后,如果再次调用syncWtithMaster函数,则会先判断slave的状态,此时应该还是REPL_STATE_RECEIVE_PSYNC

1
2
3
4
5
6
7
if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) {
serverLog(LL_WARNING,
"syncWithMaster(): state machine error,state should be RECEIVE_PSYNC but is %d",
server.repl_state);
goto error;
}

校验成功后,slave则进入等待master回应PSYNC请求的阶段

1
psync_result = slaveTryPartialResynchronization(conn, 1);

读取PYSNC请求回复

read_reply ==1时,slave 等待并读取master对 PSYNC命令的回应。返回的标志位有以下几种有效可能:

1)PSYNC_WAIT_REPLY:这种是读取到一个空行

这个返回值和上面的写PYSNC的返回值是一样的,都需要再次调用slaveTryPartialResynchronization函数读取mastet的回应。

1
2
3
4
5
6
7
8
9
10
11
12
13
int slaveTryPartialResynchronization(connection *conn, int read_reply) {
// ....

/*********** 下面是读取 ***************/
reply = sendSynchronousCommand(SYNC_CMD_READ, conn, NULL);
// 主服务器发送给从服务器的是一个空行
// 则继续等待回应
if (sdslen(reply) == 0) {
sdsfree(reply);
return PSYNC_WAIT_REPLY;
}
//...
}

2)PSYNC_FULLRESYNC:说明master经过判断,对slave执行完全重同步。

此时,回应slave的消息是 +FULLRESYNC replid offset,其中replid是master的RUN_ID,offset是复制偏移量server.master_repl_offset

replidoffset解析正确后,函数返回 PSYNC_FULLRESYNC,标志着master即将要和自己(slave)进行完全重同步操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
int slaveTryPartialResynchronization(connection *conn, int read_reply) {
// ....
// 如果已经读取到数据,则取消注册可读事件
connSetReadHandler(conn, NULL);

// 完全重同步回应:FUNLLRESYNC replid offset
// 解析 replid offset
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *replid = NULL, *offset = NULL;

replid = strchr(reply,' ');
if (replid) {
replid++;
offset = strchr(replid,' ');
if (offset) offset++;
}

// 以下两种回复都是语法错误:
// + replid 和 offset 有一个不存在
// + replid 没有40个字符
if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING, "Master replied with wrong +FULLRESYNC syntax.");
// master回复了FULLRESYNC,说明master是支持PYSNC协议的,但是格式不对
// 则将replid设置为0,使得下次调用PYSNC失败
memset(server.master_replid, 0, CONFIG_RUN_ID_SIZE+1);
}
else {
// 将主服务器的replid填充到 server.master_replid
memcpy(server.master_replid, replid, offset-replid-1);
server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
// 将主服务的复制缓冲区偏移量 offset 填充到 server.master_initial_offset
// initial 体现在第一次,即这是个完全重同步操作
server.master_initial_offset = strtoll(offset,NULL,10);

serverLog(LL_NOTICE,
"Full resync from master: %s:%lld",
server.master_replid,
server.master_initial_offset);
}

/* We are going to full resync, discard the cached master structure. */
// 由于将要去执行完全重同步,说明之前的master已经可能失效了
// 因此要丢弃 server.cached_master,等完全重更新完,再重新缓存 server.cached_master
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC; // 表示即将要完全重同步
}
//...
}

3)PSYNC_CONTINUE: master回应slave+CONTINUE,标志master同意进行部分重同步操作。

考虑到PYSNC2协议,判断 CONTINUE 后面是否附带new_replid。如果有,则说明slave当前请求的master是新选出的主服务器,server.cached_master是宕机之前的主服务器,那么slave需要更新下主服务的信息:

  1. server.replid2来存储PSYNC replid offset中的replidserver.second_replid_offset存储offset
  2. server.replidserver.cached_master->replid更新为新主服务器的运行ID:new_replid
  3. slave更改主服务器了,调用disconnectSlaves函数断开之前在旧服务器下建立的sub-slaves

到此,要为开始部分重同步做准备,要完成下面两个关键步骤:

  1. 要将server.cached_master的信息移动到server.master,因为slave后期还要依赖于server.master与master进行数据通信

  2. 如果么有复制缓冲区server.repl_backlog(比如首次调用就没有),要创建复制缓冲区

    最后,返回PSYNC_CONTINUE标志位,告诉slave要开始部分重同步了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
int slaveTryPartialResynchronization(connection *conn, int read_reply) {
// ....
// +CONTINUE replid \r\n
if (!strncmp(reply,"+CONTINUE",9)) {
serverLog(LL_NOTICE, "Successful partial resynchronization with master.");

char *start = reply+10;
char *end = reply+9;
while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
// 表示支持PSYNC2协议
if (end-start == CONFIG_RUN_ID_SIZE) {
char new[CONFIG_RUN_ID_SIZE+1]; // 新的 replid
memcpy(new, start, CONFIG_RUN_ID_SIZE);
new[CONFIG_RUN_ID_SIZE] = '\0';

// 如果和之前已经缓存的主服务器replid不等
// 即,主服务器发送了变化
if (strcmp(new, server.cached_master->replid)) {

serverLog(LL_WARNING,"Master replication ID changed to %s",new);

/* 1) Set the old ID as our ID2, up to the current offset+1. */
memcpy(server.replid2,
server.cached_master->replid,
sizeof(server.replid2));
// 2)
server.second_replid_offset = server.master_repl_offset+1;

/* 3) Update the cached master ID and our own primary ID to the
* new one. */
memcpy(server.replid, new, sizeof(server.replid));
memcpy(server.cached_master->replid, new, sizeof(server.replid));

/* 4) Disconnect all the sub-slaves: they need to be notified. */
disconnectSlaves();
}
}

/* Setup the replication to continue. */
sdsfree(reply);

// 将 server.cached_master 信息移动到 server.master
// 并且为 conn 注册可读事件,
// 如果有带发送的缓冲区,也注册可写事件
replicationResurrectCachedMaster(conn);

// 创建复制缓冲区
if (server.repl_backlog == NULL)
createReplicationBacklog();
// 下面等待部分重同步
return PSYNC_CONTINUE;
}
// ...
}

replicationResurrectCachedMaster

replicationResurrectCachedMaster函数,用于将server.cached_master转移server.master,明显这个函数只会在slave中调用,而调用的时机也是因为要执行数据同步了,slave需要server.master与master进行数据交互,因此要执行的步骤大致如下:

  1. 将之前的server.cached_master转移到server.master中。
  2. 要将slave的状态由REPL_STATE_RECEIVE_PSYNC转为REPL_STATE_CONNECTED
  3. server.master->conn注册可读事件,使用回调函数readQueryFromClient处理master的部分重同步数据,把部分重同步数据解析为普通客户端的写指令,再对slave数据库执行一遍
  4. 如果server.master的发送缓冲区中有数据,则也注册可写事件

经过上述步骤,slave状态也是REPL_STATE_CONNECTED

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void replicationResurrectCachedMaster(connection *conn) {
server.master = server.cached_master;
server.cached_master = NULL;
server.master->conn = conn;
connSetPrivateData(server.master->conn, server.master);
// 去掉两个标志位,因为这个server.master才开启生命周期
server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
server.master->authenticated = 1;
server.master->lastinteraction = server.unixtime; // 最近的交互时间
server.repl_state = REPL_STATE_CONNECTED; // 已经变成连接状态了
server.repl_down_since = 0; // 上线

// 建立连接后,那么主服务器也是从服务器的客户端
// 因此,将 server.master 添加到当前从服务器的客户端链表 server.clients 中
linkClient(server.master);

// 为客户端(即主服务器)注册可读事件,监听从服务器的请求
if (connSetReadHandler(server.master->conn, readQueryFromClient)) {
serverLog(LL_WARNING,
"Error resurrecting the cached master, impossible to add the readable handler: %s",
strerror(errno));
freeClientAsync(server.master); /* Close ASAP. */
}

/* We may also need to install the write handler as well if there is
* pending data in the write buffers.
*
* 如果主服务器的发送缓冲区中有数据,则注册可写事件,
* 等待可写事件发送,将其发送给从服务器
*/
if (clientHasPendingReplies(server.master)) {
if (connSetWriteHandler(server.master->conn, sendReplyToClient)) {
serverLog(LL_WARNING,
"Error resurrecting the cached master, "
"impossible to add the writable handler: %s",
strerror(errno));
freeClientAsync(server.master); /* Close ASAP. */
}
}
}

PYSNC_CONTINUE

下面将视线再拉回到sysnWithMaster函数中, sysnWithMaster函数需要对 slaveTryPartialResynchronization的读取PSYNC的返回标志位结果进行处理,错误标志不讲解了。

1
2
3
4
5
6
if (psync_result == PSYNC_CONTINUE) {
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
//...
return;
}

那么slave是从哪里接收到master的部分重同步的数据? 是在replicationResurrectCachedMaster函数中为serevr.master->conn 注册的可读事件回调函数中。

readQueryFromClient中,如果发现是从master发送过来的数据,那么就增加读取偏移量 c->read_reploff,而数据仍在c->querybuf中,被解析后在processCommandAndResetClient中执行,使得主从一致。

1
2
if (c->flags & CLIENT_MASTER) 
c->read_reploff += nread;

slave端读取到部分重同步数据之后,把数据解析成指令再执行就使得主从数据同步,其具体解析到执行的过程可以参考 剖析REDIS输入缓冲区设计

此外,slave在接收到部分重同步数据,完成了同步之后,会继续commandProcessed函数中,会进一步调用replicationFeedSlavesFromMasterStream函数把这个数据传递给sub-slaves。

1
2
3
if (processCommand(c) == C_OK) {
commandProcessed(c);
}

commandProcessed

commandProcessed函数主要完成三个任务:

  • 每次在接收到部分重同步数据之后,都要更新下复制偏移量server.master->read_reploff,此处的客户端 cserver.master
  • 非阻塞模式,都会resetClient
  • 需要递归传播,即将slave从master接受到的数据传递给sub-slaves
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void commandProcessed(client *c) {
long long prev_offset = c->reploff;
// 更新偏移量
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
}

if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
resetClient(c);

// 传播
if (c->flags & CLIENT_MASTER) {
long long applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf,
applied);
sdsrange(c->pending_querybuf,applied, -1);
}
}
}

replicationFeedSlavesFromMasterStream

slave作为自己的sub-slave的主服务,因为将数据传递到sub-slave时原理和主服务器传递给自己类似:

  • 1)写入 server.repl_backlog
  • 2)写入 sub_slave->conn,发送给sub-slave
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) {
listNode *ln;
listIter li;

// for debug
if (0) {
printf("%zu:",buflen);
for (size_t j = 0; j < buflen; j++) {
printf("%c", isprint(buf[j]) ? buf[j] : '.');
}
printf("\n");
}

// 写入到 serveer.repl_backlog 中
if (server.repl_backlog) feedReplicationBacklog(buf,buflen);
listRewind(slaves,&li);

// 发送给所有的sub-slaves
while((ln = listNext(&li))) {
client *slave = ln->value;

if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
// 发送到sub-slaves
addReplyProto(slave,buf,buflen);
}
}

PSYNC_FULLRESYNC

下面继续讲解syncWithMaster函数中的完全重同步。

  • 在高版本的REDIS中,支持dickles方式的传输RDB数据。如果不支持,则需要在slave端创建一个rdb文件用于接收master的RDB协议格式的数据;
    • server.repl_transfer_tmpfile:表示RDB文件名
    • server.repl_transfer_fd:rdb文件对应的文件描述符
  • conn设置可读事件,等待master的全同步数据,读回调函数是readSyncBulkPayload,完成读取master的RDB数据以及加载到slave中。

全同步的准备如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void syncWithMaster(connection *conn) {
// ...
// useDisklessLoad() 返回0,则主服务器是将当前数据库状态先存储到磁盘,
// 再将rdb文件发送给从服务器,因此从服务器要先创建个本地文件
if (!useDisklessLoad()) {
while(maxtries--) {
snprintf(tmpfile, 256, "temp-%d.%ld.rdb", (int)server.unixtime, (long int)getpid());
dfd = open(tmpfile, O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
serverLog(LL_WARNING,
"Can't Opening the temp file needed for MASTER <-> REPLICA sync: %s",
strerror(errno));
goto error;
}

server.repl_transfer_tmpfile = zstrdup(tmpfile); // 文件名
server.repl_transfer_fd = dfd; // 对应的fd
}

/* Setup the non blocking download of the bulk file. */
// 设置可读事件,等待rdb数据到来
if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) {
char conninfo[CONN_INFO_LEN];
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (%s)",
strerror(errno),
connGetInfo(conn, conninfo, sizeof(conninfo)));
goto error;
}

server.repl_state = REPL_STATE_TRANSFER; // 传递文件状态
// 下面三个字段适用于slave端选择将数据先存储到rdb中
server.repl_transfer_size = -1; // Rdb文件大小
server.repl_transfer_read = 0; // 已读取字节数
server.repl_transfer_last_fsync_off = 0; // 已经同步字节数

server.repl_transfer_lastio = server.unixtime; // 最近读取操作的时间
return;
//...
}

readSyncBulkPayload

如果这是首次同步数据,比如建立连接时同步数据,master在给slave发送同步数据之前, 会先发送一个slave->replpreamble ,格式是$<length>\r\n

  • 如果是RDB方式则告诉slave即将要发送的RDB文件大小
  • 如果是dickless方式则发送的是$<eofmark>\r\n,其中eofmarkRUN_ID长度一致,都是40个字符

因此,首次调用readSyncBulkPayload函数是为了检测主从同步数据的方式,就return了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
void readSyncBulkPayload(connection *conn) {
char buf[PROTO_IOBUF_LEN];
ssize_t nread, readlen, nwritten;
int use_diskless_load = useDisklessLoad();
redisDb *diskless_load_backup = NULL;
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
off_t left;

// 在dickless方式下,lastbytes用于检测是否达到rdb数据发送完毕
static char eofmark[CONFIG_RUN_ID_SIZE]; // 结束标志
static char lastbytes[CONFIG_RUN_ID_SIZE];
static int usemark = 0;

// server.repl_transfer_size == -1 表示首次读取
if (server.repl_transfer_size == -1) {
if (connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000) == -1) {
serverLog(LL_WARNING,
"I/O error reading bulk count from MASTER: %s",
strerror(errno));
goto error;
}

if (buf[0] == '-') {
serverLog(LL_WARNING,
"MASTER aborted replication with an error: %s",
buf+1);
goto error;
}
else if (buf[0] == '\0') {
// 空行只是为了保持连接,用于心疼检测,
server.repl_transfer_lastio = server.unixtime; // 更改最近一次主从读取时间
return;
}
else if (buf[0] != '$') {
serverLog(LL_WARNING,
"Bad protocol from MASTER, the first byte is not '$' (we received '%s'),"
"are you sure the host and port are right?",
buf);
goto error;
}

/* 主服务器的回应有两种格式:
* 1. 发送的是RDB文件,格式即使 $<count>
* 2. dickless方式,事先不知大文件大小,因此先传输 $EOF:<40 bytes delimiter>
* 在文件的末尾,再次传输预定义的分隔符delimiter,由于分隔符足够长且随机,
* 因此不会有与实际文件内容发生冲突的可能性。
*/
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
usemark = 1;
memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
memset(lastbytes,0, CONFIG_RUN_ID_SIZE);

// 更改 server.repl_transfer_size的值
// 因此 if(server.repl_transfer_size == -1),下次就不会进入
server.repl_transfer_size = 0;
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s",
use_diskless_load? "to parser":"to disk");
}
else {
usemark = 0;
// 存储rdb文件大小
server.repl_transfer_size = strtol(buf+1,NULL,10);
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving %lld bytes from master %s",
(long long) server.repl_transfer_size,
use_diskless_load? "to parser":"to disk");
}

return;
}
// ....
}

再次调用readSyncBulkPayload函数,如果是dickless方式同步,则server.repl_transfer_size==0,否则server.repl_transfer_size是即将要传输的文件大小length,因此不会再陷入 if(repl_transfer_size==-1) 分支中。

master有两种传输同步数据方式,而slave也相应有两种加载方式:

  • 先存储到RDB文件中再加载进内存
  • 直接从socket读取数据然后加载到内存中

这由useDisklessLoad函数确定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static int useDisklessLoad() {
/* compute boolean decision to use diskless load */
int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB ||
(server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY &&
dbTotalServerKeyCount()==0);
/* Check all modules handle read errors, otherwise it's not safe to use diskless load. */
if (enabled && !moduleAllDatatypesHandleErrors()) {
serverLog(LL_WARNING,
"Skipping diskless-load because there are modules "
"that don't handle read errors.");
enabled = 0;
}

return enabled;
}

如果slave端选择先将master发送过来的RDB数据存储到本地RDB文件server.repl_transfer_fd中,流程如下:

  • 设置单次最大读取字节数readlen
  • 读取readlen个字节数据。在usemark==1方式下检测下是否读取到数据流末尾,到了则eof_reached=1
  • 将读取到的数据写入到server.repl_transfer_fd。如果此时eof_reached==1 && usemark==1,则需要在同步之前将写入slave端RDB文件末尾的eofmark给删除掉
  • 每次累计写入了REPL_MAX_WRITTEN_BEFORE_FSYNC个字节,就需要自动同步一次
  • 如果master是发送的RDB文件,判断是否达到了文件末尾。

这个函数,只有在master发送完毕才会向下继续运行,否则每次都是直接return.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
void readSyncBulkPayload(connection *conn) {
//...
if (!use_diskless_load) {
/* Read the data from the socket, store it to a file and search for the EOF. */
if (usemark) {
readlen = sizeof(buf);
} else {
// 传输的是RDB文件,则直接读取RDB文件数据
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
}

//2 将数据读取到 buf 中
nread = connRead(conn,buf,readlen);
if (nread <= 0) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
return;
}

serverLog(LL_WARNING,
"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
cancelReplicationHandshake();
return;
}

server.stat_net_input_bytes += nread;

int eof_reached = 0;

// 查看当前读取的nread个数据,是否包含了结束标志 eofmark
if (usemark) {
/* Update the last bytes array, and check if it matches our
* delimiter. */
if (nread >= CONFIG_RUN_ID_SIZE) {
// 将读取的字节中最后40个字节复制到 lastbytes
// 直接比较即可
memcpy(lastbytes, buf+nread-CONFIG_RUN_ID_SIZE, CONFIG_RUN_ID_SIZE);
}
else {
// 将lastbytes中后面字节前移nread个字节,把新读取的添加到lastbyes
// 凑满最新的40个字节,逐次比较
int rem = CONFIG_RUN_ID_SIZE-nread;
memmove(lastbytes, lastbytes+nread, rem);
memcpy(lastbytes+rem,buf,nread);
}
// 判断是否达到末尾
if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0)
eof_reached = 1;
}

server.repl_transfer_lastio = server.unixtime; // 更新最近的从主服务器中读取的时间

//3 将buf中的数据写入slave端RDB文件
if ((nwritten = write(server.repl_transfer_fd, buf, nread)) != nread) {
serverLog(LL_WARNING,
"Write error or short write writing to the DB dump file "
"needed for MASTER <-> REPLICA synchronization: %s",
(nwritten == -1) ? strerror(errno) : "short write");
goto error;
}

server.repl_transfer_read += nread;

// 如果已经到达文件末尾,则将最后的40个字符删除掉
if (usemark && eof_reached) {
if (ftruncate(server.repl_transfer_fd,
server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
{
serverLog(LL_WARNING,
"Error truncating the RDB file received from the master "
"for SYNC: %s",
strerror(errno));
goto error;
}
}

/*4 每次都将数据尝试同步到磁盘中,否则最后统一同步,会产生较大的延时、阻塞
*
* 每当累计写入的字节数大于 REPL_MAX_WRITTEN_BEFORE_FSYNC ,就同步
*/
if (server.repl_transfer_read >=
server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
off_t sync_size = server.repl_transfer_read - server.repl_transfer_last_fsync_off;
rdb_fsync_range(server.repl_transfer_fd,
server.repl_transfer_last_fsync_off,
sync_size);
server.repl_transfer_last_fsync_off += sync_size;
}

//对于实现知道的传输方式,检测是否达到文件末尾
if (!usemark) {
if (server.repl_transfer_read == server.repl_transfer_size)
eof_reached = 1;
}

/* If the transfer is yet not complete, we need to read more, so
* return ASAP and wait for the handler to be called again. */
if (!eof_reached) return;
}
//...
}

下面就是加载数据的方式了:
+ use_diskless_load==1:则slave端可以直接从socket中读取数据直接加载
+ use_diskless_load==0:则slave从server.repl_transfer_fd文件中加载

在正式加载之前需要先检测是否有AO重写子进程。因为下面slave与master同步会对自己的数据库造成很多修改,如果此时AOF在重写,则会破坏copy-on-write使得内存暴涨。AOF重写只有在slave同步master成功后才会重启。

1
if (server.aof_state != AOF_OFF) stopAppendOnly();

dickless 加载有两种类型server.repl_diskless_load

  • REPL_DISKLESS_LOAD_SWAPDB:将加载前的数据库server.db[]备份diskless_load_backup,防止加载失败无法恢复到之前的状态

    1
    2
    3
    if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
    diskless_load_backup = disklessLoadMakeBackups();
    }
  • REPL_DISKLESS_LOAD_WHEN_DB_EMPTY:即使记载失败也不恢复到之前状态

在加载之前,需要释放原来的server.db[]REPL_DISKLESS_LOAD_SWAPDB方式下已经备份,REPL_DISKLESS_LOAD_WHEN_DB_EMPTY方式下不需要备份,因此可以直接清空server.db[]

1
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);

为了简化传输,REDIS将conn设置阻塞IO,并且单次阻塞时间是60s,如果加载错误则选择 server.repl_diskless_load 选择是恢复server.db[]还是直接清空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//...
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);

connSetReadHandler(conn, NULL);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (use_diskless_load) {
rio rdb;
rioInitWithConn(&rdb, conn, server.repl_transfer_size); // 底层使用socket

connBlock(conn); // 设置其为阻塞模式
connRecvTimeout(conn, server.repl_timeout*1000); // 设置从conn中读取数据最多阻塞时间 60s
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION);

if (rdbLoadRio(&rdb, RDBFLAGS_REPLICATION, &rsi) != C_OK) {
// 加载失败的处理
stopLoading(0);
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization DB from socket");
cancelReplicationHandshake();
rioFreeConn(&rdb, NULL);

if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
// 加载失败,则回复 server.db 加载之前的数据
disklessLoadRestoreBackups(diskless_load_backup, 1, empty_db_flags);
}
else {
// 否则,直接清空本次加载的数据,下次以一个空的server.db[]开始
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
}

return;
}

stopLoading(1);

/******************成功加载********************/
// 加载策略是 REPL_DISKLESS_LOAD_SWAPDB时,要释放 diskless_load_backup
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
disklessLoadRestoreBackups(diskless_load_backup,0,empty_db_flags);
}

// 校验结束标志
if (usemark) {
if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) || memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0)
{
serverLog(LL_WARNING,"Replication stream EOF marker is broken");
cancelReplicationHandshake();
rioFreeConn(&rdb, NULL);
return;
}
}

// 将 socket 恢复非阻塞模式,
rioFreeConn(&rdb, NULL);
connNonBlock(conn);
connRecvTimeout(conn,0);
}

如果slave设置的RDB文件加载方式,则和持久化的加载方式基本一致:

+ 如果此时slave正在执行rdb持久化,直接kill这个进程,因为马上要同步master数据,这个持久化是无意义的
+ 打开slave端rdb持久化的文件server.rdb_filename
+ 将之前存储的server.repl_transfer_tmpfile的rdb文件直接rename为 server.rdb_filename即可
+ rdbLoad函数将server.repl_transfer_tmpfile加载到内存,即可实现数据同步
+ 由于server.repl_transfer_tmpfile是用于数据同步产生,而不是用于持久化,因此需要删除rdb文件
+ 最后关闭server.repl_transfer_tmpfile及其fd,并初始化

到此,数据加载就结束,slave与master实现了完全重同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
else {
/* Ensure background save doesn't overwrite synced data */
if (server.rdb_child_pid != -1) {
serverLog(LL_NOTICE,
"Replica is about to load the RDB file received from the master,"
" but there is a pending RDB child running. "
"Killing process %ld and removing its temp file to avoid any race",
(long) server.rdb_child_pid);
// 此时RDB save的文件不重要,因为马上要覆盖它
killRDBChild();
}

/* Rename rdb like renaming rewrite aof asynchronously. */
int old_rdb_fd = open(server.rdb_filename, O_RDONLY|O_NONBLOCK);
if (rename(server.repl_transfer_tmpfile, server.rdb_filename) == -1) {
serverLog(LL_WARNING,
"Failed trying to rename the temp DB into %s "
"in MASTER <-> REPLICA synchronization: %s",
server.rdb_filename,
strerror(errno));
cancelReplicationHandshake();
if (old_rdb_fd != -1) close(old_rdb_fd);
return;
}
// rename success
/* Close old rdb asynchronously. */
if (old_rdb_fd != -1)
bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)old_rdb_fd,NULL,NULL);

// 加载
if (rdbLoad(server.rdb_filename, &rsi, RDBFLAGS_REPLICATION) != C_OK) {
serverLog(LL_WARNING, "Failed trying to load the MASTER synchronization DB from disk");
cancelReplicationHandshake();
// server.rdb_del_sync_files 默认值是 0,
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
serverLog(LL_NOTICE,
"Removing the RDB file obtained from the master. "
"This replica has persistence disabled");
bg_unlink(server.rdb_filename); // 在后台中删除 server.rdb_filename
}
/* Note that there's no point in restarting the AOF on sync failure,
it'll be restarted when sync succeeds or replica promoted. */
return;
}

/* Cleanup. */
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
serverLog(LL_NOTICE,"Removing the RDB file obtained from "
"the master. This replica has persistence "
"disabled");
bg_unlink(server.rdb_filename);
}

zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd); // 关闭rdb文件fd
server.repl_transfer_fd = -1;
server.repl_transfer_tmpfile = NULL;

// 加载rdb完成
}

加载成功还需要处理啥?

+ 和前面的部分重同步类似,要为连接conn对象server.repl_transfer_s创建server.master实际,为其注册可读事件
+ 将状态设置为 REPL_STATE_CONNECTED状态
+ 完全同步完成,那么可以设置server.replidmaster_repl_offset,并清除缓存的server.replid2等。因为完全重同步成功,说明master已经换了,不需要再保存之前宕机的主服务器信息了。
+ 重启AOF重写子进程,进行一次持久化。

至此,slave的状态变成了REPL_STATE_CONNECTED,标志着slave与master连接成功,并且进行了首次数据同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
   /* Final setup of the connected slave <- master link */
replicationCreateMasterClient(server.repl_transfer_s, rsi.repl_stream_db);
// 到此,从服务器状态终于变成了 REPL_STATE_CONNECTED
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;

//...

// 全部重同步完成,相当于开启一个新的记录,那么就需要清除 replid2
memcpy(server.replid, server.master->replid, sizeof(server.replid));
server.master_repl_offset = server.master->reploff;
clearReplicationId2();

// 无论是否有sub-slaves,都需要创建 server.repl_backlog
// 当这个slave升级为master时,备用
if (server.repl_backlog == NULL) createReplicationBacklog();
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");

//...

// 等全部重同步完成,就可以重启AOF重写了
if (server.aof_enabled) restartAOFAfterSYNC();
return;

SLAVEOF NO ONE

SLAVEOF命令,可以让server变成slave,也可以让其从slave恢复到单机。

对一个slave执行SLAVEOF NO ONE命令,即取消其复制行为,对主服务器执行这个命令没有效果。脱离复制的过程由replicationUnsetMaster函数实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void replicaofCommand(client *c) {
// 集群模式下不允许SLAVEOF
if (server.cluster_enabled) {
addReplyError(c,"REPLICAOF not allowed in cluster mode.");
return;
}

// SLAVE NO ONE 指令会关闭当前从服务器的复制行为,使其成为单独的主服务器
if (!strcasecmp(c->argv[1]->ptr,"no") && !strcasecmp(c->argv[2]->ptr,"one")) {
if (server.masterhost) {
replicationUnsetMaster();
sds client = catClientInfoString(sdsempty(), c);
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",client);
sdsfree(client);
}
}
// else .....
}

replicationUnsetMaster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void replicationUnsetMaster(void) {
if (server.masterhost == NULL) return; /* Nothing to do. */

/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);

sdsfree(server.masterhost);
server.masterhost = NULL;
if (server.master)
freeClient(server.master);
// freeClient() 中会缓存 server.cached_master
// 因此要调用 replicationDiscardCachedMaster() 函数取消缓存
replicationDiscardCachedMaster();

// 如果正在与主服务建立连接,则取消
cancelReplicationHandshake();

// 生成server自己的replid
shiftReplicationId();

// 不再复制主服务器,那么也需要断开自己的 sub-slaves
disconnectSlaves();
server.repl_state = REPL_STATE_NONE; // 连接状态设置初始化状态

server.slaveseldb = -1;

// 当前服务器的下线时间
server.repl_no_slaves_since = server.unixtime;

/* Fire the role change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER,
NULL);

// 如果之前手动关闭了AOF进程,现在重启
if (server.aof_enabled && server.aof_state == AOF_OFF)
restartAOFAfterSYNC();
}

shiftReplicationId

调用shiftReplicationId函数的时机,是当的server将要从slave变成master。因此server.replid要替换为自己的replid,同时要把之前复制的主服务器信息存储在server.replid2server.second_replid_offset

1
2
3
4
5
6
7
8
9
10
11
12
13
void shiftReplicationId(void) {
memcpy(server.replid2, server.replid, sizeof(server.replid));

server.second_replid_offset = server.master_repl_offset+1;
// 生成自己的 replid
changeReplicationId();
serverLog(LL_WARNING,
"Setting secondary replication ID to %s, valid up to offset: %lld."
" New replication ID is %s",
server.replid2,
server.second_replid_offset,
server.replid);
}

Master端解析

那么主服务器端是如何回应slave端?下面进行讲解。

replconfCommand

在主从握手过程中,slave端通过server.repl_transfer_s向master端发送REPLCONF option value命令,master在 replconfCommand函数对其进行解析。

这个函数master和slave都会调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
void replconfCommand(client *c) {
int j;
// 参数个数必须是奇数个
if ((c->argc % 2) == 0) {
addReply(c,shared.syntaxerr);
return;
}
/* Process every option-value pair. */
for (j = 1; j < c->argc; j+=2) {
if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
long port;
if ((getLongFromObjectOrReply(c,c->argv[j+1],&port,NULL) != C_OK)) // 取出 port
return;
c->slave_listening_port = port;
}
else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
sds ip = c->argv[j+1]->ptr;
// c->slave_ip 有长度限制
if (sdslen(ip) < sizeof(c->slave_ip)) {
memcpy(c->slave_ip,ip,sdslen(ip)+1);
} else {
addReplyErrorFormat(c,
"REPLCONF ip-address provided by replica instance is too long: "
"%zd bytes",
sdslen(ip));
return;
}
}
else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
// 如果REDIS版本过低,无法识别CAPA就直接忽略
if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
c->slave_capa |= SLAVE_CAPA_EOF;
else if (!strcasecmp(c->argv[j+1]->ptr,"psync2"))
c->slave_capa |= SLAVE_CAPA_PSYNC2;
}
else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
// REPLCONF ACK OFFSET 指令是slave用来通知master到目前为止
// slave 处理的复制流的数量
long long offset;

if (!(c->flags & CLIENT_SLAVE)) // c必须是slave
return;
if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
return;
// c->repl_ack_off 是上一次ACK时的偏移量
if (offset > c->repl_ack_off)
c->repl_ack_off = offset;
// 心跳检测
c->repl_ack_time = server.unixtime;

if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
putSlaveOnline(c);
return;
}
else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
// master向slave请求ACK
if (server.masterhost && server.master)
replicationSendAck();
return;
}
else {
addReplyErrorFormat(c,
"Unrecognized REPLCONF option: %s",
(char*)c->argv[j]->ptr);
return;
}
}
addReply(c,shared.ok);
}

syncCommand – Partial

当slave端向master发送PYSNC replid offset请求时,master端就会进入syscCommand 函数中。在正式处理PYSNC请求之前,先判断下:

  • 如果当前server是master,c已经是master的slave,那么直接忽略请求
  • 如果当前server是slave,PSYNC是c是用户发出的的而不是sub-slave,如果此时server的状态没有和master连接好,则也忽略此次请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void syncCommand(client *c) {

// 如果 c 已经是master的slave,则忽略请求
if (c->flags & CLIENT_SLAVE) return;

// 如果当前server是slave,但是还没与master建立好连接,
// 但是客户端c在强制发送PSYNC请求,则回复用户客户端:xxx
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n"));
return;
}

// 如果slave在此之前master发过指令请求,刚好此刻master要发送给slave
// 那么旧中止,mastet回应SYNC请求需要的一个全新的发送缓冲区,否则slave无法识别
if (clientHasPendingReplies(c)) {
addReplyError(c,"SYNC and PSYNC are invalid with pending output");
return;
}
//...
}

先处理PSYNC请求,基于masterTryPartialResynchronization函数尝试执行部分重同步,如果经过master判断无法执行部分重同步再执行完全重同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void syncCommand(client *c) {
//...
// 同意slave c 的请求
serverLog(LL_NOTICE,
"Replica %s asks for synchronization",
replicationGetSlaveName(c));

if (!strcasecmp(c->argv[0]->ptr, "psync")) {
// 先尝试执行部分重同步,返回C_ERR则转而执行完全重同步
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++; // for debug
return;
}
else {
// 那么应该执行完全重同步
char *master_replid = c->argv[1]->ptr;
if (master_replid[0] != '?')
server.stat_sync_partial_err++; // for debug
}
}
//...
}

masterTryPartialResynchronization

无论slave发的PSYNC是啥,masterTryPartialResynchronization函数都先尝试能不能执行部分重同步,如果能则直接传输server.repl_backlog中的数据,不能则返回C_ERR,留到上面syncCommand函数中else分支执行:

1
2
3
4
5
6
 if (masterTryPartialResynchronization(c) == C_OK) {
// 部分重同步成功
}
else {
// 需要执行完全重同步
}

而判断是否能部分重同步,对于PSYNC master_replid pysnc_offset请求,必须满足:

1
2
server.replid == master_replid  && 
pysnc_offset > server.repl_backlog_off && pysnc_offset < server.master_repl_offset

如果PYSNC请求的master_replid与当前主服务器的server.replid不一致,则有可能是因为这个server是新被选出来的主服务器,那么就需要判断将master_replid与之前宕机的旧主服务器的server.replid2进行比较,也需要满足

1
master_replid == server.replid2 && psync_offset < server.second_replid_offset

如果经过判断能执行部分重同步,

  • 将客户端c标志位加上 CLIENT_SLAVE,区别于普通的用户客户端
  • 将c的状态设置为 SLAVE_STATE_ONLINE,表示建立连接,可以正常传输数据。
  • master根据slave是否支持PSYNC2协议,回复客户端 +CONTINUE\r\n or +CONTINUE replid \r\n
  • serevr.repl_backlog 中从psync_offset开始积累的数据添加待output, 等待可写事件触发即可发送

函数执行成功,则返回C_OK,返回 syncCommand函数后,也直接return。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// PYSNC replid offset
int masterTryPartialResynchronization(client *c) {
long long psync_offset, psync_len;
char *master_replid = c->argv[1]->ptr; // replid
char buf[128];
int buflen;

// offset
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != C_OK)
goto need_full_resync;

/***** 判断是否能执行部分重同步 ******/
// 先判断 master_replid 对不对,不对再检测下 server.replid2对不对
if (strcasecmp(master_replid, server.replid) && (strcasecmp(master_replid, server.replid2) ||
psync_offset > server.second_replid_offset))
{
// 不匹配,说明是需要完全重同步

// 如果 master_replid[0] 不是 ?,就不是save强制执行完全重同步
// 那么就判断导致完全重同步的原因,是因为PSYNC的 replid 还是因为 replid2
if (master_replid[0] != '?') {
// PSYNC 请求的 master_replid 不匹配导致无法部分重同步
if (strcasecmp(master_replid, server.replid) &&
strcasecmp(master_replid, server.replid2))
{
serverLog(LL_NOTICE,
"Partial resynchronization not accepted:"
"Replication ID mismatch (Replica asked for '%s', my "
"replication IDs are '%s' and '%s')",
master_replid,
server.replid,
server.replid2);
}
else {
// replid2 不匹配导致的
serverLog(LL_NOTICE,
"Partial resynchronization not accepted: "
"Requested offset for second ID was %lld, but I can reply "
"up to %lld",
psync_offset,
server.second_replid_offset);
}
}
else {
//master_replid[0] == '?',说明是slave强制请求完全重同步导致的
serverLog(LL_NOTICE,
"Full resync requested by replica %s",
replicationGetSlaveName(c));
}

goto need_full_resync;
}

// 对PSYNC的psync_offset判断,是否能执行部分重同步
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
serverLog(LL_NOTICE,
"Unable to partial resync with replica %s "
"for lack of backlog (Replica request was: %lld).",
replicationGetSlaveName(c),
psync_offset);

if (psync_offset > server.master_repl_offset) {
serverLog(LL_WARNING,
"Warning: replica %s tried to PSYNC "
"with an offset that is greater than the master replication offset.",
replicationGetSlaveName(c));
}

goto need_full_resync;
}

// 运行到此,说明满足部分重同步的条件
c->flags |= CLIENT_SLAVE;
c->replstate = SLAVE_STATE_ONLINE; // 开始在线更新数据
c->repl_ack_time = server.unixtime; // 更新心跳检测时间
c->repl_put_online_on_ack = 0; // 是否注册ACK可写事件
listAddNodeTail(server.slaves, c); // 将从服务器加入主服务器的从服务器客户端队列中

if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid); // PYSNC2协议
} else {
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); // PYSNC 协议
}

// 发送通知:+CONTINUE
if (connWrite(c->conn,buf,buflen) != buflen) {
freeClientAsync(c);
return C_OK;
}

// 将 server.repl_backlog 中[pysnc_offset,server.master_offset]区间的数据
// 添加到发送缓冲区,发送给 slave
psync_len = addReplyReplicationBacklog(c, psync_offset);
serverLog(LL_NOTICE,
"Partial resynchronization request from %s accepted. "
"Sending %lld bytes of backlog starting from offset %lld.",
replicationGetSlaveName(c),
psync_len,
psync_offset);

// 如果能执行部分重同步,那么不需要发送select指令

// 更新有效slave个数
refreshGoodSlavesCount();

return C_OK;

need_full_resync:

return C_ERR;
}

syncCommnad – Full

masterTryPartialResynchronization函数如果返回C_ERR,则将执行完全重同步。先简要的一些判断

  • 导致PYSNC部分重同步失败,是否因为slave强行执行完全重同步
  • slave发送如果是SYNC请求,那么之后有些回复不能发送给slave,因为slave版本低无法识别。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
   if (!strcasecmp(c->argv[0]->ptr, "psync")) {
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++; // for debug
return; /* No full resync needed, return. */
}
else {
// 那么应该执行完全重同步
char *master_replid = c->argv[1]->ptr;
// 不是?,则是部分重同步请求失败了
if (master_replid[0] != '?')
server.stat_sync_partial_err++;
}
}
else {
// 如果slave端发送是SYNC请求,加上 CLIENT_PRE_PSYNC 标志
// 说明slave版本过低,很多能力不支持
c->flags |= CLIENT_PRE_PSYNC;
}

// 运行到此,说明要执行完全重同步
//...

当判断确实要执行完全重同步,即slave从零开始,同步到当前server的状态,需要做一些处理

  • 加上到当前server的从服务器链表server.slaves
  • 也要给slave机上CLIENT_SLAVE以区别于普通的客户端
  • slave的状态设置为SLAVE_STATE_WAIT_BGSAVE_START,表示即将开始启动BGSAVE
  • 如果这个slave是server的第一个从服务器,那么就需要做较多的工作:
    • 创建server.repl_backlog,积累数据,为后面部分重同同步准备。
    • 为当前server创建replid
    • 消除server.replid2。都已经执行完全重同步了,那么之前宕机的主服务器和此时的slave没有关系,不需要缓存server.replid2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
	//...
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
if (server.repl_disable_tcp_nodelay)
connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */

c->repldbfd = -1;
c->flags |= CLIENT_SLAVE;
listAddNodeTail(server.slaves, c);

// 创建server.repl_backlog
// 在server第一个slave时就创建 server.repl_backlog
// 在之后的slave共享的都是server.repl_backlog
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
// 生成新的 server.replid
changeReplicationId();
clearReplicationId2();
createReplicationBacklog();
serverLog(LL_NOTICE,
"Replication backlog created, my new replication IDs are '%s' and '%s'",
server.replid,
server.replid2);
}

下面master需要根据传输RDB数据的方式server.rdb_child_type做不同的处理:

  • RDB_CHILD_TYPE_DISK:在master端先将server.db[]以RDB协议格式保存到磁盘,再发送给slave
  • RDB_CHILD_TYPE_SOCKET:master端直接将生成的RDB数据通过socket发送给slave

syncCommnad函数处理如下:

  1. server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_DISK

由于master的所有slaves最终都是要同步到master的状态,那么如果此时子进程正在持久化操作,而当前从服务器c的同步类型RDB_CHILD_TYPE_DISK,那么就是可以直接利用此次持久化产生的rdb文件。

进一步,此刻有另一个slave正在处于SLAVE_STATE_WAIT_BGSAVE_END状态,那么将slave的发送缓存区数据复制给c,再给c开启replicationSetupSlaveForFullResync函数,那么当 server.rdb_child_pid进程产生rdb文件后就可以将此次的c一起同步了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_DISK) {
client *slave;
listNode *ln;
listIter li;

listRewind(server.slaves, &li);
while((ln = listNext(&li))) {
slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
}
// 如果c和sslave的能力一致,那么就把slave的缓冲区数据发复制给c
// 然后也给c开启全同步
// c就可以更快的执行同步
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
copyClientOutputBuffer(c, slave);
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
}
else {
serverLog(LL_NOTICE,
"Can't attach the replica to the current BGSAVE."
" Waiting for next BGSAVE for SYNC");
}
}

2)server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET

如果此时有子进程在RDB持久化,但是master设置的传输方式RDB_CHILD_TYPE_SOCKET,那么完全重同步操作只能等待下次BGSAVE。因为当前持久化对同步没有帮助,不能像上面那样可以重复利用。

1
2
3
4
5
6
/* CASE 2: BGSAVE is in progress, with socket target. */
else if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) {
serverLog(LL_NOTICE,
"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");

}

那么 RDB_CHILD_TYPE_SOCKET类型的完全重同步,延迟到何时执行?

需要等到在replicationCron函数中调用startBgsaveForReplication开启同步操作。为了让更多的slave能处于同步就绪状态,在replicationCron函数中等待server.repl_diskless_sync_delay秒后才能开启dickelss方式复制。过程如下:

  • replicationCron函数中,确认没有子进程在持久化
  • 必须要有处于SLAVE_STATE_WAIT_BGSAVE_START状态的slave,即 slaves_waiting !=0
  • slaves与master最长空闲时间max_idle > server.repl_diskless_sync_delay时,就可以开启dickless复制方式。只要slave与master有数据交互就会更新slave->lastinteraction,因此在开启dickelss方时会阻塞一会儿。
  • 计算这些slaves能力的交集mincapa

以上条件都满足了,才能调用startBgsaveForReplication(mincapa)函数开启RDB_CHILD_TYPE_SOCKET传输方式的完全重同步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    //... in replicationCron()
if (!hasActiveChildProcess()) {
time_t idle, max_idle = 0;
int slaves_waiting = 0;
int mincapa = -1; // 求所有从服务器能力的交集,即最小的能力
listNode *ln;
listIter li;

// 1 计算最大的max_idle
// 2 计算处于等待RDB的从服务器数量
// 3 计算从服务器最小的能力 mincapa
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client* slave = ln->value;

if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
idle = server.unixtime - slave->lastinteraction;
if (idle > max_idle) max_idle = idle;
slaves_waiting++;
mincapa = (mincapa == -1) ?
slave->slave_capa :
(mincapa & slave->slave_capa);
}
}

// 必须要有从服务器处于等待同步状态,再满足下面两个条件之一:
// 要么启动的是 dick_sync
// 要么等待 server.repl_diskless_sync_delay seconds 后再同步
if (slaves_waiting &&
(!server.repl_diskless_sync || max_idle > server.repl_diskless_sync_delay))
{
startBgsaveForReplication(mincapa);
}
}
//...

3) server.rdb_child_pid == -1

此时,没有开启rdb持久化子进程,那么就可以根据 server.repl_diskless_sync方式来处理,

  • server.repl_diskless_sync == RDB_CHILD_TYPE_SOCKET:还是要延迟到replicationCron()函数中执行
  • server.repl_diskless_sync == RDB_CHILD_TYPE_DISK:就直接在此处调用startBgsaveForReplication函数

这个分支表述如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// in syncCommand(client* c)
else {
if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
// dickless 的复制方式
// 由于在 replicationCron() 函数里创建rdb子进程,将数据复制给slave,
// 因此希望让更多的slave处于准备就绪状态
// 那么在此延时等待 server.repl_diskless_sync_delay sec
if (server.repl_diskless_sync_delay)
serverLog(LL_NOTICE, "Delay next BGSAVE for diskless SYNC");
}
else {
// 如果复制方式配置的是先存储到磁盘再发送,或者slave端不支持 diskless 复制方式
// 那么就启动子进程,生成RDB文件
if (!hasActiveChildProcess()) {
startBgsaveForReplication(c->slave_capa);
}
else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but another BG operation is active. "
"BGSAVE for replication delayed");
}
}
}

startBgsaveForReplication

startBgsaveForReplication 开启完全重同步。根据master端的同步方式server.repl_diskless_sync以及slave支持EOF,才可以开启socket传输rdb数据,否则还是先生成RDB文件,再传输到slave端。

  • socket_target ==1rdbSaveToSlavesSockets函数通过socket传输,此时slave直接在接受了。
  • socket_target ==0 rdbSaveBackground将rdb数据线保存到本底server.rdb_filename,然后调用replicationSetupSlaveForFullResync函数将server.rdb_filename中数据传入到slave。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
int startBgsaveForReplication(int mincapa) {
int retval;
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
listIter li;
listNode *ln;

serverLog(LL_NOTICE,
"Starting BGSAVE for SYNC with target: %s",
socket_target ? "replicas sockets" : "disk");

rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
/* Only do rdbSave* when rsiptr is not NULL,
* otherwise slave will miss repl-stream-db. */
if (rsiptr) {
if (socket_target)
retval = rdbSaveToSlavesSockets(rsiptr);
else
retval = rdbSaveBackground(server.rdb_filename,rsiptr);
}
else {
serverLog(LL_WARNING,
"BGSAVE for replication: replication information not available,"
" can't generate the RDB file right now. Try later.");
retval = C_ERR;
}

// 是否删除此处生成的 server.rdb_filaname 文件
if (retval == C_OK && !socket_target && server.rdb_del_sync_files)
RDBGeneratedByReplication = 1; // 删除

// 如果发生错误,则取消所有等待master产生rdb数据的slave,断开主从连接
if (retval == C_ERR) {
serverLog(LL_WARNING, "BGSAVE for replication failed");
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
slave->replstate = REPL_STATE_NONE;
slave->flags &= ~CLIENT_SLAVE;
listDelNode(server.slaves, ln);
addReplyError(slave, "BGSAVE failed, replication can't continue");
slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}

return retval;
}

// 给master的每个slave发送rdb数据
if (!socket_target) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset());
}
}
}

if (retval == C_OK) replicationScriptCacheFlush();
return retval;
}

rdbSaveToSlavesSockets

rdbSaveToSlavesSockets函数和 rdbSaveBackground函数原理类似,后者是向rdb文件 server.rdb_filename 中写入数据,而前者是向socket中写入rdb数据。但是REDIS并没有直接让产生rdb数据的子进程向socket中写入数据传递给slave,而是让子进程把生成的rdb数据都传递到父进程,让父进程写入socket,这是为了当子进程中止被父进程接管时不会影响TLS的连续状态。

  • 在fork之前,创建管道 server.rdb_pipe_readserver.rdb_pipe_write
  • 将所有处于SLAVE_STATE_WAIT_BGSAVE_START状态的slaves都在server.rdb_pipe_conns 中建立一个引用,使得server.rdb_pipe_conns[i]指向等待BGSAVE启动的slave,再并将他们的状态修改为 SLAVE_STATE_WAIT_BGSAVE_END
  • 在子进程中,创建基于 server.rdb_pipe_write 的rdb对象,使得子进程直接将生成的rdb协议格式的数据写入管道,父进程在server.rdb_pipe_read端读取
  • 在父进程中,需要将server.rdb_pipe_read管道设置为非阻塞模式,为server.rdb_pipe_read注册可读事件,设置可读事件的回调函数rdbPipeReadHandler,等待子进程数据的到来

rdbSaveToSlavesSockets函数结束后,子进程的数据可能尚未完全发送至父进程。需要等子进程结束后,rdb数据全部写完,那个时候就可以将rdb数据写入socket中。因此,真正发送rdb数据,是在进程结束后的处理函数backgroundSaveDoneHandler中。

此外,将rdb数据先存储到磁盘再同步的方式也是在backgroundSaveDoneHandler函数中完成发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
listNode *ln;
listIter li;
pid_t childpid;
int pipefds[2];

if (hasActiveChildProcess()) return C_ERR;
if (server.rdb_pipe_conns) return C_ERR;

/* Before to fork, create a pipe that is used to transfer the rdb bytes to
* the parent, we can't let it write directly to the sockets, since in case
* of TLS we must let the parent handle a continuous TLS state when the
* child terminates and parent takes over.
*/
if (pipe(pipefds) == -1) return C_ERR;
server.rdb_pipe_read = pipefds[0];
server.rdb_pipe_write = pipefds[1];
anetNonBlock(NULL, server.rdb_pipe_read); // 读设置为非阻塞IO

// server.rdb_pipe_conns 保存着所有处于 SLAVE_STATE_WAIT_BGSAVE_START 状态的 slave
server.rdb_pipe_conns = zmalloc(sizeof(connection *)*listLength(server.slaves));
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;

listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
// 对于等待master开启BGSAVE的slave,进行处理
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
// 将 SLAVE_STATE_WAIT_BGSAVE_START 状态的slave 保存在 server.rdb_pipe_conns
server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn;
// 将slave状态设置为 SLAVE_STATE_WAIT_BGSAVE_END
// 发送slave: +FULLSYNC master_repliId master_replioff
replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset());
}
}

openChildInfoPipe();

// 创建子进程
if ((childpid = redisFork()) == 0) {
/* Child */
int retval;
rio rdb;

// 使用 server.rdb_pipe_write 管道初始化rdb对象
// 那么就是向父进程写rdb数据
rioInitWithFd(&rdb, server.rdb_pipe_write);

redisSetProcTitle("redis-rdb-to-slaves");
redisSetCpuAffinity(server.bgsave_cpulist);

// 数据流式的写:前后都会有 eofmark
retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi);
if (retval == C_OK && rioFlush(&rdb) == 0) // rioFlush(&rdb) ==0 表示失败
retval = C_ERR;

if (retval == C_OK) {
sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB");
}

// 写完毕
rioFreeFd(&rdb);
close(server.rdb_pipe_write); /* wake up the reader, tell it we're done. */
exitFromChild((retval == C_OK) ? 0 : 1);
}
else {
/* Parent */
if (childpid == -1) {
// 如果子进程没有启动
serverLog(LL_WARNING,
"Can't save in background: fork: %s",
strerror(errno));

/* 需要将修该为 SLAVE_STATE_WAIT_BGSAVE_END 状态的slave
* 都重新恢复到 SLAVE_STATE_WAIT_BGSAVE_START 状态
*/
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client* slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
}
}
// 关闭pipe
close(server.rdb_pipe_write);
close(server.rdb_pipe_read);
zfree(server.rdb_pipe_conns); // 释放内存
server.rdb_pipe_conns = NULL;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
closeChildInfoPipe();
}
else {
// 否则子进程就是启动成功
serverLog(LL_NOTICE,
"Background RDB transfer started by pid %d",
childpid);

server.rdb_save_time_start = time(NULL); // 保存rdb子进程启动时间
server.rdb_child_pid = childpid;
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET; // rdb类型

// 将父进程中的 server.rdb_pipe_write 关闭,父进程是只读
close(server.rdb_pipe_write);

// 并为 server.rdb_pipe_read 注册可读事件,回调函数是 rdbPipeReadHandler
// 在回调函数中读取rdb数据
if (aeCreateFileEvent(server.el,
server.rdb_pipe_read,
AE_READABLE,
rdbPipeReadHandler,
NULL) == AE_ERR)
{
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
}
return (childpid == -1) ? C_ERR : C_OK;
}
return C_OK; /* Unreached. */
}

replicationSetupSlaveForFullResync

replicationSetupSlaveForFullResync给slave发送+FULLRESYNC server.replid server.master_repl_offset\r\n,master要积累数据即将要发送自己数据。

  • 设置slave此时的状态是SLAVE_STATE_WAIT_BGSAVE_END,表示从BGSAVE已经启动,要开始积累rdb数据了。
  • server.slaveseldb = -1是为了先发送个SELECT 指令

整个流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int replicationSetupSlaveForFullResync(client *slave, long long offset) {
char buf[128];
int buflen;

slave->psync_initial_offset = offset;
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
server.slaveseldb = -1;

// master接收到的是SYNC指令,则不回复
if (!(slave->flags & CLIENT_PRE_PSYNC)) {
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", server.replid, offset);
if (connWrite(slave->conn,buf,buflen) != buflen) {
freeClientAsync(slave);
return C_ERR;
}
}
return C_OK;
}

rdbPipeReadHandler

rdbPipeReadHandler函数,是从子进程读取rdb数据流的回调函数。当子进程向父进程向server.rdb_pipe_write中写入数据时,server.rdb_pipe_read上就会触发可读事件,进而调用rdbPipeReadHandler函数读取。这个函数主要完成了三个任务:

  • server.rdb_pipe_read中将数据读取到server.rdb_pipe_buff
  • server.rdb_pipe_buff中数据发到server.rdb_pipe_conns中等待数据的slave
  • 当子进程写完数据,会调用close(server.rdb_pipe_write),关闭子进程的管道server.rdb_pipe_writerdbPipeReadHandler函数检测到server.rdb_pipe_bufflen == 0时则会进行关闭清理工作。

因此,rdbPipeReadHandler完成的任务即,将从server.rdb_pipe_read读取到的rdb数据发送到server.rdb_pipe_conns还存活的slave中,并负责最后的关闭处理。因此!!!,当子进程结束,父进程也已经全部的RDB数据发送到等待数据的slave。

其他处理细节,见代码讲解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) {
UNUSED(mask);
UNUSED(clientData);
UNUSED(eventLoop);
int i;

if (!server.rdb_pipe_buff)
server.rdb_pipe_buff = zmalloc(PROTO_IOBUF_LEN);
serverAssert(server.rdb_pipe_numconns_writing ==0);

while (1) {
// 1) 从 server.rdb_pipe_read 中读取数据至 server.rdb_pipe_buff
server.rdb_pipe_bufflen = read(fd, server.rdb_pipe_buff, PROTO_IOBUF_LEN);

// 读取错误处理
if (server.rdb_pipe_bufflen < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) // 非阻塞IO下这不是错误
return;
serverLog(LL_WARNING,
"Diskless rdb transfer, read error sending DB to replicas: %s",
strerror(errno));
// 释放所有等待数据的 slaves
for (i=0; i < server.rdb_pipe_numconns; i++) {
connection *conn = server.rdb_pipe_conns[i];
if (!conn) continue;
client *slave = connGetPrivateData(conn);
freeClient(slave);
server.rdb_pipe_conns[i] = NULL;
}
// 杀死子进程
killRDBChild();
return;
}

// 子进程调用 close(server.rdb_pipe_write);表示数据已传输完毕
// 子进程即将正确关闭: exit(0)
if (server.rdb_pipe_bufflen == 0) {
int stillUp = 0;
// 需要删除 server.rdb_pipe_read 可读事件
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
for (i=0; i < server.rdb_pipe_numconns; i++) {
connection *conn = server.rdb_pipe_conns[i];
if (!conn) continue;
stillUp++;
}
serverLog(LL_WARNING,
"Diskless rdb transfer, done reading from pipe, %d replicas still up.",
stillUp);
// 清理
RdbPipeCleanup();
return;
}

// 2) 将数据写入到 server.rdb_pipe_conns 中slave
int stillAlive = 0;
for (i=0; i < server.rdb_pipe_numconns; i++) {
int nwritten;
connection* conn = server.rdb_pipe_conns[i];
if (!conn) continue;

// 将 server.rdb_pipe_read 中读取到的rdb数据发送给 slave
client *slave = connGetPrivateData(conn);
if ((nwritten = connWrite(conn, server.rdb_pipe_buff, server.rdb_pipe_bufflen)) == -1) {
// 发送失败
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,
"Diskless rdb transfer, write error sending DB to replica: %s",
connGetLastError(conn));
freeClient(slave);
server.rdb_pipe_conns[i] = NULL;
continue;
}
/* An error and still in connected state, is equivalent to EAGAIN */
slave->repldboff = 0;
}
else {
// 发送成功
slave->repldboff = nwritten; // slave->repldboff 表示每次发送的数据
server.stat_net_output_bytes += nwritten;
}

// 如果没有将 server.rdb_pipe_buff 中数据都发送给conn,
// 则为单独为 conn 注册可写事件
if (nwritten != server.rdb_pipe_bufflen) {
server.rdb_pipe_numconns_writing++;
connSetWriteHandler(conn, rdbPipeWriteHandler);
}

stillAlive++;
} // for-end

// 如果 server.rdb_pipe_conns 中所有的slave都掉线了
// 则直接kill子进程
if (stillAlive == 0) {
serverLog(LL_WARNING,
"Diskless rdb transfer, last replica dropped, killing fork child.");
killRDBChild();
RdbPipeCleanup();
}

// 以下两种情况要删除可读事件:
// case1: server.rdb_pipe_numconns_writing != 0
// 如果这一轮从 server.rdb_pipe_read 中读取的数据没有发送完毕,
// 则需要将这个数据发送完,才能继续从 server.rdb_pipe_read 中读取,
// 否则不知道是哪个conn没有接受到完整的数据
// case2: stillAlive == 0: 所有的slave都掉线了
if (server.rdb_pipe_numconns_writing || stillAlive == 0) {
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
break;
}
}
}

rdbPipeWriteHandler

rdbPipeWriteHandler函数,是将 rdbPipeReadHandler 函数中server.rdb_pipe_buff里的剩余数发送给没有接受到 server.rdb_pipe_bufflen 个字节的slave。因此,当conn上可写事件触发,就将剩余数据发送给slave。

  • 如果此次仍然没有将server.rdb_pipe_buff里的剩余数据发送完,则直接return,等待下次可写事件触发
  • 如果此次将server.rdb_pipe_buff里的剩余数据全部发送至slave,那么取消conn上的可写事件,并且server.rdb_pipe_numconns_writing--
  • 如果server.rdb_pipe_numconns_writing ==0,那么就可以重新开启server.rdb_pipe_read上的可读事件,从子进程继续读取rdb数据

上述最后一个部分是在内部调用rdbPipeWriteHandlerConnRemoved函数完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void rdbPipeWriteHandler(struct connection *conn) {
serverAssert(server.rdb_pipe_bufflen>0);
client *slave = connGetPrivateData(conn);
int nwritten;
if ((nwritten = connWrite(conn,
server.rdb_pipe_buff + slave->repldboff,
server.rdb_pipe_bufflen - slave->repldboff)) == -1)
{
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,
"Write error sending DB to replica: %s",
connGetLastError(conn));
freeClient(slave);
}
return;
}
// 发送成功
slave->repldboff += nwritten;
server.stat_net_output_bytes += nwritten;
// 如果待发送数据没有发送完,则等待下次可写事件的触发
if (slave->repldboff < server.rdb_pipe_bufflen)
return;

// slave->repldboff >= server.rdb_pipe_bufflen
// 表示数据发送完毕,则需要取消注册可写事件
rdbPipeWriteHandlerConnRemoved(conn);
}

rdbPipeWriteHandlerConnRemoved

如果主服务器master已经将server.rdb_pipe_buff中的数据全部发送至server.rdb_pipe_conns中的每一个没有掉线的slave,那么将为server.rdb_pipe_read重新注册可读事件,等待子进程的rdb数据到来,再次进入rdbPipeReadHandler函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
if (!connHasWriteHandler(conn))
return;
// 取消注册可写事件
connSetWriteHandler(conn, NULL);
server.rdb_pipe_numconns_writing--;

// 当 server.rdb_pipe_numconns_writing ==0
// 说明所有的conn的数据都发送完毕,
// 那么就需要重新开启可读事件,从子进程继续读取数据
if (server.rdb_pipe_numconns_writing == 0) {
if (aeCreateFileEvent(server.el,
server.rdb_pipe_read,
AE_READABLE,
rdbPipeReadHandler,NULL) == AE_ERR)
{
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
}
}

backgroundSaveDoneHandler

运行到backgroundSaveDoneHandler函数,说明子进程已经结束。那么到此,对于完全重同步任务,master已完成三步:

  • 向所有处于SLAVE_STATE_WAIT_BGSAVE_START状态的slave发送了 +FULLRESYNC server.replid offset\r\n
  • rdbSaveToSlavesSockets函数 or rdbSaveBackground函数中,已经完全生成同步所需的rdb数据
  • dickless传输方式下,当子进程已经结束,master的数据也已完全同步至slaves
  • 不是dickless方式,还需要从server.rdb_filename读取数据再完全进行同步至slave
  • 将完成主从同步的slaves状态都设置为SLAVE_STATE_ONLINE

最后两个任务,backgroundSaveDoneHandler 函数是通过调用 updateSlavesWaitingBgsave 函数来完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
switch(server.rdb_child_type) {
case RDB_CHILD_TYPE_DISK:
backgroundSaveDoneHandlerDisk(exitcode,bysignal);
break;
case RDB_CHILD_TYPE_SOCKET:
backgroundSaveDoneHandlerSocket(exitcode,bysignal);
break;
default:
serverPanic("Unknown RDB child type.");
break;
}
}

updateSlavesWaitingBgsave

updateSlavesWaitingBgsave中处理slave对象不仅仅只有 server.rdb_pipe_conns中,而是当前server.slaves

此时serer.slaves中可能有部分slaves处于SLAVE_STATE_WAIT_BGSAVE_START状态,等待开启BGSAVE,因此这部分slaves的处理和之前类似:1)获取他们的能力交集;2)调用 startBgsaveForReplication(mincapa)函数,来开启完全重同步过程。

状态是SLAVE_STATE_WAIT_BGSAVE_END的slave,则是从上面运行至此。对于这些slave,updateSlavesWaitingBgsave函数需要完成剩余的两个任务:

  • 不是dickless方式,从server.rdb_filename读取数据完全重同步至slaves
  • 传输完毕,需要将slaves的状态设置为 SLAVE_STATE_ONLINE

RDB_CHILD_TYPE_DISK类型下,将slave的状态修改为SLAVE_STATE_SEND_BULK,在回调函数sendBulkToSlave完成数据发送,再将slave状态修改为SLAVE_STATE_ONLINE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
listNode *ln;
int startbgsave = 0;
int mincapa = -1;
listIter li;

listRewind(server.slaves, &li);

while((ln = listNext(&li))) {
client *slave = ln->value;
// 等待开启BGSAVE的 slave 处理方式
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
startbgsave = 1;
mincapa = (mincapa == -1) ? slave->slave_capa : (mincapa & slave->slave_capa);
}
// other
else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
struct redis_stat buf;
// RDB_CHILD_TYPE_SOCKET 类型,已经同步完数据
// 需要将slave状态设置为 SLAVE_STATE_ONLINE
if (type == RDB_CHILD_TYPE_SOCKET) {
serverLog(LL_NOTICE,
"Streamed RDB transfer with replica %s succeeded (socket)."
"Waiting for REPLCONF ACK from slave to enable streaming",
replicationGetSlaveName(slave));

slave->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 1;
slave->repl_ack_time = server.unixtime; // 心跳检测
}
else {
// RDB_CHILD_TYPE_DISK 类型
// 需要判断子进程是否正常退出,正常退出的才有完整的rdb文件
if (bgsaveerr != C_OK) {
freeClient(slave);
serverLog(LL_WARNING,
"SYNC failed. BGSAVE child returned an error");
continue;
}
// 打开rdb文件
if ((slave->repldbfd = open(server.rdb_filename, O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1)
{
freeClient(slave);
serverLog(LL_WARNING,
"SYNC failed. Can't open/stat DB after BGSAVE: %s",
strerror(errno));
continue;
}

slave->repldboff = 0; // rdb文件已发送数据字节数
slave->repldbsize = buf.st_size; // rdb文件大小
slave->replstate = SLAVE_STATE_SEND_BULK; // 发送数据状态
// 发送文件前,先告诉slave文件大小
// 格式:$< slave->repldbsize>\r\n
slave->replpreamble = sdscatprintf(sdsempty(),
"$%lld\r\n",
(unsigned long long) slave->repldbsize);
// 先取消注册可写事件???
connSetWriteHandler(slave->conn,NULL);
// 注册可写事件,将rdb文件中的数据发送至slave
if (connSetWriteHandler(slave->conn,sendBulkToSlave) == C_ERR) {
freeClient(slave);
continue;
}
}
}
}

if (startbgsave) startBgsaveForReplication(mincapa);
}

sendBulkToSlave

server.rdb_filename中的数据发送至slave,分为四个部分:

  • slave->replpreamble ==1? ,是则先告诉slave即将要发送的rdb文件大小 slave->repldbsize
  • 从rdb文件slave->repldbfd中读取数据至buf
  • buf数据发送至slave
  • 如果rdb文件中的数据发送完毕,则取消注册可写事件、关闭slave->repldbfd,以及将slave状态修改为SLAVE_STATE_ONLINE

细节可见源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
void sendBulkToSlave(connection *conn) {
client *slave = connGetPrivateData(conn);
char buf[PROTO_IOBUF_LEN];
ssize_t nwritten, buflen;

// 0) 发送文件前,先告诉slave文件大小
if (slave->replpreamble) {
nwritten = connWrite(conn,slave->replpreamble,sdslen(slave->replpreamble));
if (nwritten == -1) {
serverLog(LL_VERBOSE,
"Write error sending RDB preamble to replica: %s",
connGetLastError(conn));
freeClient(slave);
return;
}

server.stat_net_output_bytes += nwritten;
// 裁剪 slave->replpreamble,为下次写准备
sdsrange(slave->replpreamble,nwritten,-1);
// 如果 slave->replpreamble 中数据没有发送完毕
// 直接返回,等待下次可写事件发送
if (sdslen(slave->replpreamble) != 0) {
return;
}
sdsfree(slave->replpreamble);
slave->replpreamble = NULL;
} // if (slave->replpreamble) --end

// 下面开始发送正文数据

// 1) 继续rdb文件读取未发送数据,读取位置是从文件开始偏移 slave->repldboff 个字节
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN);
if (buflen <= 0) {
serverLog(LL_WARNING,
"Read error sending DB to replica: %s",
(buflen == 0) ? "premature EOF" : strerror(errno));
freeClient(slave);
return;
}
// 2) 将从rdb读取到的数据 buf,发送至slave
if ((nwritten = connWrite(conn,buf,buflen)) == -1) {
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,
"Write error sending DB to replica: %s",
connGetLastError(conn));
freeClient(slave);
}
return;
}

// 3) 是否发送完文件数据
slave->repldboff += nwritten; // 更新已发给slave的字节数
server.stat_net_output_bytes += nwritten;

// 如果整个rdb数据发送完毕,取消注册可写事件、关闭 slave->repldbfd
// 并且要把!!!slave状态设置为 SLAVE_STATE_ONLINE
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
connSetWriteHandler(slave->conn,NULL);
putSlaveOnline(slave);
}
}

putSlaveOnline

将slave状态设置为 SLAVE_STATE_ONLINE,表示与slave的完全重同步完成了。

为slave注册可写事件。以后master与slave的数据交互应该就是传播了,即当master的客户端对master执行了写指令writeCmd,导致master的server.dirty++,那么就需要将writeCmd及其参数传播到salve,使得slave与master仍然具有一致性。此时这个写回调函数就应该是剖析REDIS的输出缓冲区中讲解的 sendReplyToClient,这样可以让slave像接受到普通客户端的指令一样去解析并执行这条写命令writeCmd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void putSlaveOnline(client *slave) {
slave->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 0;
slave->repl_ack_time = server.unixtime; // 心跳检测
// 注册新写回调函数
if (connSetWriteHandler(slave->conn, sendReplyToClient) == C_ERR) {
serverLog(LL_WARNING,
"Unable to register writable event for replica bulk transfer: %s",
strerror(errno));
freeClient(slave);
return;
}
// 更新有效从服务器个数
refreshGoodSlavesCount();
/* Fire the replica change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
NULL);
serverLog(LL_NOTICE,
"Synchronization with replica %s succeeded",
replicationGetSlaveName(slave));
}

propagate

上面详细讲解了slave向master发送PSYNC请求的过程,初步使得主从一致,继续维持这个一致性就需要靠propagate

每次调用call()函数都会判断当前指令c->cmd是否对服务器键产生修改,即server.dirty !=0,那么就会调用propagate函数,将c->cmd及其参数传播到slaves。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void call(client *c, int flags) {
// ...
/* Propagate the command into the AOF and replication link */
if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) {
int propagate_flags = PROPAGATE_NONE;

// 只要这个命令对数据库产生修改,就需要AOF持久化 以及复制行为
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);

// 如果设置了强制传播,那么不论 server.dirty 是否改变,
// 都会将call调用的指令传播到所有的 slaves
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;

// 但是如果调用 preventCommandPropagation() 函数来阻止传播
// 则需要去掉标志位
if (c->flags & CLIENT_PREVENT_REPL_PROP || !(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP || !(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;

// 传播!!!
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
//...
}

propagate函数中,调用replicationFeedSlaves函数将当前指令及其参数argv传递给所有的server.slaves

1
2
3
4
5
6
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) {
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves, dbid, argv, argc);
}

replicationFeedSlaves

replicationFeedSlaves函数,只能被最顶级的master调用。如果一个slave也有sub-slave要实现这样的功能需要调用replicationFeedSlavesFromMasterStream函数,原因见上文分析。整个过程也是类似:

  • 如果dictId和上次执行写入的指令所在的server.slaveseldb不一致,则需要将SELECT dictid以rdb格式序列化后写入server.repl_backlog和所有的slaves
  • 将导致server.diry++的指令及其参数分别写入server.repl_backlog和所有的slaves
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[LONG_STR_SIZE];

// 不是最顶层的master,则直接返回
if (server.masterhost != NULL) return;

/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;

serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));

// 不等,则需要先将 SELECT dictid 序列化成rdb格式字符串
// 1 先发送到 server.repl_backlog 中
// 2 再发送给所有的 slaves
if (server.slaveseldb != dictid) {
robj *selectcmd;

if (0 <= dictid && dictid < PROTO_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid];
} else {
int dictid_len;
// 序列化 SELECT dictid
dictid_len = ll2string(llstr, sizeof(llstr), dictid);
selectcmd = createObject(OBJ_STRING,
sdscatprintf(sdsempty(),
"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len,
llstr));
}

// 将selectcmd添加到复制缓冲区
if (server.repl_backlog)
feedReplicationBacklogWithObject(selectcmd);

// 再发送给所有的slaves
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START)
continue;
addReply(slave,selectcmd);
}

if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}

server.slaveseldb = dictid;

/* Write the command to the replication backlog if any. */
// 将写指令添加到 sever.repl_backlog
if (server.repl_backlog) {
char aux[LONG_STR_SIZE+3];

// *<argc>\r\n
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);

// $<len>\r\n<argv[j]\r\n,
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);

//aux : $len\r\n
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
feedReplicationBacklogWithObject(argv[j]);
feedReplicationBacklog(aux+len+1,2);
}
}

// 再将写写指令发送给slaves
/* Write the command to every slave. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
// 连接好的客户端状态都是 ONLINE
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;

// *<argc>\r\n
addReplyArrayLen(slave,argc);
// 将每个参数都序列化: $<len>\r\n<argv[j]\r\n,
// 发送给slave
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}

replicationCron

replicationCron函数是个定时触发的函数,除去上述已提到的功能外,主要就是检测超时。

1)检测slave与master在建立连接的过程是否超时

REDIS的超时时间server.repl_timeout默认是60s,每个slave都有个server.repl_transfer_lastio字段,记录着最近一次读取master数据的时间。在slave与master建立连接的过程中,会记录更新每个状态server.repl_state是否超时。

1
2
3
4
5
6
7
if (server.masterhost &&
// (server.repl_state == REPL_STATE_CONNECTING || slaveIsInHandshakeState()) && 校验是否超时
// server.repl_state == REPL_STATE_TRANSFER && // 完全重同步超时
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
//....
}

2)超时连接

当主从已经建立好连接,slave的状态是 REPL_STATE_CONNECTED。那么之后slave会在replicationCron函数中检测与master的连接是否超时。

1
2
3
4
5
6
7
if (server.masterhost && 
server.repl_state == REPL_STATE_CONNECTED &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{
serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
freeClient(server.master);
}

连接是否超时,基于server.master->lastinteraction字段判断,而这个字段更新有两处:

  • writeToClient:即 slave向master发送数据会更新

    1
    2
    3
    4
    5
    6
    7
    int writeToClient(client *c, int handler_installed) {
    //...
    if (totwritten > 0) {
    if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
    }
    //...
    }
  • readQueryFromClient:即 slave接受到master的数据也会更新

    1
    2
    3
    4
    5
    void readQueryFromClient(connection *conn) { 
    //...
    c->lastinteraction = server.unixtime;
    //...
    }

为了维持主从连接,主服务器会在replicationCron函数中定时向所有的slave发送PING,来让从服务器slave更新server.master->lastinteraction字段。其中server.repl_ping_slave_period默认值是10,大概每10ms发送一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if ((replication_cron_loops % server.repl_ping_slave_period) == 0 && listLength(server.slaves)) {
// 不是集群模式,就会发送PING
int manual_failover_in_progress = server.cluster_enabled &&
server.cluster->mf_end &&
clientsArePaused();

if (!manual_failover_in_progress) {
ping_argv[0] = createStringObject("PING", 4);

// 将 PING 发送给从服务器
replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
decrRefCount(ping_argv[0]);
}
}

3)心跳检测
而slave也会定时向主服务器发送 REPLCONF ACK offset指令,主要目的:

+ 告诉主服务器自己当前的复制偏移量;
+ 心跳检测,告诉master自己还活着
1
2
3
4
5
6
if (server.masterhost && 
server.master &&
!(server.master->flags & CLIENT_PRE_PSYNC))
{
replicationSendAck();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
// 向master发送: REPLCONF ACK reploff
void replicationSendAck(void) {
client *c = server.master;

if (c != NULL) {
c->flags |= CLIENT_MASTER_FORCE_REPLY; // CLIENT_MASTER_FORCE_REPLY 表示有数据待发送给 master
addReplyArrayLen(c,3);
addReplyBulkCString(c,"REPLCONF");
addReplyBulkCString(c,"ACK");
addReplyBulkLongLong(c,c->reploff);
c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
}
}

通过 slave->repl_ack_time字段,主服务器就可以判断从服务器中是否有slave超时连接,有则调用freeClient(slave)关闭连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* Disconnect timedout slaves. */
if (listLength(server.slaves)) {
listIter li;
listNode *ln;

listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

if (slave->replstate != SLAVE_STATE_ONLINE) continue;
if (slave->flags & CLIENT_PRE_PSYNC) continue;
// 超时,则关闭与从服务器的链接
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) {
serverLog(LL_WARNING,
"Disconnecting timedout replica: %s",
replicationGetSlaveName(slave));
freeClient(slave);
}
}
}