热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

HBase协处理器同步二级索引到Solr

一、背景二、什么是HBase的协处理器三、HBase协处理器同步数据到Solr四、添加协处理器五、测试六、协处理器动态加载一、

一、 背景
二、 什么是HBase的协处理器
三、 HBase协处理器同步数据到Solr
四、 添加协处理器
五、 测试
六、 协处理器动态加载


一、 背景

在实际生产中,HBase往往不能满足多维度分析,我们能想到的办法就是通过创建HBase数据的二级索引来快速获取rowkey,从而得到想要的数据。目前比较流行的二级索引解决方案有Lily HBase Indexer,Phoenix自带的二级索引,华为Indexer,以及360的二级索引方案。上面的目前使用比较广泛的应该是Lily HBase Indexer,但是我们有时候只想实现一些简单的功能或者比较特殊的功能的时候,需要自己写协处理器进行处理。学习HBase的协处理器对于了解HBase架构是有帮助的。

二、 什么是HBase的协处理器

协处理器分两种类型,系统协处理器可以全局导入region server上的所有数据表,表协处理器即是用户可以指定一张表使用协处理器。

Hbase的coprocessor分为两类,Observer和EndPoint。其中Observer相当于触发器,EndPoint相当于存储过程。其中Observer的代码部署在服务端,相当于对API调用的代理。

另一个是终端(endpoint),动态的终端有点像存储过程。
Observer

观察者的设计意图是允许用户通过插入代码来重载协处理器框架的upcall方法,而具体的事件触发的callback方法由HBase的核心代码来执行。协处理器框架处理所有的callback调用细节,协处理器自身只需要插入添加或者改变的功能。以HBase0.92版本为例,它提供了三种观察者接口:

  • RegionObserver:提供客户端的数据操纵事件钩子:Get、Put、Delete、Scan等。
  • WALObserver:提供WAL相关操作钩子。
  • MasterObserver:提供DDL-类型的操作钩子。如创建、删除、修改数据表等。

这些接口可以同时使用在同一个地方,按照不同优先级顺序执行.用户可以任意基于协处理器实现复杂的HBase功能层。HBase有很多种事件可以触发观察者方法,这些事件与方法从HBase0.92版本起,都会集成在HBase API中。不过这些API可能会由于各种原因有所改动,不同版本的接口改动比较大。

三、 HBase协处理器同步数据到Solr

实时更新数据需要获取到HBase的插入、更新和删除操作。由于HBase中的插入和更新都是对应RegionServer的Put操作,因此我们需要使用RegionObserver中的"postPut"和"postDelete函数"。至于Truncate操作则需要使用MasterObserver。
我们需要做的就是拦截put和delete操作,将里面的内容获取出来,写入Solr。 对应的协处理器代码如下:

 
 
  1. package com.bqjr.bigdata.HBaseObserver.server;
  2. import com.bqjr.bigdata.HBaseObserver.entity.SolrServerManager;
  3. import org.apache.hadoop.hbase.CellUtil;
  4. import org.apache.hadoop.hbase.client.Delete;
  5. import org.apache.hadoop.hbase.client.Durability;
  6. import org.apache.hadoop.hbase.client.Put;
  7. import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
  8. import org.apache.hadoop.hbase.coprocessor.ObserverContext;
  9. import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
  10. import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
  11. import org.apache.hadoop.hbase.util.Bytes;
  12. import org.apache.solr.client.solrj.SolrServerException;
  13. import org.apache.solr.common.SolrInputDocument;
  14. import java.io.IOException;
  15. /**
  16. * Created by hp on 2017-02-15. */
  17. public class HBaseIndexerToSolrObserver extends BaseRegionObserver{
  18. String[] columns = {"test_age","test_name"};
  19. String collection = "bqjr";
  20. SolrServerManager solrManager = new SolrServerManager(collection);
  21. @Override
  22. public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
  23. Put put, WALEdit edit, Durability durability) throws IOException {
  24. String rowkey= Bytes.toString(put.getRow());
  25. SolrInputDocument doc = new SolrInputDocument();
  26. for(String column : columns){
  27. if(put.has(Bytes.toBytes("cf1"),Bytes.toBytes(column))){
  28. doc.addField(column,Bytes.toString(CellUtil.cloneValue(put.get(Bytes.toBytes("cf1"),Bytes.toBytes(column)).get(0))));
  29. }
  30. }
  31. try {
  32. solrManager.addDocToCache(doc);
  33. } catch (SolrServerException e1) {
  34. e1.printStackTrace();
  35. }
  36. }
  37. @Override
  38. public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e,
  39. Delete delete,
  40. WALEdit edit,
  41. Durability durability) throws IOException{
  42. String rowkey= Bytes.toString(delete.getRow());
  43. try {
  44. solrManager.delete(rowkey);
  45. } catch (SolrServerException e1) {
  46. e1.printStackTrace();
  47. }
  48. }
  49. }

大体的写入流程我们已经完成了,接下来就是Solr的写入实现了。由于Solr需要使用Zookeeper等信息,我们可以直接通过HBase的conf中获取Zookeeper相关信息来构造所需要的SolrCloudServer。
另一方面,我们不能来了一条数据就马上写入,这样非常消耗资源。因此我们需要做一个缓存,将这些Solr数据暂时保存在里面,定时 + 定量的发送。代码如下

 
 
  1. package com.bqjr.bigdata.HBaseObserver.entity;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.hbase.HBaseConfiguration;
  4. import org.apache.solr.client.solrj.SolrServerException;
  5. import org.apache.solr.client.solrj.impl.CloudSolrServer;
  6. import org.apache.solr.client.solrj.response.UpdateResponse;
  7. import org.apache.solr.common.SolrInputDocument;
  8. import java.io.IOException;
  9. import java.util.*;
  10. /**
  11. * Created by hp on 2017-02-15. */public class SolrServerManager {
  12. public static String ZKHost = "";
  13. public static String ZKPort = "";
  14. int zkClientTimeout = 1800000;// 心跳
  15. int zkConnectTimeout = 1800000;// 连接时间
  16. CloudSolrServer solrServer;
  17. private static String defaultCollection;
  18. int maxCache = 10000;
  19. public static List<SolrInputDocument> cache = new LinkedList<SolrInputDocument>();
  20. private static int maxCommitTime = 60; //最大提交时间,s
  21. public SolrServerManager(String collection) {
  22. defaultCollection = collection;
  23. Configuration conf = HBaseConfiguration.create();
  24. ZKHost = conf.get("hbase.zookeeper.quorum", "bqdpm1,bqdpm2,bqdps2");
  25. ZKPort = conf.get("hbase.zookeeper.property.clientPort", "2181");
  26. String SolrUrl = ZKHost + ":" + ZKPort + "/" + "solr";
  27. solrServer = new CloudSolrServer(SolrUrl);
  28. solrServer.setDefaultCollection(defaultCollection);
  29. solrServer.setZkClientTimeout(zkClientTimeout);
  30. solrServer.setZkConnectTimeout(zkConnectTimeout);
  31. //启动定时任务,第一次延迟10执行,之后每隔指定时间执行一次
  32. Timer timer = new Timer();
  33. timer.schedule(new CommitTimer(), 10 * 1000L, maxCommitTime * 1000L);
  34. }
  35. public UpdateResponse put(SolrInputDocument doc) throws IOException, SolrServerException {
  36. solrServer.add(doc);
  37. return solrServer.commit(false, false);
  38. }
  39. public UpdateResponse put(List<SolrInputDocument> docs) throws IOException, SolrServerException {
  40. solrServer.add(docs);
  41. return solrServer.commit(false, false);
  42. }
  43. public void addDocToCache(SolrInputDocument doc) throws IOException, SolrServerException {
  44. synchronized (cache) {
  45. cache.add(doc);
  46. if (cache.size() >= maxCache) {
  47. this.put(cache);
  48. cache.clear();
  49. }
  50. }
  51. }
  52. public UpdateResponse delete(String rowkey) throws IOException, SolrServerException {
  53. solrServer.deleteById(rowkey);
  54. return solrServer.commit(false, false);
  55. }
  56. /**
  57. * 提交定时器 */ static class CommitTimer extends TimerTask {
  58. @Override
  59. public void run() {
  60. synchronized (cache) {
  61. try {
  62. new SolrServerManager(defaultCollection).put(cache);
  63. cache.clear();
  64. } catch (IOException e) {
  65. e.printStackTrace();
  66. } catch (SolrServerException e) {
  67. e.printStackTrace();
  68. }
  69. cache.clear();
  70. }
  71. }
  72. }
  73. }

四、 添加协处理器

 
 
  1. #先禁用这张表
  2. disable 'HBASE_OBSERVER_TEST'
  3. #为这张表添加协处理器,设置的参数具体为: jar文件路径|类名|优先级(SYSTEM或者USER)
  4. alter 'HBASE_OBSERVER_TEST','coprocessor'=>'hdfs://bqdpm1:8020/ext_lib/HBaseObserver-1.0.0.jar|com.bqjr.bigdata.HBaseObserver.server.HBaseIndexerToSolrObserver||'
  5. #启用这张表
  6. enable 'HBASE_OBSERVER_TEST'
  7. #删除某个协处理器,"$"后面跟的ID号与desc里面的ID号相同
  8. alter 'HBASE_OBSERVER_TEST',METHOD=>'table_att_unset',NAME => 'coprocessor$1'

五、 测试

尝试插入一条数据put 'HBASE_OBSERVER_TEST','001','cf1:test_age','18'
结果Solr中一条数据都没有

然后查看了regionserver的日志发现,没有找到SolrJ的类

然后我们将所有的依赖加到Jar包里面之后,再次运行。就可以看到数据了。

测试Delete功能

测试进行到这里就完了吗?当然不是
我们尝试再插入一条put 'HBASE_OBSERVER_TEST','001','cf1:test_name','Bob'

理论上我们需要在Solr中看到 test_age = 18,test_name = Bob。
但是在Solr中只有一条数据

于是我们需要使用到Solr的原子更新功能。将postPut改成下面这样的代码即可

 
 
  1. public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
  2. Put put, WALEdit edit, Durability durability) throws IOException {
  3. String rowkey= Bytes.toString(put.getRow());
  4. Long version = 1L;
  5. SolrInputDocument doc = new SolrInputDocument();
  6. for(String column : columns){
  7. if(put.has(Bytes.toBytes("cf1"),Bytes.toBytes(column))){
  8. Cell cell = put.get(Bytes.toBytes("cf1"),Bytes.toBytes(column)).get(0);
  9. Map<String, String > operation = new HashMap<String,String>();
  10. operation.put("set",Bytes.toString(CellUtil.cloneValue(cell)));
  11. doc.setField(column,operation);
  12. }
  13. }
  14. doc.addField("id",rowkey);
  15. // doc.addField("_version_",version);
  16. try {
  17. solrManager.addDocToCache(doc);
  18. } catch (SolrServerException e1) {
  19. e1.printStackTrace();
  20. }
  21. }

再次插入数据

查看Solr结果

六、 协处理器动态加载

hbase的官方文档指出动态级别的协处理器,可以做到不重启hbase,更新协处理,做法就是
禁用表,卸载协处理器,重新指定协处理器, 激活表,即可,但实际测试发现
动态加载无效,是hbase的一个bug,看这个链接:
https://issues.apache.org/jira/browse/HBASE-8445
因为协处理器,已经被JVM加载,即使删除jar也不能重新load的jar,因为cache里面的hdfs的jar路径,没有变化,所以动态更新无效
,除非重启JVM,那样就意味着,需要重启RegionServer,
里面的小伙伴们指出了两种办法,使协处理器加载生效:
(1)滚动重启regionserver,避免停掉所有的节点
(2)改变协处理器的jar的类名字或者hdfs加载路径,以方便有新的ClassLoad去加载它

但总体来看,第2种方法,比较安全,第一种风险太大,一般情况下没有人会随便滚动重启线上的服务器的,这只在hbase升级的时候使用


推荐阅读
  • IOS Run loop详解
    为什么80%的码农都做不了架构师?转自http:blog.csdn.netztp800201articledetails9240913感谢作者分享Objecti ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • 为了在Hadoop 2.7.2中实现对Snappy压缩和解压功能的原生支持,本文详细介绍了如何重新编译Hadoop源代码,并优化其Native编译过程。通过这一优化,可以显著提升数据处理的效率和性能。此外,还探讨了编译过程中可能遇到的问题及其解决方案,为用户提供了一套完整的操作指南。 ... [详细]
  • C++ 异步编程中获取线程执行结果的方法与技巧及其在前端开发中的应用探讨
    本文探讨了C++异步编程中获取线程执行结果的方法与技巧,并深入分析了这些技术在前端开发中的应用。通过对比不同的异步编程模型,本文详细介绍了如何高效地处理多线程任务,确保程序的稳定性和性能。同时,文章还结合实际案例,展示了这些方法在前端异步编程中的具体实现和优化策略。 ... [详细]
  • Web开发框架概览:Java与JavaScript技术及框架综述
    Web开发涉及服务器端和客户端的协同工作。在服务器端,Java是一种优秀的编程语言,适用于构建各种功能模块,如通过Servlet实现特定服务。客户端则主要依赖HTML进行内容展示,同时借助JavaScript增强交互性和动态效果。此外,现代Web开发还广泛使用各种框架和库,如Spring Boot、React和Vue.js,以提高开发效率和应用性能。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • Zookeeper作为Apache Hadoop生态系统中的一个重要组件,主要致力于解决分布式应用中的常见数据管理难题。它提供了统一的命名服务、状态同步服务以及集群管理功能,有效提升了分布式系统的可靠性和可维护性。此外,Zookeeper还支持配置管理和临时节点管理,进一步增强了其在复杂分布式环境中的应用价值。 ... [详细]
  • Node.js 教程第五讲:深入解析 EventEmitter(事件监听与发射机制)
    本文将深入探讨 Node.js 中的 EventEmitter 模块,详细介绍其在事件监听与发射机制中的应用。内容涵盖事件驱动的基本概念、如何在 Node.js 中注册和触发自定义事件,以及 EventEmitter 的核心 API 和使用方法。通过本教程,读者将能够全面理解并熟练运用 EventEmitter 进行高效的事件处理。 ... [详细]
  • 开机自启动的几种方式
    0x01快速自启动目录快速启动目录自启动方式源于Windows中的一个目录,这个目录一般叫启动或者Startup。位于该目录下的PE文件会在开机后进行自启动 ... [详细]
  • Hyperledger Fabric 1.4 节点 SDK 快速入门指南
    本文将详细介绍如何利用 Hyperledger Fabric 1.4 的 Node.js SDK 开发应用程序。通过最新版本的 Fabric Node.js SDK,开发者可以更高效地构建和部署基于区块链的应用,实现数据的安全共享和交易处理。文章将涵盖环境配置、SDK 安装、示例代码以及常见问题的解决方法,帮助读者快速上手并掌握核心功能。 ... [详细]
  • OpenAI首席执行官Sam Altman展望:人工智能的未来发展方向与挑战
    OpenAI首席执行官Sam Altman展望:人工智能的未来发展方向与挑战 ... [详细]
  • ### 优化后的摘要本学习指南旨在帮助读者全面掌握 Bootstrap 前端框架的核心知识点与实战技巧。内容涵盖基础入门、核心功能和高级应用。第一章通过一个简单的“Hello World”示例,介绍 Bootstrap 的基本用法和快速上手方法。第二章深入探讨 Bootstrap 与 JSP 集成的细节,揭示两者结合的优势和应用场景。第三章则进一步讲解 Bootstrap 的高级特性,如响应式设计和组件定制,为开发者提供全方位的技术支持。 ... [详细]
author-avatar
-路人甲___
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有