热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

solrCloud索引更新逻辑学习笔记

转发请声明原文:http:blog.csdn.netduck_genuinearticledetails8473502ZkCoreNodeProps封装了一个node的相关信息,包括ba

转发请声明原文:http://blog.csdn.net/duck_genuine/article/details/8473502


ZkCoreNodeProps 封装了一个node的相关信息,包括base_url,core_name,state,node_name,core_url,isLeader

SolrCmdDistributor

solr分布式更新的一个重要实现工具类,因为它本身的只负责分布式的请求处理,并没有很多的业务逻辑。

staticAdjustableSemaphoresemaphore = new AdjustableSemaphore(8);

限制同时并发的请求最多数。从构造函数看可以跟结点数相关,但最大是16.

public SolrCmdDistributor(intnumHosts, ThreadPoolExecutorexecutor) {
int maxPermits = Math.max(16, numHosts * 16);
// limits how many tasks can actually execute at once
if (maxPermits != semaphore.getMaxPermits()) {
semaphore.setMaxPermits(maxPermits);
}

completiOnService= new ExecutorCompletionService(executor);
pending = new HashSet>();
}


privatefinalMap> adds = new HashMap>();
privatefinalMap> deletes = new HashMap>();


这两个字段主要是实现用于缓存更新请求

执行缓存的请求,调用下面方法

 publicvoid finish() {

// piggyback on any outstanding adds or deletes if possible.
flushAdds(1);
flushDeletes(1);

checkResponses(true);
}


提交请求

void submit(UpdateRequestExt ureq, Node node) {
Request sreq = new Request();
sreq.node = node;
sreq.ureq = ureq;
submit(sreq);
}

然后是检查响应结果,调用

void checkResponses(booleanblock)  作为检查上一次提交的请求响应。当请求需要重试的时候,会默认重试最大次数10次 


将最终结果返回到响应结果里,有异常也会记录下来。

分布式增加更新

publicvoid distribAdd(AddUpdateCommand cmd, List nodes, ModifiableSolrParams params) throws IOException {

//执行前都会去掉之前还会检查可能没响应完的请求,不等待,直接删除旧的请求。
checkResponses(false);

// 确保所有删除的请求被执行

flushDeletes(1);

//克隆更新请求重用
AddUpdateCommand clOne= new AddUpdateCommand(null);
clone.solrDoc = cmd.solrDoc;
clone.commitWithin = cmd.commitWithin;
clone.overwrite = cmd.overwrite;
clone.setVersion(cmd.getVersion());
AddRequest addRequest = new AddRequest();
addRequest.cmd = clone;
addRequest.params = params;

//增加对每个节点的请求到缓存adds里
for (Nodenode : nodes) {
List alist = adds.get(node);
if (alist == null) {
alist = new ArrayList(2);
adds.put(node, alist);
}
alist.add(addRequest);
}
//执行缓存adds的请求
flushAdds(maxBufferedAddsPerServer);
}

其它的doDelete,addCommit的请求逻辑的处理都相差不多


DistributedUpdateProcessor


这个是solrCloud主要的一个更新处理链,使用cloud模式的时候必要的一个处理链,负责分布式更新的逻辑处理

一个重要的hash算法,作为更新记录具体分配到哪个shard的算法

这算法应该会在后期重构并设计为插件方式 ,可被用户自定议的hash算法替换。

private int hash(AddUpdateCommandcmd) {
String hashableId = cmd.getHashableId();

return Hash.murmurhash3_x86_32(hashableId, 0, hashableId.length(), 0);
}

private int hash(DeleteUpdateCommandcmd) {
return Hash.murmurhash3_x86_32(cmd.getId(), 0, cmd.getId().length(), 0);
}

其中cmd.getHashableId()方法返回的主要是文档的主键的值

通过hash值定位更新到哪个shard

private String getShard(int hash, String collection, ClusterState clusterState) {
return clusterState.getShard(hash, collection);
}


 通过取到collection对应的RangeInfo,计算该hash值座落在哪个Range,就可以计算到相应的shard

 

 public String getShard(int hash, String collection) {
RangeInfo rangInfo = getRanges(collection);

int cnt = 0;
for (Range range : rangInfo.ranges) {
if (range.includes(hash)) {
return rangInfo.shardList.get(cnt);
}
cnt++;
}

throw new IllegalStateException("The HashPartitioner failed");
}


HashPartitioner


做为切分为多个范围的Range,主要实现方法如下:


public List partitionRange(int partitions, int min, int max) {
assert max >= min;
if (partitiOns== 0) return Collections.EMPTY_LIST;
long range = (long)max - (long)min;
long srange = Math.max(1, range / partitions);

List ranges = new ArrayList(partitions);

long start = min;
long end = start;

while (end end = start + srange;
// make last range always end exactly on MAX_VALUE
if (ranges.size() == partitions - 1) {
end = max;
}
ranges.add(new Range((int)start, (int)end));
start = end + 1L;
}

return ranges;
}

指定了某个范围[min,max]切分为多个partitions的Ranges;切分的范围是按平均的切分。

Range类封装了主键hash值范围【min,max】

RangeInfo封装一个collection下所有shard信息对应的Range,主要实现方法如下:

private RangeInfo addRangeInfo(String collection) {
List ranges;
RangeInfo rangeInfo= new RangeInfo();
Map slices = getSlices(collection);
if (slices == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Can not find collection "
+ collection + " in " + this);
}
Set shards = slices.keySet();
ArrayList shardList = new ArrayList(shards.size());
shardList.addAll(shards);
Collections.sort(shardList);
ranges = hp.partitionRange(shards.size(), Integer.MIN_VALUE, Integer.MAX_VALUE);
rangeInfo.ranges = ranges;
rangeInfo.shardList = shardList;
rangeInfos.put(collection, rangeInfo);
return rangeInfo;
}

从上面方法的实现可以看到,会先将所有shard的名称排序,然后根据shard的大小切分相应的多个的范围 ,每一个shard在排序完的位置 有对应的范围Range,两者的信息存放在RangeInfo.

不用担心,上面按整数最小值 ,最大值的平均切分的范围会导致分配不匀的情况,

可能你会担心如果我的主键值是整数,那主键的hash值会不会跟他的值所对应呢,这样的话,会让hash出来的数据先填满小的shard,其它shard不够匀称。其实设计者本身使用的hash算法是针对任何类型,取的主键值也是以字节数组去做hash。这个可以自己使用它的hash算法去校验。


再来看一下DistributedUpdateProcessor



先看add请求,请求的来源有多种:


privateList setupRequest(inthash) 

此方法就是为了判断上面请求来源而决定分发的结点

  1. 请求来自leader转发:FROMLEADER,那么就只需要写到本地ulog,不需要转发给leader,也不需要转发给其它replicas
  2. 请求不是来自leader,但自己就是leader,那么就需要将请求写到本地,顺便分发给其他的replicas.
  3. 请求不是来自leader,但自己又不是leader,也就是该更新请求是最原始的更新请求,那么需要将请求写到本地ulog,顺便转发给leader,再由leader分发


所以为了不让更新请求不会转发来转发去。提交索引的时候,只提交给所有leader是最佳选择。

也就是能预先知道该数据 是要到哪个leader,这个solrj好像有实现。solrcloudserver,分对更新的数据预先做分发请求。

先来讲一下增加的更新逻辑

@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
int hash = 0;
if (zkEnabled) {//cloud模式下
zkCheck();//检查zk连接状态
hash = hash(cmd);//取得更新请求hash值,再决定hash到哪一个shard
//判断更新请求来源,决定需要转发的nodes
nodes = setupRequest(hash);
} else {
isLeader = getNonZkLeaderAssumption(req);
}

boolean dropCmd = false;
if (!forwardToLeader) {//不需要转发,即是请求来源是由leader转发来的,或者本人就是leader
dropCmd = versionAdd(cmd);//决定该请求是否丢弃,丢弃原因:1)只需要更新到ulog,也就是请求是来源leader转发来的,不需写到索引。2)ulog已有该更新记录且版本比当前的版本更新,则可以丢弃。

}

// 可以丢弃该请求,不需要写到lucene索引
if (dropCmd) {
// TODO: do we need to add anything to the response?
return;

ModifiableSolrParams params = null;
if (nodes != null) {

params = new ModifiableSolrParams(filterParams(req.getParams()));
}
//需要转发该请求,转发leader,或者由leader分发
params.set(DISTRIB_UPDATE_PARAM,
(isLeader ?
DistribPhase.FROMLEADER.toString() :
DistribPhase.TOLEADER.toString()));
if (isLeader) {
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
}

params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));

//转发请求到nodes
cmdDistrib.distribAdd(cmd, nodes, params);
}
//增加更新的响应内容
if (returnVersions && rsp != null && idField != null) {
if (addsRespOnse== null) {
addsRespOnse= new NamedList();
rsp.add("adds",addsResponse);
}
if (scratch == null) scratch = new CharsRef();
idField.getType().indexedToReadable(cmd.getIndexedId(), scratch);
addsResponse.add(scratch.toString(), cmd.getVersion());
}
}

处理删除逻辑也是差不多。

暂时到这里,有待补充。。



转发请声明原文:http://blog.csdn.net/duck_genuine/article/details/8473502


推荐阅读
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 模板引擎StringTemplate的使用方法和特点
    本文介绍了模板引擎StringTemplate的使用方法和特点,包括强制Model和View的分离、Lazy-Evaluation、Recursive enable等。同时,还介绍了StringTemplate语法中的属性和普通字符的使用方法,并提供了向模板填充属性的示例代码。 ... [详细]
  • Whatsthedifferencebetweento_aandto_ary?to_a和to_ary有什么区别? ... [详细]
  • 欢乐的票圈重构之旅——RecyclerView的头尾布局增加
    项目重构的Git地址:https:github.comrazerdpFriendCircletreemain-dev项目同步更新的文集:http:www.jianshu.comno ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 本文介绍了GregorianCalendar类的基本信息,包括它是Calendar的子类,提供了世界上大多数国家使用的标准日历系统。默认情况下,它对应格里高利日历创立时的日期,但可以通过调用setGregorianChange()方法来更改起始日期。同时,文中还提到了GregorianCalendar类为每个日历字段使用的默认值。 ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • 本文介绍了数据库的存储结构及其重要性,强调了关系数据库范例中将逻辑存储与物理存储分开的必要性。通过逻辑结构和物理结构的分离,可以实现对物理存储的重新组织和数据库的迁移,而应用程序不会察觉到任何更改。文章还展示了Oracle数据库的逻辑结构和物理结构,并介绍了表空间的概念和作用。 ... [详细]
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
author-avatar
1195596249_711e8e
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有