redis源码阅读-之哨兵流程

2022/01/28 1409点热度 0人点赞 0条评论

哨兵

redis sentinel用于管理多个redis实例,是redis高可用的解决方案之一,其本身也是分布式架构。

哨兵本身是监听者身份,没有存储功能,哨兵的服务角色及交互

  • 哨兵与主服务

  • 哨兵与从服务

  • 哨兵与哨兵

哨兵的功能

  • 集群监控 检查对应的主从集群是否正常运行(心跳机制)

  • 消息通知 同步sentinel和其他redis的相关信息(特别是某个服务出现问题时)

  • 故障转移 当主从结构中主节点故障了,如果判断为客观下线,哨兵会发起故障转移,保证服务的高可用

  • 配置中心 给客户端提供最新的master地址

名称解析

  • 主观下线sdown(Subjectively Down):哨兵中的master实例,检测到自己的链接断了,就主观认为下线了

  • 客观下线odown(Objectively Down):其他监测该节点的哨兵也认为该节点断了,就是客观下线;

  • quorum: 如果sentinel集群中有quorum个哨兵认为master节点宕机了,就客观的认为master宕机了

  • majority:如果有majority个哨兵同意进行故障转移,才会选择出来一个新的master节点来转移

启动哨兵

# redis-sentinel程序启动
redis-sentinel sentinel.conf
# redis-server程序启动
redis-server sentinel.conf --sentinel

配置哨兵

# monitor一个名为mymaster的主服务器,这个服务故障至少需要2个哨兵同意
sentinel monitor mymaster 127.0.0.1 6379 2
#Sentinel 认为服务器已经断线所需的毫秒数
sentinel down-after-milliseconds mymaster 60000
# 故障转移超时时间
sentinel failover-timeout mymaster 180000
# 在故障转移期间,最多可以有多少个从服务器同时对新的主服务器进行同步
sentinel parallel-syncs mymaster 1

sentinel monitor resque 192.168.1.3 6380 4
sentinel down-after-milliseconds resque 10000
sentinel failover-timeout resque 180000
sentinel parallel-syncs resque 5

核心代码

int main(int argc, char **argv) {
    //哨兵模式
    server.sentinel_mode = checkForSentinelMode(argc,argv);
    initServerConfig();
    //哨兵模式的情况
    if (server.sentinel_mode) {
        //初始化哨兵配置
        initSentinelConfig();
        //初始化哨兵命令和哨兵配置
        initSentinel();
    }
    if (argc >= 2) {
        //将配置文件的内容填充到server中,覆盖初始化变量
        loadServerConfig(configfile,options);
    }
    initServer();
    if (!server.sentinel_mode) {
        //非哨兵模式
    }else{
        InitServerLast();
        sentinelIsRunning();
    }     
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
    aeMain(server.el);
}

void initServer(void) {
    /**
     * tcp socket监听
     */
    if (server.port != 0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);   
    /**
     * @brief 创建时间处理器,并将serverCron放入处理器里(重要)
     * 在这里创建了aeTimeEvent并扔给了eventLoop->timeEventHead
     */
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }       
    /**
     * @brief 重点 ##########
     * 监听多少个tcp就创建多少个
     */
    for (j = 0; j < server.ipfd_count; j++) {
        //将acceptTcpHandler 放入文件监听器里,
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
}

/**
 * 在initServer中添加的时间事件 serverCron中
 */
void sentinelTimer(void) {
    //检查TILT条件
    sentinelCheckTiltCondition();
    sentinelHandleDictOfRedisInstances(sentinel.masters);
    sentinelRunPendingScripts();
    sentinelCollectTerminatedScripts();
    sentinelKillTimedoutScripts();

    /* We continuously change the frequency of the Redis "timer interrupt"
     * in order to desynchronize every Sentinel from every other.
     * This non-determinism avoids that Sentinels started at the same time
     * exactly continue to stay synchronized asking to be voted at the
     * same time again and again (resulting in nobody likely winning the
     * election because of split brain voting). */
    /**
     * 通过随机数,动态调整哨兵的刷新频率
     * 这样可以确保所有的哨兵不在同一个时间点触发,在投票是就会有一定的时间差,尽可能避免同一时间不能投出主节点
     */
    server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
}

ae.c中

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
     //只要没有停止,就循环执行,这个是主线程
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            //每次循环前执行beforesleep
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

int aeProcessEvents(aeEventLoop *eventLoop, int flags){
 if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);    
}
//时间处理器
static int processTimeEvents(aeEventLoop *eventLoop) {
    //遍历所有的时间处理器,在initServer里注册了serverCron
    e = eventLoop->timeEventHead;  
    while(te) {
         retval = te->timeProc(eventLoop, id, te->clientData);
         te = te->next;
    }    
}

/**
 * @brief 时间事件执行
 * @param eventLoop fd
 * @param id  fd
 * @param clientData 
 * @return int 
 */
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    //哨兵模式执行
    if (server.sentinel_mode) sentinelTimer();    
}

在config.c中

//文件解析
void loadServerConfig(char *filename, char *options) {
    loadServerConfigFromString(config);
}
void loadServerConfigFromString(char *config) {
    //一行行的解析
    for (i = 0; i < totlines; i++) {
        else if (!strcasecmp(argv[0],"sentinel")) {
             err = sentinelHandleConfiguration(argv+1,argc-1);
        }
    }    
}

sentinel.c中

/**
 * 哨兵配置解析(一行行的被循环调用)
 * @param argv
 * @param argc
 * @return
 */
char *sentinelHandleConfiguration(char **argv, int argc) {
    if (!strcasecmp(argv[0],"monitor") && argc == 5) {
        /* monitor <name> <host> <port> <quorum> */
        //获取参数
        int quorum = atoi(argv[4]);

        if (quorum <= 0) return "Quorum must be 1 or greater.";
        //根据监听的master节点创建redis实例
        if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
                                        atoi(argv[3]),quorum,NULL) == NULL)    
}

哨兵核心的数据结构

/**
 * 主要状态数据结构
 */
struct sentinelState {
    char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
    //当前选举,用于故障转移  当前代
    uint64_t current_epoch;         /* Current epoch. */
    /**
     * sentinel 监听的master节点 hash表
     */
    dict *masters;      /* Dictionary of master sentinelRedisInstances.
                           Key is the instance name, value is the
                           sentinelRedisInstance structure pointer. */
    //tilt模式标识
    int tilt;           /* Are we in TILT mode? */
    //当前执行的脚本数量
    int running_scripts;    /* Number of scripts in execution right now. */
    //tilt开始时间
    mstime_t tilt_start_time;       /* When TITL started. */
    //上次tilt 时间
    mstime_t previous_time;         /* Last time we ran the time handler. */
    //执行脚本队列
    list *scripts_queue;            /* Queue of user scripts to execute. */
    //gossip协议时的ip(如果不是null,代表通过gossip协议向此节点扩散)
    char *announce_ip;  /* IP addr that is gossiped to other sentinels if
                           not NULL. */
    //gossip协议时的端口
    int announce_port;  /* Port that is gossiped to other sentinels if
                           non zero. */
    //
    unsigned long simfailure_flags; /* Failures simulation. */
    int deny_scripts_reconfig; /* Allow SENTINEL SET ... to change script
                                  paths at runtime? */
} sentinel;

typedef struct sentinelRedisInstance {
    /**
     * 当前实例的类型,看SRI开头定义的常量宏
     */
    int flags;      /* See SRI_... defines */
    //实例名称
    char *name;     /* Master name from the point of view of this sentinel. */
    //实例运行id
    char *runid;    /* Run ID of this instance, or unique ID if is a Sentinel.*/
    //配置的选举轮次(纪元)
    uint64_t config_epoch;  /* Configuration epoch. */
    //主机地址
    sentinelAddr *addr; /* Master host. */
    instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
    mstime_t last_pub_time;   /* Last time we sent hello via Pub/Sub. */
    mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
                                 we received a hello from this Sentinel
                                 via Pub/Sub. */
    //收到SENTINEL is-master-down 的回复后设置的时间
    mstime_t last_master_down_reply_time; /* Time of last reply to
                                             SENTINEL is-master-down command. */
    /**
     * 主观下线时间
     */
    mstime_t s_down_since_time; /* Subjectively down since time. */
    /**
     * 客观下线时间
     */
    mstime_t o_down_since_time; /* Objectively down since time. */
    /**
     * 下线时间,如果超过了这个时间,认为主机下线
     */
    mstime_t down_after_period; /* Consider it down after that period. */
    mstime_t info_refresh;  /* Time at which we received INFO output from it. */
    dict *renamed_commands;     /* Commands renamed in this instance:
                                   Sentinel will use the alternative commands
                                   mapped on this table to send things like
                                   SLAVEOF, CONFING, INFO, ... */

    /* Role and the first time we observed it.
     * This is useful in order to delay replacing what the instance reports
     * with our own configuration. We need to always wait some time in order
     * to give a chance to the leader to report the new configuration before
     * we do silly things. */
    int role_reported;
    mstime_t role_reported_time;
    mstime_t slave_conf_change_time; /* Last time slave master addr changed. */

    /* Master specific. */
    /** 主节点独有 */
    //监听该master的其他哨兵
    dict *sentinels;    /* Other sentinels monitoring the same master. */
    //此master的slaves节点
    dict *slaves;       /* Slaves for this master instance. */
    //quorum 当quorum个数sentinel哨兵认为master主节点失联,那么这时客观上认为主节点失联了
    unsigned int quorum;/* Number of sentinels that need to agree on failure. */
    int parallel_syncs; /* How many slaves to reconfigure at same time. */
    char *auth_pass;    /* Password to use for AUTH against master & slaves. */

    /* Slave specific. */
    /**从节点特有属性*/
    //
    mstime_t master_link_down_time; /* Slave replication link down time. */
    int slave_priority; /* Slave priority according to its INFO output. */
    mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
    //从节点的,主节点信息
    struct sentinelRedisInstance *master; /* Master instance if it's slave. */
    char *slave_master_host;    /* Master host as reported by INFO */
    int slave_master_port;      /* Master port as reported by INFO */
    /**
     * info命令里master的状态
     */
    int slave_master_link_status; /* Master link status as reported by INFO */
    unsigned long long slave_repl_offset; /* Slave replication offset. */
    /* Failover */
    /**故障转移相关的变量*/
    /**
     * leader 的runid
     * 如果是主实例,这个标识就是执行故障转移的哨兵runid
     * 如果是哨兵实例,这个标识就是哨兵投票选举出来的runid
     */
    char *leader;       /* If this is a master instance, this is the runid of
                           the Sentinel that should perform the failover. If
                           this is a Sentinel, this is the runid of the Sentinel
                           that this Sentinel voted as leader. */
    //leader 的当前轮次(可以理解为一轮投票的批次号)
    uint64_t leader_epoch; /* Epoch of the 'leader' field. */
    //故障转移对应的轮次(纪元)
    uint64_t failover_epoch; /* Epoch of the currently started failover. */
    //故障转移状态
    int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
    //故障转移状态变更时间
    mstime_t failover_state_change_time;
    //上次发起故障转移到时间
    mstime_t failover_start_time;   /* Last failover attempt start time. */
    //故障转移超时时间,默认180秒
    mstime_t failover_timeout;      /* Max time to refresh failover state. */
    mstime_t failover_delay_logged; /* For what failover_start_time value we
                                       logged the failover delay. */
    //选到的晋升的从节点
    struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
    /* Scripts executed to notify admin or reconfigure clients: when they
     * are set to NULL no script is executed. */
    char *notification_script;
    char *client_reconfig_script;
    sds info; /* cached INFO output */
} sentinelRedisInstance;

/**
 * 实例链接信息
 */
typedef struct instanceLink {
    //引用次数(有几个主机持有这个对象)
    int refcount;          /* Number of sentinelRedisInstance owners. */
    /**
     *  实例的链路状态,
     *  初始状态为1
     *  命令链接和消费订阅链接都成功以后为0
     *  只要有一个链接异常就为1
     */
    int disconnected;      /* Non-zero if we need to reconnect cc or pc. */
    //等待回复的命令数
    int pending_commands;  /* Number of commands sent waiting for a reply. */
    //redis命令执行上下文
    redisAsyncContext *cc; /* Hiredis context for commands. */
    //redis 订阅发布上下文
    redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
    //cc的链接时间
    mstime_t cc_conn_time; /* cc connection time. */
    //pc的链接时间
    mstime_t pc_conn_time; /* pc connection time. */
    //最后收到消息的时间
    mstime_t pc_last_activity; /* Last time we received any message. */
    //最后收到有效ping回复的时间
    mstime_t last_avail_time; /* Last time the instance replied to ping with
                                 a reply we consider valid. */
    //当前发送ping消息的时间,接收到pong后,会置为0,如果为0,重新发送ping消息,并记录时间
    mstime_t act_ping_time;   /* Time at which the last pending ping (no pong
                                 received after it) was sent. This field is
                                 set to 0 when a pong is received, and set again
                                 to the current time if the value is 0 and a new
                                 ping is sent. */
    //最后一次发送ping的时间(正常act_ping_time可以表示),主要防止在故障期间发送过多的ping
    mstime_t last_ping_time;  /* Time at which we sent the last ping. This is
                                 only used to avoid sending too many pings
                                 during failure. Idle time is computed using
                                 the act_ping_time field. */
    //最后一次收到pong的时间
    mstime_t last_pong_time;  /* Last time the instance replied to ping,
                                 whatever the reply was. That's used to check
                                 if the link is idle and must be reconnected. */
    //最后重链接时间
    mstime_t last_reconn_time;  /* Last reconnection attempt performed when
                                   the link was down. */
} instanceLink;

定时任务中的哨兵,在sentinel.c中

/**
 * 在initServer中添加的时间事件 serverCron中
 */
void sentinelTimer(void) {
    //检查TILT条件
    sentinelCheckTiltCondition();
    //核心
    sentinelHandleDictOfRedisInstances(sentinel.masters);
    sentinelRunPendingScripts();
    sentinelCollectTerminatedScripts();
    sentinelKillTimedoutScripts();

    /**
     * 通过随机数,动态调整哨兵的刷新频率
     * 这样可以确保所有的哨兵不在同一个时间点触发,在投票是就会有一定的时间差,尽可能避免同一时间不能投出主节点
     */
    server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
}

哨兵里的redis实例处理sentinelHandleDictOfRedisInstances

/**
 * 处理字典(sentinel.masters)里的hash表中redis实例
 * 有master节点的实例
 * 有从节点的实例
 * 有哨兵实例
 * @param instances
 */
void sentinelHandleDictOfRedisInstances(dict *instances) {
    dictIterator *di;
    dictEntry *de;
    sentinelRedisInstance *switch_to_promoted = NULL;

    /* There are a number of things we need to perform against every master. */
    //将实例字典放入到迭代器
    di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) {
        //获取一个实例
        sentinelRedisInstance *ri = dictGetVal(de);
        //处理实例
        sentinelHandleRedisInstance(ri);
        //主实例的情况,递归处理从实例和哨兵
        if (ri->flags & SRI_MASTER) {
            sentinelHandleDictOfRedisInstances(ri->slaves);
            sentinelHandleDictOfRedisInstances(ri->sentinels);
            //如果故障转移了,最终会是这个状态
            if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
                switch_to_promoted = ri;
            }
        }
    }
    if (switch_to_promoted)
        //这个时候需要切换监控
        sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
    dictReleaseIterator(di);
}

void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
    /* ========== MONITORING HALF ============ */
    /* Every kind of instance */
    /**
     *  建立两个链接,一个用来执行命令,一个用来订阅接收消息
     */
    sentinelReconnectInstance(ri);
    //执行周期性命令  ping  info 和hello广播
    /**
     *  周期性执行命令,
     *  正常情况下:10秒sentinel发送一个info命令,1秒发送一个ping命令,每两秒广播 hello msg
     * 主节点挂了,1秒发送一个info命令
     */
    sentinelSendPeriodicCommands(ri);

    /* Every kind of instance */
    //检查实例是否主观下线
    sentinelCheckSubjectivelyDown(ri);

    /* Masters and slaves */
    if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
        /* Nothing so far. */
    }

    /* Only masters */
    //针对master节点
    if (ri->flags & SRI_MASTER) {
        //检查是否客观下线
        sentinelCheckObjectivelyDown(ri);
        //是否需要开启故障转移
        if (sentinelStartFailoverIfNeeded(ri))
            //请求其他sentinel对master的看法(选举征求其他哨兵的意见),第一次发起必须强制问询
            sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
        //故障转移状态机实现
        sentinelFailoverStateMachine(ri);
        //在选举过程中进来,只需要看哨兵的结果即可,可以不用再发,因为之前发过了
        sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
    }
}
  • 哨兵每秒一次向所有的主、从、sentinel 节点发送一次ping命令

  • 如果一个实例最后一次ping命令的时间超过了down-after-milliseconds,那么就标记为主观下线

  • 如果一个主实例标记了主观下线,其他哨兵会确认是否客观下线

  • 哨兵每10秒一次向所有的主从服务器发送info命令(如果有客观下线,会改为每秒一次)

数据结构以及交互如下:

图片

  • 哨兵节点持有所有的master节点实例

  • master节点实例里又对此master监听的所有哨兵master->sentinels 以及master下的所有从节点

  • 哨兵遍历对应的实例信息

  • 这些实例和自己的节点进行链接、执行命令等

主观下线判断

/**
 * 检查是否客观下线(别人也认为都下线了)
 * @param master
 */
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
    dictIterator *di;
    dictEntry *de;
    unsigned int quorum = 0, odown = 0;
    /**
     * 主观下线状态,遍历所有的哨兵,
     * 如果有quorum个哨兵认为已下线,并且quorum 大于master->quorum 则判定该节点客观下线
     */
    if (master->flags & SRI_S_DOWN) {
        /* Is down for enough sentinels? */
        quorum = 1; /* the current sentinel. */
        /* Count all the other sentinels. */
        di = dictGetIterator(master->sentinels);
        while((de = dictNext(di)) != NULL) {
            //获取对应的哨兵
            sentinelRedisInstance *ri = dictGetVal(de);
            //如果这个哨兵判断也认为该节点下线了,quorum +1
            if (ri->flags & SRI_MASTER_DOWN) quorum++;
        }
        dictReleaseIterator(di);
        //达到下线的阈值,标记odown(客观下线为1)
        if (quorum >= master->quorum) odown = 1;
    }
    //判定主观下线后,更改该节点的状态
    /* Set the flag accordingly to the outcome. */
    if (odown) {
        // 不是客观下线状态,修改为客观下线
        if ((master->flags & SRI_O_DOWN) == 0) {
            //发出主观下线事件
            sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
                quorum, master->quorum);
            //修改master的掩码为客观下线
            master->flags |= SRI_O_DOWN;
            //设置客观下线时间
            master->o_down_since_time = mstime();
        }
    } else {
        /**
         * 没有判断为客观下线,但是已经标记了客观下线,会把状态修改回来
         */
        if (master->flags & SRI_O_DOWN) {
            sentinelEvent(LL_WARNING,"-odown",master,"%@");
            master->flags &= ~SRI_O_DOWN;
        }
    }
}

故障转移状态机处理

/**
 * 故障转移状态机实现,针对不同的状态,处理逻辑不同
 * @param ri
 */
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
    //只有master节点才执行
    serverAssert(ri->flags & SRI_MASTER);
    //不在故障转移状态中,就不处理
    if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;

    switch(ri->failover_state) {
        //待故障转移状态
        case SENTINEL_FAILOVER_STATE_WAIT_START:
            sentinelFailoverWaitStart(ri);
            break;
        //选择slave状态(选哪个slave节点提升为master节点)
        case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
            sentinelFailoverSelectSlave(ri);
            break;
        //发送slaveo no one 命令(发起投票)
        case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
            sentinelFailoverSendSlaveOfNoOne(ri);
            break;
        //等待晋升(将选出来的节点晋升为主节点)
        case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
            sentinelFailoverWaitPromotion(ri);
            break;
        //发送salveof命令,重新配置从节点,并同步新的主节点信息
        case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
            sentinelFailoverReconfNextSlave(ri);
            break;
    }
}

整体流程如下

图片

在这个过程中发布订阅的信息

+reset-master <instance details> :主服务器已被重置。
+slave <instance details> :一个新的从服务器已经被 Sentinel 识别并关联。
+failover-state-reconf-slaves <instance details> :故障转移状态切换到了 reconf-slaves 状态。
+failover-detected <instance details> :另一个 Sentinel 开始了一次故障转移操作,或者一个从服务器转换成了主服务器。
+slave-reconf-sent <instance details> :领头(leader)的 Sentinel 向实例发送了 SLAVEOF host port 命令,为实例设置新的主服务器。
+slave-reconf-inprog <instance details> :实例正在将自己设置为指定主服务器的从服务器,但相应的同步过程仍未完成。
+slave-reconf-done <instance details> :从服务器已经成功完成对新主服务器的同步。
-dup-sentinel <instance details> :对给定主服务器进行监视的一个或多个 Sentinel 已经因为重复出现而被移除 —— 当 Sentinel 实例重启的时候,就会出现这种情况。
+sentinel <instance details> :一个监视给定主服务器的新 Sentinel 已经被识别并添加。
+sdown <instance details> :给定的实例现在处于主观下线状态。
-sdown <instance details> :给定的实例已经不再处于主观下线状态。
+odown <instance details> :给定的实例现在处于客观下线状态。
-odown <instance details> :给定的实例已经不再处于客观下线状态。
+new-epoch <instance details> :当前的纪元(epoch)已经被更新。
+try-failover <instance details> :一个新的故障迁移操作正在执行中,等待被大多数 Sentinel 选中(waiting to be elected by the majority)。
+elected-leader <instance details> :赢得指定纪元的选举,可以进行故障迁移操作了。
+failover-state-select-slave <instance details> :故障转移操作现在处于 select-slave 状态 —— Sentinel 正在寻找可以升级为主服务器的从服务器。
no-good-slave <instance details> :Sentinel 操作未能找到适合进行升级的从服务器。Sentinel 会在一段时间之后再次尝试寻找合适的从服务器来进行升级,又或者直接放弃执行故障转移操作。
selected-slave <instance details> :Sentinel 顺利找到适合进行升级的从服务器。
failover-state-send-slaveof-noone <instance details> :Sentinel 正在将指定的从服务器升级为主服务器,等待升级功能完成。
failover-end-for-timeout <instance details> :故障转移因为超时而中止,不过最终所有从服务器都会开始复制新的主服务器(slaves will eventually be configured to replicate with the new master anyway)。
failover-end <instance details> :故障转移操作顺利完成。所有从服务器都开始复制新的主服务器了。
+switch-master <master name> <oldip> <oldport> <newip> <newport> :配置变更,主服务器的 IP 和地址已经改变。这是绝大多数外部用户都关心的信息。
+tilt :进入 tilt 模式。
-tilt :退出 tilt 模式。

当有instance details字样时,表示频道所返回的信息中包含了以下用于识别目标实例的内容:

@
@ 字符之后的内容用于指定主服务器, 这些内容是可选的, 它们仅在 @ 字符之前的内容指定的实例不是主服务器时使用。

### 回调处理

#### ping回调

ping可以理解为心跳检测

```c
/**
* ping 消息回复处理器
* @param c
* @param reply
* @param privdata
*/
void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;

if (r->type == REDIS_REPLY_STATUS ||
r->type == REDIS_REPLY_ERROR) {
/* Update the "instance available" field only if this is an
* acceptable reply. */
if (strncmp(r->str,"PONG",4) == 0 ||
strncmp(r->str,"LOADING",7) == 0 ||
strncmp(r->str,"MASTERDOWN",10) == 0)
{
link->last_avail_time = mstime();
//收到回复后,会把act_ping_time设置为0,然后重新发起ping
link->act_ping_time = 0; /* Flag the pong as received. */
} else {
/* Send a SCRIPT KILL command if the instance appears to be
* down because of a busy script. */
if (strncmp(r->str,"BUSY",4) == 0 &&
(ri->flags & SRI_S_DOWN) &&
!(ri->flags & SRI_SCRIPT_KILL_SENT))
{
if (redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri,
"%s KILL",
sentinelInstanceMapCommand(ri,"SCRIPT")) == C_OK)
{
ri->link->pending_commands++;
}
ri->flags |= SRI_SCRIPT_KILL_SENT;
}
}
}
link->last_pong_time = mstime();
}

```

* 主要是处理接收到的回复pong

* 更新link->last\_avail\_time 为当前时间

* link->act\_ping\_time = 0 (为0后,可以再次发ping)

#### hello回调处理sentinelReceiveHelloMessages

```c
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
......
sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);
}

void sentinelProcessHelloMessage(char *hello, int hello_len) {
//添加新哨兵
si = createSentinelRedisInstance(token[2],SRI_SENTINEL,token[0],port,master->quorum,master);
}

```

#### info回调处理 sentinelInfoReplyCallback

```
# Replication
role:master
# 链接几个从库
connected_slaves:2
# 从库信息
slave0:ip=127.0.0.1,port=6380,state=online,offset=5698,lag=0
slave1:ip=127.0.0.1,port=6381,state=online,offset=5698,lag=0
#每个Redis节点启动后都会动态分配一个40位的十六进制字符串作为运行ID。主的运行ID
master_replid:e071f49c8d9d6719d88c56fa632435fba83e145d
#在执行slaveof no one时,会将master_replid,master_repl_offset+1复制为master_replid,second_repl_offset
master_replid2:0000000000000000000000000000000000000000
#Master的复制偏移量
master_repl_offset:5698
second_repl_offset:-1
# 是否开启了背压
repl_backlog_active:1
#背压大小
repl_backlog_size:1048576
# 背压中保存的master的最早的偏移量
repl_backlog_first_byte_offset:1
# 背压中的数据的大小
repl_backlog_histlen:5698

# Replication
role:slave
master_host:127.0.0.1
master_port:6379
# 主从直接的链接状态
master_link_status:up
#主节点每隔10s对从从节点发送PING命令,以判断从节点的存活性和连接状态。该变量代表多久之前,主从进行了心跳交互。
master_last_io_seconds_ago:1
#主节点是否在向从节点同步数据
master_sync_in_progress:0
#Slave的复制偏移量
slave_repl_offset:126
#slave的权重
slave_priority:100
#Slave是否处于可读模式
slave_read_only:1
connected_slaves:0
master_replid:15715bc0bd37a71cae3d08b9566f001ccbc739de
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:126
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:126

```
```c
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRefreshInstanceInfo(ri,r->str);
}

/**
* 处理info输出的信息,解析出来后刷新当前哨兵的实例信息
* @param ri
* @param info
*/
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
/**
* 从主节点里解析出从节点
*/
if ((ri->flags & SRI_MASTER) && sdslen(l) >= 7 && !memcmp(l,"slave",5) && isdigit(l[5])){
//查询是否存在
if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
/**
* 不存在,创建SentinelRedisInstance 并放入
*/
if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,atoi(port), ri->quorum, ri)) != NULL) {
sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
sentinelFlushConfig();
}
}
}
/**
* 解析角色role:<role>
* 如果是从库
* 解析对应的master的ip和端口aster_host:<host> master_port:<port>
* 解析对应master的链接状态 master_link_status:<status>
* 解析这个从库的策略 slave_priority:<priority>
* 从库复制的偏移量 slave_repl_offset:<offset>
*
*/
/* Handle slave -> master role switch. */
/**
* 主从切换处理
*/

}

```

#### 哨兵实例发出sentinel is-master-down-by-addr的回调

```c
**
* 处理询问返回(SENTINEL is-master-down-by-addr 命令的返回处理)
* @param c
* @param reply 返回结果
* @param privdata
*/
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

//回复为空,或者链接不存在,直接返回
if (!reply || !link) return;
// 说明有结果,有链接能处理,等待回复的命令数减1
link->pending_commands--;
r = reply;

/* Ignore every error or unexpected reply.
* Note that if the command returns an error for any reason we&#039;ll
* end clearing the SRI_MASTER_DOWN flag for timeout anyway. */

/**
* 对回复结果进行处理
*/
if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
r->element[0]->type == REDIS_REPLY_INTEGER &&
r->element[1]->type == REDIS_REPLY_STRING &&
r->element[2]->type == REDIS_REPLY_INTEGER)
{
//设置下线回复时间
ri->last_master_down_reply_time = mstime();
//认同主机下线
if (r->element[0]->integer == 1) {
//设置该实例主机下线表示
ri->flags |= SRI_MASTER_DOWN;
} else {
//不认可主机下线
ri->flags &= ~SRI_MASTER_DOWN;
}
/**
* 如果回复的字段是*(不是*,只是一次投票)
* 是* 表示重新发起一轮投票
*/
if (strcmp(r->element[1]->str,"*")) {
/* If the runid in the reply is not "*" the Sentinel actually
* replied with a vote. */
sdsfree(ri->leader);
//如果两个轮次(纪元)不一样,就开始新新的投票
if ((long long)ri->leader_epoch != r->element[2]->integer)
serverLog(LL_WARNING,
"%s voted for %s %llu", ri->name,
r->element[1]->str,
(unsigned long long) r->element[2]->integer);
//新leader
ri->leader = sdsnew(r->element[1]->str);
//设置新leader的当前轮次(纪元)
ri->leader_epoch = r->element[2]->integer;
}
}
}
```
参考:
http://redisdoc.com/topic/persistence.html

https://www.cnblogs.com/ivictor/p/9749491.html

yxkong

这个人很懒,什么都没留下

文章评论