为保证主从一致,replication主要实现了两件事:
同步:将master的数据同步到slaves,此时主从一致
传播:将导致master的数据库发生改变的操作传播到slaves,使得主从仍旧一致
SYNC 客户端向slave发SLAVEOF IP PORT
,使得slave成为由(IP, PORT)
指定的master的slave,slave会向master发送SYNC RUN_ID OFFSET
指令,请求与master完成数据同步:
slave向master发生SYNC命令
master收到SYNC命令之后执行BGSAVE
命令,生成RDB文件,并使用一个缓冲区记录从此刻开始执行的所有写命令
BGSAVE
命令结束,master将RDB文件发送给slave,再将缓冲区的数据发送给slave
SYNC命令结束,主从此刻数据保持一致性。
SYNC指令,适用于slave初次复制master,即slave之前没复制过当前maste。但是,对于断线重新连接比较耗时、效率低,因此此时只是想复制断线期间的不同,而不是整个数据库,即,SYNC不支持同步master的部分数据,仅支持同步全部数据。因此,针对SYNC的缺陷,在REDIS2.8之后引入了部分重同步,新的协议是PSYNC
PSYNC
PSYNC具有完全重同步 (F ull re-S ynchronization)和部分重同步(P artial re-S ynchronization)两种能力:
完全重同步:用于slave初次复制master的情况,和SYNC命令一致。
部分重同步:用于断线重连时的复制情况,此时slave应该复制断线期间master累计的写命令,就可以恢复主从数据一致性。
部分重同步,主要是由下面三个部分组成:
server.repl_backlog
:是一个固定长度循环数组,但可以看作无限长的数组,可以一直向server.repl_backlog
中循环写入数据,只是读取不及时会被覆盖。由三个变量指示着server.repl_backlog
的状态:
+ server.repl_backlog_size
:是server.repl_backlog
的固定大小,默认是1M,可以由配置文件修改 + server.repl_backlog_histlen
:是向server.repl_backlog
中已经写入的数据长度 + server.repl_backlog_idx
:是server.repl_backlog
当前写指针的位置
上面三个字段都不会超过server.repl_backloh_size
的大小。
复制偏移量
+ server.master_repl_offset
:是当前写入server.repl_backlog
的总共字节数,可以看着是写入的缓冲区的最后一个字节。每次主服务回应从服务器PSYNC请求,在完全重同步时,+FULLSYNC master_replid offset
中的offset
即server.master_repl_offset
+ server.repl_backlog_off
:总是等于server.master_repl_offset-server.repl_backlog_histlen + 1
,因此可以看作的缓冲区的第一个字节
因此,当写入字节较多,会出现server.master_repl_offset > server.repl_backlog_size
feedReplicationBacklog feedReplicationBacklog
函数向server.repl_backlog
中添加长度为len
的数据 ptr
的过程如下:
+ ` server.master_repl_offset `:表示一共向`server.repl_backlog`添加的字节数,
+ `server.repl_backlog_off`:表示当前待同步偏移量
这两个是一直增长的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void feedReplicationBacklog (void *ptr, size_t len) { unsigned char *p = ptr; server.master_repl_offset += len; while (len) { size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; if (thislen > len) thislen = len; memcpy (server.repl_backlog+server.repl_backlog_idx, p, thislen); server.repl_backlog_idx += thislen; if (server.repl_backlog_idx == server.repl_backlog_size) server.repl_backlog_idx = 0 ; len -= thislen; p += thislen; server.repl_backlog_histlen += thislen; } if (server.repl_backlog_histlen > server.repl_backlog_size) server.repl_backlog_histlen = server.repl_backlog_size; server.repl_backlog_off = server.master_repl_offset - server.repl_backlog_histlen + 1 ; }
addReplyReplicationBacklog addReplyReplicationBacklog
函数,将[offset, server.master_repl_off]
的数据添加到发送缓冲区,但是由于offset
和server.master_repl_off
会大于server.repl_backlog_size
,因此要经过一些转换将将offset
转换到隔离范围内。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 long long addReplyReplicationBacklog (client *c, long long offset) { long long j, skip, len; serverLog (LL_DEBUG, "[PSYNC] Replica request offset: %lld" , offset); if (server.repl_backlog_histlen == 0 ) { serverLog (LL_DEBUG, "[PSYNC] Backlog history len is zero" ); return 0 ; } serverLog (LL_DEBUG, "[PSYNC] Backlog size: %lld" , server.repl_backlog_size); serverLog (LL_DEBUG, "[PSYNC] First byte: %lld" , server.repl_backlog_off); serverLog (LL_DEBUG, "[PSYNC] History len: %lld" , server.repl_backlog_histlen); serverLog (LL_DEBUG, "[PSYNC] Current index: %lld" ,server.repl_backlog_idx); skip = offset - server.repl_backlog_off; serverLog (LL_DEBUG, "[PSYNC] Skipping: %lld" , skip); j = (server.repl_backlog_idx + (server.repl_backlog_size - server.repl_backlog_histlen)) % server.repl_backlog_size; serverLog (LL_DEBUG, "[PSYNC] Index of first byte: %lld" , j); j = (j + skip) % server.repl_backlog_size; len = server.repl_backlog_histlen - skip; serverLog (LL_DEBUG, "[PSYNC] Reply total length: %lld" , len); while (len) { long long thislen = ((server.repl_backlog_size - j) < len) ? (server.repl_backlog_size - j) : len; serverLog (LL_DEBUG, "[PSYNC] addReply() length: %lld" , thislen); addReplySds (c, sdsnewlen (server.repl_backlog + j, thislen)); len -= thislen; j = 0 ; } return server.repl_backlog_histlen - skip; }
服务器的运行Id
每个服务器都有RUN_ID
,每个RUN_ID
都是由于40个随机的16进制字符。
+ 初次复制的时候或者之前执行过SLAVEOF NO ONE
命令,slave不知道master的RUN_ID
,此时发送的PSYNC指令是 **PSYNC ? -1
**。master会将自己的RUN_ID
发送给slave,slave会将其保存在server.replid
+ 断开重连后,slave向master发送PSYNC指令,master会检测PSYNC中的RUN_ID
,是否与自己的RUN_ID
一致。如果一致则执行部分重同步;如果RUN_ID
不一致,则执行完全重同步。
上面讲解了实现PSYNC指令在slave端的操作,下面讲解下master对PSYNC指令的回应。
如果master判断要执行完全重同步,则回复slave: +FULLRESYNC RUN_ID OFFSET ,其中Run Id是master的运行I的,供Slave保存,OFFSET
是主服务器当前的偏移量server.master_repl_offset
,从服务器会将其作为自己的初始化偏移量。
如果master判断要执行部分冲同步,则回复slave:+CONTINUE ,
如果主服务器无法识别PSYNC命令,则回复slave:- ERR ,master的REdis版本低于2.8,slave需要发送SYNC指令。
主从服务器的同步过程大致如下图:
然后,由于在实际中经常会下面情况,在PSYNC之后又实现了PYSNC2指令:
出现从服务器出现故障,使得复制信息丢失
master出现故障,导致主从切换,此时需要从多个从服务器中选择一个作为master,那么master的RUN_ID
就会发生改变
这个时候,是无法执行部分重新同步,在REDIS4.0又提出了PSYNC2协议。
PYSNC2
PSYNC2主要进行了两点进行优化:
将主从复制信息持久化。将REDIS的复制信息给持久化到RDB文件中(关于RDB详细可见我的RDB设计分析),那么服务器重启的时候,就能加载RDB中的复制信息,依旧可以继续复制了。
1 2 3 4 5 6 if (rdbSaveAuxFieldStrInt (rdb,"repl-stream-db" ,rsi->repl_stream_db) == -1 ) return -1 ; if (rdbSaveAuxFieldStrStr (rdb,"repl-id" ,server.replid) == -1 ) return -1 ; if (rdbSaveAuxFieldStrInt (rdb,"repl-offset" ,server.master_repl_offset) == -1 ) return -1 ;
存储上一个主服务器的复制信息
1 2 3 4 5 6 struct redisServer { char replid2[CONFIG_RUN_ID_SIZE+1 ]; long long second_replid_offset; };
当master发生故障,就调用shiftReplicationId
函数使用server.replid2
和server.second_replid_offset
来存储该master的复制信息。此外,判断部分同步的条件也会修改,代码下面叙述。
Slave端开始解析
replicaofCommand 在REDIS5.0开始,SLAVEOF
指令和REPLAOF
指令完全一致,都是调用replicaofCommand
函数开始的,replicaofCommand
函数,输入有两种参数:
SLAVEOF NO ONE
:取消从服务器的复制行为
SLAVEOF IP PORT
:使从服务器slave复制由(IP, PORT)
指定主服务器。
SLAVEOF IP PORT 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 void replicaofCommand (client *c) { else { long port; if (c->flags & CLIENT_SLAVE) { addReplyError (c, "Command is not valid when client is a replica." ); return ; } if ((getLongFromObjectOrReply (c, c->argv[2 ], &port, NULL ) != C_OK)) return ; if (server.masterhost && !strcasecmp (server.masterhost, c->argv[1 ]->ptr) && server.masterport == port) { serverLog (LL_NOTICE, "REPLICAOF would result into synchronization " "with the master we are already connected " "with. No operation performed." ); addReplySds (c,sdsnew ("+OK Already connected to specified master\r\n" )); return ; } replicationSetMaster (c->argv[1 ]->ptr, port); sds client = catClientInfoString (sdsempty (),c); serverLog (LL_NOTICE, "REPLICAOF %s:%d enabled (user request from '%s')" , server.masterhost, server.masterport, client); sdsfree (client); } addReply (c,shared.ok); }
replicationSetMaster replicationSetMaster
函数的调用者可能是从服务器,也可能是主服务器,但目的都是将调用replicationSetMaster
函数的服务器server变成由(IP, PORT)
指定服务器的slave。步骤如下:
将要复制的master的(IP,PORT)
信息存储到server.masterhos
和server.masterport
。
如果server是slave,已经复制了某个主服务器,那么此次SLAVEOF
命令使server复制到新的主服务器,则需要释放掉之前存储的server.master
信息,即freeClient(server.master)
。进一步,server切换主服务后,那么之前状态下的被阻塞的客户端需要解除阻塞;
如果server
也有子服务器sub-slaves,那么也要断开连接。这是因为之前sub-slaves是与server之间保持同步,现在server更改server.master
了,如果sub-slaves也与server继续保持连接,会破坏sub-slaves的数据。
1 2 3 4 5 6 7 8 9 void disconnectSlaves (void ) { listIter li; listNode *ln; listRewind (server.slaves,&li); while ((ln = listNext (&li))) { freeClient ((client*)ln->value); } }
如果server是slave,但是当前正在与之前的主服务器建立连接,那么就需要调用cancelReplicationHandshake
函数断开连接,为新的连接准备。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 int cancelReplicationHandshake (void ) { if (server.repl_state == REPL_STATE_TRANSFER) { replicationAbortSyncTransfer (); server.repl_state = REPL_STATE_CONNECT; } else if (server.repl_state == REPL_STATE_CONNECTING || slaveIsInHandshakeState ()) { undoConnectWithMaster (); server.repl_state = REPL_STATE_CONNECT; } else { return 0 ; } return 1 ; }
如果server是master,或者之前没有复制过任何服务器。那么调用replicationSetMaster
函数会导致主从切换,即之前server是别的slave的master,现在server要变成(IP,PORT)
服务器的slave,则要考虑缓存自己的信息:
+ 先调用replicationDiscardCachedMaster
函数丢弃之前缓存在server.cached_master
的信息· + 调用replicationCacheMasterUsingMyself
函数将自己缓存在server.cached_master
·
最后,将自己的状态设置为 REPL_STATE_CONNECT
,这是主从建立的第一个状态。
完整的流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 void replicationSetMaster (char *ip, int port) { int was_master = server.masterhost == NULL ; sdsfree (server.masterhost); server.masterhost = sdsnew (ip); server.masterport = port; if (server.master) { freeClient (server.master); } disconnectAllBlockedClients (); disconnectSlaves (); cancelReplicationHandshake (); if (was_master) { replicationDiscardCachedMaster (); replicationCacheMasterUsingMyself (); } server.repl_state = REPL_STATE_CONNECT; }
replicationDiscardCachedMaster 丢弃之前缓存在server.cached_master
的信息。这个函数会被主服务master在上面调用,但是也会被从服务器slave调用,
待进一步解读
1 2 3 4 5 6 7 8 void replicationDiscardCachedMaster (void ) { if (server.cached_master == NULL ) return ; serverLog (LL_NOTICE,"Discarding previously cached master state." ); server.cached_master->flags &= ~CLIENT_MASTER; freeClient (server.cached_master); server.cached_master = NULL ; }
replicationCacheMasterUsingMyself 要缓存主服务器的信息,有两种可能,一种是slave将自己的server.master
缓存到server.cached_master
,另一种是master把自己缓存到server.cached_master
。顾名思义,replicationCacheMasterUsingMyself
属于后者,只被主服务器master调用,因此调用此函数的server即master,执行步骤如下:
缓存server的复制偏移量server.master_repl_offset
由于主服务器的server.master==NULL
,因此需要调用replicationCreateMasterClient
函数创建一个server.master
。
由于server缓存自己,不需要和自己建立连接通信,因此 replicationCreateMasterClient
的第一个参数设置NULL.
1 replicationCreateMasterClient (NULL , -1 );
server将自己的RUN_ID
缓存到server.master->replid
。
将server.master
断开所有的引用后,将server.master
信息转移到server.cached_master
。这是因为server变成另一个主服务器的slave,那么之前的引用就不再有效了,应该断开相关引用。
由于master即将也变成slave,因此也要将 server.master=NULL
但是replicationCacheMasterUsingMyself
函数会调用只有两种场景:
对一个从未复制过的REDIS服务器执行SLAVEOF
命令,会将一个单机变成slave。此时,相当于对server.cached_master
进行了一次初始化:
1 2 3 4 5 server.cached_master->conn =NULL ; server.cached_master->flags |= CLIENT_MASTER; server.cached_master->reploff = 0 ; server.cached_master->read_reploff =0 ; server.cached_master->replid = NULL ;
对一个master执行SLAVEOF
命令,此时会将master变成slave。那么此时 replicationCacheMasterUsingMyself
函数就是缓存当前master的信息。
完整的过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void replicationCacheMasterUsingMyself (void ) { serverLog (LL_NOTICE, "Before turning into a replica, using my own master parameters " "to synthesize a cached master: I may be able to synchronize with " "the new master with just a partial transfer." ); server.master_initial_offset = server.master_repl_offset; replicationCreateMasterClient (NULL , -1 ); memcpy (server.master->replid, server.replid, sizeof (server.replid)); unlinkClient (server.master); server.cached_master = server.master; server.master = NULL ; }
replicationCreateMasterClient 在介绍 replicationCreateMasterClient
函数的实现之前,进一步介绍下server.master
的作用。
master和自己的slave之间通信是通过socket实现,在REDIS中封装了网络通信的对象即struct client
。因此当建立了主从连接时,在每个slave里都需要创建一个server.master
,slave都是通过server.master
与主服务器数据交互,相应的主服务中也有个链表结构server.slaves
,保存着所有的从服务器的通信接口。
因此,主从服务器建立连接时会调用replicationCreateMasterClient
函数,为每个slave创建一个server.master
对象,而主服务器的server.master==NULL
。
replicationCreateMasterClient
函数,创建server.master
实例的步骤如下:
调用createClient()
函数,创建客户端实例 server.master
如果调用 replicationCreateMasterClient
的函数是个slave,那么传入的参数conn !=NULL
,就和往常一样需要为conn
注册事件,监听主服务发送给slave的数据,此时的可读事件回调函数依然是 readQueryFromClient
。如果调用者是master,那么conn==NULL
,就不需要注册可读事件。
给客户端server.master
标志位CLIENT_MASTER
设置与主从复制相关的三个重要变量:
server.master->reploff
:记录主服务器的偏移量
server.master->read_reploff
:偏移量的最后一个字节
server.master->replid
:主服务器的RUN_ID
如果初始化下,server.master_initial_offset ==-1
, 表示发送的SYNC指令。主服务器版本低于REDIS2.8初始化位-1
只要dictid != -1
,则会为server,master
操作的数据库选择一个db
完整的过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void replicationCreateMasterClient (connection *conn, int dbid) { server.master = createClient (conn); if (conn) connSetReadHandler (server.master->conn, readQueryFromClient); server.master->flags |= CLIENT_MASTER; server.master->authenticated = 1 ; server.master->reploff = server.master_initial_offset; server.master->read_reploff = server.master->reploff; server.master->user = NULL ; memcpy (server.master->replid, server.master_replid, sizeof (server.master_replid)); if (server.master->reploff == -1 ) server.master->flags |= CLIENT_PRE_PSYNC; if (dbid != -1 ) selectDb (server.master, dbid); }
connectWithMaster 前面的replicaofCommand
函数,为后面的主从之间连接做好了准备,下面就要slave就要与master建立连接了。建立连接的任务由connectWithMaster
函数完成,但是这个函数在 replicationCron
中调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void replicationCron (void ) { if (server.repl_state == REPL_STATE_CONNECT) { serverLog (LL_NOTICE, "Connecting to MASTER %s:%d" , server.masterhost, server.masterport); if (connectWithMaster () == C_OK) serverLog (LL_NOTICE,"MASTER <-> REPLICA sync started" ); } }
connectWithMaster
函数步骤如下:
建立主从用于主从通信的connection
对象server.repl_transfer_s
调用connConnect
注册连接回调函数,等待连接成功就调用syncWithMaster
来完成主从连接。
将slave的状态更新为 REPL_STATE_CONNECTING
完整的过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 int connectWithMaster (void ) { server.repl_transfer_s = server.tls_replication ? connCreateTLS () : connCreateSocket (); if (connConnect (server.repl_transfer_s, server.masterhost, server.masterport, NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) { serverLog (LL_WARNING, "Unable to connect to MASTER: %s" , connGetLastError (server.repl_transfer_s)); connClose (server.repl_transfer_s); server.repl_transfer_s = NULL ; return C_ERR; } server.repl_transfer_lastio = server.unixtime; server.repl_state = REPL_STATE_CONNECTING; return C_OK; }
syncWithMaster REDIS定义了13个slave的与master的连接状态,执行SLAVEOF的slave必须从 REPL_STATE_NONE
状态变成 REPL_STATE_CONNECTED
状态才算是成功建立连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 #define REPL_STATE_NONE 0 #define REPL_STATE_CONNECT 1 #define REPL_STATE_CONNECTING 2 #define REPL_STATE_RECEIVE_PONG 3 #define REPL_STATE_SEND_AUTH 4 #define REPL_STATE_RECEIVE_AUTH 5 #define REPL_STATE_SEND_PORT 6 #define REPL_STATE_RECEIVE_PORT 7 #define REPL_STATE_SEND_IP 8 #define REPL_STATE_RECEIVE_IP 9 #define REPL_STATE_SEND_CAPA 10 #define REPL_STATE_RECEIVE_CAPA 11 #define REPL_STATE_SEND_PSYNC 12 #define REPL_STATE_RECEIVE_PSYNC 13 #define REPL_STATE_TRANSFER 14 #define REPL_STATE_CONNECTED 15
syncWithMaster
函数,则是将slave从 REPL_STATE_CONNECTING
状态变成 REPL_STATE_CONNECTED
,完成连接。下面逐个状态进行分析。
syncWithMaster
判断当前状态是 REPL_STATE_ONE
,则不是由于上面回调过来的,而是由于SLAVEOF NO ONE
触发了syncWithMaster
函数则直接返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 void syncWithMaster (connection *conn) { char tmpfile[256 ], *err = NULL ; int dfd = -1 , maxtries = 5 ; int psync_result; if (server.repl_state == REPL_STATE_NONE) { connClose (conn); return ; } }
此外,还需要判断connConnect
函数是否调用成功,即socket是否连接成功
1 2 3 4 5 6 if (connGetState (conn) != CONN_STATE_CONNECTED) { serverLog (LL_WARNING, "Error condition on socket for SYNC: %s" , connGetLastError (conn)); goto error; }
REPL_STATE_CONNECTING
to REPL_STATE_RECEIVE_PONG
当校验完成后,socket确实连接成功。
+ 注册可读事件,回调函数还是syncWithMaster
,·等待master对PING
的回应 + 取消可写事件,因为此次要发送的数据只有PING
,四个字符,不存在发送不完 + 将slave的状态转变为 REPL_STATE_RECEIVE_PONG
+ slave基于conn向master发送PING
注意 :执行完就直接return了,也就是说下次调用 syncWithMaster
则应该服务器对PING
回应
1 2 3 4 5 6 7 8 9 10 11 12 if (server.repl_state == REPL_STATE_CONNECTING) { serverLog (LL_NOTICE, "Non blocking connect for SYNC fired the event." ); connSetReadHandler (conn, syncWithMaster); connSetWriteHandler (conn, NULL ); server.repl_state = REPL_STATE_RECEIVE_PONG; err = sendSynchronousCommand (SYNC_CMD_WRITE, conn, "PING" , NULL ); if (err) goto write_error; return ; }
REPL_STATE_RECEIVE_PONG
to REPL_STATE_SEND_AUTH
slave接受到master对PING的回应后,先校验有效性,两种是有效的
+ +PONG
:有效 + -NOAUTH
:有效,不设置认证 + -ERR operation not permitted
:在REDIS低于2.8时回复
如果没有发生错误,则将状态转为 REPL_STATE_SEND_AUTH
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 if (server.repl_state == REPL_STATE_RECEIVE_PONG) { err = sendSynchronousCommand (SYNC_CMD_READ, conn, NULL ); if (err[0 ] != '+' && strncmp (err,"-NOAUTH" ,7 ) != 0 && strncmp (err,"-ERR operation not permitted" ,28 ) != 0 ) { serverLog (LL_WARNING,"Error reply to PING from master: '%s'" ,err); sdsfree (err); goto error; } else { serverLog (LL_NOTICE, "Master replied to PING, replication can continue..." ); } sdsfree (err); server.repl_state = REPL_STATE_SEND_AUTH; }
上面的if分支没有return,而是直接向下执行,根绝server.masteruser
与server.masterauth
查看是否需要设置认证。,如果需要则向master发送AUTH
信息,进入下一步;否则则直接进入第5步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 if (server.repl_state == REPL_STATE_SEND_AUTH) { if (server.masteruser && server.masterauth) { err = sendSynchronousCommand (SYNC_CMD_WRITE, conn, "AUTH" , server.masteruser, server.masterauth, NULL ); if (err) goto write_error; server.repl_state = REPL_STATE_RECEIVE_AUTH; return ; } else if (server.masterauth) { err = sendSynchronousCommand (SYNC_CMD_WRITE, conn, "AUTH" , server.masterauth, NULL ); if (err) goto write_error; server.repl_state = REPL_STATE_RECEIVE_AUTH; return ; } else { server.repl_state = REPL_STATE_SEND_PORT; } }
REPL_STATE_SEND_AUTH
to REPL_STATE_RECEIVE_AUTH
如果设置了密码认证,则会在第4步会验证master的回复。如果没有错误、校验成功后,再将slave状态转换为 REPL_STATE_SEND_PORT
1 2 3 4 5 6 7 8 9 10 if (server.repl_state == REPL_STATE_RECEIVE_AUTH) { err = sendSynchronousCommand (SYNC_CMD_READ, conn, NULL ); if (err[0 ] == '-' ) { serverLog (LL_WARNING,"Unable to AUTH to MASTER: %s" ,err); sdsfree (err); goto error; } sdsfree (err); server.repl_state = REPL_STATE_SEND_PORT; }
REPL_STATE_SEND_PORT
to REPL_STATE_RECEIVE_PORT
在这里,slave向master发送 REPLCONF listening-port PORT
信息,告诉master自己的监听端口。向master发送PORT,以及后面的IP只是为了INFO
指令信息。
如果没有发生写错误,将状态转换为REPL_STATE_RECEIVE_PORT
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 if (server.repl_state == REPL_STATE_SEND_PORT) { int port; if (server.slave_announce_port) port = server.slave_announce_port; else if (server.tls_replication && server.tls_port) port = server.tls_port; else port = server.port; sds portstr = sdsfromlonglong (port); err = sendSynchronousCommand (SYNC_CMD_WRITE, conn, "REPLCONF" , "listening-port" ,portstr, NULL ); sdsfree (portstr); if (err) goto write_error; sdsfree (err); server.repl_state = REPL_STATE_RECEIVE_PORT; return ; }
REPL_STATE_RECEIVE_PORT
to REPL_STATE_SEND_IP
等待master对 REPLCONF listening-port PORT
的回复,没有错误则进入下一个状态 REPL_STATE_SEND_IP
1 2 3 4 5 6 7 8 9 10 11 12 if (server.repl_state == REPL_STATE_RECEIVE_PORT) { err = sendSynchronousCommand (SYNC_CMD_READ, conn, NULL ); if (err[0 ] == '-' ) { serverLog (LL_NOTICE, "(Non critical) Master does not understand REPLCONF listening-port: %s" , err); } sdsfree (err); server.repl_state = REPL_STATE_SEND_IP; }
上面是if分支是没有return的,因此直接进入下面的分支。在此判断是否要跳过IP认证阶段,默认情况下,不把ip地址发送master。
如果修改配置文件,server.slave_announce_ip
不是 NULL
,则进入下一步,否则进入第9步
1 2 3 4 5 if (server.repl_state == REPL_STATE_SEND_IP && server.slave_announce_ip == NULL ) { server.repl_state = REPL_STATE_SEND_CAPA; }
REPL_STATE_SEND_IP
to REPL_STATE_RECEIVE_IP
向master发送 REPLCONF ip-address ip
1 2 3 4 5 6 7 8 9 10 if (server.repl_state == REPL_STATE_SEND_IP) { err = sendSynchronousCommand (SYNC_CMD_WRITE, conn, "REPLCONF" , "ip-address" , server.slave_announce_ip, NULL ); if (err) goto write_error; sdsfree (err); server.repl_state = REPL_STATE_RECEIVE_IP; return ; }
REPL_STATE_RECEIVE_IP
to REPL_STATE_SEND_CAPA
在这忽略错误,因为不是所有的REDIS版本都支持 REPLCONF ip-address ip
命令。
1 2 3 4 5 6 7 8 9 10 11 12 if (server.repl_state == REPL_STATE_RECEIVE_IP) { err = sendSynchronousCommand (SYNC_CMD_READ, conn, NULL ); if (err[0 ] == '-' ) { serverLog (LL_NOTICE, "(Non critical) Master does not understand REPLCONF ip-address: %s" , err); } sdsfree (err); server.repl_state = REPL_STATE_SEND_CAPA; }
REPL_STATE_SEND_CAPA
to REPL_STATE_RECEIVE_CAPA
CAPAd是单词capabilities的缩写,表示slave具有的能力。slave向master发送 REPLCONF capa eof capa pasync2
eof
:表示slave支持 dickless 方式的同步数据流程,即master可以直接将server.db
中的键值以RDB协议格式向pipe[1]
里写,slave从pipe[0]
中读取数据,而不需要经过先序列化为RDB文件,再将RDB文件发送到slave的过程。
pasync2
:即slave支持PSYNC2协议。对于slave的PSYNC请求,主服务器发生故障,发生主从切换后执行部分重同步,支持PYSNC2协议回复的是 +CONTINUE new_repl_id
,如果不支持PYSNC2协议回复的是+CONTINUE
。
再将slave状态转向为 REPL_STATE_RECEIVE_CAPA
1 2 3 4 5 6 7 8 9 10 if (server.repl_state == REPL_STATE_SEND_CAPA) { err = sendSynchronousCommand (SYNC_CMD_WRITE, conn, "REPLCONF" , "capa" , "eof" , "capa" , "psync2" , NULL ); if (err) goto write_error; sdsfree (err); server.repl_state = REPL_STATE_RECEIVE_CAPA; return ; }
REPL_STATE_RECEIVE_CAPA
to REPL_STATE_SEND_PSYNC
上面slave告诉master自己具备的功能,但是如果master的REDIS版本较低无法识别这些功能,那么就会回复slave错误,但是这不是致命错误,因此仍旧可以继续向下运行。
继续将 slave状态设置为 REPL_STATE_SEND_PSYNC
1 2 3 4 5 6 7 8 9 10 if (server.repl_state == REPL_STATE_RECEIVE_CAPA) { err = sendSynchronousCommand (SYNC_CMD_READ, conn, NULL ); if (err[0 ] == '-' ) { serverLog (LL_NOTICE, "(Non critical) Master does not understand REPLCONF capa: %s" , err); } sdsfree (err); server.repl_state = REPL_STATE_SEND_PSYNC; }
REPL_STATE_SEND_PSYNC
to REPL_STATE_RECEIVE_PSYNC
在此:
slave调用函数slaveTryPartialResynchronization
函数向master发起PSYNC请求,
将状态转变为REPL_STATE_RECEIVE_PSYNC
这里,是有return返回的,因为需要等待master对PYSNC指令的回应。下次调用syncWithMaster
函数状态应该是 REPL_STATE_RECEIVE_PSYNC
1 2 3 4 5 6 7 8 9 if (server.repl_state == REPL_STATE_SEND_PSYNC) { if (slaveTryPartialResynchronization (conn, 0 ) == PSYNC_WRITE_ERROR) { err = sdsnew ("Write error sending the PSYNC command." ); goto write_error; } server.repl_state = REPL_STATE_RECEIVE_PSYNC; return ; }
slaveTryPartialResynchronization 先中断下介绍 slaveTryPartialResynchronization
函数怎么发起PYSNC请求的,再续上面的状态变化。此函数的read_reply
选项,决定着slave从conn
里读取master的回复还是向master写数据。
发送PSYNC请求
read_reply ==0
时slave向master发送PSYNC replid offset
命令,请求同步数据。
在slave端,根据 server.cached_master
来判断是否发起部分重同步PSYNC请求:
server.cached_master !=NULL
:说明在此次启动slave之前,slave已复制过一个主服务器,因故障导致下线时将该主服务的信息缓存在serer.cached_master
:
之前主服务器的replid
即 server.cached_master->replid
,
与之前主服务的复制偏移量offset
即 server.cached_master->reploff+1
因此,当slave向新的主服务器master(由(IP,PORT)
指定)发送PSYNC replid offset
时,master会根据replid
以及offset
判断,是否能执行部分重同步。如果能,则回复+CONTINUE
;不能则执行完全重同步。
server.cached_master ==NULL
,则直接发送PSYNC ? -1
,请求执行完全重同步
如果没有发生错误,则返回 PSYNC_WAIT_REPLY
状态,否则返回 PSYNC_WRITE_ERROR
状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 int slaveTryPartialResynchronization (connection *conn, int read_reply) { char *psync_replid; char psync_offset[32 ]; sds reply; if (!read_reply) { server.master_initial_offset = -1 ; if (server.cached_master) { psync_replid = server.cached_master->replid; snprintf (psync_offset,sizeof (psync_offset),"%lld" , server.cached_master->reploff+1 ); serverLog (LL_NOTICE, "Trying a partial resynchronization (request %s:%s)." , psync_replid, psync_offset); } else { serverLog (LL_NOTICE, "Partial resynchronization not possible (no cached master)" ); psync_replid = "?" ; memcpy (psync_offset,"-1" ,3 ); } reply = sendSynchronousCommand (SYNC_CMD_WRITE, conn, "PSYNC" , psync_replid, psync_offset, NULL ); if (reply != NULL ) { serverLog (LL_WARNING, "Unable to send PSYNC to master: %s" , reply); sdsfree (reply); connSetReadHandler (conn, NULL ); return PSYNC_WRITE_ERROR; } return PSYNC_WAIT_REPLY; } }
在syncWtithMaster
函数中,发出了PSYNC
请求后,如果再次调用syncWtithMaster
函数,则会先判断slave的状态,此时应该还是REPL_STATE_RECEIVE_PSYNC
1 2 3 4 5 6 7 if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) { serverLog (LL_WARNING, "syncWithMaster(): state machine error,state should be RECEIVE_PSYNC but is %d" , server.repl_state); goto error; }
校验成功后,slave则进入等待master回应PSYNC
请求的阶段
1 psync_result = slaveTryPartialResynchronization (conn, 1 );
读取PYSNC请求回复
read_reply ==1
时,slave 等待并读取master对 PSYNC命令的回应。返回的标志位有以下几种有效可能:
1)PSYNC_WAIT_REPLY
:这种是读取到一个空行
这个返回值和上面的写PYSNC
的返回值是一样的,都需要再次调用slaveTryPartialResynchronization
函数读取mastet的回应。
1 2 3 4 5 6 7 8 9 10 11 12 13 int slaveTryPartialResynchronization (connection *conn, int read_reply) { reply = sendSynchronousCommand (SYNC_CMD_READ, conn, NULL ); if (sdslen (reply) == 0 ) { sdsfree (reply); return PSYNC_WAIT_REPLY; } }
2)PSYNC_FULLRESYNC
:说明master经过判断,对slave执行完全重同步。
此时,回应slave的消息是 +FULLRESYNC replid offset
,其中replid
是master的RUN_ID,offset
是复制偏移量server.master_repl_offset
。
replid
与offset
解析正确后,函数返回 PSYNC_FULLRESYNC
,标志着master即将要和自己(slave)进行完全重同步操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 int slaveTryPartialResynchronization (connection *conn, int read_reply) { connSetReadHandler (conn, NULL ); if (!strncmp (reply,"+FULLRESYNC" ,11 )) { char *replid = NULL , *offset = NULL ; replid = strchr (reply,' ' ); if (replid) { replid++; offset = strchr (replid,' ' ); if (offset) offset++; } if (!replid || !offset || (offset-replid-1 ) != CONFIG_RUN_ID_SIZE) { serverLog (LL_WARNING, "Master replied with wrong +FULLRESYNC syntax." ); memset (server.master_replid, 0 , CONFIG_RUN_ID_SIZE+1 ); } else { memcpy (server.master_replid, replid, offset-replid-1 ); server.master_replid[CONFIG_RUN_ID_SIZE] = '\0' ; server.master_initial_offset = strtoll (offset,NULL ,10 ); serverLog (LL_NOTICE, "Full resync from master: %s:%lld" , server.master_replid, server.master_initial_offset); } replicationDiscardCachedMaster (); sdsfree (reply); return PSYNC_FULLRESYNC; } }
3)PSYNC_CONTINUE
: master回应slave+CONTINUE
,标志master同意进行部分重同步操作。
考虑到PYSNC2协议,判断 CONTINUE
后面是否附带new_replid
。如果有,则说明slave当前请求的master是新选出的主服务器,server.cached_master
是宕机之前的主服务器,那么slave需要更新下主服务的信息:
用server.replid2
来存储PSYNC replid offset
中的replid
,server.second_replid_offset
存储offset
将server.replid
及server.cached_master->replid
更新为新主服务器的运行ID:new_replid
slave更改主服务器了,调用disconnectSlaves
函数断开之前在旧服务器下建立的sub-slaves
到此,要为开始部分重同步做准备,要完成下面两个关键步骤:
要将server.cached_master
的信息移动到server.master
,因为slave后期还要依赖于server.master
与master进行数据通信
如果么有复制缓冲区server.repl_backlog
(比如首次调用就没有),要创建复制缓冲区
最后,返回PSYNC_CONTINUE
标志位,告诉slave要开始部分重同步了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 int slaveTryPartialResynchronization (connection *conn, int read_reply) { if (!strncmp (reply,"+CONTINUE" ,9 )) { serverLog (LL_NOTICE, "Successful partial resynchronization with master." ); char *start = reply+10 ; char *end = reply+9 ; while (end[0 ] != '\r' && end[0 ] != '\n' && end[0 ] != '\0' ) end++; if (end-start == CONFIG_RUN_ID_SIZE) { char new [CONFIG_RUN_ID_SIZE+1 ]; memcpy (new , start, CONFIG_RUN_ID_SIZE); new [CONFIG_RUN_ID_SIZE] = '\0' ; if (strcmp (new , server.cached_master->replid)) { serverLog (LL_WARNING,"Master replication ID changed to %s" ,new ); memcpy (server.replid2, server.cached_master->replid, sizeof (server.replid2)); server.second_replid_offset = server.master_repl_offset+1 ; memcpy (server.replid, new , sizeof (server.replid)); memcpy (server.cached_master->replid, new , sizeof (server.replid)); disconnectSlaves (); } } sdsfree (reply); replicationResurrectCachedMaster (conn); if (server.repl_backlog == NULL ) createReplicationBacklog (); return PSYNC_CONTINUE; } }
replicationResurrectCachedMaster replicationResurrectCachedMaster
函数,用于将server.cached_master
转移server.master
,明显这个函数只会在slave中调用,而调用的时机也是因为要执行数据同步了,slave需要server.master
与master进行数据交互,因此要执行的步骤大致如下:
将之前的server.cached_master
转移到server.master
中。
要将slave的状态由REPL_STATE_RECEIVE_PSYNC
转为REPL_STATE_CONNECTED
为server.master->conn
注册可读事件,使用回调函数readQueryFromClient
处理master的部分重同步数据,把部分重同步数据解析为普通客户端的写指令,再对slave数据库执行一遍
如果server.master
的发送缓冲区中有数据,则也注册可写事件
经过上述步骤,slave状态也是REPL_STATE_CONNECTED
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 void replicationResurrectCachedMaster (connection *conn) { server.master = server.cached_master; server.cached_master = NULL ; server.master->conn = conn; connSetPrivateData (server.master->conn, server.master); server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP); server.master->authenticated = 1 ; server.master->lastinteraction = server.unixtime; server.repl_state = REPL_STATE_CONNECTED; server.repl_down_since = 0 ; linkClient (server.master); if (connSetReadHandler (server.master->conn, readQueryFromClient)) { serverLog (LL_WARNING, "Error resurrecting the cached master, impossible to add the readable handler: %s" , strerror (errno)); freeClientAsync (server.master); } if (clientHasPendingReplies (server.master)) { if (connSetWriteHandler (server.master->conn, sendReplyToClient)) { serverLog (LL_WARNING, "Error resurrecting the cached master, " "impossible to add the writable handler: %s" , strerror (errno)); freeClientAsync (server.master); } } }
PYSNC_CONTINUE 下面将视线再拉回到sysnWithMaster
函数中, sysnWithMaster
函数需要对 slaveTryPartialResynchronization
的读取PSYNC
的返回标志位结果进行处理,错误标志不讲解了。
1 2 3 4 5 6 if (psync_result == PSYNC_CONTINUE) { serverLog (LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization." ); return ; }
那么slave是从哪里接收到master的部分重同步的数据? 是在replicationResurrectCachedMaster
函数中为serevr.master->conn
注册的可读事件回调函数中。
在readQueryFromClient
中,如果发现是从master发送过来的数据,那么就增加读取偏移量 c->read_reploff
,而数据仍在c->querybuf
中,被解析后在processCommandAndResetClient
中执行,使得主从一致。
1 2 if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
slave端读取到部分重同步数据之后,把数据解析成指令再执行就使得主从数据同步,其具体解析到执行的过程可以参考 剖析REDIS输入缓冲区设计 。
此外,slave在接收到部分重同步数据,完成了同步之后,会继续commandProcessed
函数中,会进一步调用replicationFeedSlavesFromMasterStream
函数把这个数据传递给sub-slaves。
1 2 3 if (processCommand (c) == C_OK) { commandProcessed (c); }
commandProcessed commandProcessed
函数主要完成三个任务:
每次在接收到部分重同步数据之后,都要更新下复制偏移量server.master->read_reploff
,此处的客户端 c
即server.master
非阻塞模式,都会resetClient
需要递归传播,即将slave从master接受到的数据传递给sub-slaves
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void commandProcessed (client *c) { long long prev_offset = c->reploff; if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { c->reploff = c->read_reploff - sdslen (c->querybuf) + c->qb_pos; } if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) resetClient (c); if (c->flags & CLIENT_MASTER) { long long applied = c->reploff - prev_offset; if (applied) { replicationFeedSlavesFromMasterStream (server.slaves, c->pending_querybuf, applied); sdsrange (c->pending_querybuf,applied, -1 ); } } }
replicationFeedSlavesFromMasterStream slave作为自己的sub-slave的主服务,因为将数据传递到sub-slave时原理和主服务器传递给自己类似:
1)写入 server.repl_backlog
2)写入 sub_slave->conn
,发送给sub-slave
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 void replicationFeedSlavesFromMasterStream (list *slaves, char *buf, size_t buflen) { listNode *ln; listIter li; if (0 ) { printf ("%zu:" ,buflen); for (size_t j = 0 ; j < buflen; j++) { printf ("%c" , isprint (buf[j]) ? buf[j] : '.' ); } printf ("\n" ); } if (server.repl_backlog) feedReplicationBacklog (buf,buflen); listRewind (slaves,&li); while ((ln = listNext (&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue ; addReplyProto (slave,buf,buflen); } }
PSYNC_FULLRESYNC 下面继续讲解syncWithMaster
函数中的完全重同步。
在高版本的REDIS中,支持dickles方式的传输RDB数据。如果不支持,则需要在slave端创建一个rdb
文件用于接收master的RDB协议格式的数据;
server.repl_transfer_tmpfile
:表示RDB文件名
server.repl_transfer_fd
:rdb文件对应的文件描述符
为conn
设置可读事件,等待master的全同步数据,读回调函数是readSyncBulkPayload
,完成读取master的RDB数据以及加载到slave中。
全同步的准备如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 void syncWithMaster (connection *conn) { if (!useDisklessLoad ()) { while (maxtries--) { snprintf (tmpfile, 256 , "temp-%d.%ld.rdb" , (int )server.unixtime, (long int )getpid ()); dfd = open (tmpfile, O_CREAT|O_WRONLY|O_EXCL,0644 ); if (dfd != -1 ) break ; sleep (1 ); } if (dfd == -1 ) { serverLog (LL_WARNING, "Can't Opening the temp file needed for MASTER <-> REPLICA sync: %s" , strerror (errno)); goto error; } server.repl_transfer_tmpfile = zstrdup (tmpfile); server.repl_transfer_fd = dfd; } if (connSetReadHandler (conn, readSyncBulkPayload) == C_ERR) { char conninfo[CONN_INFO_LEN]; serverLog (LL_WARNING, "Can't create readable event for SYNC: %s (%s)" , strerror (errno), connGetInfo (conn, conninfo, sizeof (conninfo))); goto error; } server.repl_state = REPL_STATE_TRANSFER; server.repl_transfer_size = -1 ; server.repl_transfer_read = 0 ; server.repl_transfer_last_fsync_off = 0 ; server.repl_transfer_lastio = server.unixtime; return ; }
readSyncBulkPayload 如果这是首次同步数据,比如建立连接时同步数据,master在给slave发送同步数据之前, 会先发送一个slave->replpreamble
,格式是$<length>\r\n
:
如果是RDB方式则告诉slave即将要发送的RDB文件大小
如果是dickless方式则发送的是$<eofmark>\r\n
,其中eofmark
和RUN_ID
长度一致,都是40个字符
因此,首次调用readSyncBulkPayload
函数是为了检测主从同步数据的方式,就return了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 void readSyncBulkPayload (connection *conn) { char buf[PROTO_IOBUF_LEN]; ssize_t nread, readlen, nwritten; int use_diskless_load = useDisklessLoad (); redisDb *diskless_load_backup = NULL ; int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; off_t left; static char eofmark[CONFIG_RUN_ID_SIZE]; static char lastbytes[CONFIG_RUN_ID_SIZE]; static int usemark = 0 ; if (server.repl_transfer_size == -1 ) { if (connSyncReadLine (conn,buf,1024 ,server.repl_syncio_timeout*1000 ) == -1 ) { serverLog (LL_WARNING, "I/O error reading bulk count from MASTER: %s" , strerror (errno)); goto error; } if (buf[0 ] == '-' ) { serverLog (LL_WARNING, "MASTER aborted replication with an error: %s" , buf+1 ); goto error; } else if (buf[0 ] == '\0' ) { server.repl_transfer_lastio = server.unixtime; return ; } else if (buf[0 ] != '$' ) { serverLog (LL_WARNING, "Bad protocol from MASTER, the first byte is not '$' (we received '%s')," "are you sure the host and port are right?" , buf); goto error; } if (strncmp (buf+1 ,"EOF:" ,4 ) == 0 && strlen (buf+5 ) >= CONFIG_RUN_ID_SIZE) { usemark = 1 ; memcpy (eofmark,buf+5 ,CONFIG_RUN_ID_SIZE); memset (lastbytes,0 , CONFIG_RUN_ID_SIZE); server.repl_transfer_size = 0 ; serverLog (LL_NOTICE, "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s" , use_diskless_load? "to parser" :"to disk" ); } else { usemark = 0 ; server.repl_transfer_size = strtol (buf+1 ,NULL ,10 ); serverLog (LL_NOTICE, "MASTER <-> REPLICA sync: receiving %lld bytes from master %s" , (long long ) server.repl_transfer_size, use_diskless_load? "to parser" :"to disk" ); } return ; } }
再次调用readSyncBulkPayload
函数,如果是dickless方式同步,则server.repl_transfer_size==0
,否则server.repl_transfer_size
是即将要传输的文件大小length
,因此不会再陷入 if(repl_transfer_size==-1)
分支中。
master有两种传输同步数据方式,而slave也相应有两种加载方式:
先存储到RDB文件中再加载进内存
直接从socket读取数据然后加载到内存中
这由useDisklessLoad
函数确定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 static int useDisklessLoad () { int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount ()==0 ); if (enabled && !moduleAllDatatypesHandleErrors ()) { serverLog (LL_WARNING, "Skipping diskless-load because there are modules " "that don't handle read errors." ); enabled = 0 ; } return enabled; }
如果slave端选择先将master发送过来的RDB数据存储到本地RDB文件server.repl_transfer_fd
中,流程如下:
设置单次最大读取字节数readlen
读取readlen
个字节数据。在usemark==1
方式下检测下是否读取到数据流末尾,到了则eof_reached=1
将读取到的数据写入到server.repl_transfer_fd
。如果此时eof_reached==1 && usemark==1
,则需要在同步之前将写入slave端RDB文件末尾的eofmark
给删除掉
每次累计写入了REPL_MAX_WRITTEN_BEFORE_FSYNC
个字节,就需要自动同步一次
如果master是发送的RDB文件,判断是否达到了文件末尾。
这个函数,只有在master发送完毕才会向下继续运行,否则每次都是直接return.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 void readSyncBulkPayload (connection *conn) { if (!use_diskless_load) { if (usemark) { readlen = sizeof (buf); } else { left = server.repl_transfer_size - server.repl_transfer_read; readlen = (left < (signed )sizeof (buf)) ? left : (signed )sizeof (buf); } nread = connRead (conn,buf,readlen); if (nread <= 0 ) { if (connGetState (conn) == CONN_STATE_CONNECTED) { return ; } serverLog (LL_WARNING, "I/O error trying to sync with MASTER: %s" , (nread == -1 ) ? strerror (errno) : "connection lost" ); cancelReplicationHandshake (); return ; } server.stat_net_input_bytes += nread; int eof_reached = 0 ; if (usemark) { if (nread >= CONFIG_RUN_ID_SIZE) { memcpy (lastbytes, buf+nread-CONFIG_RUN_ID_SIZE, CONFIG_RUN_ID_SIZE); } else { int rem = CONFIG_RUN_ID_SIZE-nread; memmove (lastbytes, lastbytes+nread, rem); memcpy (lastbytes+rem,buf,nread); } if (memcmp (lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0 ) eof_reached = 1 ; } server.repl_transfer_lastio = server.unixtime; if ((nwritten = write (server.repl_transfer_fd, buf, nread)) != nread) { serverLog (LL_WARNING, "Write error or short write writing to the DB dump file " "needed for MASTER <-> REPLICA synchronization: %s" , (nwritten == -1 ) ? strerror (errno) : "short write" ); goto error; } server.repl_transfer_read += nread; if (usemark && eof_reached) { if (ftruncate (server.repl_transfer_fd, server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1 ) { serverLog (LL_WARNING, "Error truncating the RDB file received from the master " "for SYNC: %s" , strerror (errno)); goto error; } } if (server.repl_transfer_read >= server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) { off_t sync_size = server.repl_transfer_read - server.repl_transfer_last_fsync_off; rdb_fsync_range (server.repl_transfer_fd, server.repl_transfer_last_fsync_off, sync_size); server.repl_transfer_last_fsync_off += sync_size; } if (!usemark) { if (server.repl_transfer_read == server.repl_transfer_size) eof_reached = 1 ; } if (!eof_reached) return ; } }
下面就是加载数据的方式了: + use_diskless_load==1
:则slave端可以直接从socket中读取数据直接加载 + use_diskless_load==0
:则slave从server.repl_transfer_fd
文件中加载
在正式加载之前需要先检测是否有AO重写子进程。因为下面slave与master同步会对自己的数据库造成很多修改,如果此时AOF在重写,则会破坏copy-on-write
使得内存暴涨。AOF重写只有在slave同步master成功后才会重启。
1 if (server.aof_state != AOF_OFF) stopAppendOnly ();
dickless 加载有两种类型server.repl_diskless_load
:
在加载之前,需要释放原来的server.db[]
,REPL_DISKLESS_LOAD_SWAPDB
方式下已经备份,REPL_DISKLESS_LOAD_WHEN_DB_EMPTY
方式下不需要备份,因此可以直接清空server.db[]
。
1 emptyDb (-1 ,empty_db_flags,replicationEmptyDbCallback);
为了简化传输,REDIS将conn
设置阻塞IO,并且单次阻塞时间是60s,如果加载错误则选择 server.repl_diskless_load
选择是恢复server.db[]
还是直接清空。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 emptyDb (-1 ,empty_db_flags,replicationEmptyDbCallback); connSetReadHandler (conn, NULL ); serverLog (LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory" ); rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; if (use_diskless_load) { rio rdb; rioInitWithConn (&rdb, conn, server.repl_transfer_size); connBlock (conn); connRecvTimeout (conn, server.repl_timeout*1000 ); startLoading (server.repl_transfer_size, RDBFLAGS_REPLICATION); if (rdbLoadRio (&rdb, RDBFLAGS_REPLICATION, &rsi) != C_OK) { stopLoading (0 ); serverLog (LL_WARNING, "Failed trying to load the MASTER synchronization DB from socket" ); cancelReplicationHandshake (); rioFreeConn (&rdb, NULL ); if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { disklessLoadRestoreBackups (diskless_load_backup, 1 , empty_db_flags); } else { emptyDb (-1 ,empty_db_flags,replicationEmptyDbCallback); } return ; } stopLoading (1 ); if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { disklessLoadRestoreBackups (diskless_load_backup,0 ,empty_db_flags); } if (usemark) { if (!rioRead (&rdb,buf,CONFIG_RUN_ID_SIZE) || memcmp (buf,eofmark,CONFIG_RUN_ID_SIZE) != 0 ) { serverLog (LL_WARNING,"Replication stream EOF marker is broken" ); cancelReplicationHandshake (); rioFreeConn (&rdb, NULL ); return ; } } rioFreeConn (&rdb, NULL ); connNonBlock (conn); connRecvTimeout (conn,0 ); }
如果slave设置的RDB文件加载方式,则和持久化的加载方式基本一致:
+ 如果此时slave正在执行rdb持久化,直接kill这个进程,因为马上要同步master数据,这个持久化是无意义的 + 打开slave端rdb持久化的文件server.rdb_filename
+ 将之前存储的server.repl_transfer_tmpfile
的rdb文件直接rename为 server.rdb_filename
即可 + rdbLoad
函数将server.repl_transfer_tmpfile
加载到内存,即可实现数据同步 + 由于server.repl_transfer_tmpfile
是用于数据同步产生,而不是用于持久化,因此需要删除rdb文件 + 最后关闭server.repl_transfer_tmpfile
及其fd,并初始化
到此,数据加载就结束,slave与master实现了完全重同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 else { if (server.rdb_child_pid != -1 ) { serverLog (LL_NOTICE, "Replica is about to load the RDB file received from the master," " but there is a pending RDB child running. " "Killing process %ld and removing its temp file to avoid any race" , (long ) server.rdb_child_pid); killRDBChild (); } int old_rdb_fd = open (server.rdb_filename, O_RDONLY|O_NONBLOCK); if (rename (server.repl_transfer_tmpfile, server.rdb_filename) == -1 ) { serverLog (LL_WARNING, "Failed trying to rename the temp DB into %s " "in MASTER <-> REPLICA synchronization: %s" , server.rdb_filename, strerror (errno)); cancelReplicationHandshake (); if (old_rdb_fd != -1 ) close (old_rdb_fd); return ; } if (old_rdb_fd != -1 ) bioCreateBackgroundJob (BIO_CLOSE_FILE,(void *)(long )old_rdb_fd,NULL ,NULL ); if (rdbLoad (server.rdb_filename, &rsi, RDBFLAGS_REPLICATION) != C_OK) { serverLog (LL_WARNING, "Failed trying to load the MASTER synchronization DB from disk" ); cancelReplicationHandshake (); if (server.rdb_del_sync_files && allPersistenceDisabled ()) { serverLog (LL_NOTICE, "Removing the RDB file obtained from the master. " "This replica has persistence disabled" ); bg_unlink (server.rdb_filename); } return ; } if (server.rdb_del_sync_files && allPersistenceDisabled ()) { serverLog (LL_NOTICE,"Removing the RDB file obtained from " "the master. This replica has persistence " "disabled" ); bg_unlink (server.rdb_filename); } zfree (server.repl_transfer_tmpfile); close (server.repl_transfer_fd); server.repl_transfer_fd = -1 ; server.repl_transfer_tmpfile = NULL ; }
加载成功还需要处理啥?
+ 和前面的部分重同步类似,要为连接conn
对象server.repl_transfer_s
创建server.master
实际,为其注册可读事件 + 将状态设置为 REPL_STATE_CONNECTED
状态 + 完全同步完成,那么可以设置server.replid
和master_repl_offset
,并清除缓存的server.replid2
等。因为完全重同步成功,说明master已经换了,不需要再保存之前宕机的主服务器信息了。 + 重启AOF重写子进程,进行一次持久化。
至此,slave的状态变成了REPL_STATE_CONNECTED
,标志着slave与master连接成功,并且进行了首次数据同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 replicationCreateMasterClient (server.repl_transfer_s, rsi.repl_stream_db); server.repl_state = REPL_STATE_CONNECTED; server.repl_down_since = 0 ; memcpy (server.replid, server.master->replid, sizeof (server.replid)); server.master_repl_offset = server.master->reploff; clearReplicationId2 (); if (server.repl_backlog == NULL ) createReplicationBacklog (); serverLog (LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success" ); if (server.aof_enabled) restartAOFAfterSYNC (); return ;
SLAVEOF NO ONE SLAVEOF
命令,可以让server变成slave,也可以让其从slave恢复到单机。
对一个slave执行SLAVEOF NO ONE
命令,即取消其复制行为,对主服务器执行这个命令没有效果。脱离复制的过程由replicationUnsetMaster
函数实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void replicaofCommand (client *c) { if (server.cluster_enabled) { addReplyError (c,"REPLICAOF not allowed in cluster mode." ); return ; } if (!strcasecmp (c->argv[1 ]->ptr,"no" ) && !strcasecmp (c->argv[2 ]->ptr,"one" )) { if (server.masterhost) { replicationUnsetMaster (); sds client = catClientInfoString (sdsempty (), c); serverLog (LL_NOTICE,"MASTER MODE enabled (user request from '%s')" ,client); sdsfree (client); } } }
replicationUnsetMaster 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 void replicationUnsetMaster (void ) { if (server.masterhost == NULL ) return ; if (server.repl_state == REPL_STATE_CONNECTED) moduleFireServerEvent (REDISMODULE_EVENT_MASTER_LINK_CHANGE, REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, NULL ); sdsfree (server.masterhost); server.masterhost = NULL ; if (server.master) freeClient (server.master); replicationDiscardCachedMaster (); cancelReplicationHandshake (); shiftReplicationId (); disconnectSlaves (); server.repl_state = REPL_STATE_NONE; server.slaveseldb = -1 ; server.repl_no_slaves_since = server.unixtime; moduleFireServerEvent (REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER, NULL ); if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC (); }
shiftReplicationId 调用shiftReplicationId
函数的时机,是当的server将要从slave变成master。因此server.replid
要替换为自己的replid
,同时要把之前复制的主服务器信息存储在server.replid2
及 server.second_replid_offset
。
1 2 3 4 5 6 7 8 9 10 11 12 13 void shiftReplicationId (void ) { memcpy (server.replid2, server.replid, sizeof (server.replid)); server.second_replid_offset = server.master_repl_offset+1 ; changeReplicationId (); serverLog (LL_WARNING, "Setting secondary replication ID to %s, valid up to offset: %lld." " New replication ID is %s" , server.replid2, server.second_replid_offset, server.replid); }
Master端解析
那么主服务器端是如何回应slave端?下面进行讲解。
replconfCommand 在主从握手过程中,slave端通过server.repl_transfer_s
向master端发送REPLCONF option value
命令,master在 replconfCommand
函数对其进行解析。
这个函数master和slave都会调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 void replconfCommand (client *c) { int j; if ((c->argc % 2 ) == 0 ) { addReply (c,shared.syntaxerr); return ; } for (j = 1 ; j < c->argc; j+=2 ) { if (!strcasecmp (c->argv[j]->ptr,"listening-port" )) { long port; if ((getLongFromObjectOrReply (c,c->argv[j+1 ],&port,NULL ) != C_OK)) return ; c->slave_listening_port = port; } else if (!strcasecmp (c->argv[j]->ptr,"ip-address" )) { sds ip = c->argv[j+1 ]->ptr; if (sdslen (ip) < sizeof (c->slave_ip)) { memcpy (c->slave_ip,ip,sdslen (ip)+1 ); } else { addReplyErrorFormat (c, "REPLCONF ip-address provided by replica instance is too long: " "%zd bytes" , sdslen (ip)); return ; } } else if (!strcasecmp (c->argv[j]->ptr,"capa" )) { if (!strcasecmp (c->argv[j+1 ]->ptr,"eof" )) c->slave_capa |= SLAVE_CAPA_EOF; else if (!strcasecmp (c->argv[j+1 ]->ptr,"psync2" )) c->slave_capa |= SLAVE_CAPA_PSYNC2; } else if (!strcasecmp (c->argv[j]->ptr,"ack" )) { long long offset; if (!(c->flags & CLIENT_SLAVE)) return ; if ((getLongLongFromObject (c->argv[j+1 ], &offset) != C_OK)) return ; if (offset > c->repl_ack_off) c->repl_ack_off = offset; c->repl_ack_time = server.unixtime; if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE) putSlaveOnline (c); return ; } else if (!strcasecmp (c->argv[j]->ptr,"getack" )) { if (server.masterhost && server.master) replicationSendAck (); return ; } else { addReplyErrorFormat (c, "Unrecognized REPLCONF option: %s" , (char *)c->argv[j]->ptr); return ; } } addReply (c,shared.ok); }
syncCommand – Partial 当slave端向master发送PYSNC replid offset
请求时,master端就会进入syscCommand
函数中。在正式处理PYSNC请求之前,先判断下:
如果当前server是master,c
已经是master的slave,那么直接忽略请求
如果当前server是slave,PSYNC
是c是用户发出的的而不是sub-slave,如果此时server
的状态没有和master连接好,则也忽略此次请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void syncCommand (client *c) { if (c->flags & CLIENT_SLAVE) return ; if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { addReplySds (c,sdsnew ("-NOMASTERLINK Can't SYNC while not connected with my master\r\n" )); return ; } if (clientHasPendingReplies (c)) { addReplyError (c,"SYNC and PSYNC are invalid with pending output" ); return ; } }
先处理PSYNC
请求,基于masterTryPartialResynchronization
函数尝试执行部分重同步,如果经过master判断无法执行部分重同步再执行完全重同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void syncCommand (client *c) { serverLog (LL_NOTICE, "Replica %s asks for synchronization" , replicationGetSlaveName (c)); if (!strcasecmp (c->argv[0 ]->ptr, "psync" )) { if (masterTryPartialResynchronization (c) == C_OK) { server.stat_sync_partial_ok++; return ; } else { char *master_replid = c->argv[1 ]->ptr; if (master_replid[0 ] != '?' ) server.stat_sync_partial_err++; } } }
masterTryPartialResynchronization 无论slave发的PSYNC是啥,masterTryPartialResynchronization
函数都先尝试能不能执行部分重同步,如果能则直接传输server.repl_backlog
中的数据,不能则返回C_ERR
,留到上面syncCommand
函数中else分支执行:
1 2 3 4 5 6 if (masterTryPartialResynchronization (c) == C_OK) { } else { }
而判断是否能部分重同步,对于PSYNC master_replid pysnc_offset
请求,必须满足:
1 2 server.replid == master_replid && pysnc_offset > server.repl_backlog_off && pysnc_offset < server.master_repl_offset
如果PYSNC
请求的master_replid
与当前主服务器的server.replid
不一致,则有可能是因为这个server是新被选出来的主服务器,那么就需要判断将master_replid
与之前宕机的旧主服务器的server.replid2
进行比较,也需要满足
1 master_replid == server.replid2 && psync_offset < server.second_replid_offset
如果经过判断能执行部分重同步,
将客户端c标志位加上 CLIENT_SLAVE
,区别于普通的用户客户端
将c的状态设置为 SLAVE_STATE_ONLINE
,表示建立连接,可以正常传输数据。
master根据slave是否支持PSYNC2
协议,回复客户端 +CONTINUE\r\n
or +CONTINUE replid \r\n
将 serevr.repl_backlog
中从psync_offset
开始积累的数据添加待output
, 等待可写事件触发即可发送
函数执行成功,则返回C_OK
,返回 syncCommand
函数后,也直接return。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 int masterTryPartialResynchronization (client *c) { long long psync_offset, psync_len; char *master_replid = c->argv[1 ]->ptr; char buf[128 ]; int buflen; if (getLongLongFromObjectOrReply (c,c->argv[2 ],&psync_offset,NULL ) != C_OK) goto need_full_resync; if (strcasecmp (master_replid, server.replid) && (strcasecmp (master_replid, server.replid2) || psync_offset > server.second_replid_offset)) { if (master_replid[0 ] != '?' ) { if (strcasecmp (master_replid, server.replid) && strcasecmp (master_replid, server.replid2)) { serverLog (LL_NOTICE, "Partial resynchronization not accepted:" "Replication ID mismatch (Replica asked for '%s', my " "replication IDs are '%s' and '%s')" , master_replid, server.replid, server.replid2); } else { serverLog (LL_NOTICE, "Partial resynchronization not accepted: " "Requested offset for second ID was %lld, but I can reply " "up to %lld" , psync_offset, server.second_replid_offset); } } else { serverLog (LL_NOTICE, "Full resync requested by replica %s" , replicationGetSlaveName (c)); } goto need_full_resync; } if (!server.repl_backlog || psync_offset < server.repl_backlog_off || psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) { serverLog (LL_NOTICE, "Unable to partial resync with replica %s " "for lack of backlog (Replica request was: %lld)." , replicationGetSlaveName (c), psync_offset); if (psync_offset > server.master_repl_offset) { serverLog (LL_WARNING, "Warning: replica %s tried to PSYNC " "with an offset that is greater than the master replication offset." , replicationGetSlaveName (c)); } goto need_full_resync; } c->flags |= CLIENT_SLAVE; c->replstate = SLAVE_STATE_ONLINE; c->repl_ack_time = server.unixtime; c->repl_put_online_on_ack = 0 ; listAddNodeTail (server.slaves, c); if (c->slave_capa & SLAVE_CAPA_PSYNC2) { buflen = snprintf (buf,sizeof (buf),"+CONTINUE %s\r\n" , server.replid); } else { buflen = snprintf (buf,sizeof (buf),"+CONTINUE\r\n" ); } if (connWrite (c->conn,buf,buflen) != buflen) { freeClientAsync (c); return C_OK; } psync_len = addReplyReplicationBacklog (c, psync_offset); serverLog (LL_NOTICE, "Partial resynchronization request from %s accepted. " "Sending %lld bytes of backlog starting from offset %lld." , replicationGetSlaveName (c), psync_len, psync_offset); refreshGoodSlavesCount (); return C_OK; need_full_resync: return C_ERR; }
syncCommnad – Full masterTryPartialResynchronization
函数如果返回C_ERR
,则将执行完全重同步。先简要的一些判断
导致PYSNC
部分重同步失败,是否因为slave强行执行完全重同步
slave发送如果是SYNC请求,那么之后有些回复不能发送给slave,因为slave版本低无法识别。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 if (!strcasecmp (c->argv[0 ]->ptr, "psync" )) { if (masterTryPartialResynchronization (c) == C_OK) { server.stat_sync_partial_ok++; return ; } else { char *master_replid = c->argv[1 ]->ptr; if (master_replid[0 ] != '?' ) server.stat_sync_partial_err++; } } else { c->flags |= CLIENT_PRE_PSYNC; }
当判断确实要执行完全重同步,即slave从零开始,同步到当前server的状态,需要做一些处理
加上到当前server的从服务器链表server.slaves
也要给slave机上CLIENT_SLAVE
以区别于普通的客户端
slave的状态设置为SLAVE_STATE_WAIT_BGSAVE_START
,表示即将开始启动BGSAVE
如果这个slave是server的第一个从服务器,那么就需要做较多的工作:
创建server.repl_backlog
,积累数据,为后面部分重同同步准备。
为当前server
创建replid
,
消除server.replid2
。都已经执行完全重同步了,那么之前宕机的主服务器和此时的slave没有关系,不需要缓存server.replid2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; if (server.repl_disable_tcp_nodelay) connDisableTcpNoDelay (c->conn); c->repldbfd = -1 ; c->flags |= CLIENT_SLAVE; listAddNodeTail (server.slaves, c); if (listLength (server.slaves) == 1 && server.repl_backlog == NULL ) { changeReplicationId (); clearReplicationId2 (); createReplicationBacklog (); serverLog (LL_NOTICE, "Replication backlog created, my new replication IDs are '%s' and '%s'" , server.replid, server.replid2); }
下面master需要根据传输RDB数据的方式server.rdb_child_type
做不同的处理:
RDB_CHILD_TYPE_DISK
:在master端先将server.db[]
以RDB协议格式保存到磁盘,再发送给slave
RDB_CHILD_TYPE_SOCKET
:master端直接将生成的RDB数据通过socket发送给slave
syncCommnad
函数处理如下:
server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_DISK
由于master的所有slaves最终都是要同步到master的状态,那么如果此时子进程正在持久化操作,而当前从服务器c的同步类型RDB_CHILD_TYPE_DISK
,那么就是可以直接利用此次持久化产生的rdb文件。
进一步,此刻有另一个slave正在处于SLAVE_STATE_WAIT_BGSAVE_END
状态,那么将slave的发送缓存区数据复制给c,再给c开启replicationSetupSlaveForFullResync
函数,那么当 server.rdb_child_pid
进程产生rdb文件后就可以将此次的c一起同步了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_DISK) { client *slave; listNode *ln; listIter li; listRewind(server.slaves, &li); while ((ln = listNext(&li))) { slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break ; } if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { copyClientOutputBuffer(c, slave); replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC" ); } else { serverLog(LL_NOTICE, "Can't attach the replica to the current BGSAVE." " Waiting for next BGSAVE for SYNC" ); } }
2)server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET
如果此时有子进程在RDB持久化,但是master设置的传输方式RDB_CHILD_TYPE_SOCKET
,那么完全重同步操作只能等待下次BGSAVE。因为当前持久化对同步没有帮助,不能像上面那样可以重复利用。
1 2 3 4 5 6 else if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) { serverLog (LL_NOTICE, "Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC" ); }
那么 RDB_CHILD_TYPE_SOCKET
类型的完全重同步,延迟到何时执行?
需要等到在replicationCron
函数中调用startBgsaveForReplication
开启同步操作。为了让更多的slave能处于同步就绪状态,在replicationCron
函数中等待server.repl_diskless_sync_delay
秒后才能开启dickelss方式复制。过程如下:
在replicationCron
函数中,确认没有子进程在持久化
必须要有处于SLAVE_STATE_WAIT_BGSAVE_START
状态的slave,即 slaves_waiting !=0
slaves与master最长空闲时间max_idle > server.repl_diskless_sync_delay
时,就可以开启dickless复制方式。只要slave与master有数据交互就会更新slave->lastinteraction
,因此在开启dickelss
方时会阻塞一会儿。
计算这些slaves能力的交集mincapa
以上条件都满足了,才能调用startBgsaveForReplication(mincapa)
函数开启RDB_CHILD_TYPE_SOCKET
传输方式的完全重同步:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 if (!hasActiveChildProcess ()) { time_t idle, max_idle = 0 ; int slaves_waiting = 0 ; int mincapa = -1 ; listNode *ln; listIter li; listRewind (server.slaves,&li); while ((ln = listNext (&li))) { client* slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { idle = server.unixtime - slave->lastinteraction; if (idle > max_idle) max_idle = idle; slaves_waiting++; mincapa = (mincapa == -1 ) ? slave->slave_capa : (mincapa & slave->slave_capa); } } if (slaves_waiting && (!server.repl_diskless_sync || max_idle > server.repl_diskless_sync_delay)) { startBgsaveForReplication (mincapa); } }
3) server.rdb_child_pid == -1
此时,没有开启rdb持久化子进程,那么就可以根据 server.repl_diskless_sync
方式来处理,
server.repl_diskless_sync == RDB_CHILD_TYPE_SOCKET
:还是要延迟到replicationCron()
函数中执行
server.repl_diskless_sync == RDB_CHILD_TYPE_DISK
:就直接在此处调用startBgsaveForReplication
函数
这个分支表述如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 else { if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) { if (server.repl_diskless_sync_delay) serverLog (LL_NOTICE, "Delay next BGSAVE for diskless SYNC" ); } else { if (!hasActiveChildProcess ()) { startBgsaveForReplication (c->slave_capa); } else { serverLog (LL_NOTICE, "No BGSAVE in progress, but another BG operation is active. " "BGSAVE for replication delayed" ); } } }
startBgsaveForReplication startBgsaveForReplication
开启完全重同步。根据master端的同步方式server.repl_diskless_sync
以及slave支持EOF
,才可以开启socket传输rdb数据,否则还是先生成RDB文件,再传输到slave端。
socket_target ==1
:rdbSaveToSlavesSockets
函数通过socket传输,此时slave直接在接受了。
socket_target ==0
: rdbSaveBackground
将rdb数据线保存到本底server.rdb_filename
,然后调用replicationSetupSlaveForFullResync
函数将server.rdb_filename
中数据传入到slave。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 int startBgsaveForReplication (int mincapa) { int retval; int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); listIter li; listNode *ln; serverLog (LL_NOTICE, "Starting BGSAVE for SYNC with target: %s" , socket_target ? "replicas sockets" : "disk" ); rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo (&rsi); if (rsiptr) { if (socket_target) retval = rdbSaveToSlavesSockets (rsiptr); else retval = rdbSaveBackground (server.rdb_filename,rsiptr); } else { serverLog (LL_WARNING, "BGSAVE for replication: replication information not available," " can't generate the RDB file right now. Try later." ); retval = C_ERR; } if (retval == C_OK && !socket_target && server.rdb_del_sync_files) RDBGeneratedByReplication = 1 ; if (retval == C_ERR) { serverLog (LL_WARNING, "BGSAVE for replication failed" ); listRewind (server.slaves,&li); while ((ln = listNext (&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { slave->replstate = REPL_STATE_NONE; slave->flags &= ~CLIENT_SLAVE; listDelNode (server.slaves, ln); addReplyError (slave, "BGSAVE failed, replication can't continue" ); slave->flags |= CLIENT_CLOSE_AFTER_REPLY; } } return retval; } if (!socket_target) { listRewind (server.slaves,&li); while ((ln = listNext (&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { replicationSetupSlaveForFullResync (slave, getPsyncInitialOffset ()); } } } if (retval == C_OK) replicationScriptCacheFlush (); return retval; }
rdbSaveToSlavesSockets rdbSaveToSlavesSockets
函数和 rdbSaveBackground
函数原理类似,后者是向rdb文件 server.rdb_filename
中写入数据,而前者是向socket中写入rdb数据。但是REDIS并没有直接让产生rdb数据的子进程向socket中写入数据传递给slave,而是让子进程把生成的rdb数据都传递到父进程,让父进程写入socket,这是为了当子进程中止被父进程接管时不会影响TLS的连续状态。
在fork之前,创建管道 server.rdb_pipe_read
与 server.rdb_pipe_write
。
将所有处于SLAVE_STATE_WAIT_BGSAVE_START
状态的slaves都在server.rdb_pipe_conns
中建立一个引用,使得server.rdb_pipe_conns[i]
指向等待BGSAVE启动的slave,再并将他们的状态修改为 SLAVE_STATE_WAIT_BGSAVE_END
在子进程中,创建基于 server.rdb_pipe_write
的rdb对象,使得子进程直接将生成的rdb协议格式的数据写入管道,父进程在server.rdb_pipe_read
端读取
在父进程中,需要将server.rdb_pipe_read
管道设置为非阻塞模式,为server.rdb_pipe_read
注册可读事件,设置可读事件的回调函数rdbPipeReadHandler
,等待子进程数据的到来
rdbSaveToSlavesSockets
函数结束后,子进程的数据可能尚未完全发送至父进程。需要等子进程结束后,rdb数据全部写完,那个时候就可以将rdb数据写入socket中。因此,真正发送rdb数据,是在进程结束后的处理函数backgroundSaveDoneHandler
中。
此外,将rdb数据先存储到磁盘再同步的方式也是在backgroundSaveDoneHandler
函数中完成发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 int rdbSaveToSlavesSockets (rdbSaveInfo *rsi) { listNode *ln; listIter li; pid_t childpid; int pipefds[2 ]; if (hasActiveChildProcess ()) return C_ERR; if (server.rdb_pipe_conns) return C_ERR; if (pipe (pipefds) == -1 ) return C_ERR; server.rdb_pipe_read = pipefds[0 ]; server.rdb_pipe_write = pipefds[1 ]; anetNonBlock (NULL , server.rdb_pipe_read); server.rdb_pipe_conns = zmalloc (sizeof (connection *)*listLength (server.slaves)); server.rdb_pipe_numconns = 0 ; server.rdb_pipe_numconns_writing = 0 ; listRewind (server.slaves,&li); while ((ln = listNext (&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn; replicationSetupSlaveForFullResync (slave, getPsyncInitialOffset ()); } } openChildInfoPipe (); if ((childpid = redisFork ()) == 0 ) { int retval; rio rdb; rioInitWithFd (&rdb, server.rdb_pipe_write); redisSetProcTitle ("redis-rdb-to-slaves" ); redisSetCpuAffinity (server.bgsave_cpulist); retval = rdbSaveRioWithEOFMark (&rdb,NULL ,rsi); if (retval == C_OK && rioFlush (&rdb) == 0 ) retval = C_ERR; if (retval == C_OK) { sendChildCOWInfo (CHILD_INFO_TYPE_RDB, "RDB" ); } rioFreeFd (&rdb); close (server.rdb_pipe_write); exitFromChild ((retval == C_OK) ? 0 : 1 ); } else { if (childpid == -1 ) { serverLog (LL_WARNING, "Can't save in background: fork: %s" , strerror (errno)); listRewind (server.slaves,&li); while ((ln = listNext (&li))) { client* slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START; } } close (server.rdb_pipe_write); close (server.rdb_pipe_read); zfree (server.rdb_pipe_conns); server.rdb_pipe_conns = NULL ; server.rdb_pipe_numconns = 0 ; server.rdb_pipe_numconns_writing = 0 ; closeChildInfoPipe (); } else { serverLog (LL_NOTICE, "Background RDB transfer started by pid %d" , childpid); server.rdb_save_time_start = time (NULL ); server.rdb_child_pid = childpid; server.rdb_child_type = RDB_CHILD_TYPE_SOCKET; close (server.rdb_pipe_write); if (aeCreateFileEvent (server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL ) == AE_ERR) { serverPanic ("Unrecoverable error creating server.rdb_pipe_read file event." ); } } return (childpid == -1 ) ? C_ERR : C_OK; } return C_OK; }
replicationSetupSlaveForFullResync replicationSetupSlaveForFullResync
给slave发送+FULLRESYNC server.replid server.master_repl_offset\r\n
,master要积累数据即将要发送自己数据。
设置slave
此时的状态是SLAVE_STATE_WAIT_BGSAVE_END
,表示从BGSAVE已经启动,要开始积累rdb数据了。
server.slaveseldb = -1
是为了先发送个SELECT
指令
整个流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 int replicationSetupSlaveForFullResync (client *slave, long long offset) { char buf[128 ]; int buflen; slave->psync_initial_offset = offset; slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; server.slaveseldb = -1 ; if (!(slave->flags & CLIENT_PRE_PSYNC)) { buflen = snprintf (buf,sizeof (buf),"+FULLRESYNC %s %lld\r\n" , server.replid, offset); if (connWrite(slave->conn,buf,buflen) != buflen) { freeClientAsync(slave); return C_ERR; } } return C_OK; }
rdbPipeReadHandler rdbPipeReadHandler
函数,是从子进程读取rdb数据流的回调函数。当子进程向父进程向server.rdb_pipe_write
中写入数据时,server.rdb_pipe_read
上就会触发可读事件,进而调用rdbPipeReadHandler
函数读取。这个函数主要完成了三个任务:
从server.rdb_pipe_read
中将数据读取到server.rdb_pipe_buff
将server.rdb_pipe_buff
中数据发到server.rdb_pipe_conns
中等待数据的slave
当子进程写完数据,会调用close(server.rdb_pipe_write)
,关闭子进程的管道server.rdb_pipe_write
,rdbPipeReadHandler
函数检测到server.rdb_pipe_bufflen == 0
时则会进行关闭清理工作。
因此,rdbPipeReadHandler
完成的任务即,将从server.rdb_pipe_read
读取到的rdb数据发送到server.rdb_pipe_conns
还存活的slave中,并负责最后的关闭处理。因此!!!,当子进程结束,父进程也已经全部的RDB数据发送到等待数据的slave。
其他处理细节,见代码讲解。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 void rdbPipeReadHandler (struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) { UNUSED (mask); UNUSED (clientData); UNUSED (eventLoop); int i; if (!server.rdb_pipe_buff) server.rdb_pipe_buff = zmalloc (PROTO_IOBUF_LEN); serverAssert (server.rdb_pipe_numconns_writing ==0 ); while (1 ) { server.rdb_pipe_bufflen = read (fd, server.rdb_pipe_buff, PROTO_IOBUF_LEN); if (server.rdb_pipe_bufflen < 0 ) { if (errno == EAGAIN || errno == EWOULDBLOCK) return ; serverLog (LL_WARNING, "Diskless rdb transfer, read error sending DB to replicas: %s" , strerror (errno)); for (i=0 ; i < server.rdb_pipe_numconns; i++) { connection *conn = server.rdb_pipe_conns[i]; if (!conn) continue ; client *slave = connGetPrivateData (conn); freeClient (slave); server.rdb_pipe_conns[i] = NULL ; } killRDBChild (); return ; } if (server.rdb_pipe_bufflen == 0 ) { int stillUp = 0 ; aeDeleteFileEvent (server.el, server.rdb_pipe_read, AE_READABLE); for (i=0 ; i < server.rdb_pipe_numconns; i++) { connection *conn = server.rdb_pipe_conns[i]; if (!conn) continue ; stillUp++; } serverLog (LL_WARNING, "Diskless rdb transfer, done reading from pipe, %d replicas still up." , stillUp); RdbPipeCleanup (); return ; } int stillAlive = 0 ; for (i=0 ; i < server.rdb_pipe_numconns; i++) { int nwritten; connection* conn = server.rdb_pipe_conns[i]; if (!conn) continue ; client *slave = connGetPrivateData (conn); if ((nwritten = connWrite (conn, server.rdb_pipe_buff, server.rdb_pipe_bufflen)) == -1 ) { if (connGetState (conn) != CONN_STATE_CONNECTED) { serverLog (LL_WARNING, "Diskless rdb transfer, write error sending DB to replica: %s" , connGetLastError (conn)); freeClient (slave); server.rdb_pipe_conns[i] = NULL ; continue ; } slave->repldboff = 0 ; } else { slave->repldboff = nwritten; server.stat_net_output_bytes += nwritten; } if (nwritten != server.rdb_pipe_bufflen) { server.rdb_pipe_numconns_writing++; connSetWriteHandler (conn, rdbPipeWriteHandler); } stillAlive++; } if (stillAlive == 0 ) { serverLog (LL_WARNING, "Diskless rdb transfer, last replica dropped, killing fork child." ); killRDBChild (); RdbPipeCleanup (); } if (server.rdb_pipe_numconns_writing || stillAlive == 0 ) { aeDeleteFileEvent (server.el, server.rdb_pipe_read, AE_READABLE); break ; } } }
rdbPipeWriteHandler rdbPipeWriteHandler
函数,是将 rdbPipeReadHandler
函数中server.rdb_pipe_buff
里的剩余数发送给没有接受到 server.rdb_pipe_bufflen
个字节的slave。因此,当conn
上可写事件触发,就将剩余数据发送给slave。
如果此次仍然没有将server.rdb_pipe_buff
里的剩余数据发送完,则直接return,等待下次可写事件触发
如果此次将server.rdb_pipe_buff
里的剩余数据全部发送至slave,那么取消conn
上的可写事件,并且server.rdb_pipe_numconns_writing--
如果server.rdb_pipe_numconns_writing ==0
,那么就可以重新开启server.rdb_pipe_read
上的可读事件,从子进程继续读取rdb数据
上述最后一个部分是在内部调用rdbPipeWriteHandlerConnRemoved
函数完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 void rdbPipeWriteHandler (struct connection *conn) { serverAssert (server.rdb_pipe_bufflen>0 ); client *slave = connGetPrivateData (conn); int nwritten; if ((nwritten = connWrite (conn, server.rdb_pipe_buff + slave->repldboff, server.rdb_pipe_bufflen - slave->repldboff)) == -1 ) { if (connGetState (conn) != CONN_STATE_CONNECTED) { serverLog (LL_WARNING, "Write error sending DB to replica: %s" , connGetLastError (conn)); freeClient (slave); } return ; } slave->repldboff += nwritten; server.stat_net_output_bytes += nwritten; if (slave->repldboff < server.rdb_pipe_bufflen) return ; rdbPipeWriteHandlerConnRemoved (conn); }
rdbPipeWriteHandlerConnRemoved 如果主服务器master已经将server.rdb_pipe_buff
中的数据全部发送至server.rdb_pipe_conns
中的每一个没有掉线的slave,那么将为server.rdb_pipe_read
重新注册可读事件,等待子进程的rdb数据到来,再次进入rdbPipeReadHandler
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void rdbPipeWriteHandlerConnRemoved (struct connection *conn) { if (!connHasWriteHandler (conn)) return ; connSetWriteHandler (conn, NULL ); server.rdb_pipe_numconns_writing--; if (server.rdb_pipe_numconns_writing == 0 ) { if (aeCreateFileEvent (server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL ) == AE_ERR) { serverPanic ("Unrecoverable error creating server.rdb_pipe_read file event." ); } } }
backgroundSaveDoneHandler 运行到backgroundSaveDoneHandler
函数,说明子进程已经结束。那么到此,对于完全重同步任务,master已完成三步:
最后两个任务,backgroundSaveDoneHandler
函数是通过调用 updateSlavesWaitingBgsave
函数来完成的。
1 2 3 4 5 6 7 8 9 10 11 12 13 void backgroundSaveDoneHandler (int exitcode, int bysignal) { switch (server.rdb_child_type) { case RDB_CHILD_TYPE_DISK: backgroundSaveDoneHandlerDisk (exitcode,bysignal); break ; case RDB_CHILD_TYPE_SOCKET: backgroundSaveDoneHandlerSocket (exitcode,bysignal); break ; default : serverPanic ("Unknown RDB child type." ); break ; } }
updateSlavesWaitingBgsave updateSlavesWaitingBgsave
中处理slave对象不仅仅只有 server.rdb_pipe_conns
中,而是当前server.slaves
。
此时serer.slaves
中可能有部分slaves处于SLAVE_STATE_WAIT_BGSAVE_START
状态,等待开启BGSAVE,因此这部分slaves的处理和之前类似:1)获取他们的能力交集;2)调用 startBgsaveForReplication(mincapa)
函数,来开启完全重同步过程。
状态是SLAVE_STATE_WAIT_BGSAVE_END
的slave,则是从上面运行至此。对于这些slave,updateSlavesWaitingBgsave
函数需要完成剩余的两个任务:
在 RDB_CHILD_TYPE_DISK
类型下,将slave的状态修改为SLAVE_STATE_SEND_BULK
,在回调函数sendBulkToSlave
完成数据发送,再将slave状态修改为SLAVE_STATE_ONLINE
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 void updateSlavesWaitingBgsave (int bgsaveerr, int type) { listNode *ln; int startbgsave = 0 ; int mincapa = -1 ; listIter li; listRewind (server.slaves, &li); while ((ln = listNext (&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { startbgsave = 1 ; mincapa = (mincapa == -1 ) ? slave->slave_capa : (mincapa & slave->slave_capa); } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { struct redis_stat buf; if (type == RDB_CHILD_TYPE_SOCKET) { serverLog (LL_NOTICE, "Streamed RDB transfer with replica %s succeeded (socket)." "Waiting for REPLCONF ACK from slave to enable streaming" , replicationGetSlaveName (slave)); slave->replstate = SLAVE_STATE_ONLINE; slave->repl_put_online_on_ack = 1 ; slave->repl_ack_time = server.unixtime; } else { if (bgsaveerr != C_OK) { freeClient (slave); serverLog (LL_WARNING, "SYNC failed. BGSAVE child returned an error" ); continue ; } if ((slave->repldbfd = open (server.rdb_filename, O_RDONLY)) == -1 || redis_fstat (slave->repldbfd,&buf) == -1 ) { freeClient (slave); serverLog (LL_WARNING, "SYNC failed. Can't open/stat DB after BGSAVE: %s" , strerror (errno)); continue ; } slave->repldboff = 0 ; slave->repldbsize = buf.st_size; slave->replstate = SLAVE_STATE_SEND_BULK; slave->replpreamble = sdscatprintf (sdsempty (), "$%lld\r\n" , (unsigned long long ) slave->repldbsize); connSetWriteHandler (slave->conn,NULL ); if (connSetWriteHandler (slave->conn,sendBulkToSlave) == C_ERR) { freeClient (slave); continue ; } } } } if (startbgsave) startBgsaveForReplication (mincapa); }
sendBulkToSlave 将server.rdb_filename
中的数据发送至slave,分为四个部分:
slave->replpreamble ==1?
,是则先告诉slave即将要发送的rdb文件大小 slave->repldbsize
从rdb文件slave->repldbfd
中读取数据至buf
,
将buf
数据发送至slave
如果rdb文件中的数据发送完毕,则取消注册可写事件、关闭slave->repldbfd
,以及将slave状态修改为SLAVE_STATE_ONLINE
细节可见源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 void sendBulkToSlave (connection *conn) { client *slave = connGetPrivateData (conn); char buf[PROTO_IOBUF_LEN]; ssize_t nwritten, buflen; if (slave->replpreamble) { nwritten = connWrite (conn,slave->replpreamble,sdslen (slave->replpreamble)); if (nwritten == -1 ) { serverLog (LL_VERBOSE, "Write error sending RDB preamble to replica: %s" , connGetLastError (conn)); freeClient (slave); return ; } server.stat_net_output_bytes += nwritten; sdsrange (slave->replpreamble,nwritten,-1 ); if (sdslen (slave->replpreamble) != 0 ) { return ; } sdsfree (slave->replpreamble); slave->replpreamble = NULL ; } lseek (slave->repldbfd,slave->repldboff,SEEK_SET); buflen = read (slave->repldbfd,buf,PROTO_IOBUF_LEN); if (buflen <= 0 ) { serverLog (LL_WARNING, "Read error sending DB to replica: %s" , (buflen == 0 ) ? "premature EOF" : strerror (errno)); freeClient (slave); return ; } if ((nwritten = connWrite (conn,buf,buflen)) == -1 ) { if (connGetState (conn) != CONN_STATE_CONNECTED) { serverLog (LL_WARNING, "Write error sending DB to replica: %s" , connGetLastError (conn)); freeClient (slave); } return ; } slave->repldboff += nwritten; server.stat_net_output_bytes += nwritten; if (slave->repldboff == slave->repldbsize) { close (slave->repldbfd); slave->repldbfd = -1 ; connSetWriteHandler (slave->conn,NULL ); putSlaveOnline (slave); } }
putSlaveOnline 将slave状态设置为 SLAVE_STATE_ONLINE
,表示与slave的完全重同步完成了。
为slave注册可写事件。以后master与slave的数据交互应该就是传播了,即当master的客户端对master执行了写指令writeCmd
,导致master的server.dirty++
,那么就需要将writeCmd
及其参数传播到salve,使得slave与master仍然具有一致性。此时这个写回调函数就应该是剖析REDIS的输出缓冲区 中讲解的 sendReplyToClient
,这样可以让slave像接受到普通客户端的指令一样去解析并执行这条写命令writeCmd
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void putSlaveOnline (client *slave) { slave->replstate = SLAVE_STATE_ONLINE; slave->repl_put_online_on_ack = 0 ; slave->repl_ack_time = server.unixtime; if (connSetWriteHandler (slave->conn, sendReplyToClient) == C_ERR) { serverLog (LL_WARNING, "Unable to register writable event for replica bulk transfer: %s" , strerror (errno)); freeClient (slave); return ; } refreshGoodSlavesCount (); moduleFireServerEvent (REDISMODULE_EVENT_REPLICA_CHANGE, REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, NULL ); serverLog (LL_NOTICE, "Synchronization with replica %s succeeded" , replicationGetSlaveName (slave)); }
propagate 上面详细讲解了slave向master发送PSYNC请求的过程,初步使得主从一致,继续维持这个一致性就需要靠propagate
。
每次调用call()
函数都会判断当前指令c->cmd
是否对服务器键产生修改,即server.dirty !=0
,那么就会调用propagate
函数,将c->cmd
及其参数传播到slaves。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void call (client *c, int flags) { if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) { int propagate_flags = PROPAGATE_NONE; if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL); if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL; if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF; if (c->flags & CLIENT_PREVENT_REPL_PROP || !(flags & CMD_CALL_PROPAGATE_REPL)) propagate_flags &= ~PROPAGATE_REPL; if (c->flags & CLIENT_PREVENT_AOF_PROP || !(flags & CMD_CALL_PROPAGATE_AOF)) propagate_flags &= ~PROPAGATE_AOF; if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE)) propagate (c->cmd,c->db->id,c->argv,c->argc,propagate_flags); } }
在propagate
函数中,调用replicationFeedSlaves
函数将当前指令及其参数argv
传递给所有的server.slaves
:
1 2 3 4 5 6 void propagate (struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile (cmd,dbid,argv,argc); if (flags & PROPAGATE_REPL) replicationFeedSlaves (server.slaves, dbid, argv, argc); }
replicationFeedSlaves replicationFeedSlaves
函数,只能被最顶级的master调用。如果一个slave也有sub-slave要实现这样的功能需要调用replicationFeedSlavesFromMasterStream
函数,原因见上文分析。整个过程也是类似:
如果dictId
和上次执行写入的指令所在的server.slaveseldb
不一致,则需要将SELECT dictid
以rdb格式序列化后写入server.repl_backlog
和所有的slaves
将导致server.diry++
的指令及其参数分别写入server.repl_backlog
和所有的slaves
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 void replicationFeedSlaves (list *slaves, int dictid, robj **argv, int argc) { listNode *ln; listIter li; int j, len; char llstr[LONG_STR_SIZE]; if (server.masterhost != NULL ) return ; if (server.repl_backlog == NULL && listLength (slaves) == 0 ) return ; serverAssert (!(listLength (slaves) != 0 && server.repl_backlog == NULL )); if (server.slaveseldb != dictid) { robj *selectcmd; if (0 <= dictid && dictid < PROTO_SHARED_SELECT_CMDS) { selectcmd = shared.select[dictid]; } else { int dictid_len; dictid_len = ll2string (llstr, sizeof (llstr), dictid); selectcmd = createObject (OBJ_STRING, sdscatprintf (sdsempty (), "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n" , dictid_len, llstr)); } if (server.repl_backlog) feedReplicationBacklogWithObject (selectcmd); listRewind (slaves,&li); while ((ln = listNext (&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue ; addReply (slave,selectcmd); } if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount (selectcmd); } server.slaveseldb = dictid; if (server.repl_backlog) { char aux[LONG_STR_SIZE+3 ]; aux[0 ] = '*' ; len = ll2string (aux+1 ,sizeof (aux)-1 ,argc); aux[len+1 ] = '\r' ; aux[len+2 ] = '\n' ; feedReplicationBacklog (aux,len+3 ); for (j = 0 ; j < argc; j++) { long objlen = stringObjectLen (argv[j]); aux[0 ] = '$' ; len = ll2string (aux+1 ,sizeof (aux)-1 ,objlen); aux[len+1 ] = '\r' ; aux[len+2 ] = '\n' ; feedReplicationBacklog (aux,len+3 ); feedReplicationBacklogWithObject (argv[j]); feedReplicationBacklog (aux+len+1 ,2 ); } } listRewind (slaves,&li); while ((ln = listNext (&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue ; addReplyArrayLen (slave,argc); for (j = 0 ; j < argc; j++) addReplyBulk (slave,argv[j]); } }
replicationCron replicationCron
函数是个定时触发的函数,除去上述已提到的功能外,主要就是检测超时。
1)检测slave与master在建立连接的过程是否超时
REDIS的超时时间server.repl_timeout
默认是60s,每个slave都有个server.repl_transfer_lastio
字段,记录着最近一次读取master数据的时间。在slave与master建立连接的过程中,会记录更新每个状态server.repl_state
是否超时。
1 2 3 4 5 6 7 if (server.masterhost && (time (NULL )-server.repl_transfer_lastio) > server.repl_timeout) { }
2)超时连接
当主从已经建立好连接,slave的状态是 REPL_STATE_CONNECTED
。那么之后slave会在replicationCron
函数中检测与master的连接是否超时。
1 2 3 4 5 6 7 if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED && (time (NULL )-server.master->lastinteraction) > server.repl_timeout) { serverLog (LL_WARNING,"MASTER timeout: no data nor PING received..." ); freeClient (server.master); }
连接是否超时,基于server.master->lastinteraction
字段判断,而这个字段更新有两处:
为了维持主从连接,主服务器会在replicationCron
函数中定时向所有的slave发送PING
,来让从服务器slave更新server.master->lastinteraction
字段。其中server.repl_ping_slave_period
默认值是10,大概每10ms发送一次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 if ((replication_cron_loops % server.repl_ping_slave_period) == 0 && listLength(server.slaves)) { int manual_failover_in_progress = server.cluster_enabled && server.cluster->mf_end && clientsArePaused(); if (!manual_failover_in_progress) { ping_argv[0 ] = createStringObject("PING" , 4 ); replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1 ); decrRefCount(ping_argv[0 ]); } }
3)心跳检测 而slave也会定时向主服务器发送 REPLCONF ACK offset
指令,主要目的:
+ 告诉主服务器自己当前的复制偏移量;
+ 心跳检测,告诉master自己还活着
1 2 3 4 5 6 if (server.masterhost && server.master && !(server.master->flags & CLIENT_PRE_PSYNC)) { replicationSendAck (); }
1 2 3 4 5 6 7 8 9 10 11 12 13 void replicationSendAck (void ) { client *c = server.master; if (c != NULL ) { c->flags |= CLIENT_MASTER_FORCE_REPLY; addReplyArrayLen (c,3 ); addReplyBulkCString (c,"REPLCONF" ); addReplyBulkCString (c,"ACK" ); addReplyBulkLongLong (c,c->reploff); c->flags &= ~CLIENT_MASTER_FORCE_REPLY; } }
通过 slave->repl_ack_time
字段,主服务器就可以判断从服务器中是否有slave超时连接,有则调用freeClient(slave)
关闭连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 if (listLength (server.slaves)) { listIter li; listNode *ln; listRewind (server.slaves,&li); while ((ln = listNext (&li))) { client *slave = ln->value; if (slave->replstate != SLAVE_STATE_ONLINE) continue ; if (slave->flags & CLIENT_PRE_PSYNC) continue ; if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) { serverLog (LL_WARNING, "Disconnecting timedout replica: %s" , replicationGetSlaveName (slave)); freeClient (slave); } } }