热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Sqoop导入HBase,并借助Coprocessor协处理器同步索引到ES

1.环境Mysql5.6Sqoop1.4.6Hadoop2.5.2HBase0.98Elasticsearch2.3.52.安装(略过)3.HB

1.环境

  • Mysql 5.6
  • Sqoop 1.4.6
  • Hadoop 2.5.2
  • HBase 0.98
  • Elasticsearch 2.3.5

2.安装(略过)

3.HBase Coprocessor实现

HBase Observer

import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.elasticsearch.client.Client; //import org.elasticsearch.client.transport.TransportClient; //import org.elasticsearch.common.settings.ImmutableSettings; //import org.elasticsearch.common.settings.Settings; //import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; //import java.util.NavigableMap;

public class DataSyncObserver extends BaseRegionObserver { private static Client client = null; private static final Log LOG = LogFactory.getLog(DataSyncObserver.class); /** * 读取HBase Shell的指令参数 * * @param env */
   private void readConfiguration(CoprocessorEnvironment env) { Configuration conf = env.getConfiguration(); Config.clusterName = conf.get("es_cluster"); Config.nodeHost = conf.get("es_host"); Config.nodePort = conf.getInt("es_port", -1); Config.indexName = conf.get("es_index"); Config.typeName = conf.get("es_type"); LOG.info("observer -- started with config: " + Config.getInfo()); } @Override public void start(CoprocessorEnvironment env) throws IOException { readConfiguration(env); // Settings settings = ImmutableSettings.settingsBuilder() // .put("cluster.name", Config.clusterName).build(); // client = new TransportClient(settings) // .addTransportAddress(new InetSocketTransportAddress( // Config.nodeHost, Config.nodePort));
       client = MyTransportClient.client; } @Override public void postPut(ObserverContext e, Put put, WALEdit edit, Durability durability) throws IOException { try { String indexId = new String(put.getRow()); Map<byte[], List> familyMap = put.getFamilyCellMap(); // NavigableMap> familyMap = put.getFamilyCellMap();
           Map json = new HashMap(); for (Map.Entry<byte[], List> entry : familyMap.entrySet()) { for (Cell cell : entry.getValue()) { String key = Bytes.toString(CellUtil.cloneQualifier(cell)); String value = Bytes.toString(CellUtil.cloneValue(cell)); json.put(key, value); } } System.out.println(); ElasticSearchOperator.addUpdateBuilderToBulk(client.prepareUpdate(Config.indexName, Config.typeName, indexId).setDoc(json).setUpsert(json)); LOG.info("observer -- add new doc: " + indexId + " to type: " + Config.typeName); } catch (Exception ex) { LOG.error(ex); } } @Override public void postDelete(final ObserverContext e, final Delete delete, final WALEdit edit, final Durability durability) throws IOException { try { String indexId = new String(delete.getRow()); ElasticSearchOperator.addDeleteBuilderToBulk(client.prepareDelete(Config.indexName, Config.typeName, indexId)); LOG.info("observer -- delete a doc: " + indexId); } catch (Exception ex) { LOG.error(ex); } } }

ES方法

import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.client.Client; //import org.elasticsearch.client.transport.TransportClient; //import org.elasticsearch.common.settings.ImmutableSettings; //import org.elasticsearch.common.settings.Settings; //import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import java.util.HashMap; import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ElasticSearchOperator { // 缓冲池容量
   private static final int MAX_BULK_COUNT = 10; // 最大提交间隔(秒)
   private static final int MAX_COMMIT_INTERVAL = 60 * 5; private static Client client = null; private static BulkRequestBuilder bulkRequestBuilder = null; private static Lock commitLock = new ReentrantLock(); static { // elasticsearch1.5.0 // Settings settings = ImmutableSettings.settingsBuilder() // .put("cluster.name", Config.clusterName).build(); // client = new TransportClient(settings) // .addTransportAddress(new InetSocketTransportAddress( // Config.nodeHost, Config.nodePort)); // 2.3.5
       client = MyTransportClient.client; bulkRequestBuilder = client.prepareBulk(); bulkRequestBuilder.setRefresh(true); Timer timer = new Timer(); timer.schedule(new CommitTimer(), 10 * 1000, MAX_COMMIT_INTERVAL * 1000); } /** * 判断缓存池是否已满,批量提交 * * @param threshold */
   private static void bulkRequest(int threshold) { if (bulkRequestBuilder.numberOfActions() > threshold) { BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet(); if (!bulkResponse.hasFailures()) { bulkRequestBuilder = client.prepareBulk(); } } } /** * 加入索引请求到缓冲池 * * @param builder */
   public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) { commitLock.lock(); try { bulkRequestBuilder.add(builder); bulkRequest(MAX_BULK_COUNT); } catch (Exception ex) { ex.printStackTrace(); } finally { commitLock.unlock(); } } /** * 加入删除请求到缓冲池 * * @param builder */
   public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) { commitLock.lock(); try { bulkRequestBuilder.add(builder); bulkRequest(MAX_BULK_COUNT); } catch (Exception ex) { ex.printStackTrace(); } finally { commitLock.unlock(); } } /** * 定时任务,避免RegionServer迟迟无数据更新,导致ElasticSearch没有与HBase同步 */
   static class CommitTimer extends TimerTask { @Override public void run() { commitLock.lock(); try { bulkRequest(0); } catch (Exception ex) { ex.printStackTrace(); } finally { commitLock.unlock(); } } } }

打包并上传到hdfs

mvn clean compile assembly:single mv observer-1.0-SNAPSHOT-jar-with-dependencies.jar observer-hb0.98-es2.3.5.jar hdfs dfs -put observer-hb0.98-es2.3.5.jar /hbase/lib/

4.创建HBase表,并启用Coprocessor

mysql

hbase shell create 'region','data' disable 'region' alter 'region', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase/lib/observer-hb0.98-es2.3.5.jar|com.gavin.observer.DataSyncObserver|1001|es_cluster=elas2.3.4,es_type=mysql_region,es_index=hbase,es_port=9300,es_host=localhost' enable 'region'

oracle

create 'sp','data' disable 'sp' alter 'sp', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase/lib/observer-hb0.98-es2.3.5.jar|com.gavin.observer.DataSyncObserver|1001|es_cluster=elas2.3.4,es_type=oracle_sp,es_index=hbase,es_port=9300,es_host=localhost' enable 'sp'

查看

hbase(main):007:0* describe 'ora_test'
Table ora_test is ENABLED ora_test, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs:///appdt/hbase
/lib/observer-hb1.2.2-es2.3.5.jar|com.gavin.observer.DataSyncObserver |1001|es_cluster=elas2.3.4,es_type=ora_test,es_index=hbase,es_port=93
00,es_host=localhost'} 
COLUMN FAMILIES DESCRIPTION {NAME => 'data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIOnS=> '1', COMPRESSION => 'NONE', MI N_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', B LOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'} 1 row(s) in 0.0260 seconds

删除Coprocessor

disable 'ora_test' alter 'ora_test',METHOD => 'table_att_unset',NAME =>'coprocessor$1' enable 'ora_test'

查看删除效果

hbase(main):011:0> describe 'ora_test'
Table ora_test is ENABLED ora_test COLUMN FAMILIES DESCRIPTION {NAME => 'data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIOnS=> '1', COMPRESSION => 'NONE', MI N_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', B LOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'} 1 row(s) in 0.0200 seconds

5.使用sqoop上传数据

mysql

bin/sqoop import --connect jdbc:mysql://192.168.1.187:3306/trade_dev --username mysql --password 111111 --table TB_REGION --hbase-table region --hbase-row-key REGION_ID --column-family data

oracle

bin/sqoop import --connect jdbc:oracle:thin:@192.168.16.223:1521/orcl --username sitts --password password --table SITTS.ESB_SERVICE_PARAM --split-by PARAM_ID --hbase-table sp --hbase-row-key PARAM_ID --column-family data

6.校验

HBase

scan 'region'

ES

7.参考

HBase Observer同步数据到ElasticSearch

8.注意

  • 同一个Coprocessor用一个index,不同表可以设置不同type,不然index会乱
  • 修改Java代码后,上传到HDFS的jar包文件必须和之前不一样,否则就算卸载掉原有的coprocessor再重新安装也不能生效
  • 如果你有多个表对多个索引/类型的映射,每个表所加载Observer对应的jar包路径不能相同,否则ElasticSearch会串数据

推荐阅读
  • 本文介绍了如何利用 Delphi 中的 IdTCPServer 和 IdTCPClient 控件实现高效的文件传输。这些控件在默认情况下采用阻塞模式,并且服务器端已经集成了多线程处理,能够支持任意大小的文件传输,无需担心数据包大小的限制。与传统的 ClientSocket 相比,Indy 控件提供了更为简洁和可靠的解决方案,特别适用于开发高性能的网络文件传输应用程序。 ... [详细]
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • 在Linux系统中,原本已安装了多个版本的Python 2,并且还安装了Anaconda,其中包含了Python 3。本文详细介绍了如何通过配置环境变量,使系统默认使用指定版本的Python,以便在不同版本之间轻松切换。此外,文章还提供了具体的实践步骤和注意事项,帮助用户高效地管理和使用不同版本的Python环境。 ... [详细]
  • 小记hbase数据库java API 常用方法及案例
    HBaseAdmin类:管理hbase数据库的表信息,‘创建表、删除表、列出表选项、使表有效/无效、添加或删除列簇’;  ... [详细]
  • 本文详细介绍了如何使用Python中的smtplib库来发送带有附件的邮件,并提供了完整的代码示例。作者:多测师_王sir,时间:2020年5月20日 17:24,微信:15367499889,公司:上海多测师信息有限公司。 ... [详细]
  • 您的数据库配置是否安全?DBSAT工具助您一臂之力!
    本文探讨了Oracle提供的免费工具DBSAT,该工具能够有效协助用户检测和优化数据库配置的安全性。通过全面的分析和报告,DBSAT帮助用户识别潜在的安全漏洞,并提供针对性的改进建议,确保数据库系统的稳定性和安全性。 ... [详细]
  • 为了确保iOS应用能够安全地访问网站数据,本文介绍了如何在Nginx服务器上轻松配置CertBot以实现SSL证书的自动化管理。通过这一过程,可以确保应用始终使用HTTPS协议,从而提升数据传输的安全性和可靠性。文章详细阐述了配置步骤和常见问题的解决方法,帮助读者快速上手并成功部署SSL证书。 ... [详细]
  • 本文详细解析了 Android 系统启动过程中的核心文件 `init.c`,探讨了其在系统初始化阶段的关键作用。通过对 `init.c` 的源代码进行深入分析,揭示了其如何管理进程、解析配置文件以及执行系统启动脚本。此外,文章还介绍了 `init` 进程的生命周期及其与内核的交互方式,为开发者提供了深入了解 Android 启动机制的宝贵资料。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 构建高可用性Spark分布式集群:大数据环境下的最佳实践
    在构建高可用性的Spark分布式集群过程中,确保所有节点之间的无密码登录是至关重要的一步。通过在每个节点上生成SSH密钥对(使用 `ssh-keygen -t rsa` 命令并保持默认设置),可以实现这一目标。此外,还需将生成的公钥分发到所有节点的 `~/.ssh/authorized_keys` 文件中,以确保节点间的无缝通信。为了进一步提升集群的稳定性和性能,建议采用负载均衡和故障恢复机制,并定期进行系统监控和维护。 ... [详细]
  • 在Hive中合理配置Map和Reduce任务的数量对于优化不同场景下的性能至关重要。本文探讨了如何控制Hive任务中的Map数量,分析了当输入数据超过128MB时是否会自动拆分,以及Map数量是否越多越好的问题。通过实际案例和实验数据,本文提供了具体的配置建议,帮助用户在不同场景下实现最佳性能。 ... [详细]
  • 我们在之前的文章中已经初步介绍了Cloudera。hadoop基础----hadoop实战(零)-----hadoop的平台版本选择从版本选择这篇文章中我们了解到除了hadoop官方版本外很多 ... [详细]
  • 本文_大数据之非常详细Sqoop安装和基本操作
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了大数据之非常详细Sqoop安装和基本操作相关的知识,希望对你有一定的参考价值。大数据大数据之 ... [详细]
author-avatar
x深藏的爱x_402
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有