热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Hbase操作工具类

依赖jarorg.apache.hbasehbase-client

 

依赖jar

org.apache.hbasehbase-client2.0.5

 

 

HbaseUtils.java

package javax.utils;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;/*** Hbase 操作工具类* * @author Logan* @version 1.0.0* @createDate 2019-05-03**/
public class HbaseUtils {// ===============Common=====================================/*** 根据表名获取Table对象* * @param name 表名,必要时可指定命名空间,比如:“default:user”* @return Hbase Table 对象* @throws IOException 有异常抛出,由调用者捕获处理*/public static Table getTable(String name) throws IOException {TableName tableName = TableName.valueOf(name);Connection connection = ConnectionFactory.createConnection();return connection.getTable(tableName);}// =============== Put =====================================/*** 根据rowKey生成一个Put对象* * @param rowKey rowKey* @return Put对象*/public static Put createPut(String rowKey) {return new Put(Bytes.toBytes(rowKey));}/*** 在Put对象上增加Cell* * @param put Put对象* @param cell cell对象* @throws IOException 有异常抛出,由调用者捕获处理*/public static void addCellOnPut(Put put, Cell cell) throws IOException {put.add(cell);}/*** 在Put对象上增加值* * @param put Put对象* @param family 列簇* @param qualifier 列* @param value 字符串类型的值*/public static void addValueOnPut(Put put, String family, String qualifier, String value) {addValueOnPut(put, family, qualifier, Bytes.toBytes(value));}/*** 在Put对象上增加值* * @param put Put对象* @param family 列簇* @param qualifier 列* @param value 字节数组类型的值,可以是任意对象序列化而成*/public static void addValueOnPut(Put put, String family, String qualifier, byte[] value) {put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);}/*** 在Put对象上增加值* * @param put Put对象* @param family 列簇* @param qualifier 列* @param ts Timestamp时间戳* @param value 字符串类型的值*/public static void addValueOnPut(Put put, String family, String qualifier, long ts, String value) {addValueOnPut(put, family, qualifier, ts, Bytes.toBytes(value));}/*** 在Put对象上增加值* * @param put Put对象* @param family 列簇* @param qualifier 列* @param ts Timestamp时间戳* @param value 字节数组类型的值,可以是任意对象序列化而成*/public static void addValueOnPut(Put put, String family, String qualifier, long ts, byte[] value) {put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), ts, value);}/*** 按表名插入一个Put对象包含的数据* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param put 要插入的数据对象* @throws IOException 有异常抛出,由调用者捕获处理*/public static void put(String tableName, Put put) throws IOException {try (Table table = getTable(tableName);) {table.put(put);}}/*** 按表名批量插入Put对象包含的数据* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param puts 要插入的数据对象集合* @throws IOException 有异常抛出,由调用者捕获处理*/public static void put(String tableName, List puts) throws IOException {try (Table table = getTable(tableName);) {table.put(puts);}}// =============== Get =====================================/*** 根据rowKey生成一个查询的Get对象* * @param rowKey rowKey* @return Get 对象*/public static Get createGet(String rowKey) {return new Get(Bytes.toBytes(rowKey));}/*** 对查询的Get对象增加指定列簇* * @param get* @param family*/public static void addFamilyOnGet(Get get, String family) {get.addFamily(Bytes.toBytes(family));}/*** 对查询的Get对象增加指定列簇和列* * @param get* @param family* @param qualifier*/public static void addColumnOnGet(Get get, String family, String qualifier) {get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));}/*** 根据表名和rowKey查询结果(包含全部列簇和列)* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param rowKey 查询rowKey* @return 查询结果Result* @throws IOException 有异常抛出,由调用者捕获处理*/public static Result get(String tableName, String rowKey) throws IOException {Get get = createGet(rowKey);return get(tableName, get);}/*** 根据表名和rowKey数组批量查询结果(包含全部列簇和列)* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param rowKeys 查询rowKey数组* @return 查询结果Result数组* @throws IOException 有异常抛出,由调用者捕获处理*/public static Result[] get(String tableName, String[] rowKeys) throws IOException {List gets = new ArrayList();for (String rowKey : rowKeys) {gets.add(createGet(rowKey));}return get(tableName, gets);}/*** 根据表名和Get对象查询结果* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param get Hbase查询对象* @return 查询结果Result* @throws IOException 有异常抛出,由调用者捕获处理*/public static Result get(String tableName, Get get) throws IOException {try (Table table = getTable(tableName);) {return table.get(get);}}/*** 根据表名和Get对象数组查询结果* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param gets 多个Hbase查询对象组成的数组* @return 查询结果Result数组* @throws IOException 有异常抛出,由调用者捕获处理*/public static Result[] get(String tableName, List gets) throws IOException {try (Table table = getTable(tableName);) {return table.get(gets);}}// =============== Scan =====================================/*** 根据startRow和stopRow创建扫描对象* * @param startRow 扫描开始行,结果包含该行* @param stopRow 扫描结束行,结果不包含该行* @return Scan对象*/public static Scan createScan(String startRow, String stopRow) {Scan scan = new Scan();scan.withStartRow(Bytes.toBytes(startRow));scan.withStopRow(Bytes.toBytes(stopRow));return scan;}/*** 对扫描对象设置列簇* * @param scan 扫描对象* @param family 列簇*/public static void addFamilyOnScan(Scan scan, String family) {scan.addFamily(Bytes.toBytes(family));}/*** 对扫描对象设置列* * @param scan 扫描对象* @param family 列簇* @param qualifier 列簇下对应的列*/public static void addColumnOnScan(Scan scan, String family, String qualifier) {scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));}/*** 根据表名和扫描对象扫描数据* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param scan 扫描对象* @return 扫描结果集对象ResultScanner* @throws IOException 有异常抛出,由调用者捕获处理*/public static ResultScanner scan(String tableName, Scan scan) throws IOException {try (Table table = getTable(tableName);) {return table.getScanner(scan);}}/*** 根据表名、开始行和结束行扫描数据(结果包含开始行,不包含结束行,半开半闭区间[startRow, stopRow))* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param startRow 扫描开始行* @param stopRow 扫描结束行* @return 扫描结果集对象ResultScanner* @throws IOException 有异常抛出,由调用者捕获处理*/public static ResultScanner scan(String tableName, String startRow, String stopRow) throws IOException {return scan(tableName, createScan(startRow, stopRow));}// =============== Delete =====================================/*** 根据rowKey生成一个查询的Delete对象* * @param rowKey rowKey* @return Delete对象*/public static Delete createDelete(String rowKey) {return new Delete(Bytes.toBytes(rowKey));}/*** 在Delete对象上增加Cell* * @param delete Delete对象* @param cell cell对象* @throws IOException 有异常抛出,由调用者捕获处理*/public static void addCellOnDelete(Delete delete, Cell cell) throws IOException {delete.add(cell);}/*** 对删除对象增加指定列簇* * @param delete Delete对象* @param family 列簇*/public static void addFamilyOnDelete(Delete delete, String family) {delete.addFamily(Bytes.toBytes(family));}/*** 对删除对象增加指定列簇和列* * @param delete Delete对象* @param family 列簇* @param qualifier 列*/public static void addColumnOnDelete(Delete delete, String family, String qualifier) {delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));}/*** 按表名删除一个Delete对象指定的数据* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param delete Delete对象* @throws IOException 有异常抛出,由调用者捕获处理*/public static void delete(String tableName, Delete delete) throws IOException {try (Table table = getTable(tableName);) {table.delete(delete);}}/*** 按表名批量删除Delete对象集合包含的指定数据* * @param tableName 表名,必要时可指定命名空间,比如:“default:user”* @param deletes Delete对象集合* @throws IOException 有异常抛出,由调用者捕获处理*/public static void delete(String tableName, List deletes) throws IOException {try (Table table = getTable(tableName);) {table.delete(deletes);}}}

 

 

 

以下是测试类

HbaseClientDemo.java

package com.java.demo;import java.util.ArrayList;
import java.util.List;import javax.utils.HbaseUtils;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;/*** Hbase 客户端测试* * @author Logan* @version 1.0.0* @createDate 2019-05-03**/
public class HbaseClientDemo {/*** 向user表中插入数据*/@Testpublic void put() {String tableName = "default:user";try {List puts = new ArrayList();Put put = HbaseUtils.createPut("key1005");HbaseUtils.addValueOnPut(put, "info", "name", "孙悟空");HbaseUtils.addValueOnPut(put, "info", "age", "500");HbaseUtils.addValueOnPut(put, "info", "address", "花果山");// HbaseUtils.put(tableName, put);
puts.add(put);put = HbaseUtils.createPut("key1006");HbaseUtils.addValueOnPut(put, "info", "name", "沙悟净");HbaseUtils.addValueOnPut(put, "info", "age", "1000");HbaseUtils.addValueOnPut(put, "info", "address", "流沙河");puts.add(put);HbaseUtils.put(tableName, puts);} catch (Exception e) {e.printStackTrace();}}/*** 按rowKey批量查询user表中全部列簇全部列的值*/@Testpublic void getAllFamily() {try {String tableName = "default:user";String[] rowKeys = { "key1001", "key1002", "key1003", "key1005", "key1006" };// 按表名和rowKey查询所有列Result[] results = HbaseUtils.get(tableName, rowKeys);for (Result result : results) {// 打印查询结果
printResult(result);}} catch (Exception e) {e.printStackTrace();}}/*** 按rowKey查询user表中指定列簇指定列的值*/@Testpublic void get() {try {String tableName = "default:user";String rowKey = "key1002";Get get = HbaseUtils.createGet(rowKey);HbaseUtils.addColumnOnGet(get, "info", "name");HbaseUtils.addColumnOnGet(get, "info", "age");// 不存在的列,查询结果不显示HbaseUtils.addColumnOnGet(get, "info", "address");// 如果在增加列后增加已有的列簇,会返回该列簇的全部列数据,覆盖前边的增加列// HbaseUtils.addFamilyOnGet(get, "info");
Result result = HbaseUtils.get(tableName, get);printResult(result);} catch (Exception e) {e.printStackTrace();}}@Testpublic void scan() {try {String tableName = "default:user";String startRow = "key1001";String stopRow = "key1006";ResultScanner resultScanner = HbaseUtils.scan(tableName, startRow, stopRow);for (Result result : resultScanner) {printResult(result);}} catch (Exception e) {e.printStackTrace();}}/*** 打印查询结果* * @param result 查询结果对象*/private void printResult(Result result) {Cell[] cells = result.rawCells();// 从Result中读取 rowKey
System.out.println(Bytes.toString(result.getRow()));String print = "%s\t %s:%s \t %s";for (Cell cell : cells) {// 从Cell中取rowKeyString row = Bytes.toString(CellUtil.cloneRow(cell));String family = Bytes.toString(CellUtil.cloneFamily(cell));String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));String value = Bytes.toString(CellUtil.cloneValue(cell));System.out.println(String.format(print, row, family, qualifier, value));}}/*** 删除指定列*/@Testpublic void deleteColumn() {try {String tableName = "default:user";List deletes = new ArrayList();Delete delete = HbaseUtils.createDelete("key1005");HbaseUtils.addColumnOnDelete(delete, "info", "age");HbaseUtils.addColumnOnDelete(delete, "info", "address");// HbaseUtils.delete(tableName, delete);
deletes.add(delete);delete = HbaseUtils.createDelete("key1006");HbaseUtils.addColumnOnDelete(delete, "info", "address");deletes.add(delete);HbaseUtils.delete(tableName, deletes);} catch (Exception e) {e.printStackTrace();}}/*** 删除指定列簇*/@Testpublic void deleteFamily() {try {String tableName = "default:user";List deletes = new ArrayList();Delete delete = HbaseUtils.createDelete("key1005");HbaseUtils.addFamilyOnDelete(delete, "info");// HbaseUtils.delete(tableName, delete);
deletes.add(delete);delete = HbaseUtils.createDelete("key1006");HbaseUtils.addFamilyOnDelete(delete, "info");deletes.add(delete);HbaseUtils.delete(tableName, deletes);} catch (Exception e) {e.printStackTrace();}}}

 

  

 

Hbase 操作工具类

.


转载于:https://www.cnblogs.com/jonban/p/hbase-client.html


推荐阅读
author-avatar
无声无无为爱
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有