热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Hbase0.96与Java的交互访问

hbase-0.96.x相对hbase-0.94.x的改变hadoop:hadoop-2.2.0hbase:hbase-0.96.01.org.apache.hadoop.hbase.client.

hbase-0.96.x相对hbase-0.94.x的改变

hadoop:hadoop-2.2.0
hbase:hbase-0.96.0
1.org.apache.hadoop.hbase.client.Put
    <1>取消了无参的构造方法
    <2>Put类不再继承Writable类     
        0.94.6时public class Put extends Mutation implements HeapSize, Writable, Comparable
        0.96.0时public class Put extends Mutation implements HeapSize, Comparable
解决方法:
        由public class MonthUserLoginTimeIndexReducer extends Reducer {
改public class MonthUserLoginTimeIndexReducer extends Reducer {
2.org.apache.hadoop.hbase.client.Mutation.familyMap
     org.apache.hadoop.hbase.client.Mutation.familyMap类型改变:
     /**
     * 0.94.6
     * protected Map> familyMap
     * 
     * 0.96.*
     * protected NavigableMap> familyMap
     * org.apache.hadoop.hbase.Cell hbase-0.94.*中是没有的
     */    

     org.apache.hadoop.hbase.KeyValue的改变:
     /**
     * 0.94.*
     * public class KeyValue extends Object implements Writable, HeapSize
     * 
     * 0.96.0
     * public class KeyValue extends Object implements Cell, HeapSize, Cloneable
     */
     解决方法:将代码中的List改成List
3. org.apache.hadoop.hbase.KeyValue
     0.96.0中方法getFamily已被弃用(Deprecated),改成方法getFamilyArray() 
4.org.apache.hadoop.hbase.HTableDescriptor   
     类org.apache.hadoop.hbase.HTableDescriptor的构造方法public HTableDescriptor(String name)已被弃用(Deprecated)
     解决方法:使用public HTableDescriptor(TableName name)
     旧:HTableDescriptor tableDesc = new HTableDescriptor(tableName);
     新:HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
5.org.apache.hadoop.hbase.client.HTablePool
     类org.apache.hadoop.hbase.client.HTablePool整个被弃用(Deprecated)
     解决方法:使用HConnection.getTable(String)代替,HConnection是个接口,类CoprocessorHConnection是它唯一的实现类:
     HRegionServer hRegiOnServer= new HRegionServer(conf) ;
     HConnection cOnnection= HConnectionManager.createConnection(conf);
     hCOnnection= new CoprocessorHConnection(connection,hRegionServer);
6.org.apache.hadoop.hbase.client.Result
     方法public KeyValue[] raw()被弃用(Deprecated),建议使用public Cell[] rawCells()
     方法getRow被弃用(Deprecated)
     方法getFamily被弃用(Deprecated)
     方法getQualifier被弃用(Deprecated)
     方法getValue被弃用(Deprecated)
     方法public List getColumn(byte[] family,byte[] qualifier)被弃用(Deprecated)
     方法public KeyValue getColumnLatest(byte[] family,byte[] qualifier)被弃用(Deprecated)
     Cell中:改成以下方法
     getRowArray()
     getFamilyArray()
     getQualifierArray()
     getValueArray()
     Result中:增加如下方法
     public List getColumnCells(byte[] family,byte[] qualifier)
     public KeyValue getColumnLatestCell(byte[] family,byte[] qualifier)
     改动:所有ipeijian_data中凡是和【新增用户活跃用户流失用户】相关的都做如下变化:
     旧代码:if (value.raw().length == 1
     新代码:if (value.rawCells().length == 1
7.job中设置TableInputFormat.SCAN
     0.96.0中去掉了方法:public void write(DataOutput out)throws IOException
     之前版本使用conf.set(TableInputFormat.SCAN, StatUtils.convertScanToString(scan));进行设置
     StatUtils.convertScanToString的具体实现为:
     public static String convertScanToString(Scan scan) throws IOException {
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            DataOutputStream dos = new DataOutputStream(out);
            scan.write(dos);
            return Base64.encodeBytes(out.toByteArray());
     }
     该方法的实现与TableMapReduceUtil.convertScanToString(Scan scan)是一样的。
     但是当hbase升级到了0.96.*是对于类Scan弃用(不仅仅是Deprecated,而是Deleted)了方法write,所以上面
     的实现变为不正确
     hbase0.96.*中对该方法进行了重新的实现:
     public static String convertScanToString(Scan scan) throws IOException {
            ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
            return Base64.encodeBytes(proto.toByteArray());
     }
     所以做如下更改:
     StatUtils类中方法convertScanToString的实现做如上更改以适配hbase0.96.* 
8.cn.m15.ipj.db.hbase.MyPut
    自定义的Put类,比传统的Put类多一个length,原版和新版代码比较:
    原版:(红色字体为API变为新版时报错的地方)

public class MyPut extends Put {
     public MyPut(byte[] row, int length) {                                    
     //原因是put的无参构造方法已经在新本中消失
          if (row == null || length > HConstants.MAX_ROW_LENGTH) {
               throw new IllegalArgumentException(“Row key is invalid”);
          }
          this.row = Arrays.copyOf(row, length);
          this.ts = HConstants.LATEST_TIMESTAMP;
     }    
     public MyPut add(byte[] family, byte[] qualifier, long ts, byte[] value,int length) {
          List list = getKeyValueList(family);
          KeyValue kv = createPutKeyValue(family, qualifier, ts, value, length);
          list.add(kv);
          familyMap.put(kv.getFamily(), list);                                   
          //familyMap的类型已经改变
          return this;
      }
     private List getKeyValueList(byte[] family) {
          List list = familyMap.get(family);                     
          //familyMap的类型已经改变
          if (list == null) {
               list = new ArrayList(0);
          }
          return list;
     }
     private KeyValue createPutKeyValue(byte[] family, byte[] qualifier,long ts, byte[] value, int length) {
          return new KeyValue(this.row, 0, this.row.length, family, 0,
          family.length, qualifier, 0, qualifier.length, ts,
          KeyValue.Type.Put, value, 0, length);
     }
}

更改之后:

public MyPut(byte[] row, int length) {
     super(row,length);                                                                      
     //新增加
     if (row == null || length > HConstants.MAX_ROW_LENGTH) {
          throw new IllegalArgumentException(“Row key is invalid”);
     }
     this.row = Arrays.copyOf(row, length);
     this.ts = HConstants.LATEST_TIMESTAMP;
     }
     public MyPut add(byte[] family, byte[] qualifier, long ts, byte[] value,int length) {
          List<Cell> list = getCellsList(family);
          KeyValue kv = createPutKeyValue(family, qualifier, ts, value, length);
          list.add(kv);
          familyMap.put(CellUtil.cloneFamily(kv), list);
          return this;
     }    
     private List getCellsList(byte[] family) {
          List<Cell> list = familyMap.get(family);
          if (list == null) {
              list = new ArrayList<Cell>(0);
          }
          return list;
     }
     private KeyValue createPutKeyValue(byte[] family, byte[] qualifier,long ts, byte[] value, int length) {
          return new KeyValue(this.row, 0, this.row.length, family, 0,family.length, qualifier, 0, qualifier.length, ts,
                    KeyValue.Type.Put, value, 0, length);
     }
}

 

package com.test.hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.Cell;

public class HbaseTest {

private static Configuration cOnf= null;
/**
* 初始化配置
*/
static {
conf
= HBaseConfiguration.create();
}

/**
* 创建表操作
*
*
@throws IOException
*
@throws ZooKeeperConnectionException
*
@throws MasterNotRunningException
*/
public void createTable(String tableName, String[] cfs)
throws MasterNotRunningException, ZooKeeperConnectionException,
IOException {
HBaseAdmin admin
= new HBaseAdmin(conf);
if (admin.tableExists(tableName)) {
System.out.println(
"表已经存在!");
}
else {
HTableDescriptor tableDesc
= new HTableDescriptor(
TableName.valueOf(tableName));
for (int i = 0; i ) {
tableDesc.addFamily(new HColumnDescriptor(cfs[i]));
}
admin.createTable(tableDesc);
admin.close();
System.out.println(
"表创建成功!");
}
}

/**
* 删除表操作
*/
public void deleteTable(String tableName) {
HBaseAdmin admin;
try {
admin
= new HBaseAdmin(conf);
admin.disableTable(tableName);
admin.deleteTable(tableName);
admin.close();
System.out.println(
"表删除成功!");
}
catch (MasterNotRunningException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (ZooKeeperConnectionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

/**
* 插入一行记录
*/
public void writeRow(String tableName, String[] cfs) {
try {
HTable table
= new HTable(conf, tableName);
Put put
= new Put(Bytes.toBytes("row1"));
for (int j = 0; j ) {
put.add(Bytes.toBytes(cfs[j]),
Bytes.toBytes(String.valueOf(1)),
Bytes.toBytes(
"value_1"));
}
table.close();
System.out.println(
"添加成功!");
}
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

/**
* 删除一行记录
*/
public void deleteRow(String tableName, String rowKey) {
try {
HTable table
= new HTable(conf, tableName);
Delete dl
= new Delete(rowKey.getBytes());
table.delete(dl);
table.close();
System.out.println(
"删除行成功!");
}
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

/**
* 查找一条记录
*/
public static void selectRow(String tableName, String rowKey) {
try {
HTable table
= new HTable(conf, tableName);
Get g
= new Get(rowKey.getBytes());
Result rs
= table.get(g);
System.out.println(rs);
table.close();
System.out.println(
"查询完成!");
}
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

/**
* 查询表中所有的行
*/
public void scanner(String tableName) {
try {
HTable table
= new HTable(conf, tableName);
Scan s
= new Scan();
ResultScanner rs
= table.getScanner(s);
for (Result r : rs) {
// keyvalue
Cell[] cell = r.rawCells();
System.out.println(
"长度:" + cell.length);
for (int i = 0; i ) {
System.out.println("信息:"
+ new String(CellUtil.cloneFamily(cell[i])) + " "
+ new String(CellUtil.cloneQualifier(cell[i]))
+ " " + new String(CellUtil.cloneValue(cell[i]))
+ " " + new String(CellUtil.cloneRow(cell[i])));
}
System.out.println(
"\n-----------------------");
}
table.close();
System.out.println(
"执行结束!");
}
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

public static void main(String[] args) {
HbaseTest hbase
= new HbaseTest();
String tableName
= "test";
hbase.scanner(tableName);
}
}

 


推荐阅读
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • Spark与HBase结合处理大规模流量数据结构设计
    本文将详细介绍如何利用Spark和HBase进行大规模流量数据的分析与处理,包括数据结构的设计和优化方法。 ... [详细]
  • 本文介绍了如何利用ObjectMapper实现JSON与JavaBean之间的高效转换。ObjectMapper是Jackson库的核心组件,能够便捷地将Java对象序列化为JSON格式,并支持从JSON、XML以及文件等多种数据源反序列化为Java对象。此外,还探讨了在实际应用中如何优化转换性能,以提升系统整体效率。 ... [详细]
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • 在搭建Hadoop集群以处理大规模数据存储和频繁读取需求的过程中,经常会遇到各种配置难题。本文总结了作者在实际部署中遇到的典型问题,并提供了详细的解决方案,帮助读者避免常见的配置陷阱。通过这些经验分享,希望读者能够更加顺利地完成Hadoop集群的搭建和配置。 ... [详细]
  • 在数据仓库管理中,定时更新程序与查询SQL之间的冲突常常导致性能瓶颈和阻塞问题。为了解决这些问题,通常需要对SQL Server进行详细的性能诊断。常用的诊断工具包括系统动态管理视图(DMVs)和扩展事件(Extended Events),这些工具能够帮助识别和分析性能瓶颈的具体原因,从而采取有效的优化措施。 ... [详细]
  • 基于Node.js的高性能实时消息推送系统通过集成Socket.IO和Express框架,实现了高效的高并发消息转发功能。该系统能够支持大量用户同时在线,并确保消息的实时性和可靠性,适用于需要即时通信的应用场景。 ... [详细]
  • 本文详细介绍了在 Ubuntu 系统上搭建 Hadoop 集群时遇到的 SSH 密钥认证问题及其解决方案。通过本文,读者可以了解如何在多台虚拟机之间实现无密码 SSH 登录,从而顺利启动 Hadoop 集群。 ... [详细]
  • 第二十五天接口、多态
    1.java是面向对象的语言。设计模式:接口接口类是从java里衍生出来的,不是python原生支持的主要用于继承里多继承抽象类是python原生支持的主要用于继承里的单继承但是接 ... [详细]
  • 本文详细解析了客户端与服务器之间的交互过程,重点介绍了Socket通信机制。IP地址由32位的4个8位二进制数组成,分为网络地址和主机地址两部分。通过使用 `ipconfig /all` 命令,用户可以查看详细的IP配置信息。此外,文章还介绍了如何使用 `ping` 命令测试网络连通性,例如 `ping 127.0.0.1` 可以检测本机网络是否正常。这些技术细节对于理解网络通信的基本原理具有重要意义。 ... [详细]
  • 本文介绍了如何利用Struts1框架构建一个简易的四则运算计算器。通过采用DispatchAction来处理不同类型的计算请求,并使用动态Form来优化开发流程,确保代码的简洁性和可维护性。同时,系统提供了用户友好的错误提示,以增强用户体验。 ... [详细]
  • Java环境中Selenium Chrome驱动在大规模Web应用扩展时的性能限制分析 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
author-avatar
swa乄ycat曼颜
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有