热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

参数详解复制进程_Redis源码分析13——主从复制

原理主从复制本质其实挺简单的,核心思想:主发数据,从接受数据。但是我不知道这个从在不在,因此需要加一个定时心跳检测ÿ

原理

主从复制本质其实挺简单的,核心思想:主发数据,从接受数据。但是我不知道这个从在不在,因此需要加一个定时心跳检测;我也不知道你现在复制到哪里了,所以从会定期的回报给主。

934a5e1e9ed44da93f95dbf9b733c2db.png

配置文件

1464d3b77438733f3c761e10c69b6e29.png
3a37ecddc2ea9cbee4186c22c9ec3fa8.png
9ea37b7ec99f7a601cff025e39489476.png

术语介绍

主从复制执行方式

Redis节点默认都是主节点,因此建立复制关系有三种方式:

  1. 使用slaveof命令来建立主从关系
  2. 配置文件设置slaveof
  3. redis-cli使用相关参数,redis-server –port 9999 –slaveof 127.0.0.1 8888

主从复制类型

全量复制

主上数据全量复制到从。

  • 有盘复制
  1. 主先使用bgsave生成RDB文件,并在此时记录所有写命令(主要是为后面的命令传播做准备);
  2. 在serverCron定时任务中,将RDB文件发给从;
  3. 从将接收到的数据先记录在本地的RDB文件中,然后再载入RDB文件。
  4. 进入命令传播阶段。
  • 无盘复制

我们发现主先生成RDB文件,然后再读取RDB文件发给从,这个难道不能优化一下吗。难道不能直接将数据通过socket发给从吗?答案当然是:Yes。

无盘不会先将数据持久化至磁盘,而是直接通过网络发送RDB数据。为了区分是否是无盘复制,这则会以”$EOF:rn”开头,以”“结尾。开头和结尾中的内容相同,都是40字节长的,”0123456789abcdef”中的字符组成的随机字符串。

  1. 在serverCron定时任务中,主进程fork一个子进程,持久化一个RDB数据在内存,然后将数据发送给从节点;
  2. 从将接收到的数据先记录在本地的RDB文件中,然后再载入RDB文件。
  3. 进入命令传播阶段。
  • 使用场景
    • 首次复制
    • 不在复制积压缓冲区范围内
    • 主节点发生变化
    • 当从库开启了AOF持久化,redis加载顺序发生变化优先加载AOF文件,但是由于aof文件中没有复制信息,所以导致重启后从实例依旧使用全量复制!

增量复制

全量复制代价太大,因此Redis使用复制积压缓冲区和带备份的masterid来尽可能的使用增量复制

  • 使用场景
    • 在复制积压缓冲区内,主从节点网络闪断或从节点重启。

命令传播

在增量或全量复制时,Redis是继续对外服务的,这将导致主从数据存在差异,如何处理呢?此时会进行命令传播,在复制过程中,会将数据拷贝一份到复制积压缓冲区和从服务器 输出缓冲区中。

复制积压缓冲区

在复制期间,如果出现网络闪断或者命令丢失等异常情况时,从节点会向主节点要求补发丢失的命令数据,如果主节点的复制积压缓冲区内存在这部分数据则直接发送给从节点,这样就可以保持主从节点复制的一致性。补发的这部分数据一般远远小于全量数据,所以开销很小。

积压缓冲区是一个由主服务器维护的固定长度、先进先出的队列,默认大小为1M。如果超过了1M,那么会进行覆盖操作。而且对于复制积压缓冲区整个主节点只有一个,所有的从节点共享此缓冲区。

f258b8fe05268551491e4980ff4dcde7.png

服务器运行id

  1. 每个服务器运行时都会创建一个40位的随机runid,redis每次重启该值不同(因此我们可以使用debug reload来实现,重新加载文件,注意这里有优化),因此如果从节点记录主的runid和主节点自身的runid不一样,也会进行全同步。
27c382790429969b186b70437b870713.png
298f0ca2bb9cd7b2f160b3cb461f68ff.png
  1. 4.0以后服务端多了一个master_replid2,常用于兄弟实例或级联复制,主库故障切换后,使用psync依旧可以进行半同步

struct redisServer{
/* Replication (master) */char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/
}

对于从节点而言:在shutdown时,会将此主节点的master_replid和offset写入rdb的AUX字段中,注意如果此时不是rdb持久化,必须要使用shutdown save参数。见prepareForShutdown函数。

af9578515e3bafaf242091096e72056c.png

主从复制过程详解

主从复制过程中涉及到的状态

握手过程实际上就是一个维护状态机的过程。

#define REPL_STATE_NONE 0 /* No active replication */
#define REPL_STATE_CONNECT 1 /* Must connect to master */
#define REPL_STATE_CONNECTING 2 /* Connecting to master */
/* --- Handshake states, must be ordered --- */
#define REPL_STATE_RECEIVE_PONG 3 /* Wait for PING reply */
#define REPL_STATE_SEND_AUTH 4 /* Send AUTH to master */
#define REPL_STATE_RECEIVE_AUTH 5 /* Wait for AUTH reply */
#define REPL_STATE_SEND_PORT 6 /* Send REPLCONF listening-port */
#define REPL_STATE_RECEIVE_PORT 7 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_IP 8 /* Send REPLCONF ip-address */
#define REPL_STATE_RECEIVE_IP 9 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_CAPA 10 /* Send REPLCONF capa */
#define REPL_STATE_RECEIVE_CAPA 11 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_PSYNC 12 /* Send PSYNC */
#define REPL_STATE_RECEIVE_PSYNC 13 /* Wait for PSYNC reply */
/* --- End of handshake states --- */
#define REPL_STATE_TRANSFER 14 /* Receiving .rdb from master */
#define REPL_STATE_CONNECTED 15 /* Connected to master */

主从关系的建立

297cf67975682963328610935f1c9a95.png

slaveof 后面参数有两种一种是no+one,一种是IP+Port。

  • slaveof no one

表示断开主从复制。

void replicaofCommand(client *c) {/* The special host/port combination "NO" "ONE" turns the instance* into a master. Otherwise the new master address is set. */if (!strcasecmp(c->argv[1]->ptr,"no") &&!strcasecmp(c->argv[2]->ptr,"one")) {if (server.masterhost) {replicationUnsetMaster();sds client = catClientInfoString(sdsempty(),c);serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",client);sdsfree(client);}} else {...}addReply(c,shared.ok);
}/* 取消主从复制,将他自己设置为主,恢复最初状态. */
void replicationUnsetMaster(void) {if (server.masterhost == NULL) return; /* Nothing to do. */sdsfree(server.masterhost);server.masterhost = NULL;shiftReplicationId();if (server.master) freeClient(server.master);replicationDiscardCachedMaster();cancelReplicationHandshake();disconnectSlaves();server.repl_state = REPL_STATE_NONE;server.slaveseldb = -1;server.repl_no_slaves_since = server.unixtime;
}

  • slaveof IP Port

执行主从复制的初始阶段:握手阶段,见上图。主节点的IP和Port记录在redisServer数据结构中,并且还会缓存主节点信息。

struct redisServer
{...char *masterhost; /* Hostname of master */int masterport; /* Port of master */...
}

在replicaofCommand中replicationSetMaster缓冲主节点信息:

/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {int was_master = server.masterhost == NULL;//记录新的主节点sdsfree(server.masterhost);server.masterhost = sdsnew(ip);server.masterport = port;if (server.master) {freeClient(server.master);}disconnectAllBlockedClients(); /* Clients blocked in master, now slave. *//* Force our slaves to resync with us as well. They may hopefully be able* to partially resync with us, but we can notify the replid change. */disconnectSlaves();cancelReplicationHandshake();//这里缓存主节点if (was_master) replicationCacheMasterUsingMyself();server.repl_state = REPL_STATE_CONNECT;
}

维护状态机

4ee63bdf4c8eeadc0ba0036bb0e8458a.png
  1. 上述状态机状态主要有以下作用:
  • ping用来验证主从是否正常;
  • 对于数据比较重要的节点,主节点会通过设置requirepass参数进行密码验证,这时所有的客户端访问必须使用auth命令实行校验。参考Redis设计与实现一图
98f263d32c6bd7c9874472b8535b1da4.png
  • 向主发送从节点的IP和Port
主节点需要记录每个从节点的IP和Port,这样才能向他发数据,其发送的Port命令格式为REPLCONF listening-port PortNum,发送IP格式为:REPLCONF ip-address IPNum

主节点的信息记录在client数据结构中

struct{。。。
int slave_listening_port; /* As configured with: SLAVECONF listening-port */char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */。。。
} client;

  • 主从复制支持哪些特性

比如是否支持无盘复制、是否支持增量复制等。见宏

#define SLAVE_CAPA_NONE 0
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */

该特性保存在client数据结构中

struct client{...int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */...
}

  1. 上述流程主要是在serverCron中replicationCron执行的。

serverCron函数&#xff1a;

run_with_period(1000) replicationCron();

再具体点&#xff0c;其实是connectWithMaster函数&#xff1a;

replicationCron---->connectWithMaster---->syncWithMasterint connectWithMaster(void) {//注册可写事件if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) &#61;&#61;AE_ERR){close(fd);serverLog(LL_WARNING,"Can&#39;t create readable event for SYNC");return C_ERR;}//更新状态server.repl_transfer_lastio &#61; server.unixtime;server.repl_transfer_s &#61; fd;server.repl_state &#61; REPL_STATE_CONNECTING;return C_OK;
}/* This handler fires when the non blocking connect was able to* establish a connection with the master. */
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {char tmpfile[256], *err &#61; NULL;int dfd &#61; -1, maxtries &#61; 5;int sockerr &#61; 0, psync_result;socklen_t errlen &#61; sizeof(sockerr);UNUSED(el);UNUSED(privdata);UNUSED(mask);/* If this event fired after the user turned the instance into a master* with SLAVEOF NO ONE we must just return ASAP. */if (server.repl_state &#61;&#61; REPL_STATE_NONE) {close(fd);return;}/* Check for errors in the socket: after a non blocking connect() we* may find that the socket is in error state. */if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) &#61;&#61; -1)sockerr &#61; errno;if (sockerr) {serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",strerror(sockerr));goto error;}/* Send a PING to check the master is able to reply without errors. */if (server.repl_state &#61;&#61; REPL_STATE_CONNECTING) {serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");/* Delete the writable event so that the readable event remains* registered and we can wait for the PONG reply. */aeDeleteFileEvent(server.el,fd,AE_WRITABLE);server.repl_state &#61; REPL_STATE_RECEIVE_PONG;/* Send the PING, don&#39;t check for errors at all, we have the timeout* that will take care about this. */err &#61; sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);if (err) goto write_error;return;}/* Receive the PONG command. */if (server.repl_state &#61;&#61; REPL_STATE_RECEIVE_PONG) {err &#61; sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);/* We accept only two replies as valid, a positive &#43;PONG reply* (we just check for "&#43;") or an authentication error.* Note that older versions of Redis replied with "operation not* permitted" instead of using a proper error code, so we test* both. */if (err[0] !&#61; &#39;&#43;&#39; &&strncmp(err,"-NOAUTH",7) !&#61; 0 &&strncmp(err,"-ERR operation not permitted",28) !&#61; 0){serverLog(LL_WARNING,"Error reply to PING from master: &#39;%s&#39;",err);sdsfree(err);goto error;} else {serverLog(LL_NOTICE,"Master replied to PING, replication can continue...");}sdsfree(err);server.repl_state &#61; REPL_STATE_SEND_AUTH;}/* AUTH with the master if required. */if (server.repl_state &#61;&#61; REPL_STATE_SEND_AUTH) {if (server.masterauth) {err &#61; sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);if (err) goto write_error;server.repl_state &#61; REPL_STATE_RECEIVE_AUTH;return;} else {server.repl_state &#61; REPL_STATE_SEND_PORT;}}/* Receive AUTH reply. */if (server.repl_state &#61;&#61; REPL_STATE_RECEIVE_AUTH) {err &#61; sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);if (err[0] &#61;&#61; &#39;-&#39;) {serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);sdsfree(err);goto error;}sdsfree(err);server.repl_state &#61; REPL_STATE_SEND_PORT;}/* Set the slave port, so that Master&#39;s INFO command can list the* slave listening port correctly. */if (server.repl_state &#61;&#61; REPL_STATE_SEND_PORT) {sds port &#61; sdsfromlonglong(server.slave_announce_port ?server.slave_announce_port : server.port);err &#61; sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF","listening-port",port, NULL);sdsfree(port);if (err) goto write_error;sdsfree(err);server.repl_state &#61; REPL_STATE_RECEIVE_PORT;return;}/* Receive REPLCONF listening-port reply. */if (server.repl_state &#61;&#61; REPL_STATE_RECEIVE_PORT) {err &#61; sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);/* Ignore the error if any, not all the Redis versions support* REPLCONF listening-port. */if (err[0] &#61;&#61; &#39;-&#39;) {serverLog(LL_NOTICE,"(Non critical) Master does not understand ""REPLCONF listening-port: %s", err);}sdsfree(err);server.repl_state &#61; REPL_STATE_SEND_IP;}/* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */if (server.repl_state &#61;&#61; REPL_STATE_SEND_IP &&server.slave_announce_ip &#61;&#61; NULL){server.repl_state &#61; REPL_STATE_SEND_CAPA;}/* Set the slave ip, so that Master&#39;s INFO command can list the* slave IP address port correctly in case of port forwarding or NAT. */if (server.repl_state &#61;&#61; REPL_STATE_SEND_IP) {err &#61; sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF","ip-address",server.slave_announce_ip, NULL);if (err) goto write_error;sdsfree(err);server.repl_state &#61; REPL_STATE_RECEIVE_IP;return;}/* Receive REPLCONF ip-address reply. */if (server.repl_state &#61;&#61; REPL_STATE_RECEIVE_IP) {err &#61; sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);/* Ignore the error if any, not all the Redis versions support* REPLCONF listening-port. */if (err[0] &#61;&#61; &#39;-&#39;) {serverLog(LL_NOTICE,"(Non critical) Master does not understand ""REPLCONF ip-address: %s", err);}sdsfree(err);server.repl_state &#61; REPL_STATE_SEND_CAPA;}/* Inform the master of our (slave) capabilities.** EOF: supports EOF-style RDB transfer for diskless replication.* PSYNC2: supports PSYNC v2, so understands &#43;CONTINUE .** The master will ignore capabilities it does not understand. */if (server.repl_state &#61;&#61; REPL_STATE_SEND_CAPA) {err &#61; sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF","capa","eof","capa","psync2",NULL);if (err) goto write_error;sdsfree(err);server.repl_state &#61; REPL_STATE_RECEIVE_CAPA;return;}/* Receive CAPA reply. */if (server.repl_state &#61;&#61; REPL_STATE_RECEIVE_CAPA) {err &#61; sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);/* Ignore the error if any, not all the Redis versions support* REPLCONF capa. */if (err[0] &#61;&#61; &#39;-&#39;) {serverLog(LL_NOTICE,"(Non critical) Master does not understand ""REPLCONF capa: %s", err);}sdsfree(err);server.repl_state &#61; REPL_STATE_SEND_PSYNC;}/* Try a partial resynchonization. If we don&#39;t have a cached master* slaveTryPartialResynchronization() will at least try to use PSYNC* to start a full resynchronization so that we get the master run id* and the global offset, to try a partial resync at the next* reconnection attempt. */if (server.repl_state &#61;&#61; REPL_STATE_SEND_PSYNC) {if (slaveTryPartialResynchronization(fd,0) &#61;&#61; PSYNC_WRITE_ERROR) {err &#61; sdsnew("Write error sending the PSYNC command.");goto write_error;}server.repl_state &#61; REPL_STATE_RECEIVE_PSYNC;return;}/* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */if (server.repl_state !&#61; REPL_STATE_RECEIVE_PSYNC) {serverLog(LL_WARNING,"syncWithMaster(): state machine error, ""state should be RECEIVE_PSYNC but is %d",server.repl_state);goto error;}psync_result &#61; slaveTryPartialResynchronization(fd,1);if (psync_result &#61;&#61; PSYNC_WAIT_REPLY) return; /* Try again later... *//* If the master is in an transient error, we should try to PSYNC* from scratch later, so go to the error path. This happens when* the server is loading the dataset or is not connected with its* master and so forth. */if (psync_result &#61;&#61; PSYNC_TRY_LATER) goto error;/* Note: if PSYNC does not return WAIT_REPLY, it will take care of* uninstalling the read handler from the file descriptor. */if (psync_result &#61;&#61; PSYNC_CONTINUE) {serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");return;}/* PSYNC failed or is not supported: we want our slaves to resync with us* as well, if we have any sub-slaves. The master may transfer us an* entirely different data set and we have no way to incrementally feed* our slaves after that. */disconnectSlaves(); /* Force our slaves to resync with us as well. */freeReplicationBacklog(); /* Don&#39;t allow our chained slaves to PSYNC. *//* Fall back to SYNC if needed. Otherwise psync_result &#61;&#61; PSYNC_FULLRESYNC* and the server.master_replid and master_initial_offset are* already populated. */if (psync_result &#61;&#61; PSYNC_NOT_SUPPORTED) {serverLog(LL_NOTICE,"Retrying with SYNC...");if (syncWrite(fd,"SYNCrn",6,server.repl_syncio_timeout*1000) &#61;&#61; -1) {serverLog(LL_WARNING,"I/O error writing to MASTER: %s",strerror(errno));goto error;}}/* Prepare a suitable temp file for bulk transfer */while(maxtries--) {snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());dfd &#61; open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);if (dfd !&#61; -1) break;sleep(1);}if (dfd &#61;&#61; -1) {serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));goto error;}/* Setup the non blocking download of the bulk file. */if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)&#61;&#61; AE_ERR){serverLog(LL_WARNING,"Can&#39;t create readable event for SYNC: %s (fd&#61;%d)",strerror(errno),fd);goto error;}server.repl_state &#61; REPL_STATE_TRANSFER;server.repl_transfer_size &#61; -1;server.repl_transfer_read &#61; 0;server.repl_transfer_last_fsync_off &#61; 0;server.repl_transfer_fd &#61; dfd;server.repl_transfer_lastio &#61; server.unixtime;server.repl_transfer_tmpfile &#61; zstrdup(tmpfile);return;error:aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);if (dfd !&#61; -1) close(dfd);close(fd);server.repl_transfer_s &#61; -1;server.repl_state &#61; REPL_STATE_CONNECT;return;write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err);sdsfree(err);goto error;
}

开始数据复制

总流程图

b32dfcde7d13d430754f920860455e23.png

复制类型判断流程

7fe033c57434adca12623c0d1c805d86.png

全量复制

需要注意的是&#xff1a;主节点回复从节点发送过来的psync之后不再干其他事情了。真正开始发送文件是在serverCron周期函数中。主进程使用wait3来回收子进程&#xff0c;此时根据复制类型来进行复制。

eceaf28f5b104cf24cabf1fa10828e2e.png

/* When a background RDB saving/transfer terminates, call the right handler. */
void backgroundSaveDoneHandler(int exitcode, int bysignal) {switch(server.rdb_child_type) {case RDB_CHILD_TYPE_DISK:backgroundSaveDoneHandlerDisk(exitcode,bysignal);break;case RDB_CHILD_TYPE_SOCKET:backgroundSaveDoneHandlerSocket(exitcode,bysignal);break;default:serverPanic("Unknown RDB child type.");break;}
}

  1. 有盘复制
  • 有盘复制&#xff0c;则使用的是RDB文件。但是呢&#xff0c;如果有正在进行RDB持久化&#xff0c;Redis则是尽可能的复用RDB文件&#xff0c;具体见函数synCommand&#xff1a;

/* SYNC and PSYNC command implemenation. */
void syncCommand(client *c) {..../* Setup the slave as one waiting for BGSAVE to start. The following code* paths will change the state if we handle the slave differently. */c->replstate &#61; SLAVE_STATE_WAIT_BGSAVE_START;if (server.repl_disable_tcp_nodelay)anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */c->repldbfd &#61; -1;c->flags |&#61; CLIENT_SLAVE;listAddNodeTail(server.slaves,c);/* Create the replication backlog if needed. */if (listLength(server.slaves) &#61;&#61; 1 && server.repl_backlog &#61;&#61; NULL) {/* When we create the backlog from scratch, we always use a new* replication ID and clear the ID2, since there is no valid* past history. */changeReplicationId();clearReplicationId2();createReplicationBacklog();}/* CASE 1: BGSAVE is in progress, with disk target. */if (server.rdb_child_pid !&#61; -1 &&server.rdb_child_type &#61;&#61; RDB_CHILD_TYPE_DISK){/* Ok a background save is in progress. Let&#39;s check if it is a good* one for replication, i.e. if there is another slave that is* registering differences since the server forked to save. */client *slave;listNode *ln;listIter li;listRewind(server.slaves,&li);while((ln &#61; listNext(&li))) {slave &#61; ln->value;if (slave->replstate &#61;&#61; SLAVE_STATE_WAIT_BGSAVE_END) break;}/* To attach this slave, we check that it has at least all the* capabilities of the slave that triggered the current BGSAVE. */if (ln && ((c->slave_capa & slave->slave_capa) &#61;&#61; slave->slave_capa)) {/* Perfect, the server is already registering differences for* another slave. Set the right state, and copy the buffer. */copyClientOutputBuffer(c,slave);replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");} else {/* No way, we need to wait for the next BGSAVE in order to* register differences. */serverLog(LL_NOTICE,"Can&#39;t attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC");}。。。}

在函数中&#xff0c;我们看到copyClientOutputBuffer函数&#xff0c;主要是因为复用了RDB文件&#xff0c;而执行RDB时&#xff0c;slave中命令肯定是最新&#xff0c;因此我们需要将slave的数据拷贝会主节点。

  • 来看看backgroundSaveDoneHandlerDisk函数

重点分析updateSlavesWaitingBgsave函数&#xff1a;

void updateSlavesWaitingBgsave(int bgsaveerr, int type) {listNode *ln;int startbgsave &#61; 0;int mincapa &#61; -1;listIter li;listRewind(server.slaves,&li);while((ln &#61; listNext(&li))) {client *slave &#61; ln->value;if (slave->replstate &#61;&#61; SLAVE_STATE_WAIT_BGSAVE_START) {startbgsave &#61; 1;mincapa &#61; (mincapa &#61;&#61; -1) ? slave->slave_capa :(mincapa & slave->slave_capa);} else if (slave->replstate &#61;&#61; SLAVE_STATE_WAIT_BGSAVE_END) {struct redis_stat buf;/* If this was an RDB on disk save, we have to prepare to send* the RDB from disk to the slave socket. Otherwise if this was* already an RDB -> Slaves socket transfer, used in the case of* diskless replication, our work is trivial, we can just put* the slave online. */if (type &#61;&#61; RDB_CHILD_TYPE_SOCKET) {...} else {if (bgsaveerr !&#61; C_OK) {freeClient(slave);serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");continue;}if ((slave->repldbfd &#61; open(server.rdb_filename,O_RDONLY)) &#61;&#61; -1 ||redis_fstat(slave->repldbfd,&buf) &#61;&#61; -1) {freeClient(slave);serverLog(LL_WARNING,"SYNC failed. Can&#39;t open/stat DB after BGSAVE: %s", strerror(errno));continue;}slave->repldboff &#61; 0;slave->repldbsize &#61; buf.st_size;slave->replstate &#61; SLAVE_STATE_SEND_BULK;slave->replpreamble &#61; sdscatprintf(sdsempty(),"$%lldrn",(unsigned long long) slave->repldbsize);aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) &#61;&#61; AE_ERR) {freeClient(slave);continue;}}}}if (startbgsave) startBgsaveForReplication(mincapa);
}

其中最重要的是&#xff1a;

  • 打开rdb文件&#xff0c;并注册可写事件。这样就可以将数据发送给从节点。
  • 当需要创建新的RDB文件时&#xff0c;执行startBgsaveForReplication函数
  1. 无盘复制
  • 无盘复制避免主节点将数据先持久化到磁盘&#xff0c;然后再读取。而是直接fork&#xff0c;放在内存。见syncCommand函数&#xff1a;

/* SYNC and PSYNC command implemenation. */
void syncCommand(client *c) {.../* CASE 2: BGSAVE is in progress, with socket target. */} else if (server.rdb_child_pid !&#61; -1 &&server.rdb_child_type &#61;&#61; RDB_CHILD_TYPE_SOCKET){/* There is an RDB child process but it is writing directly to* children sockets. We need to wait for the next BGSAVE* in order to synchronize. */serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");/* CASE 3: There is no BGSAVE is progress. */} else {if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {/* Diskless replication RDB child is created inside* replicationCron() since we want to delay its start a* few seconds to wait for more slaves to arrive. */if (server.repl_diskless_sync_delay)serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");} else {//这里if (server.aof_child_pid &#61;&#61; -1) {startBgsaveForReplication(c->slave_capa);} else {}}}return;
}

更进一步实际是rdbSaveToSlavesSockets函数。

de7450112bf0c6b0f69dc3ee558d71e9.png

在这里需要特别说一下repl_diskless_sync_delay变量&#xff0c;他表示无盘复制的延迟时间&#xff0c;默认为5s&#xff0c;是为了保证有更多的从节点到达&#xff0c;避免设置为0&#xff0c;因为无盘模式下&#xff0c;一旦开始复制到达的slave将会排队。

  • 来看看backgroundSaveDoneHandlerSocket函数

具体实际还是updateSlavesWaitingBgsave函数

void updateSlavesWaitingBgsave(int bgsaveerr, int type) {listNode *ln;int startbgsave &#61; 0;int mincapa &#61; -1;listIter li;listRewind(server.slaves,&li);while((ln &#61; listNext(&li))) {client *slave &#61; ln->value;if (slave->replstate &#61;&#61; SLAVE_STATE_WAIT_BGSAVE_START) {startbgsave &#61; 1;mincapa &#61; (mincapa &#61;&#61; -1) ? slave->slave_capa :(mincapa & slave->slave_capa);} else if (slave->replstate &#61;&#61; SLAVE_STATE_WAIT_BGSAVE_END) {struct redis_stat buf;if (type &#61;&#61; RDB_CHILD_TYPE_SOCKET) {replicationGetSlaveName(slave));//在这里额slave->replstate &#61; SLAVE_STATE_ONLINE;slave->repl_put_online_on_ack &#61; 1;slave->repl_ack_time &#61; server.unixtime; /* Timeout otherwise. */} else {}}}if (startbgsave) startBgsaveForReplication(mincapa);
}

在这里并没发现注册可写事件&#xff0c;是因为在rdbSaveToSlavesSockets中的rdbSaveRioWithEOFMark已经发送了。

88be6e409f2aa7dc88cb2b98517b781b.png

增量复制

增量复制其实没啥好说的&#xff0c;主要看从节点的复制偏移量是否位于主节点复制积压缓冲区、从节点的主节点是否发生变化。见函数masterTryPartialResynchronization


从节点接受数据

从上面看出主服务器发送 RDB 文件后 从服务器触发可读事件执行 syncWithMaster中的readSyncBulkPayload() 函数&#xff0c;这个函数就会把主服务器发来的数据读到一个缓冲区中&#xff0c;然后将缓冲区的数据写到刚才打开的临时文件中&#xff0c;接着要载入到从服务器的数据库中&#xff0c;最后同步到磁盘中。

/* Setup the non blocking download of the bulk file. */if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)&#61;&#61; AE_ERR){serverLog(LL_WARNING,"Can&#39;t create readable event for SYNC: %s (fd&#61;%d)",strerror(errno),fd);goto error;}

  1. 针对无盘复制&#xff0c;我们在前面说过&#xff0c;他是以”$EOF:rn”开头&#xff0c;以”“结尾。开头和结尾中的内容相同&#xff0c;都是40字节长的&#xff0c;”0123456789abcdef”中的字符组成的随机字符串。因此我们需要将其去掉。
b7fab3971d6bf9902059771399a1bab0.png
7d976d1eb76233dea2832f3df81f7350.png
  1. 将接受到数据先存放至RDB文件
217d30432da88613063e922fd1f0c09d.png
  1. 当全部接受完后&#xff0c;将数据库清空&#xff0c;暂停aof持久化&#xff0c;删除可读事件、更新从节点中主节点的复制偏移量和runid、然后加载RDB文件。
6353ef742c8d197cf3a7274a59ca8f2b.png

if(aof_is_enabled) stopAppendOnly();//暂停aof持久化
signalFlushedDb(-1);emptyDb(-1,server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,replicationEmptyDbCallback);//清空数据库aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);//删除可读事件serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");rdbSaveInfo rsi &#61; RDB_SAVE_INFO_INIT;if (rdbLoad(server.rdb_filename,&rsi) !&#61; C_OK) { //加载RDBserverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");cancelReplicationHandshake();//暂停if (aof_is_enabled) restartAOF();return;}/* Final setup of the connected slave <- master link */zfree(server.repl_transfer_tmpfile);close(server.repl_transfer_fd);replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);server.repl_state &#61; REPL_STATE_CONNECTED;server.repl_down_since &#61; 0;/* 记录主节点的runid和偏移量 */memcpy(server.replid,server.master->replid,sizeof(server.replid));server.master_repl_offset &#61; server.master->reploff;clearReplicationId2();if (server.repl_backlog &#61;&#61; NULL) createReplicationBacklog();if (aof_is_enabled) restartAOF();//重启aof

命令传播

那么以后主服务器有写命令执行&#xff0c;主从的数据又不一致了&#xff0c;那么就需要一个 命令传播机制&#xff0c;传播的时候会通过 propagate() 函数调用 replicationFeedSlaves() &#xff0c;会将执行的命令以协议的传输格式写到从服务器 client 的输出缓冲区中&#xff0c;这就是为什么主服务器会将从服务器 client 的输出缓冲区发送到从服务器&#xff0c;也会添加到 server.repl_backlog

91e433d06ae44517347441dc47115683.png
77734987f78551cb665d8188efa490fa.png

两个定时检测任务

a0237f3e8d52371814085c2132924967.png

我们知道跨主机&#xff0c;那就涉及到是否超时&#xff0c;对方是否挂掉。因此主从节点分别维护了两个事件。

replconf心跳

580d0a58f3cdb036b96c14b90bc77290.png

其作用主要有以下几个&#xff1a;

  • 实时监测主从节点网络状态
  • 上报自身复制偏移量&#xff0c;检查复制数据是否丢失&#xff0c;如果从节点数据丢失&#xff0c;再从主节点的复制缓冲区中拉取丢失数据
  • 实现保证从节点的数量和延迟性功能&#xff0c;通过min-slaves-to-write、minslaves-max-lag参数配置定义

主节点根据replconf命令判断从节点超时时间&#xff0c;体现在info replication统计中的lag信息中&#xff0c;lag表示与从节点最后一次通信延迟的秒数&#xff0c;正常延迟应该在0和1之间。如果超过repl-timeout配置的值&#xff08;默认60秒&#xff09;&#xff0c;则判定从节点
下线并断开复制客户端连接。即使主节点判定从节点下线后&#xff0c;如果从节点重新恢复&#xff0c;心跳检测会继续进行。

4c2589b7bd9d3fdfc440a749e827b173.png

ping

ping心跳主要看从节点还在不在&#xff0c;其计算方式如下&#xff1a;

d6d75a3d323d29696012f69ceb289517.png

主从复制对过期键的处理

主&#xff1a;只会载入未过期的&#xff1b;

从&#xff1a;全部载入&#xff0c;因为后期主从会同步

info replication

使用info replication命令可以查看当前主从同步状态。

d4c4679b9cc8b15f26ef88434d944615.png
4c2589b7bd9d3fdfc440a749e827b173.png



推荐阅读
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • 基于PgpoolII的PostgreSQL集群安装与配置教程
    本文介绍了基于PgpoolII的PostgreSQL集群的安装与配置教程。Pgpool-II是一个位于PostgreSQL服务器和PostgreSQL数据库客户端之间的中间件,提供了连接池、复制、负载均衡、缓存、看门狗、限制链接等功能,可以用于搭建高可用的PostgreSQL集群。文章详细介绍了通过yum安装Pgpool-II的步骤,并提供了相关的官方参考地址。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 本文介绍了为什么要使用多进程处理TCP服务端,多进程的好处包括可靠性高和处理大量数据时速度快。然而,多进程不能共享进程空间,因此有一些变量不能共享。文章还提供了使用多进程实现TCP服务端的代码,并对代码进行了详细注释。 ... [详细]
  • 本文介绍了RPC框架Thrift的安装环境变量配置与第一个实例,讲解了RPC的概念以及如何解决跨语言、c++客户端、web服务端、远程调用等需求。Thrift开发方便上手快,性能和稳定性也不错,适合初学者学习和使用。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • 本文介绍了如何使用iptables添加非对称的NAT规则段,以实现内网穿透和端口转发的功能。通过查阅相关文章,得出了解决方案,即当匹配的端口在映射端口的区间内时,可以成功进行端口转发。详细的操作步骤和命令示例也在文章中给出。 ... [详细]
  • 本文介绍了计算机网络的定义和通信流程,包括客户端编译文件、二进制转换、三层路由设备等。同时,还介绍了计算机网络中常用的关键词,如MAC地址和IP地址。 ... [详细]
  • 利用Visual Basic开发SAP接口程序初探的方法与原理
    本文介绍了利用Visual Basic开发SAP接口程序的方法与原理,以及SAP R/3系统的特点和二次开发平台ABAP的使用。通过程序接口自动读取SAP R/3的数据表或视图,在外部进行处理和利用水晶报表等工具生成符合中国人习惯的报表样式。具体介绍了RFC调用的原理和模型,并强调本文主要不讨论SAP R/3函数的开发,而是针对使用SAP的公司的非ABAP开发人员提供了初步的接口程序开发指导。 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 本文讨论了在手机移动端如何使用HTML5和JavaScript实现视频上传并压缩视频质量,或者降低手机摄像头拍摄质量的问题。作者指出HTML5和JavaScript无法直接压缩视频,只能通过将视频传送到服务器端由后端进行压缩。对于控制相机拍摄质量,只有使用JAVA编写Android客户端才能实现压缩。此外,作者还解释了在交作业时使用zip格式压缩包导致CSS文件和图片音乐丢失的原因,并提供了解决方法。最后,作者还介绍了一个用于处理图片的类,可以实现图片剪裁处理和生成缩略图的功能。 ... [详细]
  • 移动端常用单位——rem的使用方法和注意事项
    本文介绍了移动端常用的单位rem的使用方法和注意事项,包括px、%、em、vw、vh等其他常用单位的比较。同时还介绍了如何通过JS获取视口宽度并动态调整rem的值,以适应不同设备的屏幕大小。此外,还提到了rem目前在移动端的主流地位。 ... [详细]
  • Nginx使用(server参数配置)
    本文介绍了Nginx的使用,重点讲解了server参数配置,包括端口号、主机名、根目录等内容。同时,还介绍了Nginx的反向代理功能。 ... [详细]
  • 关于我们EMQ是一家全球领先的开源物联网基础设施软件供应商,服务新产业周期的IoT&5G、边缘计算与云计算市场,交付全球领先的开源物联网消息服务器和流处理数据 ... [详细]
  • ZSI.generate.Wsdl2PythonError: unsupported local simpleType restriction ... [详细]
author-avatar
蘇靖雯jingwensu
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有