剖析REDIS的sentinel

replication中解决了主从同步的问题,sentinel解决两个问题

  • 当前各个主从服务器运行状态
  • 如果主服务器出现故障,怎么转移故障

监视状态

sentinelRedisInstance

每个sentinel实例都被封装成 sentinelRedisInstance d对象,每个sentinelRedisInstance创建后,sentinelRedisInstance要向监视的主服务器master发起连接请求,成为master的客户端。这里使用前面所述的 hiredis.c/redisAsyncContext 结构体来创建客户端。

每个sentinel都会向监视的主服务器master发起两个异步连接,即创建两个redisAsyncContext对象来和master进行通信:

  • 命令连接:即专门用于sentinel向master发送普通命令,并接受master对这些命令的reply
  • 订阅连接:即专门用于订阅主服务的 __sentinel__:hello 通道

sentinelRedisInstance内部结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
typedef struct sentinelRedisInstance {
int flags; /* 标志着 sentinelRedisInstance 的类型*/
char *name; /* 监视的master名字,格式是 IP:PORT */
char *runid; /* 这个sentinel的runid */
uint64_t config_epoch;
sentinelAddr *addr; /* master的ip地址 */
instanceLink *link; /* 与主服务的通信接口 */

// 下面是一些时间,用于判断所监视的master状态
mstime_t last_pub_time; /* 上一次向 __sentinel__:hello 发送数据的时间*/
mstime_t last_hello_time; /* flags==SRI_SENTINEL, 上次收到 hello 数据的时间*/
mstime_t last_master_down_reply_time; /* 上次回应 SENTINEL is-master-down 指令的时间 */
mstime_t s_down_since_time; /* master 的主观下线时间 */
mstime_t o_down_since_time; /* master 的客观下线时间 */
mstime_t down_after_period; /* 下线时间阈值:超过这个时间就认为master下线 */
mstime_t info_refresh; /* 接收到INFO指令的时间 */
dict *renamed_commands; /* sentinel 支持的指令 */

int role_reported;
mstime_t role_reported_time;
mstime_t slave_conf_change_time;

/* Master specific. */
dict *sentinels; /* 监视同一个master的所有sentinels */
dict *slaves; /* 所监视的master所属的slaves */

unsigned int quorum;/* Number of sentinels that need to agree on failure. */
int parallel_syncs; /* How many slaves to reconfigure at same time. */
char *auth_pass; /* Password to use for AUTH against master & replica. */
char *auth_user; /* Username for ACLs AUTH against master & replica. */

/* Slave specific. */
mstime_t master_link_down_time; /* Slave replication link down time. */
int slave_priority; /* Slave priority according to its INFO output. */
mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
struct sentinelRedisInstance *master; // slave对应的master
char *slave_master_host;
int slave_master_port;
int slave_master_link_status;
unsigned long long slave_repl_offset;

/* Failover */
char *leader; // 执行故障转移的 sentinel 的 runid
uint64_t leader_epoch; /* Epoch of the 'leader' field. */
uint64_t failover_epoch; /* Epoch of the currently started failover. */
int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
mstime_t failover_state_change_time;
mstime_t failover_start_time; /* Last failover attempt start time. */
mstime_t failover_timeout; /* Max time to refresh failover state. */
mstime_t failover_delay_logged;
struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */

/* Scripts executed to notify admin or reconfigure clients: when they
* are set to NULL no script is executed. */
char *notification_script;
char *client_reconfig_script;
sds info; /* cached INFO output */
} sentinelRedisInstance;

instanceLink

sentinel要向master发起连接,是通过instanceLink对象来完成。在instanceLink内部,封装了两个连接ccpc,用于和监视的主服务进行通信,同时设置了一些关于PINGPONG的字段来记录连接状态。而sentinelRedisInstance中时间字段监视的是主服务的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
typedef struct instanceLink {
int refcount;
int disconnected; /* 0表示没有断开,1表示断开 */
int pending_commands; /* 已发送且正等待答复的命令数。 */
// 两个连接
redisAsyncContext *cc; /* Hiredis context for commands. */
redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */

// 以下都是时间
mstime_t cc_conn_time; /* 普通指令建立连接时间. */
mstime_t pc_conn_time; /* 订阅指令建立连接的时间. */
mstime_t pc_last_activity; /* 最近一次接收到__sentinel__:hello通道数据的时间 */
mstime_t last_avail_time; /* 最近一次有效回应 PING 的时间 */
mstime_t act_ping_time; /* 上次发送PING但是无PONG回应的时间. */
mstime_t last_ping_time; /* 上次发送PING的时间*/
mstime_t last_pong_time; /* 上次回应PING的时间 */
mstime_t last_reconn_time; /* 上一次重新连接的时间 */
} instanceLink;

createInstanceLink

createInstanceLink函数用于创建一个instanceLink对象,其中初始化值如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
instanceLink* createInstanceLink(void) {
instanceLink *link = zmalloc(sizeof(*link));

link->refcount = 1;
link->disconnected = 1; // 默认是断开连接的
link->pending_commands = 0;
link->cc = NULL; // 未建立连接
link->pc = NULL;
link->cc_conn_time = 0;
link->pc_conn_time = 0;
link->last_reconn_time = 0; // 初始化为 0
link->pc_last_activity = 0;

link->act_ping_time = mstime(); // 这对于检测超时非常有用,以防我们根本无法连接到节点
link->last_ping_time = 0;
link->last_avail_time = mstime();
link->last_pong_time = mstime();
return link;
}

sentinelReconnectInstance

sentinelReconnectInstance函数,用于建立连接、断开后重新连接。sentinelRedisInstance类似于client,负责和对端通信。此时,乙方是server,对端被封装成sentinelRedisInstance对象ri,对端的类型ri->flags表征:

1
2
3
#define SRI_MASTER  (1<<0)		// 对端是 master
#define SRI_SLAVE (1<<1) // 对端是 slave
#define SRI_SENTINEL (1<<2) // 对端是 sentinel

sentinel服务器要和 (ri->adder->ip, ri->addr->port)建立指令连接link->cc和订阅连接link->pc。过程如下:

  1. 确认ri->link->disconnected ==1。只有link->cclink->pc有一个没有连接成功ri->link->disconnected ==1

  2. 调用redisAsyncConnectBind函数和 (ri->adder->ip, ri->addr->port)建立连接,获得redisAsyncContext对象
    注意: sentinel与另一个sentinel之间没有订阅连接link->pc

    1
    2
    3
    4
    // 只能与主从服务器之间连接
    if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
    //...
    }
  3. 成功建立连接后,调用redisAeAttach函数将link->cc加入到server.el事件循环中,让当server的事件循环也能处理 redisAsyncContext的可读可写事件

  4. 设置连接和断开连接的回调函数

  5. 对于link->pc,此次还需要订阅__sentinel__:hello通道

  6. link->cclink->pc成功连接后,才将link->disconnected 设置为0。但是如果对端ri也是个sentinel,那么就不需要建立link->pc,也能link->disconnected =0

    1
    2
    if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
    link->disconnected = 0;

整个过程详细如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
if (ri->link->disconnected == 0) return;
if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
instanceLink *link = ri->link;
mstime_t now = mstime();

// 1) 两次重连接时间必须不小于 SENTINEL_PING_PERIOD 毫秒
if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
ri->link->last_reconn_time = now; // 更改重新连接字段字段

// 2) 建立普通指令连接
if (link->cc == NULL) {
link->cc = redisAsyncConnectBind(ri->addr->ip,
ri->addr->port,
NET_FIRST_BIND_ADDR);
if (!link->cc->err &&
server.tls_replication &&
(instanceLinkNegotiateTLS(link->cc) == C_ERR))
{
// TLS连接错误
sentinelEvent(LL_DEBUG,
"-cmd-link-reconnection",
ri,
"%@ #Failed to initialize TLS");
instanceLinkCloseConnection(link,link->cc);
}
else if (link->cc->err) {
// 重连接错误
sentinelEvent(LL_DEBUG,
"-cmd-link-reconnection",
ri,
"%@ #%s",
link->cc->errstr);
// 断开连接
instanceLinkCloseConnection(link,link->cc);
}
else {
// 正常连接

link->pending_commands = 0;
link->cc_conn_time = mstime();
link->cc->data = link; // privdata
// 将 link->cc 加入 server.el 事件循环中
redisAeAttach(server.el,link->cc);
// 设置连接回调函数
redisAsyncSetConnectCallback(link->cc,sentinelLinkEstablishedCallback);
// 设置断开连接的回调函数
redisAsyncSetDisconnectCallback(link->cc,sentinelDisconnectCallback);
// 看是否有必要设置AUTH
sentinelSendAuthIfNeeded(ri,link->cc);
// 设置客户端名
sentinelSetClientName(ri,link->cc,"cmd");

// 连接上时,立即发送 PING
sentinelSendPing(ri);
}
}

// 3) 建立订阅连接 (sentinel和sentinel之间没有订阅)
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
// 如果这个实例 主、从服务器
link->pc = redisAsyncConnectBind(ri->addr->ip,
ri->addr->port,
NET_FIRST_BIND_ADDR);
if (!link->pc->err &&
server.tls_replication &&
(instanceLinkNegotiateTLS(link->pc) == C_ERR))
{
sentinelEvent(LL_DEBUG,
"-pubsub-link-reconnection",
ri,
"%@ #Failed to initialize TLS");
}
else if (link->pc->err) {
sentinelEvent(LL_DEBUG,
"-pubsub-link-reconnection",
ri,
"%@ #%s",
link->pc->errstr);
// 断开连接
instanceLinkCloseConnection(link,link->pc);
}
else {
// 成功建立订阅连接
int retval;

link->pc_conn_time = mstime();
link->pc->data = link;

// 将 link->pc 也加入 server.el 事件循环中
redisAeAttach(server.el, link->pc);
// 设置建立连接回调函数
redisAsyncSetConnectCallback(link->pc,sentinelLinkEstablishedCallback);
// 设置断开连接回调函数
redisAsyncSetDisconnectCallback(link->pc,sentinelDisconnectCallback);
// 是否需要密码认证
sentinelSendAuthIfNeeded(ri,link->pc);
// 设置客户端名字
sentinelSetClientName(ri,link->pc,"pubsub");
// 订阅 __sentinel__:hello 通道
retval = redisAsyncCommand(link->pc,
sentinelReceiveHelloMessages,
ri,
"%s %s",
sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
SENTINEL_HELLO_CHANNEL);
if (retval != C_OK) {
// 如果没有订阅成功,那么这个 link->pc 是没有用的
// 需要下次定时事件中再重新建立link->pc
instanceLinkCloseConnection(link,link->pc);
return;
}
}
}

// 当前 ri 标志的是sentinel,那么就
// 清除断开连接的状态
if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
link->disconnected = 0;
}

sentinelSetClientName

向对端发送 CLIENT SETNAME name指令,即告诉对端自己的名字:

  • 如果建立的连接是link-cc,则名字是sentinel-runid-cmd,其中runid是前8个字符。
  • 如果建立的连接是link-pc,则名字是sentinel-runid-pubsub

这个可以通过CLIENT LIST指令查看自己的客户端列表:

1
2
3
127.0.0.1:6379> CLIENT LIST
id=272 addr=127.0.0.1:11453 name=sentinel-e5d1f1a9-cmd // link-cc 其余参数省略...
id=450 addr=127.0.0.1:11687 name=szz // 这个是自己的客户端 其余参数省略...

对端对CLIENT SETNAME name指令的回复sentinel不在乎,因此设置回调函数是 sentinelDiscardReplyCallback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void sentinelSetClientName(sentinelRedisInstance *ri, redisAsyncContext *c, char *type) {
char name[64];

snprintf(name,sizeof(name),"sentinel-%.8s-%s", sentinel.myid, type);
if (redisAsyncCommand(c,
sentinelDiscardReplyCallback,
ri,
"%s SETNAME %s",
sentinelInstanceMapCommand(ri,"CLIENT"),
name) == C_OK)
{
ri->link->pending_commands++;
}
}

sentinelSendPing

在成功建立link->cc时,会向对端发送PING指令。redisAsyncCommand函数返回C_OK,则更新相关时间字段:

  • ri->link->last_ping_time:表示最近一次发送PING时间
  • ri->link->act_ping_time:表示当前上次发送PING后是否接受到对端回复PONG。如果回复了则此时ri->link->act_ping_time的值是0,并将 ri->link->act_ping_time设置为此时发送PING的时间,否则就保持上次发送PING的时间。

对于PING回复回调函数是 sentinelPingReplyCallback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int sentinelSendPing(sentinelRedisInstance *ri) {
// 发送 PING 请求
int retval = redisAsyncCommand(ri->link->cc,
sentinelPingReplyCallback,
ri,
"%s",
sentinelInstanceMapCommand(ri,"PING"));
if (retval == C_OK) {
ri->link->pending_commands++;
ri->link->last_ping_time = mstime(); // 更新最后一次发送 PING 时间

/* 仅当接受到 PING 的回复(即 PONG)时才再次更新 ri->link->act_ping_time,
* 否则 ri->link->act_ping_time 将一直以上次发送PING的时间
* 来阻塞等待PING的时间值
*/
if (ri->link->act_ping_time == 0)
ri->link->act_ping_time = ri->link->last_ping_time; // 更新发送PING时间
return 1;
}

return 0;
}

sentinelPingReplyCallback

对端对PING指令的有效恢复只有三种:

1
2
3
+PONG 		 // r->type == REDIS_REPLY_STATUS
-LOADING // r->type == REDIS_REPLY_ERROR
-MASTERDOWN // r->type == REDIS_REPLY_ERROR

因此在这三种情况下,是可以将link->act_ping_time设置为0,表示接受到对端的有效回复。如果对端回复的BUSY,表示对端忙于脚本任务,那么sentinel就向对端发送KILL SCRIPT 命令,杀死脚本,解析对端忙碌状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;

if (r->type == REDIS_REPLY_STATUS || r->type == REDIS_REPLY_ERROR) {
if (strncmp(r->str,"PONG",4) == 0 ||
strncmp(r->str,"LOADING",7) == 0 ||
strncmp(r->str,"MASTERDOWN",10) == 0)
{
// 不是错误
link->last_avail_time = mstime(); // 有效回复的时间
link->act_ping_time = 0; /* Flag the pong as received. */
}
else {
// 如果是因为一个耗时脚步导致实例主观下线,则 KILL SCRIPT
if (strncmp(r->str,"BUSY",4) == 0 &&
(ri->flags & SRI_S_DOWN) &&
!(ri->flags & SRI_SCRIPT_KILL_SENT))
{
if (redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback,
ri,
"%s KILL",
sentinelInstanceMapCommand(ri,"SCRIPT")) == C_OK)
{
ri->link->pending_commands++;
}
ri->flags |= SRI_SCRIPT_KILL_SENT;
}
}
}

link->last_pong_time = mstime(); // 最近一次的PONG时间
}

sentinelSendPeriodicCommands

既然sentinel要监视master,那是怎么监视的呢?

  1. sentinel定时的发送PING
  2. sentinel定时向监视的主服务器master及master的slaves发送INFO指令,查看他们的状态
  3. sentinel定时向__sentinel__:hello通道发送hello信息

通过这种方式来监视整个网络。

PING

在发送PING之前,会先计算下发送PING的频率。ri->down_after_period的默认值是由sentinel.conf中的down-after-milliseconds参数设置的,但是如果ri->down_after_period > SENTINEL_PING_PERIOD,则以SENTINEL_PING_PERIOD为频率发送,即每秒发送一次。

因此,PING指针默认是1s一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#define SENTINEL_PING_PERIOD 1000

void sentinelSendPeriodicCommands(redisAsyncContext* c, void* reply, void* privdata) {
//...
// 确定发送PING的频率
ping_period = ri->down_after_period;
if (ping_period > SENTINEL_PING_PERIOD)
ping_period = SENTINEL_PING_PERIOD;

// 发送 PING 给监视的master及其slave、所有其他 sentinels
if ((now - ri->link->last_pong_time) > ping_period && // 距离上次接受到pong的时间 > ping_period
(now - ri->link->last_ping_time) > ping_period/2) // 距离上次发送ping的时间要 > ping_period/2
{
sentinelSendPing(ri);
}
//...
}

INFO

sentinel默认是10s发送一次INFO命令。但如果ri是从服务器,那么有两种情况会加快发送INFO命令的频率:

  • ri->master 处于 O_DOWN 状态:即r->master处于主观下线状态,那么需要加快发送 INFO 的频率,来密切监视ri的状态,因为它有可能被另一个sentinel从slave变成master
  • ri下线了,那么也需要加快INFO的频率,来获得一个更加准确的重新连接时间

INFO指令的回调函数sentinelInfoReplyCallback,在回调函数中对其监视的服务器进行分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void sentinelSendPeriodicCommands(sentinelRedisInstance* ri) { 
//...
// 发送频率
if ((ri->flags & SRI_SLAVE) &&
((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
(ri->master_link_down_time != 0)))
{
info_period = 1000; // 1 s
}
else
{
info_period = SENTINEL_INFO_PERIOD; // 10 s
}

// sentinel 发送 INFO 给 master及master的slave
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period)) // 两次时间差:默认10s,主服务器故障时为1s
{
retval = redisAsyncCommand(ri->link->cc,
sentinelInfoReplyCallback,
ri,
"%s",
sentinelInstanceMapCommand(ri,"INFO"));
if (retval == C_OK)
ri->link->pending_commands++;
}
//...
}

sentinelInfoReplyCallback

INFO指令的回复回调函数中,会调用sentinelRefreshInstanceInfo函数,对当前主从服务器的状态进行判断:

  • 在INFO指令的回应中,发现了监视的master有新的从服务器后,sentient也会和slaves之间建立命令和订阅两个连接。下次sentinel的INFO命令将发送给master及其slaves
  • master是否发送故障,那么就需要故障转移

sentinelRefreshInstanceInfo函数留到后面章节讲述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void sentinelInfoReplyCallback(redisAsyncContext* c, void* reply, void* privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;
// reply已解析好, link也没有断开
if (!reply || !link) return;

link->pending_commands--;
r = reply;

// 对于INFO指令的回复类型应该是字符串
if (r->type == REDIS_REPLY_STRING)
sentinelRefreshInstanceInfo(ri,r->str);
}

PUB/SUB

默认情况下,sentinel每2s一次向监视的主服务器及其从服务器的link->cc发送PUBLISH指令,格式如下:

1
PUBLISH __sentinel__:hello  "s_ip,s_port,s_runid,s_epoch,m_name,m_ip,m_port,m_epoch" 

即,向各个服务器的__sentinel__:hellow通道发送了一条信息,那么订阅这个通道的所有服务器都会接受到这条信息。

1
2
3
4
5
6
7
8
#define SENTINEL_PUBLISH_PERIOD 2000
void sentinelInfoReplyCallback(redisAsyncContext* c, void* reply, void* privdata) {
//...
// 向master、master的slave,发送Hello
if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
sentinelSendHello(ri);
}
}

sentinelSendHello

sentinelSendHello函数,主要就是格式化待发送数据,然后通过link->cc发送PUBLISH指令。设置回调函数sentinelPublishReplyCallback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int sentinelSendHello(sentinelRedisInstance *ri) {
char ip[NET_IP_STR_LEN];
char payload[NET_IP_STR_LEN+1024];
int retval;
char *announce_ip;
int announce_port;
// 当前实例的主服务器
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
// 主服务器的地址
sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);

// 已经断开连接,则return
if (ri->link->disconnected) return C_ERR;

// 获取当前sentinel的信息
if (sentinel.announce_ip) {
announce_ip = sentinel.announce_ip;
}
else {
if (anetSockName(ri->link->cc->c.fd, ip, sizeof(ip),NULL) == -1)
return C_ERR;
announce_ip = ip;
}

if (sentinel.announce_port) announce_port = sentinel.announce_port;
else if (server.tls_replication && server.tls_port) announce_port = server.tls_port;
else announce_port = server.port; // sentinel : 26379

// 格式化待发送的信息
snprintf(payload,
sizeof(payload),
"%s,%d,%s,%llu," /* 当前 sentinel 的信息 */
"%s,%s,%d,%llu", /* 当前 sentinel 监视的 master信息 */
announce_ip,
announce_port,
sentinel.myid,
(unsigned long long) sentinel.current_epoch,
master->name,
master_addr->ip,
master_addr->port,
(unsigned long long) master->config_epoch);
// 通过 link->cc 通道发布 PUBLISH __sentinel__:hello XXX
retval = redisAsyncCommand(ri->link->cc,
sentinelPublishReplyCallback,
ri,
"%s %s %s",
sentinelInstanceMapCommand(ri,"PUBLISH"),
SENTINEL_HELLO_CHANNEL, // __sentinel__:hello
payload);
if (retval != C_OK) return C_ERR;
ri->link->pending_commands++;
return C_OK;
}

sentinelPublishReplyCallback

sentinel在接收到对端对于hello的回应后,主要更新下 ri->last_pub_time时间。

1
2
3
4
5
6
7
8
9
10
11
12
void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;

if (r->type != REDIS_REPLY_ERROR)
ri->last_pub_time = mstime();
}

sentinelReceiveHelloMessages

那么其他sentinel接收到hello通道的信息怎么处理???

sentinelReconnectInstance函数中,创建sentinelRedisInstance实例时,就已订阅了hello通道,并设置回调函数为sentinelReceiveHelloMessages函数。当其他sentinelRedisInstance实例向__sentinel__:hello通道发送消息时,就能接受到如下格式的数据:

1
2
3
1) "message"
2) __sentinel__:hello
3) "s_ip,s_port,s_runid,s_epoch,m_name,m_ip,m_port,m_epoch"

sentinelReceiveHelloMessages函数,主要完成以下几个任务:

  • 更新link->pc_last_activity 时间后
  • 判断接受到数据格式正确
  • hello通道的数据不是自己发送
  • 如果以上都满足,才调用 sentinelProcessHelloMessage 函数来处理hello通道传递来的数据。

整个流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
redisReply *r;
UNUSED(c);

if (!reply || !ri) return;
r = reply;

ri->link->pc_last_activity = mstime(); // 更新时间
// 类型检查
if (r->type != REDIS_REPLY_ARRAY ||
r->elements != 3 ||
r->element[0]->type != REDIS_REPLY_STRING ||
r->element[1]->type != REDIS_REPLY_STRING ||
r->element[2]->type != REDIS_REPLY_STRING ||
strcmp(r->element[0]->str,"message") != 0) return;

// 如果能找到自己id,则消息就是自己发送的,则忽略
if (strstr(r->element[2]->str, sentinel.myid) != NULL) return;

// 处理来自其他sentinel的消息
sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);
}

故障转移

sentinelCheckSubjectivelyDown

当前sentinel会调用sentinelCheckSubjectivelyDown函数, 以1ms的频率来检测监视的服务器是否下线。ri可能是slave也可能是个master。

  1. 如果当前sentinel之前发送的PING有没有PONG回应,计算距离上次该PING的时间。否则计算距离最近一次收到PONG的时间elapsed

    1
    2
    3
    4
    5
    mstime_t last_avail_time; /*最近一次接收到PING的回复(PONG)的时间 */
    // act_ping_time 记录上次发出的 PING 是否有回应
    // 有,则 act_ping_time ==0,接收PONG的时间由 last_avail_time 记录
    // 没有,则 act_ping_time 的值就是上次发送PING的时间
    mstime_t act_ping_time;
  2. 是否有必要关闭link->cc:如果sentinel和对端建立link->cc的时间超过了SENTINEL_MIN_LINK_RECONNECT_PERIOD,说明没有接受到PONG不是因为刚刚建立、网络不稳导致的。那么发送的PING在down_after_period时段内没有回应,那么对端可能发生故障,直接关闭与对端的连接。

  3. 是否有必要关闭link->pc:处理__sentinel__:hello超时,也关闭link->pc连接

  4. 判断监视的服务器ri是否主观下线:

    • 如果没有回复PING的时间超过ri->down_after_period,就视为主观下线,加上ri->flags |= SRI_S_DOWN标志
    • 如果ri是主服务器,但在INFO的回应中ri被报告为slave,发生了主从切换,也视为主观下线

    如果不满足上面两个中的任何一条件,则不视为主观下线,如果之前设置为主观下线,现在取消SRI_S_DOWN标志位。

下一步就是判断是否真的下线了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
mstime_t elapsed = 0;

// 1)
if (ri->link->act_ping_time)
elapsed = mstime() - ri->link->act_ping_time; //
else if (ri->link->disconnected)
elapsed = mstime() - ri->link->last_avail_time;

// 2) 是否有必要关闭不活跃的命令连接 link->cc
if (ri->link->cc &&
(mstime() - ri->link->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
ri->link->act_ping_time != 0 && // 对端一致没有回应PING
(mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) && // 发出去的PING
(mstime() - ri->link->last_pong_time) > (ri->down_after_period/2)) // 没有收到回应
{
instanceLinkCloseConnection(ri->link,ri->link->cc); // 关闭连接
}

// 3) 是否有必要关闭不活跃的订阅连接 link->pc
if (ri->link->pc &&
(mstime() - ri->link->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
(mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3)) // 处理hello的时间超时
{
instanceLinkCloseConnection(ri->link,ri->link->pc); // 关闭连接
}

/* 导致sentinel认为master主观下线的条件,有两种可能:
* 1) elapsed > ri->down_after_period ,即长时间没有回复
* 2)ri之前是master,现在变成slave,并且时间较长
*
* 不满足两者之一,则不是主观下线 */
if (elapsed > ri->down_after_period || // 不是主服务器类型
(ri->flags & SRI_MASTER &&
ri->role_reported == SRI_SLAVE &&
mstime() - ri->role_reported_time > (ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
{
// 加上标志位 SRI_S_DOWN,设置为主观下线
if ((ri->flags & SRI_S_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
ri->s_down_since_time = mstime();
ri->flags |= SRI_S_DOWN;
}
}
else {
// 取消主观下线标志位 SRI_S_DOWN
if (ri->flags & SRI_S_DOWN) {
sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
}
}
}

sentinelCheckObjectivelyDown

sentinelCheckObjectivelyDown 函数,仅对主服务器适用,进一步判断是不是真的下线了。

  1. 监视主服务master的其他sentinel中master->sentinel,超过master->quorum个也认为master下线了,则当前sentinel将master视为真的下线了,即客观下线了
  2. 客观下线,加上SRI_O_DOWN标志,否则取消该标志

详细过程如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
unsigned int quorum = 0, odown = 0;

if (master->flags & SRI_S_DOWN) {
quorum = 1; // 自己sentinel认为master已经下线
// 下面要计算 maseter->sentinel 中也认为master下线sentinel个数
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
// 下线
if (ri->flags & SRI_MASTER_DOWN) quorum++;
}
dictReleaseIterator(di);
// 超过阈值 master->quorum,则判断master处于客观下线
if (quorum >= master->quorum) odown = 1;
}

// 如果真的下线了,则加上 SRI_O_DOWN 标志位
if (odown) {
// 加上 SRI_O_DOWN 标志
if ((master->flags & SRI_O_DOWN) == 0) {
sentinelEvent(LL_WARNING,
"+odown",
master,
"%@ #quorum %d/%d",
quorum,
master->quorum);
master->flags |= SRI_O_DOWN;
master->o_down_since_time = mstime();
}
} else {
// 否则取消 SRI_O_DOWN 标志
if (master->flags & SRI_O_DOWN) {
sentinelEvent(LL_WARNING,"-odown",master,"%@");
master->flags &= ~SRI_O_DOWN;
}
}
}

sentinelStartFailoverIfNeeded

sentinelStartFailoverIfNeeded函数,判断主服务master是否需要转故障转移:

  1. master必须处于SRI_O_DOWN状态,即已客观下线
  2. maste不能已经处于故障转移中,
  3. master距离上次故障转移的时间不能太短,太短则在上次的启动时间后延迟 master->failover_timeout*2

如果上述条件都满足,则开启故障转移

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
// 客观下线
if (!(master->flags & SRI_O_DOWN)) return 0;

// 没有正在处理故障转移
if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;

/* Last attempt started too little time ago? */
// 距离上次故障转移时间要大于 master->failover_timeout*2 才可以
// 否则,将此作为下次故障转移的时间
if (mstime() - master->failover_start_time < master->failover_timeout*2) {
if (master->failover_delay_logged != master->failover_start_time) {
time_t clock = (master->failover_start_time + master->failover_timeout*2) / 1000;
char ctimebuf[26];

ctime_r(&clock, ctimebuf);
ctimebuf[24] = '\0'; /* Remove newline. */
master->failover_delay_logged = master->failover_start_time;
serverLog(LL_WARNING,
"Next failover delay: I will not start a failover before %s",
ctimebuf);
}
return 0;
}

// 那么可以进行故障转移
sentinelStartFailover(master);
return 1;
}

sentinelAskMasterStateToOtherSentinels

当监视的主服务发生故障,那么sentinel要从slaves中选出一个提升为主服务器,被选中的slave叫做promoted_slave。但是在此之前,要先投票决定由哪个sentinel来执行这个过程。

sentinelAskMasterStateToOtherSentinels函数, 当sentinel向master->sentinels中其他sentinels发送如下信息:

1
SENTINEL is-master-down-by-addr  master_ip master_port sentinel.current_epoch sentinel.runid

上述信息:告诉接受到此信息的dst_sentinel,地址是 (master_ip, master_port) 的主服务宕机了,请把(s_epoch, s_runid)的sentinel设置为leader,由这个leader来完成故障转移的过程。

在发送该指令之前,先判断是否能发送:

  1. master必须处于客观下线状态
  2. 当前sentinel与master->sentinels之间的连接link->cc没有断开
  3. 当前sentinel与master->sentinels之间,没有设置SENTINEL_ASK_FORCED标志 或者 距离上次此指令的回复时间大于SENTINEL_ASK_PERIOD

同时满足上面的条件,才能发送此次的is-master-down-by-addr 指令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#define SENTINEL_ASK_FORCED (1<<0)
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance* master, int flags) {
dictIterator *di;
dictEntry *de;

di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance* ri = dictGetVal(de);
// 其他sentinel实例 ri 距离上次回应 SENTINEL is-master-down 指令的时间
mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
char port[32];
int retval;

// 如果时间太久,则清除之前关于master的状态,以及当时选择的leader
if (elapsed > SENTINEL_ASK_PERIOD*5) {
ri->flags &= ~SRI_MASTER_DOWN;
sdsfree(ri->leader);
ri->leader = NULL;
}
// 判断能不能发送
if ((master->flags & SRI_S_DOWN) == 0)
continue;
if (ri->link->disconnected)
continue;
if (!(flags & SENTINEL_ASK_FORCED) &&
mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
continue;

/* Ask */
ll2string(port,sizeof(port),master->addr->port);
// 向监视master的其他sentinel,发送指令 :
// SENTINEL is-master-down-by-addr m_ip m_port s_epoch s_runid
retval = redisAsyncCommand(ri->link->cc,
sentinelReceiveIsMasterDownReply,
ri,
"%s is-master-down-by-addr %s %s %llu %s",
sentinelInstanceMapCommand(ri,"SENTINEL"),
master->addr->ip,
port,
sentinel.current_epoch,
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
sentinel.myid :
"*");
if (retval == C_OK)
ri->link->pending_commands++;
}
dictReleaseIterator(di);
}

master->sentinel中的sentinel在收到指令后:

1
SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>

先在自己的sentinel.masters中查找是否有地址为 (ip, port)的主服务器ri,并且该 ri 也确实处于SRI_S_DOWN状态,则回复消息。不过回复有三行数据

1
2
3
sdown   	// 接受到指令的sentinel中master是否也主观下线
leader // 选出的leader
epoch // 该leader的epoch
  • 如果IS-MASTER-DOWN-BY-ADDR指令中的runid ==*,则发送指令的src_sentinel不要求接受到此消息的dst_sentinel投自己票,只想知道在dst_sentinelmaster的是否也SRI_S_DOWN状态。此时回复如下:

    1
    2
    3
    sdown	// 是否下线
    *
    0
  • rund != *:则说明src_dentinel希望自己成为leader来完成master的故障转移过程

    1
    2
    3
    sdown
    runid // vote_runid
    epoch // vote_epoch

其他sentinel在接受到 IS-MASTER-DOWN-BY-ADDR后,处理过程如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
 if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
/* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid> */
sentinelRedisInstance *ri;
long long req_epoch;
uint64_t leader_epoch = 0;
char *leader = NULL;
long port;
int isdown = 0;

if (c->argc != 6) goto numargserr;
if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != C_OK ||
getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL) != C_OK)
return;

// 在 sentinel.master 中查找地址是 (m_ip, m_port) 的主服务器
ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
c->argv[2]->ptr, // m_ip
port, // m_port
NULL);
// 如果存在,
// 并且也是处于 SRI_S_DOWN 状态,即主观下线状态,
// 设置 isdown = 1;
if (!sentinel.tilt &&
ri &&
(ri->flags & SRI_S_DOWN) &&
(ri->flags & SRI_MASTER))
isdown = 1;

// 如果指令参数中 s_runid 不是 '*',尝试将 s_runid 的sentinel 设置为leader
if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
leader = sentinelVoteLeader(ri,
(uint64_t)req_epoch, // req_epoch
c->argv[5]->ptr, // runid
&leader_epoch); //
}

// 回复客户端
addReplyArrayLen(c,3);
addReply(c, isdown ? shared.cone : shared.czero);
addReplyBulkCString(c, leader ? leader : "*");
addReplyLongLong(c, (long long)leader_epoch);
if (leader) sdsfree(leader);
}

sentinelReceiveIsMasterDownReply

sentinelReceiveIsMasterDownReply函数,是src_sentinel发送 IS-MASTER-DOWN-BY-ADDR 指令的回复回调函数。当接收到对端对IS-MASTER-DOWN-BY-ADDR指令回复后调用。

IS-MASTER-DOWN-BY-ADDR指令的回复中,如果runid != *,则runiddst_sentinel选出来的leader,那么就将dst_sentinel的leader更改为runid

注意: sentinelReceiveIsMasterDownReply函数中的 privdata 就是dst_sentinel,因此在接收到dst_sentinel的回复后,对dst_sentinel的参数做一些修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void sentinelReceiveIsMasterDownReply(redisAsyncContext* c, void* reply, void* privdata) {
sentinelRedisInstance *ri = privdata; // master->sentinels 中的实例对象
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;

// 回复格式
if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
r->element[0]->type == REDIS_REPLY_INTEGER &&
r->element[1]->type == REDIS_REPLY_STRING &&
r->element[2]->type == REDIS_REPLY_INTEGER)
{
// 更新is-master-down-by-addr 指令的回复时间
ri->last_master_down_reply_time = mstime();

if (r->element[0]->integer == 1)
ri->flags |= SRI_MASTER_DOWN; // 加上 SRI_MASTER_DOWN 标志,同意主服务下线
else
ri->flags &= ~SRI_MASTER_DOWN;

// 如果 runid 回复不是 '*',表示选出的局部 leader
// 将当前主服务的 ri 设置为返回的 leader
if (strcmp(r->element[1]->str,"*")) {
sdsfree(ri->leader); // 释放自己的原 leader
if ((long long)ri->leader_epoch != r->element[2]->integer)
serverLog(LL_WARNING,
"%s voted for %s %llu",
ri->name,
r->element[1]->str,
(unsigned long long) r->element[2]->integer);
// 更替dst_sentinel的leader
ri->leader = sdsnew(r->element[1]->str);
ri->leader_epoch = r->element[2]->integer;
}
}
}

sentinelVoteLeader

下面来讲述下,选择leader的过程。

要能成为leader,必须要使得req_epoch > sentinel.current_epoch,因为当前sentinel也有可能负责master的故障转移。这个条件都不满足,则不可能成为leader。

此外,还需要大于master->leader_epoch。如果都满足则将req_runid设置为masterleader,后期负责master的故障转移。src_sentinel在接受到 add回复后,在sentinelReceiveIsMasterDownReply函数中,还会将req_runid设置为dst_Sentinel的leader。

如果不满足,则在sentinelReceiveIsMasterDownReply函数中,dst_sentinelleadermaster->leader

其中,req_runidreq_epochsrc_sentinel的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
char* sentinelVoteLeader(sentinelRedisInstance* master, 
uint64_t req_epoch,
char* req_runid,
uint64_t* leader_epoch)
{
// src_sentinel.epoch > dst_sentinel.epoch
// 则更新当前 dst_sentinel.epoch
// !!! 每次接收到这个指令,满足这个条件都会更改一次
if (req_epoch > sentinel.current_epoch) {
sentinel.current_epoch = req_epoch;
sentinelFlushConfig(); // 重写当前sentinel的配置文件
sentinelEvent(LL_WARNING,
"+new-epoch",
master,
"%llu",
(unsigned long long) sentinel.current_epoch);
}

// 是否要更新负责对 master 进行故障转移的 leader
// 条件是:req_epoch 要大于 master->leader_epoch
if (req_epoch > master->leader_epoch && req_epoch >= sentinel.current_epoch) {
sdsfree(master->leader);
// 那么就将master->leader更改为 src_sentinel
// 后期由 src_sentinel 来完成故障转移
master->leader = sdsnew(req_runid);
master->leader_epoch = sentinel.current_epoch;
sentinelFlushConfig();
sentinelEvent(LL_WARNING,
"+vote-for-leader",
master,
"%s %llu",
master->leader,
(unsigned long long) master->leader_epoch);

/* If we did not voted for ourselves, set the master failover start
* time to now, in order to force a delay before we can start a
* failover for the same master. */
if (strcasecmp(master->leader, sentinel.myid))
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
}

*leader_epoch = master->leader_epoch;
return master->leader ? sdsnew(master->leader) : NULL;
}

sentinelFailoverStateMachine

sentinelFailoverStateMachine函数, 是用于控制故障转移流程。

故障转移的状态如下, SENTINEL_FAILOVER_STATE_WAIT_STARTSENTINEL_FAILOVER_STATE_UPDATE_CONFIG

1
2
3
4
5
6
7
8
/* Failover machine different states. */
#define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
#define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
#define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
#define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
#define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 6 /* Monitor promoted slave. */

下面按照ri->failover_state状态的流程来介绍故障转移的流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_MASTER);

// 此时应该处于故障转移中
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;

// 故障转移的状态
switch(ri->failover_state) {
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
sentinelFailoverSelectSlave(ri);
break;
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
sentinelFailoverSendSlaveOfNoOne(ri);
break;
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
sentinelFailoverWaitPromotion(ri);
break;
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
sentinelFailoverReconfNextSlave(ri);
break;
}
}

SENTINEL_FAILOVER_STATE_WAIT_START

状态是SENTINEL_FAILOVER_STATE_WAIT_START时,由 sentinelFailoverWaitStart函数转移到下一阶段。流程如下:

  1. 在正式的故障转移之前,要先从当sentinel及master->sentinels中选出一个leader,负责master的故障转移
  2. 判断是否满足开启故障转移的条件
  3. 满足,则进入故障转移的下一阶段SENTINEL_FAILOVER_STATE_SELECT_SLAVE

这个阶段主要任务就是选出leader。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
char *leader;
int isleader;
// 1) 选出leader
leader = sentinelGetLeader(ri, ri->failover_epoch);
// 当前sentinel是leader?
isleader = leader && strcasecmp(leader,sentinel.myid) == 0;
sdsfree(leader);

/* 如果当前 sentinel 既不是leader, 也没有 SRI_FORCE_FAILOVER 字段,
那么自己不需要执行故障转移,
将故障转移任务留给leader执行 */
if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
// 在 SENTINEL_ELECTION_TIMEOUT 和 ri->failover_timeout 之间取最小值
int election_timeout = SENTINEL_ELECTION_TIMEOUT;

if (election_timeout > ri->failover_timeout)
election_timeout = ri->failover_timeout;

// 既然自己不是leader,超过选举时间,则应该中断自己的故障转移行为
if (mstime() - ri->failover_start_time > election_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}

/** 成为leader的sentinel, 需要负责故障转移 ***/

sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@");
if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION)
sentinelSimFailureCrash();
// 故障转移进入下一个状态
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
// 更新故障转移状态改变时间
ri->failover_state_change_time = mstime();
sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
}

SENTINEL_FAILOVER_STATE_SELECT_SLAVE

调用sentinelSelectSlave函数,从master->slaves中选出promoted_slave,为其加上SRI_PROMOTED标志位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
sentinelRedisInstance *slave = sentinelSelectSlave(ri);

if (slave == NULL) {
sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@");
sentinelAbortFailover(ri);
} else {
sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@");
slave->flags |= SRI_PROMOTED;
ri->promoted_slave = slave; // 被提升为主服务的slave
// 更新状态及时间
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
ri->failover_state_change_time = mstime();
sentinelEvent(LL_NOTICE,
"+failover-state-send-slaveof-noone",
slave,
"%@");
}
}

SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE

选出promoted_slave之后,需要将promoted_slave变成master,因此要对其执行 SLAVEOF NO ONE命令。不过此处发送SLAVEOF命令的函数是 sentinelSendSlaveOf,不需要设置回调函数。至于具体promoted_slave是否成功变成master,通过INFO信息的回调函数查看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
int retval;

/* 如果 sentinel 与 promoted_slave 断开了连接,则一直阻塞到故障转移超时时间。
* 超时还没重新连接,则中止故障转移
*/
if (ri->promoted_slave->link->disconnected) {
// 超时则中断
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}
// 发送SLAVEOF NO ONE
retval = sentinelSendSlaveOf(ri->promoted_slave, NULL, 0);
if (retval != C_OK) return;
sentinelEvent(LL_NOTICE,
"+failover-state-wait-promotion",
ri->promoted_slave,
"%@");
// 更改状态,更新时间
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
ri->failover_state_change_time = mstime();
}

SENTINEL_FAILOVER_STATE_WAIT_PROMOTION

sentinelFailoverWaitPromotion 函数用于leader在ri->failover_timeout时间内是否将promoted_slave提升为master。如果,超时则中断故障转移。

1
2
3
4
5
6
7
8
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
/* Just handle the timeout. Switching to the next state is handled
* by the function parsing the INFO command of the promoted slave. */
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
}

SENTINEL_FAILOVER_STATE_RECONF_SLAVES

SENTINEL_FAILOVER_STATE_WAIT_PROMOTION状态转移至SENTINEL_FAILOVER_STATE_RECONF_SLAVES 是在INFO的回复回调函数中完成的。

promoted_slave变成master,那么剩下的就是将master->slaves中的其他slaves变成promoted_slave的slave。即更改master->slaves的主服务器。因此要再次执行SLAVEOF命令。

1
2
3
4
#define SRI_PROMOTED (1<<7)         /* 表示 promoted_slave*/
#define SRI_RECONF_SENT (1<<8) /* 已向该slave发送了 SLAVEOF <promoted_slave> . */
#define SRI_RECONF_INPROG (1<<9) /* slave正在与promoted_slave进行同步 */
#define SRI_RECONF_DONE (1<<10) /* slave与promoted_slave的同步行为已经完成. */
  • 对于超时的slave强制设置为 SRI_RECONF_DONE 状态
  • 对于未完成的slave的发送SLAVEOF 命令,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
int not_reconfigured = 0, timeout = 0;
dictIterator *di;
dictEntry *de;
mstime_t elapsed = mstime() - master->failover_state_change_time; // 距离上次故障转移状态改变的时间

/* 如果当前 promoted_slave 的状态还没完成,或者 promoted_slave 已经下线,直接返回*/
if (master->promoted_slave == NULL ||
master->promoted_slave->flags & SRI_S_DOWN) return;

// 计算切换未完成slave的个数
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
// 忽略 promoted_slave 、切换完成的slave
if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE))
continue;
if (slave->flags & SRI_S_DOWN)
continue;
// 未配置完成
not_reconfigured++;
}
dictReleaseIterator(di);

// 故障转移已超时,强制结束这个过程
if (elapsed > master->failover_timeout) {
not_reconfigured = 0;
timeout = 1;
sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@");
}

if (not_reconfigured == 0) {
sentinelEvent(LL_WARNING,"+failover-end",master,"%@");
// 更新状态、时间
master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
master->failover_state_change_time = mstime();
}

/* 作为leader的 sentinel 向没有成功将主服务器切换到 promoted_slave 的slave
* 发送 SLAVOF 命令 */
if (timeout) {
dictIterator *di;
dictEntry *de;

di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
int retval;

// 忽略promoted_slave、成功配置的以及断开连接的slave
if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE|SRI_RECONF_SENT))
continue;
// 忽略断开连接的
if (slave->link->disconnected)
continue;
// 向切换失败的slave发送 SLAVEOF 指令
retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
if (retval == C_OK) {
sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@");
slave->flags |= SRI_RECONF_SENT;
}
}
dictReleaseIterator(di);
}
}

sentinelHandleRedisInstance

sentinelHandleRedisInstance 函数,主要有两个功能:

  • 监视着ri的状态
  • 如果ri是master,则还为其负责故障转移。

sentinelHandleRedisInstance函数的传入参数,可能是master、slave及sentinel,但是只有对master实例执行故障转移。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
/* ========== 监视行为 ============ */
// 对于每种实例(master, slave, sentinels)
sentinelReconnectInstance(ri); // sentinel和ri之间建立连接
sentinelSendPeriodicCommands(ri); // 发送周期指令

if (sentinel.tilt) {
if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
sentinel.tilt = 0;
sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
}
// 对于每种实例(master, slave, sentinels),检测是否主观下线
sentinelCheckSubjectivelyDown(ri);

/* ========== 故障转移行为 ============ */
// 仅针对master实例
if (ri->flags & SRI_MASTER) {
// 是否客观下线
sentinelCheckObjectivelyDown(ri);
// 是否需要开始故障转移。如果需要,则返回1
if (sentinelStartFailoverIfNeeded(ri))
// 向ri->sentinels 发送 'SENTINEL IS-MASTER-DOWN-BY-ADDR' 指令
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
// 控制故障转移流程
sentinelFailoverStateMachine(ri);
// 再发送一次
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}