积压空间

binlog 是在 mysql 中的一种日志类型,它记录了所有数据库自备份一来的所有更新操作或潜在的更新操作,描述了数据的更改。因为 binlog 只记录了数据的更新,所以适合用来做实时备份和主从复制。同样,redis 在主从复制上用的就是一种类似 binlog 的日志。

在《AOF 持久化策略》中,介绍了更新缓存的概念,举一个例子:客户端发来命令:set name Jhon,这一数据更新被记录为:*3/r/n\$3/r/nSET/r/n\$4/r/nname/r/n\$3/r/nJhon/r/n,并存储在更新缓存中。

同样,在主从连接中,也有更新缓存的概念。只是两者的用途不一样,前者被写入本地,后者被写入从机,这里我们把它成为积压空间。

同样,在主从连接中,也有更新缓存的概念。只是两者的用途不一样,前者被写入本地,后者被写入从机,这里我们把它成为积压空间。

更新缓存存储在 server.repl_backlog,redis 将其作为一个环形空间来处理,这样做节省了空间,避免内存再分配的情况。

struct redisServer 中保存了主从复制的一些信息:

struct redisServer {
    ......
    /* Replication (master) */
    // 最近一次使用(访问)的数据集
    int slaveseldb;                 /* Last SELECTed DB in replication output */

    // 全局的数据同步偏移量
    long long master_repl_offset;   /* Global replication offset */

    // 主从连接心跳频率
    int repl_ping_slave_period;     /* Master pings the slave every N seconds */

    // 积压空间指针
    char *repl_backlog;             /* Replication backlog for partial syncs */

    // 积压空间大小
    long long repl_backlog_size;    /* Backlog circular buffer size */

    // 积压空间中写入的新数据的大小
    long long repl_backlog_histlen; /* Backlog actual data length */

    // 下一次向积压空间写入数据的起始位置
    long long repl_backlog_idx;     /* Backlog circular buffer current offset */

    // 积压数据的起始位置的所对应的全局主从复制偏移量
    long long repl_backlog_off;     /* Replication offset of first byte in the
                                       backlog buffer. */

    // 积压空间有效时间
    time_t repl_backlog_time_limit; /* Time without slaves after the backlog
                                       gets released. */
    ......
}

积压空间中的数据变更记录是什么时候被写入的?在执行一个 redis 命令的时候,如果存在数据的修改(写),那么就会把变更记录传播。redis 源码中是这么实现的:call()->propagate()->replicationFeedSlaves().

需注意,命令真正执行的地方在 call() 中,call() 如果发现数据被修改(dirty),则传播 propagrate(),replicationFeedSlaves() 将修改记录写入积压空间和所有已连接的从机。

同样,在《AOF 持久化策略》提到的,propagrate() 也会将数据的修改记录写入到更新缓存中。

这里可能会有疑问:为什么把数据添加入积压空间,又把数据分发给所有的从机?为什么不仅仅将数据分发给所有从机呢?

因为有一些从机会因特殊情况,与主机断开连接。从机断开前有暂存主机的状态信息,因此这些断开的从机就没有及时收到更新的数据。redis 为了让断开的从机在下次连接后能够获取更新数据,将更新数据加入了积压空间。从 replicationFeedSlaves() 实现来看,在线的 slave 能马上收到数据更新记录;因某些原因暂时断开连接的 slave,需要从积压空间中找回断开期间的数据更新记录。如果断开的时间足够长,master 会拒绝 slave 的部分同步请求,从而 slave 只能进行全同步。

下面是更细积压空间的核心代码注释:首先,在命令执行函数中,如果发现是涉及写的命令,会将修改传播,即调用 propagrate().

// call() 函数是执行命令的核心函数,真正执行命令的地方
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
    ......
    /* Call the command. */
    c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
    redisOpArrayInit(&server.also_propagate);

    // 脏数据标记,数据是否被修改
    dirty = server.dirty;

    // 执行命令对应的函数
    c->cmd->proc(c);

    dirty = server.dirty-dirty;
    duration = ustime()-start;

    ......

    // 将客户端请求的数据修改记录传播给 AOF 和从机
    /* Propagate the command into the AOF and replication link */
    if (flags & REDIS_CALL_PROPAGATE) {
        int flags = REDIS_PROPAGATE_NONE;

        // 强制主从复制
        if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;

        // 强制 AOF 持久化
        if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;

        // 数据被修改
        if (dirty)
            flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);

        // 传播数据修改记录
        if (flags != REDIS_PROPAGATE_NONE)
            propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
    }
    ......
}

主要向两个方向传播修改记录,一个是 AOF 持久化,另一个则是主从复制。

// 向 AOF 和从机发布数据更新
/* Propagate the specified command (in the context of the specified database id)
 * to AOF and Slaves.
 *
 * flags are an xor between:
 * + REDIS_PROPAGATE_NONE (no propagation of command at all)
 * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
 * + REDIS_PROPAGATE_REPL (propagate into the replication link)
 */
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    // AOF 策略需要打开,且设置 AOF 传播标记,将更新发布给本地文件
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);

    // 设置了从机传播标记,将更新发布给从机
    if (flags & REDIS_PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

向从机传播更新记录的时候,redis 主机会向所有的从机发送变更记录,同时也会写入到积压空间,方便已经断开的从机,再下一次重新连接的时候,拷贝数据。

// 向积压空间和从机发送数据
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
    listNode *ln;
    listIter li;
    int j, len;
    char llstr[REDIS_LONGSTR_SIZE];

    // 没有积压数据且没有从机,直接退出
    /* If there aren't slaves, and there is no backlog buffer to populate,
     * we can return ASAP. */
    if (server.repl_backlog == NULL && listLength(slaves) == 0) return;

    /* We can't have slaves attached and no backlog. */
    redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));

    /* Send SELECT command to every slave if needed. */
    if (server.slaveseldb != dictid) {
        robj *selectcmd;

        // 小于等于 10 的可以用共享对象
        /* For a few DBs we have pre-computed SELECT command. */
        if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
            selectcmd = shared.select[dictid];
        } else {
        // 不能使用共享对象,生成 SELECT 命令对应的 redis 对象
            int dictid_len;

            dictid_len = ll2string(llstr,sizeof(llstr),dictid);
            selectcmd = createObject(REDIS_STRING,
                sdscatprintf(sdsempty(),
                "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
                dictid_len, llstr));
        }

        // 这里可能会有疑问:为什么把数据添加入积压空间,又把数据分发给所有的从机?
        // 为什么不仅仅将数据分发给所有从机呢?
        // 因为有一些从机会因特殊情况,与主机断开连接。从机断开前有暂存
        // 主机的状态信息,因此这些断开的从机就没有及时收到更新的数据。redis 为了让
        // 断开的从机在下次连接后能够获取更新数据,将更新数据加入了积压空间。

        // 将 SELECT 命令对应的 redis 对象数据添加到积压空间
        /* Add the SELECT command into the backlog. */
        if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);

        // 将数据分发所有的从机
        /* Send it to slaves. */
        listRewind(slaves,&li);
        while((ln = listNext(&li))) {
            redisClient *slave = ln->value;
            addReply(slave,selectcmd);
        }

        // 销毁对象
        if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
            decrRefCount(selectcmd);
    }

    // 更新最近一次使用(访问)的数据集
    server.slaveseldb = dictid;

    // 将命令写入积压空间
    /* Write the command to the replication backlog if any. */
    if (server.repl_backlog) {
        char aux[REDIS_LONGSTR_SIZE+3];

        // 命令个数
        /* Add the multi bulk reply length. */
        aux[0] = '*';
        len = ll2string(aux+1,sizeof(aux)-1,argc);
        aux[len+1] = '\r';
        aux[len+2] = '\n';
        feedReplicationBacklog(aux,len+3);

        // 逐个命令写入
        for (j = 0; j < argc; j++) {
            long objlen = stringObjectLen(argv[j]);

            /* We need to feed the buffer with the object as a bulk reply
             * not just as a plain string, so create the $..CRLF payload len
             * ad add the final CRLF */
            aux[0] = '$';
            len = ll2string(aux+1,sizeof(aux)-1,objlen);
            aux[len+1] = '\r';
            aux[len+2] = '\n';

            /* 每个命令格式如下:
            $3
            *3
            SET
            *4
            NAME
            *4
            Jhon*/

            // 命令长度
            feedReplicationBacklog(aux,len+3);
            // 命令
            feedReplicationBacklogWithObject(argv[j]);
            // 换行
            feedReplicationBacklog(aux+len+1,2);
        }
    }

    // 立即给每一个从机发送命令
    /* Write the command to every slave. */
    listRewind(slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;

        // 如果从机要求全同步,则不对此从机发送数据
        /* Don't feed slaves that are still waiting for BGSAVE to start */
        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;

        /* Feed slaves that are waiting for the initial SYNC (so these commands
         * are queued in the output buffer until the initial SYNC completes),
         * or are already in sync with the master. */

        // 向从机命令的长度
        /* Add the multi bulk length. */
        addReplyMultiBulkLen(slave,argc);

        // 向从机发送命令
        /* Finally any additional argument that was not stored inside the
         * static buffer if any (from j to argc). */
        for (j = 0; j < argc; j++)
            addReplyBulk(slave,argv[j]);
    }
}

results matching ""

    No results matching ""