热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Hbase操作工具类

依赖jarorg.apache.hbasehbase-client

 

依赖jar

org.apache.hbasehbase-client2.0.5

 

 

HbaseUtils.java

package javax.utils;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;/*** Hbase 操作工具类* * @author Logan* @version 1.0.0* @createDate 2019-05-03**/
public class HbaseUtils {// ===============Common=====================================/*** 根据表名获取Table对象* * @param name 表名,必要时可指定命名空间,比如:“default:user”* @return Hbase Table 对象* @throws IOException 有异常抛出,由调用者捕获处理*/public static Table getTable(String name) throws IOException {TableName tableName = TableName.valueOf(name);Connection connection = ConnectionFactory.createConnection();return connection.getTable(tableName);}// =============== Put =====================================/*** 根据rowKey生成一个Put对象* * @param rowKey rowKey* @return Put对象*/public static Put createPut(String rowKey) {return new Put(Bytes.toBytes(rowKey));}/*** 在Put对象上增加Cell* * @param put Put对象* @param cell cell对象* @throws IOException 有异常抛出,由调用者捕获处理*/public static void addCellOnPut(Put put, Cell cell) throws IOException {put.add(cell);}/*** 在Put对象上增加值* * @param put Put对象* @param family 列簇* @param qualifier 列* @param value 字符串类型的值*/public static void addValueOnPut(Put put, String family, String qualifier, String value) {addValueOnPut(put, family, qualifier, Bytes.toBytes(value));}/*** 在Put对象上增加值* * @param put Put对象* @param family 列簇* @param qualifier 列* @param value 字节数组类型的值,可以是任意对象序列化而成*/public static void addValueOnPut(Put put, String family, String qualifier, byte[] value) {put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);}/*** 在Put对象上增加值* * @param put Put对象* @param family 列簇* @param qualifier 列* @param ts Timestamp时间戳* @param value 字符串类型的值*/public static void addValueOnPut(Put put, String family, String qualifier, long ts, String value) {addValueOnPut(put, family, qualifier, ts, Bytes.toBytes(value));}/*** 在Put对象上增加值* * @param put Put对象* @param family 列簇* @param qualifier 列* @param ts Timestamp时间戳* @param value 字节数组类型的值,可以是任意对象序列化而成*/public static void addValueOnPut(Put put, String family, String qualifier, long ts, byte[] value) {put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), ts, value);}/*** 按表名插入一个Put对象包含的数据* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param put 要插入的数据对象* @throws IOException 有异常抛出,由调用者捕获处理*/public static void put(String tableName, Put put) throws IOException {try (Table table = getTable(tableName);) {table.put(put);}}/*** 按表名批量插入Put对象包含的数据* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param puts 要插入的数据对象集合* @throws IOException 有异常抛出,由调用者捕获处理*/public static void put(String tableName, List puts) throws IOException {try (Table table = getTable(tableName);) {table.put(puts);}}// =============== Get =====================================/*** 根据rowKey生成一个查询的Get对象* * @param rowKey rowKey* @return Get 对象*/public static Get createGet(String rowKey) {return new Get(Bytes.toBytes(rowKey));}/*** 对查询的Get对象增加指定列簇* * @param get* @param family*/public static void addFamilyOnGet(Get get, String family) {get.addFamily(Bytes.toBytes(family));}/*** 对查询的Get对象增加指定列簇和列* * @param get* @param family* @param qualifier*/public static void addColumnOnGet(Get get, String family, String qualifier) {get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));}/*** 根据表名和rowKey查询结果(包含全部列簇和列)* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param rowKey 查询rowKey* @return 查询结果Result* @throws IOException 有异常抛出,由调用者捕获处理*/public static Result get(String tableName, String rowKey) throws IOException {Get get = createGet(rowKey);return get(tableName, get);}/*** 根据表名和rowKey数组批量查询结果(包含全部列簇和列)* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param rowKeys 查询rowKey数组* @return 查询结果Result数组* @throws IOException 有异常抛出,由调用者捕获处理*/public static Result[] get(String tableName, String[] rowKeys) throws IOException {List gets = new ArrayList();for (String rowKey : rowKeys) {gets.add(createGet(rowKey));}return get(tableName, gets);}/*** 根据表名和Get对象查询结果* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param get Hbase查询对象* @return 查询结果Result* @throws IOException 有异常抛出,由调用者捕获处理*/public static Result get(String tableName, Get get) throws IOException {try (Table table = getTable(tableName);) {return table.get(get);}}/*** 根据表名和Get对象数组查询结果* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param gets 多个Hbase查询对象组成的数组* @return 查询结果Result数组* @throws IOException 有异常抛出,由调用者捕获处理*/public static Result[] get(String tableName, List gets) throws IOException {try (Table table = getTable(tableName);) {return table.get(gets);}}// =============== Scan =====================================/*** 根据startRow和stopRow创建扫描对象* * @param startRow 扫描开始行,结果包含该行* @param stopRow 扫描结束行,结果不包含该行* @return Scan对象*/public static Scan createScan(String startRow, String stopRow) {Scan scan = new Scan();scan.withStartRow(Bytes.toBytes(startRow));scan.withStopRow(Bytes.toBytes(stopRow));return scan;}/*** 对扫描对象设置列簇* * @param scan 扫描对象* @param family 列簇*/public static void addFamilyOnScan(Scan scan, String family) {scan.addFamily(Bytes.toBytes(family));}/*** 对扫描对象设置列* * @param scan 扫描对象* @param family 列簇* @param qualifier 列簇下对应的列*/public static void addColumnOnScan(Scan scan, String family, String qualifier) {scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));}/*** 根据表名和扫描对象扫描数据* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param scan 扫描对象* @return 扫描结果集对象ResultScanner* @throws IOException 有异常抛出,由调用者捕获处理*/public static ResultScanner scan(String tableName, Scan scan) throws IOException {try (Table table = getTable(tableName);) {return table.getScanner(scan);}}/*** 根据表名、开始行和结束行扫描数据(结果包含开始行,不包含结束行,半开半闭区间[startRow, stopRow))* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param startRow 扫描开始行* @param stopRow 扫描结束行* @return 扫描结果集对象ResultScanner* @throws IOException 有异常抛出,由调用者捕获处理*/public static ResultScanner scan(String tableName, String startRow, String stopRow) throws IOException {return scan(tableName, createScan(startRow, stopRow));}// =============== Delete =====================================/*** 根据rowKey生成一个查询的Delete对象* * @param rowKey rowKey* @return Delete对象*/public static Delete createDelete(String rowKey) {return new Delete(Bytes.toBytes(rowKey));}/*** 在Delete对象上增加Cell* * @param delete Delete对象* @param cell cell对象* @throws IOException 有异常抛出,由调用者捕获处理*/public static void addCellOnDelete(Delete delete, Cell cell) throws IOException {delete.add(cell);}/*** 对删除对象增加指定列簇* * @param delete Delete对象* @param family 列簇*/public static void addFamilyOnDelete(Delete delete, String family) {delete.addFamily(Bytes.toBytes(family));}/*** 对删除对象增加指定列簇和列* * @param delete Delete对象* @param family 列簇* @param qualifier 列*/public static void addColumnOnDelete(Delete delete, String family, String qualifier) {delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));}/*** 按表名删除一个Delete对象指定的数据* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param delete Delete对象* @throws IOException 有异常抛出,由调用者捕获处理*/public static void delete(String tableName, Delete delete) throws IOException {try (Table table = getTable(tableName);) {table.delete(delete);}}/*** 按表名批量删除Delete对象集合包含的指定数据* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param deletes Delete对象集合* @throws IOException 有异常抛出,由调用者捕获处理*/public static void delete(String tableName, List deletes) throws IOException {try (Table table = getTable(tableName);) {table.delete(deletes);}}}

 

 

 

以下是测试类

HbaseClientDemo.java

package com.java.demo;import java.util.ArrayList;
import java.util.List;import javax.utils.HbaseUtils;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;/*** Hbase 客户端测试* * @author Logan* @version 1.0.0* @createDate 2019-05-03**/
public class HbaseClientDemo {/*** 向user表中插入数据*/@Testpublic void put() {String tableName = "default:user";try {List puts = new ArrayList();Put put = HbaseUtils.createPut("key1005");HbaseUtils.addValueOnPut(put, "info", "name", "孙悟空");HbaseUtils.addValueOnPut(put, "info", "age", "500");HbaseUtils.addValueOnPut(put, "info", "address", "花果山");// HbaseUtils.put(tableName, put);
puts.add(put);put = HbaseUtils.createPut("key1006");HbaseUtils.addValueOnPut(put, "info", "name", "沙悟净");HbaseUtils.addValueOnPut(put, "info", "age", "1000");HbaseUtils.addValueOnPut(put, "info", "address", "流沙河");puts.add(put);HbaseUtils.put(tableName, puts);} catch (Exception e) {e.printStackTrace();}}/*** 按rowKey批量查询user表中全部列簇全部列的值*/@Testpublic void getAllFamily() {try {String tableName = "default:user";String[] rowKeys = { "key1001", "key1002", "key1003", "key1005", "key1006" };// 按表名和rowKey查询所有列Result[] results = HbaseUtils.get(tableName, rowKeys);for (Result result : results) {// 打印查询结果
printResult(result);}} catch (Exception e) {e.printStackTrace();}}/*** 按rowKey查询user表中指定列簇指定列的值*/@Testpublic void get() {try {String tableName = "default:user";String rowKey = "key1002";Get get = HbaseUtils.createGet(rowKey);HbaseUtils.addColumnOnGet(get, "info", "name");HbaseUtils.addColumnOnGet(get, "info", "age");// 不存在的列,查询结果不显示HbaseUtils.addColumnOnGet(get, "info", "address");// 如果在增加列后增加已有的列簇,会返回该列簇的全部列数据,覆盖前边的增加列// HbaseUtils.addFamilyOnGet(get, "info");
Result result = HbaseUtils.get(tableName, get);printResult(result);} catch (Exception e) {e.printStackTrace();}}@Testpublic void scan() {try {String tableName = "default:user";String startRow = "key1001";String stopRow = "key1006";ResultScanner resultScanner = HbaseUtils.scan(tableName, startRow, stopRow);for (Result result : resultScanner) {printResult(result);}} catch (Exception e) {e.printStackTrace();}}/*** 打印查询结果* * @param result 查询结果对象*/private void printResult(Result result) {Cell[] cells = result.rawCells();// 从Result中读取 rowKey
System.out.println(Bytes.toString(result.getRow()));String print = "%s\t %s:%s \t %s";for (Cell cell : cells) {// 从Cell中取rowKeyString row = Bytes.toString(CellUtil.cloneRow(cell));String family = Bytes.toString(CellUtil.cloneFamily(cell));String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));String value = Bytes.toString(CellUtil.cloneValue(cell));System.out.println(String.format(print, row, family, qualifier, value));}}/*** 删除指定列*/@Testpublic void deleteColumn() {try {String tableName = "default:user";List deletes = new ArrayList();Delete delete = HbaseUtils.createDelete("key1005");HbaseUtils.addColumnOnDelete(delete, "info", "age");HbaseUtils.addColumnOnDelete(delete, "info", "address");// HbaseUtils.delete(tableName, delete);
deletes.add(delete);delete = HbaseUtils.createDelete("key1006");HbaseUtils.addColumnOnDelete(delete, "info", "address");deletes.add(delete);HbaseUtils.delete(tableName, deletes);} catch (Exception e) {e.printStackTrace();}}/*** 删除指定列簇*/@Testpublic void deleteFamily() {try {String tableName = "default:user";List deletes = new ArrayList();Delete delete = HbaseUtils.createDelete("key1005");HbaseUtils.addFamilyOnDelete(delete, "info");// HbaseUtils.delete(tableName, delete);
deletes.add(delete);delete = HbaseUtils.createDelete("key1006");HbaseUtils.addFamilyOnDelete(delete, "info");deletes.add(delete);HbaseUtils.delete(tableName, deletes);} catch (Exception e) {e.printStackTrace();}}}

 

  

 

Hbase 操作工具类

.


转载于:https://www.cnblogs.com/jonban/p/hbase-client.html


推荐阅读
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • Android中将独立SO库封装进JAR包并实现SO库的加载与调用
    在Android开发中,将独立的SO库封装进JAR包并实现其加载与调用是一个常见的需求。本文详细介绍了如何将SO库嵌入到JAR包中,并确保在外部应用调用该JAR包时能够正确加载和使用这些SO库。通过这种方式,开发者可以更方便地管理和分发包含原生代码的库文件,提高开发效率和代码复用性。文章还探讨了常见的问题及其解决方案,帮助开发者避免在实际应用中遇到的坑。 ... [详细]
  • 本文介绍了如何使用Java和PDFBox库根据坐标值对PDF文件进行局部切割的方法。 ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • 在Java Web服务开发中,Apache CXF 和 Axis2 是两个广泛使用的框架。CXF 由于其与 Spring 框架的无缝集成能力,以及更简便的部署方式,成为了许多开发者的首选。本文将详细介绍如何使用 CXF 框架进行 Web 服务的开发,包括环境搭建、服务发布和客户端调用等关键步骤,为开发者提供一个全面的实践指南。 ... [详细]
  • Netty框架中运用Protobuf实现高效通信协议
    在Netty框架中,通过引入Protobuf来实现高效的通信协议。为了使用Protobuf,需要先准备好环境,包括下载并安装Protobuf的代码生成器`protoc`以及相应的源码包。具体资源可从官方下载页面获取,确保版本兼容性以充分发挥其性能优势。此外,配置好开发环境后,可以通过定义`.proto`文件来自动生成Java类,从而简化数据序列化和反序列化的操作,提高通信效率。 ... [详细]
  • 本文介绍了 Java 中 io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evfiltSock() 方法的使用及其代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • JVM钩子函数的应用场景详解
    本文详细介绍了JVM钩子函数的多种应用场景,包括正常关闭、异常关闭和强制关闭。通过具体示例和代码演示,帮助读者更好地理解和应用这一机制。适合对Java编程和JVM有一定基础的开发者阅读。 ... [详细]
  • Spring – Bean Life Cycle
    Spring – Bean Life Cycle ... [详细]
  • 零拷贝技术是提高I/O性能的重要手段,常用于Java NIO、Netty、Kafka等框架中。本文将详细解析零拷贝技术的原理及其应用。 ... [详细]
  • 本文主要探讨了Java中处理ActionEvent事件的接口,以及一些常见的编程问题和解决方案,包括方法重载、成员变量访问、镜片质量检测等。 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 线程能否先以安全方式获取对象,再进行非安全发布? ... [详细]
  • C++ 开发实战:实用技巧与经验分享
    C++ 开发实战:实用技巧与经验分享 ... [详细]
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
author-avatar
无声无无为爱
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有