部分同步
如上所说,无论如何,redis 首先会尝试部分同步。部分同步即把积压空间缓存的数据,即更新记录发送给从机。
从机连接主机后,会主动发起 PSYNC 命令,从机会提供 master_runid 和 offset,主机验证 master_runid 和 offset 是否有效? 验证通过则,进行部分同步:主机返回 +CONTINUE(从机接收后会注册积压数据接收事件),接着发送积压空间数据。
主机和从机之间的交互图如下:
syncWithMaster() 已经被设置为回调函数,当与主机建立连接后,syncWithMaster() 会被回调,这一点查阅在 connectWithMaster() 函数。首先如果该从机从未与主机有过连接,那么会进行全同步,从主机拷贝所有的数据;否则,会尝试进行部分同步。
// 连接主机 connectWithMaster() 的时候,会被注册为回调函数
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err;
int dfd, maxtries = 5;
int sockerr = 0, psync_result;
socklen_t errlen = sizeof(sockerr);
......
// 尝试部分同步,主机允许进行部分同步会返回 +CONTINUE,从机接收后注册相应的事件
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master run id
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
// 函数返回三种状态:
// PSYNC_CONTINUE:表示会进行部分同步,在 slaveTryPartialResynchronization()
// 中已经设置回调函数 readQueryFromClient()
// PSYNC_FULLRESYNC:全同步,会下载 RDB 文件
// PSYNC_NOT_SUPPORTED:未知
psync_result = slaveTryPartialResynchronization(fd);
if (psync_result == PSYNC_CONTINUE) {
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a "
"Partial Resynchronization.");
return;
}
// 执行全同步
......
}
slaveTryPartialResynchronization() 主要工作是判断是进行全同步还是部分同步。
// 函数返回三种状态:
// PSYNC_CONTINUE:表示会进行部分同步,已经设置回调函数
// PSYNC_FULLRESYNC:全同步,会下载 RDB 文件
// PSYNC_NOT_SUPPORTED:未知
#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {
char *psync_runid;
char psync_offset[32];
sds reply;
/* Initially set repl_master_initial_offset to -1 to mark the current
* master run_id and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
server.repl_master_initial_offset = -1;
if (server.cached_master) {
// 缓存了上一次与主机连接的信息,可以尝试进行部分同步,减少数据传输
psync_runid = server.cached_master->replrunid;
snprintf(psync_offset,sizeof(psync_offset),"%lld",
server.cached_master->reploff+1);
redisLog(REDIS_NOTICE,"Trying a partial resynchronization "
"(request %s:%s).", psync_runid, psync_offset);
} else {
// 未缓存上一次与主机连接的信息,进行全同步
// psync ? -1 可以获取主机的 master_runid
redisLog(REDIS_NOTICE,"Partial resynchronization not possible "
"(no cached master)");
psync_runid = "?";
memcpy(psync_offset,"-1",3);
}
// 向主机发送命令,并接收回复
/* Issue the PSYNC command */
reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
// 全同步
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *runid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset. */
runid = strchr(reply,' ');
if (runid) {
runid++;
offset = strchr(runid,' ');
if (offset) offset++;
}
if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
redisLog(REDIS_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* runid to make sure next PSYNCs will fail. */
memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
} else {
// 拷贝 runid
memcpy(server.repl_master_runid, runid, offset-runid-1);
server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
server.repl_master_initial_offset = strtoll(offset,NULL,10);
redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
server.repl_master_runid,
server.repl_master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
// 部分同步
if (!strncmp(reply,"+CONTINUE",9)) {
/* Partial resync was accepted, set the replication state accordingly */
redisLog(REDIS_NOTICE,
"Successful partial resynchronization with master.");
sdsfree(reply);
// 缓存主机替代现有主机,且为 PSYNC(部分同步) 做好准备
replicationResurrectCachedMaster(fd);
return PSYNC_CONTINUE;
}
/* If we reach this point we receied either an error since the master does
* not understand PSYNC, or an unexpected reply from the master.
* Reply with PSYNC_NOT_SUPPORTED in both cases. */
// 接收到主机发出的错误信息
if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
redisLog(REDIS_WARNING,
"Unexpected reply to PSYNC from master: %s", reply);
} else {
redisLog(REDIS_NOTICE,
"Master does not support PSYNC or is in "
"error state (reply: %s)", reply);
}
sdsfree(reply);
replicationDiscardCachedMaster();
return PSYNC_NOT_SUPPORTED;
}
下面 syncCommand() 摘取部分同步的部分:
// 主机 SYNC 和 PSYNC 命令处理函数,会尝试进行部分同步和全同步
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
......
// 主机尝试部分同步,允许则进行部分同步,会返回 +CONTINUE,接着发送积压空间
/* Try a partial resynchronization if this is a PSYNC command.
* If it fails, we continue with usual full resynchronization, however
* when this happens masterTryPartialResynchronization() already
* replied with:
*
* +FULLRESYNC <runid> <offset>
*
* So the slave knows the new runid and offset to try a PSYNC later
* if the connection with the master is lost. */
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
// 部分同步
if (masterTryPartialResynchronization(c) == REDIS_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
// 部分同步失败,会进行全同步,这时会收到来自客户端的 runid
char *master_runid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
* runid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_runid[0] != '?') server.stat_sync_partial_err++;
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
* of the replication protocol (like redis-cli --slave). Flag the client
* so that we don't expect to receive REPLCONF ACK feedbacks. */
c->flags |= REDIS_PRE_PSYNC_SLAVE;
}
// 执行全同步:
......
}
主机虽然收到了来自从机的部分同步的请求,但主机并不一定会允许进行部分同步。在主机侧,如果收到部分同步的请求,还需要验证从机是否适合进行部分同步。
// 主机尝试是否能进行部分同步
/* This function handles the PSYNC command from the point of view of a
* master receiving a request for partial resynchronization.
*
* On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
* with the usual full resync. */
int masterTryPartialResynchronization(redisClient *c) {
long long psync_offset, psync_len;
char *master_runid = c->argv[1]->ptr;
char buf[128];
int buflen;
/* Is the runid of this master the same advertised by the wannabe slave
* via PSYNC? If runid changed this master is a different instance and
* there is no way to continue. */
if (strcasecmp(master_runid, server.runid)) {
// 当因为异常需要与主机断开连接的时候,从机会暂存主机的状态信息,以便
// 下一次的部分同步。
// 1)master_runid 是从机提供一个因缓存主机的 runid,
// 2)server.runid 是本机(主机)的 runid。
// 匹配失败,说明是本机(主机)不是从机缓存的主机,这时候不能进行部分同步,
// 只能进行全同步
// "?" 表示从机要求全同步
// 什么时候从机会要求全同步???
/* Run id "?" is used by slaves that want to force a full resync. */
if (master_runid[0] != '?') {
redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
"Runid mismatch (Client asked for '%s', I'm '%s')",
master_runid, server.runid);
} else {
redisLog(REDIS_NOTICE,"Full resync requested by slave.");
}
goto need_full_resync;
}
// 从参数中解析整数,整数是从机指定的偏移量
/* We still have the data our slave is asking for? */
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
REDIS_OK) goto need_full_resync;
// 部分同步失败的情况:
// 1、不存在积压空间
if (!server.repl_backlog ||
// 2、psync_offset 太过小,即从机错过太多更新记录,安全起见,实行全同步
// 我们知道,积压空间的大小是有限的,如果某个从机错过的更新过多,将无法
// 在积压空间中找到更新的记录
psync_offset 越界
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
// 经检测,不满足部分同步的条件,转而进行全同步
{
redisLog(REDIS_NOTICE,
"Unable to partial resync with the slave for lack of backlog "
"(Slave request was: %lld).", psync_offset);
if (psync_offset > server. ) {
redisLog(REDIS_WARNING,
"Warning: slave tried to PSYNC with an offset that is "
"greater than the master replication offset.");
}
goto need_full_resync;
}
// 执行部分同步:
// 1)标记客户端为从机
// 2)通知从机准备接收数据。从机收到 +CONTINUE 会做好准备
// 3)开发发送数据
/* If we reached this point, we are able to perform a partial resync:
* 1) Set client state to make it a slave.
* 2) Inform the client we can continue with +CONTINUE
* 3) Send the backlog data (from the offset to the end) to the slave. */
// 将连接的客户端标记为从机
c->flags |= REDIS_SLAVE;
// 表示进行部分同步
// #define REDIS_REPL_ONLINE 9 /* RDB file transmitted, sending just
// updates. */
c->replstate = REDIS_REPL_ONLINE;
// 更新 ack 的时间
c->repl_ack_time = server.unixtime;
// 添加入从机链表
listAddNodeTail(server.slaves,c);
// 告诉从机可以进行部分同步,从机收到后会做相关的准备(注册回调函数)
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
* emtpy so this write will never fail actually. */
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
// 向从机写积压空间中的数据,积压空间存储有「更新缓存」
psync_len = addReplyReplicationBacklog(c,psync_offset);
redisLog(REDIS_NOTICE,
"Partial resynchronization request accepted. Sending %lld bytes of "
"backlog starting from offset %lld.", psync_len, psync_offset);
/* Note that we don't need to set the selected DB at server.slaveseldb
* to -1 to force the master to emit SELECT, since the slave already
* has this state from the previous connection with the master. */
refreshGoodSlavesCount();
return REDIS_OK; /* The caller can return, no full resync needed. */
need_full_resync:
......
// 向从机发送 +FULLRESYNC runid repl_offset
}