热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Solr4.8.0源码分析(22)之SolrCloud的Recovery策略(三)

Solr4.8.0源码分析(22)之SolrCloud的Recovery策略(三)本文是SolrCloud的Recovery策略系列的第三篇文章,前面两篇主要介绍了R

Solr4.8.0源码分析(22)之SolrCloud的Recovery策略(三)

    本文是SolrCloud的Recovery策略系列的第三篇文章,前面两篇主要介绍了Recovery的总体流程,以及PeerSync策略。本文以及后续的文章将重点介绍Replication策略。Replication策略不但可以在SolrCloud中起到leader到replica的数据同步,也可以在用多个单独的Solr来实现主从同步。本文先介绍在SolrCloud的leader到replica的数据同步,下一篇文章将介绍通过配置SolrConfig.xml来实现多个Solr节点间的主从同步。

一. Replication策略介绍

        Replication的作用在前文已经介绍过了,当需要同步的数据较多时候,Solr会放弃按document为单位的同步模式(PeerSync)而采用以文件为最小单位的同步模式。在Replication的过程中,重点使用了SnapPuller类,它封装了对leader数据快照以及通过快照来实现同步的代码。Replication流程原理如下图所示。接下来根据源码来介绍每一步骤。

  • 开始Replication的时候,首先进行一次commitOnLeader操作,即发送commit命令到leader。它的作用就是将leader的update中的数据刷入到索引文件中,使得快照snap完整。

1 private void commitOnLeader(String leaderUrl) throws SolrServerException,
2 IOException {
3 HttpSolrServer server = new HttpSolrServer(leaderUrl);
4 try {
5 server.setConnectionTimeout(30000);
6 UpdateRequest ureq = new UpdateRequest();
7 ureq.setParams(new ModifiableSolrParams());
8 ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
9 ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
10 ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
11 server);
12 } finally {
13 server.shutdown();
14 }
15 }

  • 获取leader的lastVersion与lastGeneration,同本分片的进行比较来确定是否需要进行同步。

1 //get the current 'replicateable' index version in the master
2 NamedList response = null;
3 try {
4 response = getLatestVersion();
5 } catch (Exception e) {
6 LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage());
7 return false;
8 }
9 long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
10 long latestGeneration = (Long) response.get(GENERATION);

  • 检查本分片是否打开IndexWriter,如果没有则Recovery失败。这是因为没有打开indexWriter就无法获取索引的generation以及version信息,replication无法进行下去。

1 // TODO: make sure that getLatestCommit only returns commit points for the main index (i.e. no side-car indexes)
2 IndexCommit commit = core.getDeletionPolicy().getLatestCommit();
3 if (commit == null) {
4 // Presumably the IndexWriter hasn't been opened yet, and hence the deletion policy hasn't been updated with commit points
5 RefCounted searcherRefCounted = null;
6 try {
7 searcherRefCounted = core.getNewestSearcher(false);
8 if (searcherRefCounted == null) {
9 LOG.warn("No open searcher found - fetch aborted");
10 return false;
11 }
12 commit = searcherRefCounted.get().getIndexReader().getIndexCommit();
13 } finally {
14 if (searcherRefCounted != null)
15 searcherRefCounted.decref();
16 }
17 }

  • 如果获取的leader的lastestVersion为0,则表示leader没有索引数据,那么根本就不需要进行replication。所以返回true结果。

1 if (latestVersion == 0L) {
2 if (forceReplication && commit.getGeneration() != 0) {
3 // since we won't get the files for an empty index,
4 // we just clear ours and commit
5 RefCounted iw = core.getUpdateHandler().getSolrCoreState().getIndexWriter(core);
6 try {
7 iw.get().deleteAll();
8 } finally {
9 iw.decref();
10 }
11 SolrQueryRequest req = new LocalSolrQueryRequest(core,
12 new ModifiableSolrParams());
13 core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
14 }
15
16 //there is nothing to be replicated
17 successfulInstall = true;
18 return true;
19 }

  • 我们还需要通过比较分片的lastestVersion和leader的lastestVersion来确定是否需要继续进行replication,因为两者相等同样没必要进行replication,除非进行的时forceReplication

1 if (!forceReplication && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) {
2 //master and slave are already in sync just return
3 LOG.info("Slave in sync with master.");
4 successfulInstall = true;
5 return true;
6 }

  • 获取leader节点的lastestGeneration的索引文件列表以及相关文件信息,以及配置文件列表以及信息。如果文件列表为空,则退出replication。

1 // get the list of files first
2 fetchFileList(latestGeneration);
3 // this can happen if the commit point is deleted before we fetch the file list.
4 if(filesToDownload.isEmpty()) return false;
5
6 private void fetchFileList(long gen) throws IOException {
7 ModifiableSolrParams params = new ModifiableSolrParams();
8 params.set(COMMAND, CMD_GET_FILE_LIST);
9 params.set(GENERATION, String.valueOf(gen));
10 params.set(CommonParams.WT, "javabin");
11 params.set(CommonParams.QT, "/replication");
12 QueryRequest req = new QueryRequest(params);
13 HttpSolrServer server = new HttpSolrServer(masterUrl, myHttpClient); //XXX modify to use shardhandler
14 try {
15 server.setSoTimeout(60000);
16 server.setConnectionTimeout(15000);
17 NamedList response = server.request(req);
18
19 List> files = (List>) response.get(CMD_GET_FILE_LIST);
20 if (files != null)
21 filesToDownload = Collections.synchronizedList(files);
22 else {
23 filesToDownload = Collections.emptyList();
24 LOG.error("No files to download for index generation: "+ gen);
25 }
26
27 files = (List>) response.get(CONF_FILES);
28 if (files != null)
29 confFilesToDownload = Collections.synchronizedList(files);
30
31 } catch (SolrServerException e) {
32 throw new IOException(e);
33 } finally {
34 server.shutdown();
35 }
36 }

  • 建立临时的index目录来存放同步过来的数据,临时index目录的格式是index.timestamp。它存放在data目录下。

1 String tmpIdxDirName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
2 tmpIndex = createTempindexDir(core, tmpIdxDirName);
3
4 tmpIndexDir = core.getDirectoryFactory().get(tmpIndex, DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
5
6 // cindex dir...
7 indexDirPath = core.getIndexDir();
8 indexDir = core.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);

  • 判断isFullCopyNeeded是否为true来决定是否需要关闭IndexWriter。如果本分片(slave)的数据的version或者generation新于master(leader)或者是forceReplication,那么必须进行所有数据的完整同步。

1 // if the generation of master is older than that of the slave , it means they are not compatible to be copied
2 // then a new index directory to be created and all the files need to be copied
3 boolean isFullCopyNeeded = IndexDeletionPolicyWrapper
4 .getCommitTimestamp(commit) >= latestVersion
5 || commit.getGeneration() >= latestGeneration || forceReplication;
6
7 if (isIndexStale(indexDir)) {
8 isFullCopyNeeded = true;
9 }
10
11 if (!isFullCopyNeeded) {
12 // rollback - and do it before we download any files
13 // so we don't remove files we thought we didn't need
14 // to download later
15 solrCore.getUpdateHandler().getSolrCoreState()
16 .closeIndexWriter(core, true);
17 }

  • 现在才开始真正的下载不同的索引文件,Replication是根据索引文件的大小来判断是否发生过变化.下载文件时候,Replication是以packet的大小为单位进行下载的,这可以在SolrConfig.xml中设置,下一篇文章将具体介绍这个。

1 private void downloadIndexFiles(boolean downloadCompleteIndex,
2 Directory indexDir, Directory tmpIndexDir, long latestGeneration)
3 throws Exception {
4 if (LOG.isDebugEnabled()) {
5 LOG.debug("Download files to dir: " + Arrays.asList(indexDir.listAll()));
6 }
7 for (Map file : filesToDownload) {
8 if (!slowFileExists(indexDir, (String) file.get(NAME))
9 || downloadCompleteIndex) {
10 dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file,
11 (String) file.get(NAME), false, latestGeneration);
12 currentFile = file;
13 dirFileFetcher.fetchFile();
14 filesDownloaded.add(new HashMap<>(file));
15 } else {
16 LOG.info("Skipping download for " &#43; file.get(NAME)
17 &#43; " because it already exists");
18 }
19 }
20 }
21
22 /**
23 * The main method which downloads file
24 */
25 void fetchFile() throws Exception {
26 try {
27 while (true) {
28 final FastInputStream is &#61; getStream();
29 int result;
30 try {
31 //fetch packets one by one in a single request
32 result &#61; fetchPackets(is);
33 if (result &#61;&#61; 0 || result &#61;&#61; NO_CONTENT) {
34
35 return;
36 }
37 //if there is an error continue. But continue from the point where it got broken
38 } finally {
39 IOUtils.closeQuietly(is);
40 }
41 }
42 } finally {
43 cleanup();
44 //if cleanup suceeds . The file is downloaded fully. do an fsync
45 fsyncService.submit(new Runnable(){
46 &#64;Override
47 public void run() {
48 try {
49 copy2Dir.sync(Collections.singleton(saveAs));
50 } catch (IOException e) {
51 fsyncException &#61; e;
52 }
53 }
54 });
55 }
56 }

  • 到这里已经完成了索引文件的同步&#xff0c;但是整一个同步过程才进行了一半。接下来要获取已经发生过修改的配置文件&#xff0c;如果没有修改过的配置文件则不需要下载配置文件。而比较配置文件是否发生变化则是比较文件的checksum信息。下载配置文件的过程downloadConfFiles()与下载索引文件的过程类似&#xff0c;就不具体介绍了。

1 //get the details of the local conf files with the same alias/name
2 List> localFilesInfo &#61; replicationHandler.getConfFileInfoFromCache(names, confFileInfoCache);
3 //compare their size/checksum to see if
4 for (Map fileInfo : localFilesInfo) {
5 String name &#61; (String) fileInfo.get(NAME);
6 Map m &#61; nameVsFile.get(name);
7 if (m &#61;&#61; null) continue; // the file is not even present locally (so must be downloaded)
8 if (m.get(CHECKSUM).equals(fileInfo.get(CHECKSUM))) {
9 nameVsFile.remove(name); //checksums are same so the file need not be downloaded
10 }
11 }

1 private void downloadConfFiles(List> confFilesToDownload, long latestGeneration) throws Exception {
2 LOG.info("Starting download of configuration files from master: " &#43; confFilesToDownload);
3 confFilesDownloaded &#61; Collections.synchronizedList(new ArrayList>());
4 File tmpconfDir &#61; new File(solrCore.getResourceLoader().getConfigDir(), "conf." &#43; getDateAsStr(new Date()));
5 try {
6 boolean status &#61; tmpconfDir.mkdirs();
7 if (!status) {
8 throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
9 "Failed to create temporary config folder: " &#43; tmpconfDir.getName());
10 }
11 for (Map file : confFilesToDownload) {
12 String saveAs &#61; (String) (file.get(ALIAS) &#61;&#61; null ? file.get(NAME) : file.get(ALIAS));
13 localFileFetcher &#61; new LocalFsFileFetcher(tmpconfDir, file, saveAs, true, latestGeneration);
14 currentFile &#61; file;
15 localFileFetcher.fetchFile();
16 confFilesDownloaded.add(new HashMap<>(file));
17 }
18 // this is called before copying the files to the original conf dir
19 // so that if there is an exception avoid corrupting the original files.
20 terminateAndWaitFsyncService();
21 copyTmpConfFiles2Conf(tmpconfDir);
22 } finally {
23 delTree(tmpconfDir);
24 }
25 }

  • 下载完索引数据以及配置文件后&#xff0c;现在需要处理临时的索引数据了。不同于索引文件的下载&#xff0c;配置文件在下载的过程中就已经替换了原先的配置文件了&#xff0c;这是在copyTmpConfFiles2Conf过程中。而索引数据的替换则需要根据isFullCopyNeeded这个参数&#xff0c;如果该值为true&#xff0c;则临时的索引文件将全部替换旧的索引文件&#xff0c;否则只是部分的文件的替换&#xff0c;他们的实现分别为&#xff1a;modifyIndexProps和moveIndexFiles。

1 if (isFullCopyNeeded) {
2 successfulInstall &#61; modifyIndexProps(tmpIdxDirName);
3 deleteTmpIdxDir &#61; false;
4 } else {
5 successfulInstall &#61; moveIndexFiles(tmpIndexDir, indexDir);
6 }

      接下来要重点介绍下modifyIndexProps和moveIndexFiles的实现。前文讲到&#xff0c;同步索引文件时候&#xff0c;下载下来的数据会存放在data目录下&#xff0c;以index. 加上同步开始时间的时间戳结构的目录下。当下载数据完成后&#xff0c;Replication会在同级目录下新建index.property文件。该文件内只会放入一句内容&#xff0c;index&#61; index.2014XXXXXXXXXX,这样做的目的就是将索引目录index重定向到index.2014XXXXXXXXXX上&#xff0c;这个时候相当于index.2014XXXXXXXXXX成为了index目录。然后就可以删除原来的index目录了。

     而moveIndexFiles则比较简单&#xff0c;即将临时文件下的索引文件都拷贝到正在用的index目录上&#xff0c;其中segment_N文件最后复制。

  • 将Replication的统计信息存于Replication.properties文件当中。统计信息较多&#xff0c;这里就不介绍了。
  • 如果配置文件发生变化&#xff0c;需要进行reloadcore操作才能使得配置生效。
  • 最后进行一次openNewSearcherAndUpdateCommitPoint&#xff0c;重新打开searcher以及更新commit信息。

 

      Replication的一次同步过程就这么结束了&#xff0c;但是有个问题需要搞清楚&#xff0c;那就是在进行Replication的时候即shard的状态recoverying时候&#xff0c;分片是可以建索引的但是不能进行查询。在同步的时候&#xff0c;新进来的数据会进入到ulog中&#xff0c;但是这些数据是否会进入索引文件中&#xff1f;源码上我还没有发现可以证明新进来的数据的只会进入ulog中&#xff0c;但是不会进入索引文件。

     目前我认为&#xff0c;当shard变为recoverying时候&#xff0c;新进来的请求只会进入ulog中&#xff0c;而不会进入索引文件中&#xff0c;原因有3&#xff1a;

1. 因为一旦有新数据写入旧索引文件中&#xff0c;索引文件发送变化了&#xff0c;那么下载好后的数据(索引文件)就不好替换旧的索引文件。

2. 在同步过程中&#xff0c;如果isFullCopyNeeded是false&#xff0c;那么就会close indexwriter&#xff0c;既然关闭了indexwriter就无法写入新的数据。而如果isFullCopyNeeded是true的话&#xff0c;因为整个index目录替换&#xff0c;所以是否关闭indexeriter也没啥意义。

3. 在recovery过程中&#xff0c;当同步replication结束后&#xff0c;会进行replay过程&#xff0c;该过程就是将ulog中的请求重新进行一遍。

      以上是我目前的猜测&#xff0c;待我搞明白了再来修改这部分内容&#xff0c;或者是否有网友能指导下。

      补上最近看的一些内容&#xff0c;主要是关于第二点的证明&#xff1a;在Replication的时候如果isFullCopyNeeded是false&#xff0c;那么在closeIndexWriter时候会对indexWriter进行回滚回到上次commit刚结束时候&#xff0c;即清空上次commit之后update操作。

1
2 if (indexWriter !&#61; null) {
3 if (!rollback) {
4 try {
5 log.info("Closing old IndexWriter... core&#61;" &#43; coreName);
6 indexWriter.close();
7 } catch (Exception e) {
8 SolrException.log(log, "Error closing old IndexWriter. core&#61;"
9 &#43; coreName, e);
10 }
11 } else {
12 try {
13 log.info("Rollback old IndexWriter... core&#61;" &#43; coreName);
14 indexWriter.rollback();
15 } catch (Exception e) {
16 SolrException.log(log, "Error rolling back old IndexWriter. core&#61;"
17 &#43; coreName, e);
18 }
19 }
20 }

        而且在Replication结束的时候&#xff0c;Solr会重新打开IndexWriter,而重新打开IndexWriter任然会进行一次回滚&#xff0c;清除update记录。所以在一次Replication过程中update数据是无效的。

1 try {
2 if (indexWriter !&#61; null) {
3 if (!rollback) {
4 try {
5 log.info("Closing old IndexWriter... core&#61;" &#43; coreName);
6 indexWriter.close();
7 } catch (Exception e) {
8 SolrException.log(log, "Error closing old IndexWriter. core&#61;"
9 &#43; coreName, e);
10 }
11 } else {
12 try {
13 log.info("Rollback old IndexWriter... core&#61;" &#43; coreName);
14 indexWriter.rollback();
15 } catch (Exception e) {
16 SolrException.log(log, "Error rolling back old IndexWriter. core&#61;"
17 &#43; coreName, e);
18 }
19 }
20 }
21 indexWriter &#61; createMainIndexWriter(core, "DirectUpdateHandler2");
22 log.info("New IndexWriter is ready to be used.");
23 // we need to null this so it picks up the new writer next get call
24 refCntWriter &#61; null;
25 } finally {
26
27 pauseWriter &#61; false;
28 writerPauseLock.notifyAll();
29 }

 

二. Replay过程

       在整个recovery过程中&#xff0c;当replication结束后&#xff0c;会调用replay的来将ulog的请求重新刷入索引文件中。replay过程的本质是调用ulog的LogReplayer线程。

  • LogReplayer是以transactionlog为单位的。

1 for(;;) {
2 TransactionLog translog &#61; translogs.pollFirst();
3 if (translog &#61;&#61; null) break;
4 doReplay(translog);
5 }

  • doReplay会重新获取索引链&#xff0c;读取transctionlog的update命令然后重新走一遍索引链三步骤&#xff0c;这些内容在已经介绍过了&#xff0c;这里就不再介绍了。需要指出的是在进行doReplay时候会设置updatecmd为replay&#xff0c;而一旦cmd&#61;UpdateCmd.Replay&#xff0c;因为无法获取到nodes所以不会分发给其他分片包括leader&#xff0c;所以doReplay只会对本分片有效&#xff0c;且不会记录ulog中。

1 tlogReader &#61; translog.getReader(recoveryInfo.positionOfStart);
2
3 // NOTE: we don&#39;t currently handle a core reload during recovery. This would cause the core
4 // to change underneath us.
5
6 UpdateRequestProcessorChain processorChain &#61; req.getCore().getUpdateProcessingChain(null);
7 UpdateRequestProcessor proc &#61; processorChain.createProcessor(req, rsp);

if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) !&#61; 0) {isLeader &#61; false; // we actually might be the leader, but we don&#39;t want leader-logic for these types of updates anyway.forwardToLeader &#61; false;return nodes;}

  • LogReplayer主要用于applyBufferedUpdates(replication策略中)以及recoverFromLog(单机模式下的recovery&#xff0c;即从ulog进行recovery)。

 

总结&#xff1a;

        本文主要介绍了SolrCloud中Replication的原理以及过程&#xff0c;同时简要讲述LogReplayer的doReplay线程对ulog的日志进行recovery。下文将要重点介绍主从模式下的Replication的配置以及使用。

 

转:https://www.cnblogs.com/rcfeng/p/4148733.html



推荐阅读
author-avatar
可爱的伊比2324767445hMkK
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有