积压空间
binlog 是在 mysql 中的一种日志类型,它记录了所有数据库自备份一来的所有更新操作或潜在的更新操作,描述了数据的更改。因为 binlog 只记录了数据的更新,所以适合用来做实时备份和主从复制。同样,redis 在主从复制上用的就是一种类似 binlog 的日志。
在《AOF 持久化策略》中,介绍了更新缓存的概念,举一个例子:客户端发来命令:set name Jhon,这一数据更新被记录为:*3/r/n\$3/r/nSET/r/n\$4/r/nname/r/n\$3/r/nJhon/r/n
,并存储在更新缓存中。
同样,在主从连接中,也有更新缓存的概念。只是两者的用途不一样,前者被写入本地,后者被写入从机,这里我们把它成为积压空间。
同样,在主从连接中,也有更新缓存的概念。只是两者的用途不一样,前者被写入本地,后者被写入从机,这里我们把它成为积压空间。
更新缓存存储在 server.repl_backlog,redis 将其作为一个环形空间来处理,这样做节省了空间,避免内存再分配的情况。
struct redisServer 中保存了主从复制的一些信息:
struct redisServer {
......
/* Replication (master) */
// 最近一次使用(访问)的数据集
int slaveseldb; /* Last SELECTed DB in replication output */
// 全局的数据同步偏移量
long long master_repl_offset; /* Global replication offset */
// 主从连接心跳频率
int repl_ping_slave_period; /* Master pings the slave every N seconds */
// 积压空间指针
char *repl_backlog; /* Replication backlog for partial syncs */
// 积压空间大小
long long repl_backlog_size; /* Backlog circular buffer size */
// 积压空间中写入的新数据的大小
long long repl_backlog_histlen; /* Backlog actual data length */
// 下一次向积压空间写入数据的起始位置
long long repl_backlog_idx; /* Backlog circular buffer current offset */
// 积压数据的起始位置的所对应的全局主从复制偏移量
long long repl_backlog_off; /* Replication offset of first byte in the
backlog buffer. */
// 积压空间有效时间
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */
......
}
积压空间中的数据变更记录是什么时候被写入的?在执行一个 redis 命令的时候,如果存在数据的修改(写),那么就会把变更记录传播。redis 源码中是这么实现的:call()->propagate()->replicationFeedSlaves().
需注意,命令真正执行的地方在 call() 中,call() 如果发现数据被修改(dirty),则传播 propagrate(),replicationFeedSlaves() 将修改记录写入积压空间和所有已连接的从机。
同样,在《AOF 持久化策略》提到的,propagrate() 也会将数据的修改记录写入到更新缓存中。
这里可能会有疑问:为什么把数据添加入积压空间,又把数据分发给所有的从机?为什么不仅仅将数据分发给所有从机呢?
因为有一些从机会因特殊情况,与主机断开连接。从机断开前有暂存主机的状态信息,因此这些断开的从机就没有及时收到更新的数据。redis 为了让断开的从机在下次连接后能够获取更新数据,将更新数据加入了积压空间。从 replicationFeedSlaves() 实现来看,在线的 slave 能马上收到数据更新记录;因某些原因暂时断开连接的 slave,需要从积压空间中找回断开期间的数据更新记录。如果断开的时间足够长,master 会拒绝 slave 的部分同步请求,从而 slave 只能进行全同步。
下面是更细积压空间的核心代码注释:首先,在命令执行函数中,如果发现是涉及写的命令,会将修改传播,即调用 propagrate().
// call() 函数是执行命令的核心函数,真正执行命令的地方
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
......
/* Call the command. */
c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
redisOpArrayInit(&server.also_propagate);
// 脏数据标记,数据是否被修改
dirty = server.dirty;
// 执行命令对应的函数
c->cmd->proc(c);
dirty = server.dirty-dirty;
duration = ustime()-start;
......
// 将客户端请求的数据修改记录传播给 AOF 和从机
/* Propagate the command into the AOF and replication link */
if (flags & REDIS_CALL_PROPAGATE) {
int flags = REDIS_PROPAGATE_NONE;
// 强制主从复制
if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
// 强制 AOF 持久化
if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
// 数据被修改
if (dirty)
flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
// 传播数据修改记录
if (flags != REDIS_PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
}
......
}
主要向两个方向传播修改记录,一个是 AOF 持久化,另一个则是主从复制。
// 向 AOF 和从机发布数据更新
/* Propagate the specified command (in the context of the specified database id)
* to AOF and Slaves.
*
* flags are an xor between:
* + REDIS_PROPAGATE_NONE (no propagation of command at all)
* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
* + REDIS_PROPAGATE_REPL (propagate into the replication link)
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
// AOF 策略需要打开,且设置 AOF 传播标记,将更新发布给本地文件
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
// 设置了从机传播标记,将更新发布给从机
if (flags & REDIS_PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
向从机传播更新记录的时候,redis 主机会向所有的从机发送变更记录,同时也会写入到积压空间,方便已经断开的从机,再下一次重新连接的时候,拷贝数据。
// 向积压空间和从机发送数据
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[REDIS_LONGSTR_SIZE];
// 没有积压数据且没有从机,直接退出
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
/* We can't have slaves attached and no backlog. */
redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
/* Send SELECT command to every slave if needed. */
if (server.slaveseldb != dictid) {
robj *selectcmd;
// 小于等于 10 的可以用共享对象
/* For a few DBs we have pre-computed SELECT command. */
if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid];
} else {
// 不能使用共享对象,生成 SELECT 命令对应的 redis 对象
int dictid_len;
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
selectcmd = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),
"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, llstr));
}
// 这里可能会有疑问:为什么把数据添加入积压空间,又把数据分发给所有的从机?
// 为什么不仅仅将数据分发给所有从机呢?
// 因为有一些从机会因特殊情况,与主机断开连接。从机断开前有暂存
// 主机的状态信息,因此这些断开的从机就没有及时收到更新的数据。redis 为了让
// 断开的从机在下次连接后能够获取更新数据,将更新数据加入了积压空间。
// 将 SELECT 命令对应的 redis 对象数据添加到积压空间
/* Add the SELECT command into the backlog. */
if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
// 将数据分发所有的从机
/* Send it to slaves. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
addReply(slave,selectcmd);
}
// 销毁对象
if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
// 更新最近一次使用(访问)的数据集
server.slaveseldb = dictid;
// 将命令写入积压空间
/* Write the command to the replication backlog if any. */
if (server.repl_backlog) {
char aux[REDIS_LONGSTR_SIZE+3];
// 命令个数
/* Add the multi bulk reply length. */
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
// 逐个命令写入
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* ad add the final CRLF */
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
/* 每个命令格式如下:
$3
*3
SET
*4
NAME
*4
Jhon*/
// 命令长度
feedReplicationBacklog(aux,len+3);
// 命令
feedReplicationBacklogWithObject(argv[j]);
// 换行
feedReplicationBacklog(aux+len+1,2);
}
}
// 立即给每一个从机发送命令
/* Write the command to every slave. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
// 如果从机要求全同步,则不对此从机发送数据
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
// 向从机命令的长度
/* Add the multi bulk length. */
addReplyMultiBulkLen(slave,argc);
// 向从机发送命令
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}