热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

中的rowkey获取hbase_自定义Rowkey规则读取Hbase数据

在Flink中我们有时候需要分析数据1点到2点的范围,可是经过Region又比较慢,这时候我们就可以定制TableInputFormat来实现我们的需求了࿰

在Flink中我们有时候需要分析数据1点到2点的范围,可是经过Region又比较慢,这时候我们就可以定制TableInputFormat来实现我们的需求了,我们还可以采用Flink的DataSet的方式读取,另外下面还有Spark读取的例子。

使用教程

Md5Util.javaimport org.apache.commons.codec.binary.Hex;import java.security.MessageDigest;import java.security.NoSuchAlgorithmException;public class Md5Util {    public static String md5(byte[] key) {        return md5(key, 0, key.length);

}    public static String md5(byte[] key, int offset, int length) {        try {

MessageDigest e = MessageDigest.getInstance("MD5");

e.update(key, offset, length);            byte[] digest = e.digest();            return new String(Hex.encodeHex(digest));

} catch (NoSuchAlgorithmException var5) {            throw new RuntimeException("Error computing MD5 hash", var5);

}

}    public static String md5(String str) {        return md5(str.getBytes());

}    public static String md5(String str,int offset, int length) {        return md5(str.getBytes(),offset,length);

}

}

数据Split方式private Connection connection;    private Admin admin;    @Before

public void init() throws Exception {

System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");

System.setProperty("sun.security.krb5.debug", "false");        final String user = "hbase/abc.demo.com@DEMO.COM";        final String keyPath = "/home/dounine/kerberos/lake.keytab";

Configuration conf = new Configuration();

conf.addResource("hbase-site.xml");

UserGroupInformation.setConfiguration(conf);

UserGroupInformation.loginUserFromKeytab(user, keyPath);

connection = ConnectionFactory.createConnection(conf);

admin = connection.getAdmin();

}@Test

public void createTable() throws IOException {

TableName table = TableName.valueOf("logTable1");

TableDescriptorBuilder tableDesc = TableDescriptorBuilder.newBuilder(table);

tableDesc.setValue(TableDescriptorBuilder.SPLIT_POLICY,KeyPrefixRegionSplitPolicy.class.getName());

tableDesc.setValue(KeyPrefixRegionSplitPolicy.PREFIX_LENGTH_KEY,"2");

ColumnFamilyDescriptor extCF = ColumnFamilyDescriptorBuilder.newBuilder("ext".getBytes()).build();

ColumnFamilyDescriptor deviceCF = ColumnFamilyDescriptorBuilder.newBuilder("device".getBytes()).build();

ColumnFamilyDescriptor locationCF = ColumnFamilyDescriptorBuilder.newBuilder("location".getBytes()).build();

tableDesc.setColumnFamilies(Arrays.asList(extCF,locationCF,deviceCF));        try {            byte[][] splitKeys = new byte[4][];

splitKeys[0] = Bytes.toBytes("00");

splitKeys[1] = Bytes.toBytes("40");

splitKeys[2] = Bytes.toBytes("80");

splitKeys[3] = Bytes.toBytes("c0");

admin.createTable(tableDesc.build(),splitKeys);

} catch (IOException e) {

e.printStackTrace();

}

}

logTable1数据写入方式public class HbaseKerberos{

private static final Logger LOGGER = LoggerFactory.getLogger(HbaseKerberos.class);

private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");

private static final String TABLE_NAME = "logTable1";

public void insertDataToHbase1(String appKey,List hasDatas) throws IOException {

Table table = HbaseUtils.getTable(TABLE_NAME);

Long sumCount = 0L;        /**

* 常规值

*/

byte[] extCF = Bytes.toBytes("ext");//CF列族

Random random = new Random();

List rows &#61; new ArrayList<>();        for (Log logEntity : hasDatas) {

JSONObject dataJsonObject &#61; logEntity.getData();

JSONObject extJsonObject &#61; dataJsonObject.getJSONObject("ext");            String userId &#61; extJsonObject.getString("userId");            String timeStr &#61; logEntity.getTime().format(dtf);

String md5Str &#61; Md5Util.md5(userId);            String rowKey &#61; new StringBuilder()

.append(md5Str.substring(0,2))//md5出来的前两位最高为ff,00~ff为256位&#xff0c;后期Region可以增加那么多&#xff0c;足够使用了。

.append("|")

.append(timeStr)//时间

.append("|")

.append(CrcUtil.getCrcValue(appKey))

.append("|")

.append(md5Str.substring(2,8))

.append("|")

.append(Md5Util.md5(UUID.randomUUID().toString()).substring(0,2))

.toString();

Put row &#61; new Put(Bytes.toBytes(rowKey));            for(String keyName : extJsonObject.keySet()){                String value &#61; extJsonObject.getString(keyName);                if(StringUtils.isNotBlank(value)){

row.addColumn(extCF, Bytes.toBytes(keyName), Bytes.toBytes(value));

}

}

row.addColumn(extCF, Bytes.toBytes("time"), Bytes.toBytes(logEntity.getTime().toString()));

/**

* 设备信息

*/

putFieldToRow(logEntity.getData(),"device",row);            /**

* 位置信息

*/

putFieldToRow(logEntity.getData(),"location",row);

rows.add(row);

}        for(Integer[] durtation : LimitUtil.getLimits(rows.size(),1000)){            Object[] results &#61; new Object[(durtation[1]-durtation[0])];            try {

table.batch(rows.subList(durtation[0], durtation[1]),results);

} catch (InterruptedException e) {

e.printStackTrace();

}

sumCount &#43;&#61; (durtation[1]-durtation[0]);

}

LOGGER.info("write data count:" &#43; sumCount);

}

}

logTable1数据00|20180518203401772|2352356512|4519 column&#61;ext:appKey, timestamp&#61;1533646292389, value&#61;898b7e90-5754-11e8-983c-6b4bcc3b7c2e

f3|f1

00|20180518203401772|2352356512|4519 column&#61;ext:channelCode, timestamp&#61;1533646292389, value&#61;guanlan-resurrection-002-

f3|f1

00|20180518203401772|2352356512|4519 column&#61;ext:createDateTime, timestamp&#61;1533646292389, value&#61;1526646836093

f3|f1

00|20180518203401772|2352356512|4519 column&#61;ext:retain, timestamp&#61;1533646292389, value&#61;17670

f3|f1

00|20180518203401772|2352356512|4519 column&#61;ext:scene, timestamp&#61;1533646292389, value&#61;1007

f3|f1

00|20180518203401772|2352356512|4519 column&#61;ext:shareId, timestamp&#61;1533646292389, value&#61;ogJmG5ItE_nBCS3pg5XCvGotGI1c

f3|f1

00|20180518203401772|2352356512|4519 column&#61;ext:time, timestamp&#61;1533646292389, value&#61;2018-05-18T20:34:01

f3|f1

00|20180518203401772|2352356512|4519 column&#61;ext:type, timestamp&#61;1533646292389, value&#61;login_in

f3|f1

00|20180518203401772|2352356512|4519 column&#61;ext:userId, timestamp&#61;1533646292389, value&#61;ogJmG5KRcIxtyg7UmcRHFCn6YiAQ

f3|f1

00|20180518203406167|2352356512|4519 column&#61;ext:appKey, timestamp&#61;1533646347725, value&#61;898b7e90-5754-11e8-983c-6b4bcc3b7c2e

f3|54

00|20180518203406167|2352356512|4519 column&#61;ext:channelCode, timestamp&#61;1533646347725, value&#61;guanlan-regular-001-

f3|54

00|20180518203406167|2352356512|4519 column&#61;ext:createDateTime, timestamp&#61;1533646347725, value&#61;1526646839075

f3|54

00|20180518203406167|2352356512|4519 column&#61;ext:retain, timestamp&#61;1533646347725, value&#61;17670

f3|54

00|20180518203406167|2352356512|4519 column&#61;ext:shareId, timestamp&#61;1533646347725, value&#61;ogJmG5KRcIxtyg7UmcRHFCn6YiAQ

f3|54

00|20180518203406167|2352356512|4519 column&#61;ext:time, timestamp&#61;1533646347725, value&#61;2018-05-18T20:34:06

f3|54

00|20180518203406167|2352356512|4519 column&#61;ext:type, timestamp&#61;1533646347725, value&#61;sharesuccess

f3|54

00|20180518203406167|2352356512|4519 column&#61;ext:userId, timestamp&#61;1533646347725, value&#61;ogJmG5KRcIxtyg7UmcRHFCn6YiAQ

f3|54

00|20180518203407144|2352356512|5ca1 column&#61;ext:appKey, timestamp&#61;1533646294045, value&#61;898b7e90-5754-11e8-983c-6b4bcc3b7c2e

c4|bc

00|20180518203407144|2352356512|5ca1 column&#61;ext:createDateTime, timestamp&#61;1533646294045, value&#61;1526646849745

c4|bc

00|20180518203407144|2352356512|5ca1 column&#61;ext:retain, timestamp&#61;1533646294045, value&#61;17670

c4|bc

00|20180518203407144|2352356512|5ca1 column&#61;ext:scene, timestamp&#61;1533646294045, value&#61;1037

c4|bc

00|20180518203407144|2352356512|5ca1 column&#61;ext:time, timestamp&#61;1533646294045, value&#61;2018-05-18T20:34:07

c4|bc

00|20180518203407144|2352356512|5ca1 column&#61;ext:type, timestamp&#61;1533646294045, value&#61;login_in

CustomTableInputFormat.javaimport org.apache.commons.lang3.StringUtils;import org.apache.hadoop.hbase.HRegionLocation;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.mapreduce.RegionSizeCalculator;import org.apache.hadoop.hbase.mapreduce.TableInputFormat;import org.apache.hadoop.hbase.mapreduce.TableSplit;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.hbase.util.Strings;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.net.DNS;import java.io.IOException;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.UnknownHostException;import java.util.ArrayList;import java.util.HashMap;import java.util.List;public class CustomTableInputFormat extends TableInputFormat {    private HashMap reverseDNSCacheMap &#61;            new HashMap<>();    private List keys &#61; new ArrayList<>();    public CustomTableInputFormat(){        super();        for(int i &#61;0;i<256;i&#43;&#43;){

keys.add(StringUtils.substring("00"&#43;Integer.toHexString(i),-2));

}

}    &#64;Override

public List getSplits(JobContext context) throws IOException {        super.initialize(context);

TableName tableName &#61; super.getTable().getName();

RegionSizeCalculator sizeCalculator &#61; new RegionSizeCalculator(getRegionLocator(), getAdmin());

List splits &#61; new ArrayList<>();        for (String key : keys) {

HRegionLocation location &#61; getRegionLocator().getRegionLocation(Bytes.toBytes(key), false);

InetSocketAddress isa &#61; new InetSocketAddress(location.getHostname(), location.getPort());

InetAddress regionAddress &#61; isa.getAddress();

String regionLocation;

regionLocation &#61; reverseDNS(regionAddress);            byte[] regionName &#61; location.getRegion().getRegionName();

String encodedRegionName &#61; location.getRegion().getEncodedName();            long regionSize &#61; sizeCalculator.getRegionSize(regionName);            byte[] splitStart &#61; Bytes.add(Bytes.toBytes(key&#43;"|"),this.getScan().getStartRow());            byte[] splitStop &#61; Bytes.add(Bytes.toBytes(key&#43;"|"),this.getScan().getStopRow());

TableSplit split &#61; new TableSplit(tableName, this.getScan(),

splitStart, splitStop, regionLocation, encodedRegionName, regionSize);

splits.add(split);

}        return splits;

}    String reverseDNS(InetAddress ipAddress) throws UnknownHostException {

String hostName &#61; this.reverseDNSCacheMap.get(ipAddress);        if (hostName &#61;&#61; null) {

String ipAddressString &#61; null;            try {

ipAddressString &#61; DNS.reverseDns(ipAddress, null);

} catch (Exception e) {

ipAddressString &#61; InetAddress.getByName(ipAddress.getHostAddress()).getHostName();

}            if (ipAddressString &#61;&#61; null) throw new UnknownHostException("No host found for " &#43; ipAddress);

hostName &#61; Strings.domainNamePointerToHostName(ipAddressString);            this.reverseDNSCacheMap.put(ipAddress, hostName);

}        return hostName;

}

}

Flink例子static Configuration conf;    static {

HadoopKrbLogin.login();

conf &#61; new Configuration();

String tableName &#61; "logTable1";

conf.addResource("hbase-site.xml");

Scan scan &#61; new Scan();

scan.setCaching(1000);

scan.withStartRow("201805182039".getBytes());

scan.withStopRow("201805182040".getBytes());

scan.setCacheBlocks(false);

conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, tableName);

ClientProtos.Scan proto &#61; null;        try {

proto &#61; ProtobufUtil.toScan(scan);

} catch (IOException e) {

e.printStackTrace();

}

String ScanToString &#61; Base64.encodeBytes(proto.toByteArray());

conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN, ScanToString);

}    public static void main(String[] args) throws Exception {        final ExecutionEnvironment env &#61; ExecutionEnvironment.getExecutionEnvironment();

DataSource> hbase &#61; env.createInput(

HadoopInputs.createHadoopInput(                        new CustomTableInputFormat(),

ImmutableBytesWritable.class,

Result.class,

Job.getInstance(conf)

)

);

DataSet toTuple &#61; hbase.map(                new MapFunction, LogEntity>() {                    public LogEntity map(Tuple2 record) throws Exception {

Result result &#61; record.f1;                        return result2Entity(result);

}

});

}private static LogEntity result2Entity(Result result) {

JSONObject root &#61; new JSONObject();

JSONObject ext &#61; new JSONObject();

JSONObject device &#61; new JSONObject();

JSONObject location &#61; new JSONObject();        for (Cell cell : result.rawCells()) {            byte[] family &#61; CellUtil.cloneFamily(cell);            byte[] column &#61; CellUtil.cloneQualifier(cell);            byte[] value &#61; CellUtil.cloneValue(cell);

String columnName &#61; Bytes.toString(column);            if ("ext".equals(Bytes.toString(family))) {                if ("durationTime".equals(columnName)) {

ext.put(columnName, Bytes.toLong(value));

} else if ("time".equals(columnName)) {

root.put(columnName, Bytes.toString(value));

root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value))));

} else {

ext.put(columnName, Bytes.toString(value));

}

} else if ("device".equals(Bytes.toString(family))) {

device.put(columnName, Bytes.toString(value));

} else if ("location".equals(Bytes.toString(family))) {

location.put(columnName, Bytes.toString(value));

}

}

JSONObject data &#61; new JSONObject();        if (device.keySet().size() > 0) {

data.put("device", device);

}        if (location.keySet().size() > 0) {

data.put("location", location);

}

data.put("ext", ext);

root.put("data", data);        return JSON.parseObject(root.toString(), LogEntity.class);

}

Spark 例子public class SimpleApp implements Serializable {

static Configuration cfg &#61; null;    static {

HadoopKrbLogin.login();

cfg &#61; new Configuration();

String tableName &#61; "logTable1";

cfg.addResource("hbase-site.xml");

Scan scan &#61; new Scan();

scan.setCaching(1000);

scan.withStartRow("201805182039".getBytes());

scan.withStopRow("201805182040".getBytes());

scan.setCacheBlocks(false);

cfg.set(TableInputFormat.INPUT_TABLE, tableName);

ClientProtos.Scan proto &#61; null;        try {

proto &#61; ProtobufUtil.toScan(scan);

} catch (IOException e) {

e.printStackTrace();

}

String ScanToString &#61; Base64.encodeBytes(proto.toByteArray());

cfg.set(TableInputFormat.SCAN, ScanToString);

}public static void main(String[] args) {

SparkConf sparkConf &#61; new SparkConf()

.setMaster("local")

.setAppName("HbaseDemo");

JavaSparkContext jsc &#61; new JavaSparkContext(sparkConf);

JavaPairRDD hBaseRDD &#61;

jsc.newAPIHadoopRDD(cfg, CustomTableInputFormat.class, ImmutableBytesWritable.class, Result.class);        // do some transformation

JavaRDD rdd1 &#61; hBaseRDD.mapPartitions((FlatMapFunction>, LogEntity>)

tuple2Iterator -> {

List logEntities &#61; new ArrayList<>();                    while (tuple2Iterator.hasNext()) {

Tuple2 tuple &#61; tuple2Iterator.next();

Result result &#61; tuple._2;

String rowKey &#61; Bytes.toString(result.getRow());

logEntities.add(result2Entity(result));

}                    return logEntities.iterator();

});

}private static LogEntity result2Entity(Result result) {

JSONObject root &#61; new JSONObject();

JSONObject ext &#61; new JSONObject();

JSONObject device &#61; new JSONObject();

JSONObject location &#61; new JSONObject();        for (Cell cell : result.rawCells()) {            byte[] family &#61; CellUtil.cloneFamily(cell);            byte[] column &#61; CellUtil.cloneQualifier(cell);            byte[] value &#61; CellUtil.cloneValue(cell);

String columnName &#61; Bytes.toString(column);            if ("ext".equals(Bytes.toString(family))) {                if ("durationTime".equals(columnName)) {

ext.put(columnName, Bytes.toLong(value));

} else if ("time".equals(columnName)) {

root.put(columnName, Bytes.toString(value));

root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value))));

} else {

ext.put(columnName, Bytes.toString(value));

}

} else if ("device".equals(Bytes.toString(family))) {

device.put(columnName, Bytes.toString(value));

} else if ("location".equals(Bytes.toString(family))) {

location.put(columnName, Bytes.toString(value));

}

}

JSONObject data &#61; new JSONObject();        if (device.keySet().size() > 0) {

data.put("device", device);

}        if (location.keySet().size() > 0) {

data.put("location", location);

}

data.put("ext", ext);

root.put("data", data);        return JSON.parseObject(root.toString(), LogEntity.class);

}

作者&#xff1a;dounine

链接&#xff1a;https://www.jianshu.com/p/ed86302eec2a



推荐阅读
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
  • 本文介绍了在MFC下利用C++和MFC的特性动态创建窗口的方法,包括继承现有的MFC类并加以改造、插入工具栏和状态栏对象的声明等。同时还提到了窗口销毁的处理方法。本文详细介绍了实现方法并给出了相关注意事项。 ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • 开发笔记:对称加密详解,以及JAVA简单实现
     (原)常用的加密有3种1、正向加密,如MD5,加密后密文固定,目前还没办法破解,但是可以能过数据库撞库有一定概率找到,不过现 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 本文介绍了在Java中gt、gtgt、gtgtgt和lt之间的区别。通过解释符号的含义和使用例子,帮助读者理解这些符号在二进制表示和移位操作中的作用。同时,文章还提到了负数的补码表示和移位操作的限制。 ... [详细]
  • baresip android编译、运行教程1语音通话
    本文介绍了如何在安卓平台上编译和运行baresip android,包括下载相关的sdk和ndk,修改ndk路径和输出目录,以及创建一个c++的安卓工程并将目录考到cpp下。详细步骤可参考给出的链接和文档。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
author-avatar
津pig
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有