replication中解决了主从同步的问题,sentinel解决两个问题
当前各个主从服务器运行状态
如果主服务器出现故障,怎么转移故障
监视状态 sentinelRedisInstance 每个sentinel实例都被封装成 sentinelRedisInstance d对象,每个sentinelRedisInstance 创建后,sentinelRedisInstance 要向监视的主服务器master发起连接请求,成为master的客户端。这里使用前面所述的 hiredis.c/redisAsyncContext
结构体来创建客户端。
每个sentinel都会向监视的主服务器master发起两个异步连接,即创建两个redisAsyncContext
对象来和master进行通信:
命令连接:即专门用于sentinel向master发送普通命令,并接受master对这些命令的reply
订阅连接:即专门用于订阅主服务的 __sentinel__:hello
通道
sentinelRedisInstance 内部结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 typedef struct sentinelRedisInstance { int flags; char *name; char *runid; uint64_t config_epoch; sentinelAddr *addr; instanceLink *link; mstime_t last_pub_time; mstime_t last_hello_time; mstime_t last_master_down_reply_time; mstime_t s_down_since_time; mstime_t o_down_since_time; mstime_t down_after_period; mstime_t info_refresh; dict *renamed_commands; int role_reported; mstime_t role_reported_time; mstime_t slave_conf_change_time; dict *sentinels; dict *slaves; unsigned int quorum; int parallel_syncs; char *auth_pass; char *auth_user; mstime_t master_link_down_time; int slave_priority; mstime_t slave_reconf_sent_time; struct sentinelRedisInstance *master ; char *slave_master_host; int slave_master_port; int slave_master_link_status; unsigned long long slave_repl_offset; char *leader; uint64_t leader_epoch; uint64_t failover_epoch; int failover_state; mstime_t failover_state_change_time; mstime_t failover_start_time; mstime_t failover_timeout; mstime_t failover_delay_logged; struct sentinelRedisInstance *promoted_slave ; char *notification_script; char *client_reconfig_script; sds info; } sentinelRedisInstance;
instanceLink
sentinel要向master发起连接,是通过instanceLink
对象来完成。在instanceLink 内部,封装了两个连接cc
和pc
,用于和监视的主服务进行通信,同时设置了一些关于PING
和PONG
的字段来记录连接状态。而sentinelRedisInstance
中时间字段监视的是主服务的状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 typedef struct instanceLink { int refcount; int disconnected; int pending_commands; redisAsyncContext *cc; redisAsyncContext *pc; mstime_t cc_conn_time; mstime_t pc_conn_time; mstime_t pc_last_activity; mstime_t last_avail_time; mstime_t act_ping_time; mstime_t last_ping_time; mstime_t last_pong_time; mstime_t last_reconn_time; } instanceLink;
createInstanceLink
createInstanceLink
函数用于创建一个instanceLink
对象,其中初始化值如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 instanceLink* createInstanceLink (void ) { instanceLink *link = zmalloc(sizeof (*link)); link->refcount = 1 ; link->disconnected = 1 ; link->pending_commands = 0 ; link->cc = NULL ; link->pc = NULL ; link->cc_conn_time = 0 ; link->pc_conn_time = 0 ; link->last_reconn_time = 0 ; link->pc_last_activity = 0 ; link->act_ping_time = mstime(); link->last_ping_time = 0 ; link->last_avail_time = mstime(); link->last_pong_time = mstime(); return link; }
sentinelReconnectInstance sentinelReconnectInstance
函数,用于建立连接、断开后重新连接。sentinelRedisInstance 类似于client
,负责和对端通信。此时,乙方是server
,对端被封装成sentinelRedisInstance 对象ri
,对端的类型ri->flags
表征:
1 2 3 #define SRI_MASTER (1<<0) #define SRI_SLAVE (1<<1) #define SRI_SENTINEL (1<<2)
sentinel服务器要和 (ri->adder->ip, ri->addr->port)
建立指令连接link->cc
和订阅连接link->pc
。过程如下:
确认ri->link->disconnected ==1
。只有link->cc
和 link->pc
有一个没有连接成功ri->link->disconnected ==1
调用redisAsyncConnectBind
函数和 (ri->adder->ip, ri->addr->port)
建立连接,获得redisAsyncContext
对象 注意: sentinel与另一个sentinel之间没有订阅连接link->pc
:
1 2 3 4 if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL ) { }
成功建立连接后,调用redisAeAttach
函数将link->cc
加入到server.el
事件循环中,让当server
的事件循环也能处理 redisAsyncContext
的可读可写事件
设置连接和断开连接的回调函数
对于link->pc
,此次还需要订阅__sentinel__:hello
通道
link->cc
和link->pc
成功连接后,才将link->disconnected
设置为0。但是如果对端ri
也是个sentinel,那么就不需要建立link->pc
,也能link->disconnected =0
。
1 2 if (link->cc && (ri->flags & SRI_SENTINEL || link->pc)) link->disconnected = 0 ;
整个过程详细如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 void sentinelReconnectInstance (sentinelRedisInstance *ri) { if (ri->link->disconnected == 0 ) return ; if (ri->addr->port == 0 ) return ; instanceLink *link = ri->link; mstime_t now = mstime(); if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return ; ri->link->last_reconn_time = now; if (link->cc == NULL ) { link->cc = redisAsyncConnectBind(ri->addr->ip, ri->addr->port, NET_FIRST_BIND_ADDR); if (!link->cc->err && server.tls_replication && (instanceLinkNegotiateTLS(link->cc) == C_ERR)) { sentinelEvent(LL_DEBUG, "-cmd-link-reconnection" , ri, "%@ #Failed to initialize TLS" ); instanceLinkCloseConnection(link,link->cc); } else if (link->cc->err) { sentinelEvent(LL_DEBUG, "-cmd-link-reconnection" , ri, "%@ #%s" , link->cc->errstr); instanceLinkCloseConnection(link,link->cc); } else { link->pending_commands = 0 ; link->cc_conn_time = mstime(); link->cc->data = link; redisAeAttach(server.el,link->cc); redisAsyncSetConnectCallback(link->cc,sentinelLinkEstablishedCallback); redisAsyncSetDisconnectCallback(link->cc,sentinelDisconnectCallback); sentinelSendAuthIfNeeded(ri,link->cc); sentinelSetClientName(ri,link->cc,"cmd" ); sentinelSendPing(ri); } } if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL ) { link->pc = redisAsyncConnectBind(ri->addr->ip, ri->addr->port, NET_FIRST_BIND_ADDR); if (!link->pc->err && server.tls_replication && (instanceLinkNegotiateTLS(link->pc) == C_ERR)) { sentinelEvent(LL_DEBUG, "-pubsub-link-reconnection" , ri, "%@ #Failed to initialize TLS" ); } else if (link->pc->err) { sentinelEvent(LL_DEBUG, "-pubsub-link-reconnection" , ri, "%@ #%s" , link->pc->errstr); instanceLinkCloseConnection(link,link->pc); } else { int retval; link->pc_conn_time = mstime(); link->pc->data = link; redisAeAttach(server.el, link->pc); redisAsyncSetConnectCallback(link->pc,sentinelLinkEstablishedCallback); redisAsyncSetDisconnectCallback(link->pc,sentinelDisconnectCallback); sentinelSendAuthIfNeeded(ri,link->pc); sentinelSetClientName(ri,link->pc,"pubsub" ); retval = redisAsyncCommand(link->pc, sentinelReceiveHelloMessages, ri, "%s %s" , sentinelInstanceMapCommand(ri,"SUBSCRIBE" ), SENTINEL_HELLO_CHANNEL); if (retval != C_OK) { instanceLinkCloseConnection(link,link->pc); return ; } } } if (link->cc && (ri->flags & SRI_SENTINEL || link->pc)) link->disconnected = 0 ; }
sentinelSetClientName 向对端发送 CLIENT SETNAME name
指令,即告诉对端自己的名字:
如果建立的连接是link-cc
,则名字是sentinel-runid-cmd
,其中runid是前8个字符。
如果建立的连接是link-pc
,则名字是sentinel-runid-pubsub
这个可以通过CLIENT LIST
指令查看自己的客户端列表:
1 2 3 127.0.0.1:6379> CLIENT LIST id=272 addr=127.0.0.1:11453 name=sentinel-e5d1f1a9-cmd // link-cc 其余参数省略... id=450 addr=127.0.0.1:11687 name=szz // 这个是自己的客户端 其余参数省略...
对端对CLIENT SETNAME name
指令的回复sentinel不在乎,因此设置回调函数是 sentinelDiscardReplyCallback
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void sentinelSetClientName (sentinelRedisInstance *ri, redisAsyncContext *c, char *type) { char name[64 ]; snprintf (name,sizeof (name),"sentinel-%.8s-%s" , sentinel.myid, type); if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s SETNAME %s" , sentinelInstanceMapCommand(ri,"CLIENT" ), name) == C_OK) { ri->link->pending_commands++; } }
sentinelSendPing 在成功建立link->cc
时,会向对端发送PING
指令。redisAsyncCommand
函数返回C_OK
,则更新相关时间字段:
ri->link->last_ping_time
:表示最近一次发送PING时间
ri->link->act_ping_time
:表示当前上次发送PING后是否接受到对端回复PONG。如果回复了则此时ri->link->act_ping_time
的值是0,并将 ri->link->act_ping_time
设置为此时发送PING的时间,否则就保持上次发送PING的时间。
对于PING回复回调函数是 sentinelPingReplyCallback
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 int sentinelSendPing (sentinelRedisInstance *ri) { int retval = redisAsyncCommand(ri->link->cc, sentinelPingReplyCallback, ri, "%s" , sentinelInstanceMapCommand(ri,"PING" )); if (retval == C_OK) { ri->link->pending_commands++; ri->link->last_ping_time = mstime(); if (ri->link->act_ping_time == 0 ) ri->link->act_ping_time = ri->link->last_ping_time; return 1 ; } return 0 ; }
sentinelPingReplyCallback 对端对PING
指令的有效恢复只有三种:
1 2 3 +PONG // r->type == REDIS_REPLY_STATUS -LOADING // r->type == REDIS_REPLY_ERROR -MASTERDOWN // r->type == REDIS_REPLY_ERROR
因此在这三种情况下,是可以将link->act_ping_time
设置为0,表示接受到对端的有效回复。如果对端回复的BUSY
,表示对端忙于脚本任务,那么sentinel就向对端发送KILL SCRIPT
命令,杀死脚本,解析对端忙碌状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 void sentinelPingReplyCallback (redisAsyncContext *c, void *reply, void *privdata) { sentinelRedisInstance *ri = privdata; instanceLink *link = c->data; redisReply *r; if (!reply || !link) return ; link->pending_commands--; r = reply; if (r->type == REDIS_REPLY_STATUS || r->type == REDIS_REPLY_ERROR) { if (strncmp (r->str,"PONG" ,4 ) == 0 || strncmp (r->str,"LOADING" ,7 ) == 0 || strncmp (r->str,"MASTERDOWN" ,10 ) == 0 ) { link->last_avail_time = mstime(); link->act_ping_time = 0 ; } else { if (strncmp (r->str,"BUSY" ,4 ) == 0 && (ri->flags & SRI_S_DOWN) && !(ri->flags & SRI_SCRIPT_KILL_SENT)) { if (redisAsyncCommand(ri->link->cc, sentinelDiscardReplyCallback, ri, "%s KILL" , sentinelInstanceMapCommand(ri,"SCRIPT" )) == C_OK) { ri->link->pending_commands++; } ri->flags |= SRI_SCRIPT_KILL_SENT; } } } link->last_pong_time = mstime(); }
sentinelSendPeriodicCommands 既然sentinel要监视master,那是怎么监视的呢?
sentinel定时的发送PING
sentinel定时向监视的主服务器master及master的slaves发送INFO指令,查看他们的状态
sentinel定时向__sentinel__:hello
通道发送hello
信息
通过这种方式来监视整个网络。
PING 在发送PING之前,会先计算下发送PING的频率。ri->down_after_period
的默认值是由sentinel.conf
中的down-after-milliseconds
参数设置的,但是如果ri->down_after_period > SENTINEL_PING_PERIOD
,则以SENTINEL_PING_PERIOD
为频率发送,即每秒发送一次。
因此,PING指针默认是1s一次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #define SENTINEL_PING_PERIOD 1000 void sentinelSendPeriodicCommands (redisAsyncContext* c, void * reply, void * privdata) { ping_period = ri->down_after_period; if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD; if ((now - ri->link->last_pong_time) > ping_period && (now - ri->link->last_ping_time) > ping_period/2 ) { sentinelSendPing(ri); } }
INFO sentinel默认是10s发送一次INFO
命令。但如果ri
是从服务器 ,那么有两种情况会加快发送INFO
命令的频率:
ri->master
处于 O_DOWN
状态:即r->master
处于主观下线状态,那么需要加快发送 INFO 的频率,来密切监视ri
的状态,因为它有可能被另一个sentinel从slave变成master
ri
下线了,那么也需要加快INFO的频率,来获得一个更加准确的重新连接时间
INFO指令的回调函数sentinelInfoReplyCallback
,在回调函数中对其监视的服务器进行分析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 void sentinelSendPeriodicCommands (sentinelRedisInstance* ri) { if ((ri->flags & SRI_SLAVE) && ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) || (ri->master_link_down_time != 0 ))) { info_period = 1000 ; } else { info_period = SENTINEL_INFO_PERIOD; } if ((ri->flags & SRI_SENTINEL) == 0 && (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) { retval = redisAsyncCommand(ri->link->cc, sentinelInfoReplyCallback, ri, "%s" , sentinelInstanceMapCommand(ri,"INFO" )); if (retval == C_OK) ri->link->pending_commands++; } }
sentinelInfoReplyCallback
在INFO 指令的回复回调函数中,会调用sentinelRefreshInstanceInfo
函数,对当前主从服务器的状态进行判断:
在INFO指令的回应中,发现了监视的master有新的从服务器后,sentient也会和slaves之间建立命令和订阅两个连接。下次sentinel的INFO命令将发送给master及其slaves
master是否发送故障,那么就需要故障转移
sentinelRefreshInstanceInfo
函数留到后面章节讲述。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void sentinelInfoReplyCallback (redisAsyncContext* c, void * reply, void * privdata) { sentinelRedisInstance *ri = privdata; instanceLink *link = c->data; redisReply *r; if (!reply || !link) return ; link->pending_commands--; r = reply; if (r->type == REDIS_REPLY_STRING) sentinelRefreshInstanceInfo(ri,r->str); }
PUB/SUB 默认情况下,sentinel每2s一次向监视的主服务器及其从服务器的link->cc
发送PUBLISH
指令,格式如下:
1 PUBLISH __sentinel__:hello "s_ip,s_port,s_runid,s_epoch,m_name,m_ip,m_port,m_epoch"
即,向各个服务器的__sentinel__:hellow
通道发送了一条信息,那么订阅这个通道的所有服务器都会接受到这条信息。
1 2 3 4 5 6 7 8 #define SENTINEL_PUBLISH_PERIOD 2000 void sentinelInfoReplyCallback (redisAsyncContext* c, void * reply, void * privdata) { if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) { sentinelSendHello(ri); } }
sentinelSendHello
sentinelSendHello
函数,主要就是格式化待发送数据,然后通过link->cc
发送PUBLISH指令。设置回调函数sentinelPublishReplyCallback
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 int sentinelSendHello (sentinelRedisInstance *ri) { char ip[NET_IP_STR_LEN]; char payload[NET_IP_STR_LEN+1024 ]; int retval; char *announce_ip; int announce_port; sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master; sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master); if (ri->link->disconnected) return C_ERR; if (sentinel.announce_ip) { announce_ip = sentinel.announce_ip; } else { if (anetSockName(ri->link->cc->c.fd, ip, sizeof (ip),NULL ) == -1 ) return C_ERR; announce_ip = ip; } if (sentinel.announce_port) announce_port = sentinel.announce_port; else if (server.tls_replication && server.tls_port) announce_port = server.tls_port; else announce_port = server.port; snprintf (payload, sizeof (payload), "%s,%d,%s,%llu," "%s,%s,%d,%llu" , announce_ip, announce_port, sentinel.myid, (unsigned long long ) sentinel.current_epoch, master->name, master_addr->ip, master_addr->port, (unsigned long long ) master->config_epoch); retval = redisAsyncCommand(ri->link->cc, sentinelPublishReplyCallback, ri, "%s %s %s" , sentinelInstanceMapCommand(ri,"PUBLISH" ), SENTINEL_HELLO_CHANNEL, payload); if (retval != C_OK) return C_ERR; ri->link->pending_commands++; return C_OK; }
sentinelPublishReplyCallback
sentinel在接收到对端对于hello
的回应后,主要更新下 ri->last_pub_time
时间。
1 2 3 4 5 6 7 8 9 10 11 12 void sentinelPublishReplyCallback (redisAsyncContext *c, void *reply, void *privdata) { sentinelRedisInstance *ri = privdata; instanceLink *link = c->data; redisReply *r; if (!reply || !link) return ; link->pending_commands--; r = reply; if (r->type != REDIS_REPLY_ERROR) ri->last_pub_time = mstime(); }
sentinelReceiveHelloMessages
那么其他sentinel接收到hello
通道的信息怎么处理???
在sentinelReconnectInstance
函数中,创建sentinelRedisInstance
实例时,就已订阅了hello
通道,并设置回调函数为sentinelReceiveHelloMessages
函数。当其他sentinelRedisInstance 实例向__sentinel__:hello
通道发送消息时,就能接受到如下格式的数据:
1 2 3 1 ) "message" 2 ) __sentinel__:hello3 ) "s_ip,s_port,s_runid,s_epoch,m_name,m_ip,m_port,m_epoch"
sentinelReceiveHelloMessages
函数,主要完成以下几个任务:
更新link->pc_last_activity
时间后
判断接受到数据格式正确
hello
通道的数据不是自己发送
如果以上都满足,才调用 sentinelProcessHelloMessage
函数来处理hello通道传递来的数据。
整个流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void sentinelReceiveHelloMessages (redisAsyncContext *c, void *reply, void *privdata) { sentinelRedisInstance *ri = privdata; redisReply *r; UNUSED(c); if (!reply || !ri) return ; r = reply; ri->link->pc_last_activity = mstime(); if (r->type != REDIS_REPLY_ARRAY || r->elements != 3 || r->element[0 ]->type != REDIS_REPLY_STRING || r->element[1 ]->type != REDIS_REPLY_STRING || r->element[2 ]->type != REDIS_REPLY_STRING || strcmp (r->element[0 ]->str,"message" ) != 0 ) return ; if (strstr (r->element[2 ]->str, sentinel.myid) != NULL ) return ; sentinelProcessHelloMessage(r->element[2 ]->str, r->element[2 ]->len); }
故障转移 sentinelCheckSubjectivelyDown 当前sentinel会调用sentinelCheckSubjectivelyDown
函数, 以1ms
的频率来检测监视的服务器是否下线。ri
可能是slave也可能是个master。
如果当前sentinel之前发送的PING有没有PONG回应,计算距离上次该PING的时间。否则计算距离最近一次收到PONG的时间elapsed
1 2 3 4 5 mstime_t last_avail_time; mstime_t act_ping_time;
是否有必要关闭link->cc
:如果sentinel和对端建立link->cc
的时间超过了SENTINEL_MIN_LINK_RECONNECT_PERIOD
,说明没有接受到PONG不是因为刚刚建立、网络不稳导致的。那么发送的PING在down_after_period
时段内没有回应,那么对端可能发生故障,直接关闭与对端的连接。
是否有必要关闭link->pc
:处理__sentinel__:hello
超时,也关闭link->pc
连接
判断监视的服务器ri是否主观下线:
如果没有回复PING的时间超过ri->down_after_period
,就视为主观下线,加上ri->flags |= SRI_S_DOWN
标志
如果ri
是主服务器,但在INFO的回应中ri
被报告为slave,发生了主从切换,也视为主观下线
如果不满足上面两个中的任何一条件,则不视为主观下线,如果之前设置为主观下线,现在取消SRI_S_DOWN
标志位。
下一步就是判断是否真的下线了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 void sentinelCheckSubjectivelyDown (sentinelRedisInstance *ri) { mstime_t elapsed = 0 ; if (ri->link->act_ping_time) elapsed = mstime() - ri->link->act_ping_time; else if (ri->link->disconnected) elapsed = mstime() - ri->link->last_avail_time; if (ri->link->cc && (mstime() - ri->link->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD && ri->link->act_ping_time != 0 && (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2 ) && (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2 )) { instanceLinkCloseConnection(ri->link,ri->link->cc); } if (ri->link->pc && (mstime() - ri->link->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD && (mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3 )) { instanceLinkCloseConnection(ri->link,ri->link->pc); } if (elapsed > ri->down_after_period || (ri->flags & SRI_MASTER && ri->role_reported == SRI_SLAVE && mstime() - ri->role_reported_time > (ri->down_after_period+SENTINEL_INFO_PERIOD*2 ))) { if ((ri->flags & SRI_S_DOWN) == 0 ) { sentinelEvent(LL_WARNING,"+sdown" ,ri,"%@" ); ri->s_down_since_time = mstime(); ri->flags |= SRI_S_DOWN; } } else { if (ri->flags & SRI_S_DOWN) { sentinelEvent(LL_WARNING,"-sdown" ,ri,"%@" ); ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT); } } }
sentinelCheckObjectivelyDown sentinelCheckObjectivelyDown
函数,仅对主服务器适用,进一步判断是不是真的下线了。
监视主服务master的其他sentinel中master->sentinel
,超过master->quorum
个也认为master下线了,则当前sentinel将master视为真的下线了,即客观下线了
客观下线,加上SRI_O_DOWN
标志,否则取消该标志
详细过程如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 void sentinelCheckObjectivelyDown (sentinelRedisInstance *master) { dictIterator *di; dictEntry *de; unsigned int quorum = 0 , odown = 0 ; if (master->flags & SRI_S_DOWN) { quorum = 1 ; di = dictGetIterator(master->sentinels); while ((de = dictNext(di)) != NULL ) { sentinelRedisInstance *ri = dictGetVal(de); if (ri->flags & SRI_MASTER_DOWN) quorum++; } dictReleaseIterator(di); if (quorum >= master->quorum) odown = 1 ; } if (odown) { if ((master->flags & SRI_O_DOWN) == 0 ) { sentinelEvent(LL_WARNING, "+odown" , master, "%@ #quorum %d/%d" , quorum, master->quorum); master->flags |= SRI_O_DOWN; master->o_down_since_time = mstime(); } } else { if (master->flags & SRI_O_DOWN) { sentinelEvent(LL_WARNING,"-odown" ,master,"%@" ); master->flags &= ~SRI_O_DOWN; } } }
sentinelStartFailoverIfNeeded sentinelStartFailoverIfNeeded
函数,判断主服务master是否需要转故障转移:
master必须处于SRI_O_DOWN
状态,即已客观下线
maste不能已经处于故障转移中,
master距离上次故障转移的时间不能太短,太短则在上次的启动时间后延迟 master->failover_timeout*2
如果上述条件都满足,则开启故障转移
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 int sentinelStartFailoverIfNeeded (sentinelRedisInstance *master) { if (!(master->flags & SRI_O_DOWN)) return 0 ; if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0 ; if (mstime() - master->failover_start_time < master->failover_timeout*2 ) { if (master->failover_delay_logged != master->failover_start_time) { time_t clock = (master->failover_start_time + master->failover_timeout*2 ) / 1000 ; char ctimebuf[26 ]; ctime_r(&clock, ctimebuf); ctimebuf[24 ] = '\0' ; master->failover_delay_logged = master->failover_start_time; serverLog(LL_WARNING, "Next failover delay: I will not start a failover before %s" , ctimebuf); } return 0 ; } sentinelStartFailover(master); return 1 ; }
sentinelAskMasterStateToOtherSentinels 当监视的主服务发生故障,那么sentinel要从slaves中选出一个提升为主服务器,被选中的slave叫做promoted_slave 。但是在此之前,要先投票决定由哪个sentinel来执行这个过程。
sentinelAskMasterStateToOtherSentinels
函数, 当sentinel向master->sentinels
中其他sentinels发送如下信息:
1 SENTINEL is-master-down-by-addr master_ip master_port sentinel.current_epoch sentinel.runid
上述信息:告诉接受到此信息的dst_sentinel
,地址是 (master_ip, master_port)
的主服务宕机了,请把(s_epoch, s_runid)
的sentinel设置为leader,由这个leader来完成故障转移的过程。
在发送该指令之前,先判断是否能发送:
master必须处于客观下线状态
当前sentinel与master->sentinels
之间的连接link->cc
没有断开
当前sentinel与master->sentinels
之间,没有设置SENTINEL_ASK_FORCED
标志 或者 距离上次此指令的回复时间大于SENTINEL_ASK_PERIOD
同时满足上面的条件,才能发送此次的is-master-down-by-addr
指令。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 #define SENTINEL_ASK_FORCED (1<<0) void sentinelAskMasterStateToOtherSentinels (sentinelRedisInstance* master, int flags) { dictIterator *di; dictEntry *de; di = dictGetIterator(master->sentinels); while ((de = dictNext(di)) != NULL ) { sentinelRedisInstance* ri = dictGetVal(de); mstime_t elapsed = mstime() - ri->last_master_down_reply_time; char port[32 ]; int retval; if (elapsed > SENTINEL_ASK_PERIOD*5 ) { ri->flags &= ~SRI_MASTER_DOWN; sdsfree(ri->leader); ri->leader = NULL ; } if ((master->flags & SRI_S_DOWN) == 0 ) continue ; if (ri->link->disconnected) continue ; if (!(flags & SENTINEL_ASK_FORCED) && mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD) continue ; ll2string(port,sizeof (port),master->addr->port); retval = redisAsyncCommand(ri->link->cc, sentinelReceiveIsMasterDownReply, ri, "%s is-master-down-by-addr %s %s %llu %s" , sentinelInstanceMapCommand(ri,"SENTINEL" ), master->addr->ip, port, sentinel.current_epoch, (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ? sentinel.myid : "*" ); if (retval == C_OK) ri->link->pending_commands++; } dictReleaseIterator(di); }
master->sentinel
中的sentinel在收到指令后:
1 SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>
先在自己的sentinel.masters
中查找是否有地址为 (ip, port)
的主服务器ri
,并且该 ri
也确实处于SRI_S_DOWN
状态,则回复消息。不过回复有三行数据
1 2 3 sdown // 接受到指令的sentinel中master是否也主观下线 leader // 选出的leader epoch // 该leader的epoch
其他sentinel在接受到 IS-MASTER-DOWN-BY-ADDR
后,处理过程如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 if (!strcasecmp(c->argv[1 ]->ptr,"is-master-down-by-addr" )) { sentinelRedisInstance *ri; long long req_epoch; uint64_t leader_epoch = 0 ; char *leader = NULL ; long port; int isdown = 0 ; if (c->argc != 6 ) goto numargserr; if (getLongFromObjectOrReply(c,c->argv[3 ],&port,NULL ) != C_OK || getLongLongFromObjectOrReply(c,c->argv[4 ],&req_epoch,NULL ) != C_OK) return ; ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters, c->argv[2 ]->ptr, port, NULL ); if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) && (ri->flags & SRI_MASTER)) isdown = 1 ; if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5 ]->ptr,"*" )) { leader = sentinelVoteLeader(ri, (uint64_t )req_epoch, c->argv[5 ]->ptr, &leader_epoch); } addReplyArrayLen(c,3 ); addReply(c, isdown ? shared.cone : shared.czero); addReplyBulkCString(c, leader ? leader : "*" ); addReplyLongLong(c, (long long )leader_epoch); if (leader) sdsfree(leader); }
sentinelReceiveIsMasterDownReply sentinelReceiveIsMasterDownReply
函数,是src_sentinel
发送 IS-MASTER-DOWN-BY-ADDR
指令的回复回调函数。当接收到对端对IS-MASTER-DOWN-BY-ADDR
指令回复后调用。
在 IS-MASTER-DOWN-BY-ADDR
指令的回复中,如果runid != *
,则runid
是dst_sentinel
选出来的leader
,那么就将dst_sentinel
的leader更改为runid
。
注意: sentinelReceiveIsMasterDownReply
函数中的 privdata
就是dst_sentinel
,因此在接收到dst_sentinel
的回复后,对dst_sentinel
的参数做一些修改。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 void sentinelReceiveIsMasterDownReply (redisAsyncContext* c, void * reply, void * privdata) { sentinelRedisInstance *ri = privdata; instanceLink *link = c->data; redisReply *r; if (!reply || !link) return ; link->pending_commands--; r = reply; if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 && r->element[0 ]->type == REDIS_REPLY_INTEGER && r->element[1 ]->type == REDIS_REPLY_STRING && r->element[2 ]->type == REDIS_REPLY_INTEGER) { ri->last_master_down_reply_time = mstime(); if (r->element[0 ]->integer == 1 ) ri->flags |= SRI_MASTER_DOWN; else ri->flags &= ~SRI_MASTER_DOWN; if (strcmp (r->element[1 ]->str,"*" )) { sdsfree(ri->leader); if ((long long )ri->leader_epoch != r->element[2 ]->integer) serverLog(LL_WARNING, "%s voted for %s %llu" , ri->name, r->element[1 ]->str, (unsigned long long ) r->element[2 ]->integer); ri->leader = sdsnew(r->element[1 ]->str); ri->leader_epoch = r->element[2 ]->integer; } } }
sentinelVoteLeader 下面来讲述下,选择leader的过程。
要能成为leader,必须要使得req_epoch > sentinel.current_epoch
,因为当前sentinel也有可能负责master的故障转移。这个条件都不满足,则不可能成为leader。
此外,还需要大于master->leader_epoch
。如果都满足则将req_runid
设置为master
的leader
,后期负责master
的故障转移。src_sentinel
在接受到 add
回复后,在sentinelReceiveIsMasterDownReply
函数中,还会将req_runid
设置为dst_Sentinel
的leader。
如果不满足,则在sentinelReceiveIsMasterDownReply
函数中,dst_sentinel
的leader
是master->leader
其中,req_runid
和req_epoch
即src_sentinel
的信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 char * sentinelVoteLeader (sentinelRedisInstance* master, uint64_t req_epoch, char * req_runid, uint64_t * leader_epoch) { if (req_epoch > sentinel.current_epoch) { sentinel.current_epoch = req_epoch; sentinelFlushConfig(); sentinelEvent(LL_WARNING, "+new-epoch" , master, "%llu" , (unsigned long long ) sentinel.current_epoch); } if (req_epoch > master->leader_epoch && req_epoch >= sentinel.current_epoch) { sdsfree(master->leader); master->leader = sdsnew(req_runid); master->leader_epoch = sentinel.current_epoch; sentinelFlushConfig(); sentinelEvent(LL_WARNING, "+vote-for-leader" , master, "%s %llu" , master->leader, (unsigned long long ) master->leader_epoch); if (strcasecmp(master->leader, sentinel.myid)) master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC; } *leader_epoch = master->leader_epoch; return master->leader ? sdsnew(master->leader) : NULL ; }
sentinelFailoverStateMachine sentinelFailoverStateMachine
函数, 是用于控制故障转移流程。
故障转移的状态如下, SENTINEL_FAILOVER_STATE_WAIT_START
到 SENTINEL_FAILOVER_STATE_UPDATE_CONFIG
:
1 2 3 4 5 6 7 8 #define SENTINEL_FAILOVER_STATE_NONE 0 #define SENTINEL_FAILOVER_STATE_WAIT_START 1 #define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 #define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 #define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 #define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 #define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 6
下面按照ri->failover_state
状态的流程来介绍故障转移的流程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 void sentinelFailoverStateMachine (sentinelRedisInstance *ri) { serverAssert(ri->flags & SRI_MASTER); if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return ; switch (ri->failover_state) { case SENTINEL_FAILOVER_STATE_WAIT_START: sentinelFailoverWaitStart(ri); break ; case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: sentinelFailoverSelectSlave(ri); break ; case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: sentinelFailoverSendSlaveOfNoOne(ri); break ; case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: sentinelFailoverWaitPromotion(ri); break ; case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: sentinelFailoverReconfNextSlave(ri); break ; } }
SENTINEL_FAILOVER_STATE_WAIT_START 状态是SENTINEL_FAILOVER_STATE_WAIT_START
时,由 sentinelFailoverWaitStart
函数转移到下一阶段。流程如下:
在正式的故障转移之前,要先从当sentinel及master->sentinels
中选出一个leader,负责master的故障转移
判断是否满足开启故障转移的条件
满足,则进入故障转移的下一阶段SENTINEL_FAILOVER_STATE_SELECT_SLAVE
。
这个阶段主要任务就是选出leader。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 void sentinelFailoverWaitStart (sentinelRedisInstance *ri) { char *leader; int isleader; leader = sentinelGetLeader(ri, ri->failover_epoch); isleader = leader && strcasecmp(leader,sentinel.myid) == 0 ; sdsfree(leader); if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) { int election_timeout = SENTINEL_ELECTION_TIMEOUT; if (election_timeout > ri->failover_timeout) election_timeout = ri->failover_timeout; if (mstime() - ri->failover_start_time > election_timeout) { sentinelEvent(LL_WARNING,"-failover-abort-not-elected" ,ri,"%@" ); sentinelAbortFailover(ri); } return ; } sentinelEvent(LL_WARNING,"+elected-leader" ,ri,"%@" ); if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION) sentinelSimFailureCrash(); ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE; ri->failover_state_change_time = mstime(); sentinelEvent(LL_WARNING,"+failover-state-select-slave" ,ri,"%@" ); }
SENTINEL_FAILOVER_STATE_SELECT_SLAVE 调用sentinelSelectSlave
函数,从master->slaves
中选出promoted_slave
,为其加上SRI_PROMOTED
标志位。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void sentinelFailoverSelectSlave (sentinelRedisInstance *ri) { sentinelRedisInstance *slave = sentinelSelectSlave(ri); if (slave == NULL ) { sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave" ,ri,"%@" ); sentinelAbortFailover(ri); } else { sentinelEvent(LL_WARNING,"+selected-slave" ,slave,"%@" ); slave->flags |= SRI_PROMOTED; ri->promoted_slave = slave; ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE; ri->failover_state_change_time = mstime(); sentinelEvent(LL_NOTICE, "+failover-state-send-slaveof-noone" , slave, "%@" ); } }
SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 选出promoted_slave
之后,需要将promoted_slave
变成master,因此要对其执行 SLAVEOF NO ONE
命令。不过此处发送SLAVEOF
命令的函数是 sentinelSendSlaveOf
,不需要设置回调函数。至于具体promoted_slave
是否成功变成master,通过INFO
信息的回调函数查看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 void sentinelFailoverSendSlaveOfNoOne (sentinelRedisInstance *ri) { int retval; if (ri->promoted_slave->link->disconnected) { if (mstime() - ri->failover_state_change_time > ri->failover_timeout) { sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout" ,ri,"%@" ); sentinelAbortFailover(ri); } return ; } retval = sentinelSendSlaveOf(ri->promoted_slave, NULL , 0 ); if (retval != C_OK) return ; sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion" , ri->promoted_slave, "%@" ); ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION; ri->failover_state_change_time = mstime(); }
sentinelFailoverWaitPromotion
函数用于leader在ri->failover_timeout
时间内是否将promoted_slave
提升为master。如果,超时则中断故障转移。
1 2 3 4 5 6 7 8 void sentinelFailoverWaitPromotion (sentinelRedisInstance *ri) { if (mstime() - ri->failover_state_change_time > ri->failover_timeout) { sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout" ,ri,"%@" ); sentinelAbortFailover(ri); } }
SENTINEL_FAILOVER_STATE_RECONF_SLAVES 从SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
状态转移至SENTINEL_FAILOVER_STATE_RECONF_SLAVES
是在INFO的回复回调函数中完成的。
当promoted_slave
变成master,那么剩下的就是将master->slaves
中的其他slaves变成promoted_slave
的slave。即更改master->slaves
的主服务器。因此要再次执行SLAVEOF
命令。
1 2 3 4 #define SRI_PROMOTED (1<<7) #define SRI_RECONF_SENT (1<<8) /* 已向该slave发送了 SLAVEOF <promoted_slave> . */ #define SRI_RECONF_INPROG (1<<9) #define SRI_RECONF_DONE (1<<10)
对于超时的slave强制设置为 SRI_RECONF_DONE
状态
对于未完成的slave的发送SLAVEOF 命令,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 void sentinelFailoverDetectEnd (sentinelRedisInstance *master) { int not_reconfigured = 0 , timeout = 0 ; dictIterator *di; dictEntry *de; mstime_t elapsed = mstime() - master->failover_state_change_time; if (master->promoted_slave == NULL || master->promoted_slave->flags & SRI_S_DOWN) return ; di = dictGetIterator(master->slaves); while ((de = dictNext(di)) != NULL ) { sentinelRedisInstance *slave = dictGetVal(de); if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue ; if (slave->flags & SRI_S_DOWN) continue ; not_reconfigured++; } dictReleaseIterator(di); if (elapsed > master->failover_timeout) { not_reconfigured = 0 ; timeout = 1 ; sentinelEvent(LL_WARNING,"+failover-end-for-timeout" ,master,"%@" ); } if (not_reconfigured == 0 ) { sentinelEvent(LL_WARNING,"+failover-end" ,master,"%@" ); master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG; master->failover_state_change_time = mstime(); } if (timeout) { dictIterator *di; dictEntry *de; di = dictGetIterator(master->slaves); while ((de = dictNext(di)) != NULL ) { sentinelRedisInstance *slave = dictGetVal(de); int retval; if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE|SRI_RECONF_SENT)) continue ; if (slave->link->disconnected) continue ; retval = sentinelSendSlaveOf(slave, master->promoted_slave->addr->ip, master->promoted_slave->addr->port); if (retval == C_OK) { sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be" ,slave,"%@" ); slave->flags |= SRI_RECONF_SENT; } } dictReleaseIterator(di); } }
sentinelHandleRedisInstance sentinelHandleRedisInstance
函数,主要有两个功能:
监视着ri
的状态
如果ri
是master,则还为其负责故障转移。
sentinelHandleRedisInstance
函数的传入参数,可能是master、slave及sentinel,但是只有对master实例执行故障转移。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 void sentinelHandleRedisInstance (sentinelRedisInstance *ri) { sentinelReconnectInstance(ri); sentinelSendPeriodicCommands(ri); if (sentinel.tilt) { if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return ; sentinel.tilt = 0 ; sentinelEvent(LL_WARNING,"-tilt" ,NULL ,"#tilt mode exited" ); } sentinelCheckSubjectivelyDown(ri); if (ri->flags & SRI_MASTER) { sentinelCheckObjectivelyDown(ri); if (sentinelStartFailoverIfNeeded(ri)) sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED); sentinelFailoverStateMachine(ri); sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS); } }