全同步

无论如何,redis 首先会尝试部分同步,如果失败才尝试全同步。而刚刚建立连接的 master-slave 需要全同步。

从机连接主机后,会主动发起 PSYNC 命令,从机会提供 master_runid 和 offset,主机验证 master_runid 和 offset 是否有效?master_runid 相当于主机身份验证码,用来验证从机上一次连接的主机,offset 是全局积压空间数据的偏移量。

验证未通过则,则进行全同步:主机返回 +FULLRESYNC master_runid offset(从机接收并记录 master_runid 和 offset,并准备接收 RDB 文件)接着启动 BGSAVE 生成 RDB 文件,BGSAVE 结束后,向从机传输,从而完成全同步。

主机和从机之间的交互图如下:

// 连接主机 connectWithMaster() 的时候,会被注册为回调函数
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    char tmpfile[256], *err;
    int dfd, maxtries = 5;
    int sockerr = 0, psync_result;
    socklen_t errlen = sizeof(sockerr);

    ......

    // 这里尝试向主机请求部分同步,主机会回复以拒绝或接受请求。如果拒绝部分同步,
    // 会返回 +FULLRESYNC master_runid offset
    // 从机接收后准备进行全同步    
    psync_result = slaveTryPartialResynchronization(fd);
    if (psync_result == PSYNC_CONTINUE) {
        redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a "
            "Partial Resynchronization.");
        return;
    }

    // 执行全同步
    /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
     * and the server.repl_master_runid and repl_master_initial_offset are
     * already populated. */

    // 未知结果,进行出错处理
    if (psync_result == PSYNC_NOT_SUPPORTED) {
        redisLog(REDIS_NOTICE,"Retrying with SYNC...");
        if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
            redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
                strerror(errno));
            goto error;
        }
    }

    // 为什么要尝试 5次???
    /* Prepare a suitable temp file for bulk transfer */
    while(maxtries--) {
        snprintf(tmpfile,256,
            "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
        if (dfd != -1) break;
        sleep(1);
    }
    if (dfd == -1) {
        redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> "
            "SLAVE synchronization: %s",strerror(errno));
        goto error;
    }

    // 注册读事件,回调函数 readSyncBulkPayload(), 准备读 RDB 文件
    /* Setup the non blocking download of the bulk file. */
    if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
            == AE_ERR)
    {
        redisLog(REDIS_WARNING,
            "Can't create readable event for SYNC: %s (fd=%d)",
            strerror(errno),fd);
        goto error;
    }

    // 设置传输 RDB 文件数据的选项
    // 状态
    server.repl_state = REDIS_REPL_TRANSFER;
    // RDB 文件大小
    server.repl_transfer_size = -1;
    // 已经传输的大小
    server.repl_transfer_read = 0;
    // 上一次同步的偏移,为的是定时写入磁盘
    server.repl_transfer_last_fsync_off = 0;
    // 本地 RDB 文件套接字
    server.repl_transfer_fd = dfd;
    // 上一次同步 IO 时间
    server.repl_transfer_lastio = server.unixtime;
    // 临时文件名
    server.repl_transfer_tmpfile = zstrdup(tmpfile);
    return;

error:
    close(fd);
    server.repl_transfer_s = -1;
    server.repl_state = REDIS_REPL_CONNECT;
    return;
}

全同步请求的数据是 RDB 数据文件和积压空间中的数据。关于 RDB 数据文件,请参见《RDB 持久化策略》。如果没有后台持久化 BGSAVE 进程,那么 BGSVAE 会被触发,否则所有请求全同步的 slave 都会被标记为等待 BGSAVE 结束。BGSAVE 结束后,master 会马上向所有的从机发送 RDB 文件。

下面 syncCommand() 摘取全同步的部分:

// 主机 SYNC 和 PSYNC 命令处理函数,会尝试进行部分同步和全同步
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
    ......
    // 主机尝试部分同步,失败的话向从机发送 +FULLRESYNC master_runid offset,
    // 接着启动 BGSAVE

    // 执行全同步:
    /* Full resynchronization. */
    server.stat_sync_full++;

    /* Here we need to check if there is a background saving operation
     * in progress, or if it is required to start one */
    if (server.rdb_child_pid != -1) {
    /*  存在 BGSAVE 后台进程。
        1.如果 master 现有所连接的所有从机 slaves 当中有存在 
        REDIS_REPL_WAIT_BGSAVE_END 的从机,那么将从机 c 设置为 
        REDIS_REPL_WAIT_BGSAVE_END;
        2.否则,设置为 REDIS_REPL_WAIT_BGSAVE_START*/

        /* Ok a background save is in progress. Let's check if it is a good
         * one for replication, i.e. if there is another slave that is
         * registering differences since the server forked to save */
        redisClient *slave;
        listNode *ln;
        listIter li;

        // 检测是否已经有从机申请全同步
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            slave = ln->value;
            if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
        }

        if (ln) {
        // 存在状态为 REDIS_REPL_WAIT_BGSAVE_END 的从机 slave,
        // 就将此从机 c 状态设置为 REDIS_REPL_WAIT_BGSAVE_END,
        // 从而在 BGSAVE 进程结束后,可以发送 RDB 文件,
        // 同时将从机 slave 中的更新复制到此从机 c。

            /* Perfect, the server is already registering differences for
             * another slave. Set the right state, and copy the buffer. */

            // 将其他从机上的待回复的缓存复制到从机 c
            copyClientOutputBuffer(c,slave);

            // 修改从机 c 状态为「等待 BGSAVE 进程结束」
            c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
            redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
        } else {
        // 不存在状态为 REDIS_REPL_WAIT_BGSAVE_END 的从机,就将此从机 c 状态设置为 
        // REDIS_REPL_WAIT_BGSAVE_START,即等待新的 BGSAVE 进程的开启。

            // 修改状态为「等待 BGSAVE 进程开始」
            /* No way, we need to wait for the next BGSAVE in order to
             * register differences */
            c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
            redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
        }
    } else {
    // 不存在 BGSAVE 后台进程,启动一个新的 BGSAVE 进程

        /* Ok we don't have a BGSAVE in progress, let's start one */
        redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
            redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
            addReplyError(c,"Unable to perform background save");
            return;
        }

        // 将此从机 c 状态设置为 REDIS_REPL_WAIT_BGSAVE_END,从而在 BGSAVE 
        // 进程结束后,可以发送 RDB 文件,同时将从机 slave 中的更新复制到此从机 c。
        c->replstate = REDIS_REPL_WAIT_BGSAVE_END;

        // 清理脚本缓存???
        /* Flush the script cache for the new slave. */
        replicationScriptCacheFlush();
    }

    if (server.repl_disable_tcp_nodelay)
        anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
    c->repldbfd = -1;
    c->flags |= REDIS_SLAVE;
    server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
    listAddNodeTail(server.slaves,c);
    if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
        createReplicationBacklog();
    return;
}

主机执行完 BGSAVE 后,会将 RDB 文件发送给从机。

// BGSAVE 结束后,会调用
/* A background saving child (BGSAVE) terminated its work. Handle this. */
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
    // 其他操作
    ......
    // 可能从机正在等待 BGSAVE 进程的终止
    /* Possibly there are slaves waiting for a BGSAVE in order to be served
     * (the first stage of SYNC is a bulk transfer of dump.rdb) */
    updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}

// 当 RDB 持久化(backgroundSaveDoneHandler())结束后,会调用此函数
// RDB 文件就绪,给所有的从机发送 RDB 文件
/* This function is called at the end of every background saving.
* The argument bgsaveerr is REDIS_OK if the background saving succeeded
* otherwise REDIS_ERR is passed to the function.
*
* The goal of this function is to handle slaves waiting for a successful
* background saving in order to perform non-blocking synchronization. */
void updateSlavesWaitingBgsave(int bgsaveerr) {
    listNode *ln;
    int startbgsave = 0;
    listIter li;

    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;

        // 等待 BGSAVE 开始。调整状态为等待下一次 BGSAVE 进程的结束
        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
            startbgsave = 1;

            slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;

        // 等待 BGSAVE 结束。准备向 slave 发送 RDB 文件
        } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
            struct redis_stat buf;

            // 如果 RDB 持久化失败, bgsaveerr 会被设置为 REDIS_ERR
            if (bgsaveerr != REDIS_OK) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned "
                    "an error");
                continue;
            }

            // 打开 RDB 文件
            if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
                redis_fstat(slave->repldbfd,&buf) == -1) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after"
                    " BGSAVE: %s", strerror(errno));
                continue;
            }

            slave->repldboff = 0;
            slave->repldbsize = buf.st_size;
            slave->replstate = REDIS_REPL_SEND_BULK;

            // 如果之前有注册写事件,取消
            aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);

            // 注册新的写事件,sendBulkToSlave() 传输 RDB 文件
            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, 
                sendBulkToSlave, slave) == AE_ERR) {
                freeClient(slave);
                continue;
            }
        }
    }

    // startbgsave == REDIS_ERR 表示 BGSAVE 失败,再一次进行 BGSAVE 尝试
    if (startbgsave) {
        /* Since we are starting a new background save for one or more slaves,
         * we flush the Replication Script Cache to use EVAL to propagate every
         * new EVALSHA for the first time, since all the new slaves don't know
         * about previous scripts. */
        replicationScriptCacheFlush();

        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
        /*BGSAVE 可能 fork 失败,所有等待 BGSAVE 的从机都将结束连接。这是 
        redis 自我保护的措施,fork 失败很可能是内存紧张*/

            listIter li;

            listRewind(server.slaves,&li);
            redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
            while((ln = listNext(&li))) {
                redisClient *slave = ln->value;

                if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
                    freeClient(slave);
            }
        }
    }
}

results matching ""

    No results matching ""