引言
HBase作为Apache软件基金会的一个顶级项目,目前在业界有着广泛的使用。HBase是Google BigTable的开源实现,运行在HDFS文件系统之上,为Hadoop提供类似于BigTable的分布式数据存放服务。在Eric Brewer的CAP理论中,HBase属于CP类型的系统,即保证了系统的一致性和对分区容忍性,具体可以自行google CAP理论。
最近在做一个大数据项目,需要将原本oracle数据库中的数据导入到HBase中,所以做了一个简单的HBase数据导入总结。目前我已知的并且实验可行的方法有一下几种:
1. JDBC & HBase Client API
2. MapReduce Job/TableReducer
3. Importtsv(unbulk load)
4. bulk load(importtsv | MapReduce Job)
5. Sqoop
先简要介绍下每种方法:
方法1:最基本的数据导入方法。首先通过JDBC将原本关系型数据库中的数据读出到内存中,然后在使用HBase自带的客户端API将数据put到相应的表中。这种方法通用性强,只要写好接口就可以用,但是效率并不高。
方法2:使用这种方法之前其实是需要先将数据导出到本地,以文本的形式保存,然后使用TableReudcer类编写MapReduce job。这种方法需要频繁的I/O操作,所以效率不高,容易导致HBase节点的不稳定。
方法3:importtsv是HBase内置的数据导入工具,目的是将tsv格式的文件加载到HBase中,本质上它是通过调用MapReudce Job实现数据导入的。注意:使用该方法,需要提前将数据导出到本地,以tsv格式存储。unbulk load模式的importtsv效果一般,适用于小型的数据。
方法4:bulk load是一个快速大量数据高效导入工具,相比于importtsv效率更高。
方法5:Sqoop是apache软件基金会的一个项目,可以用来实现关系型数据库和hdfs,hbase,hive之间的数据高效传输。只需要做一些简单的配置,通过Sqoop命令行指令就可以方便的实现数据导入和导出。
下面具体介绍每种方法的做法:
JDBC & HBase Client API
此处以MySql为例。首先在MySql数据库中创建database ‘test’,然后创建一张表’Info’,这里可以使用可视化软件(例如workbench),也可以直接在命令行输入相应指令:
:~$mysql -u root -p #root用户登录mysql
#创建Info表
CREATE TABLE Info (`ID` INT NOT NULL,
`Name` VARCHAR(45) NOT NULL,
`Number` INT NOT NULL,
`Time` VARCHAR(45) NOT NULL,
PRIMARY KEY (`ID`));
然后使用load指令将准备好的数据导入到Info中。数据格式与Info各字段的一致即可。
load data local infile '/home/lvyang/Desktop/test.csv' into table Info fields terminated by ','
到此数据已经准备好了。下面就可以进行数据导出导入过程了。
由于需要使用MySql的数据读取接口,所以我们需要到官网下载相应的connector,并将其中包含的mysql-connector-java-版本号-bin.jar文件取出,添加到自己Project的依赖库中。如果对maven比较熟的,就可以忽视这些配置过程,直接配置pom.xml文件即可完成项目依赖设置,方便快捷。
JDBC数据读取:
public class JDBCUtils {
Connection cOnn=null;
ResultSet rs=null;
String databaseName=null;
String userName=null;
String password=null;
String url=null;
public JDBCUtils(String databaseName, String userName, String password, String url);
public void connect();
public ResultSet readData(String sql);
public void writeToConsole(ResultSet rs,String[] keys);
public boolean writeToLocal(ResultSet rs,String path);
public void close();
}
上面是JDBC工具类定义,可以根据自己的需求,自行添加或者删除方法。部分方法的实现如下,仅做参考:
public void connect(){
try {
Class.forName("com.mysql.jdbc.Driver"); //注册驱动
System.out.println("load mysql driver successfully!");
cOnn= (Connection) DriverManager.getConnection(url); //获得connection对象,完成数据库连接
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
}
public ResultSet readData(String sql){
try {
Statement stmt= (Statement) conn.createStatement(); //创建statement对象
rs=stmt.executeQuery(sql); //执行query命令,获取ResultSet
} catch (SQLException e) {
e.printStackTrace();
}
return rs;
}
public void close(){
if(rs!=null){
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(conn!=null){
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
到此JDBC读取数据部分已经完成,下面需要实现HBase数据导入功能:
public class HBaseUtils {
private static final Log LOG= LogFactory.getLog(HBaseUtils.class); //LOG用于输出部分关键信息
//Here I choose construct func to init configuration instance
//and then use connectionFactory to create init conn instance
//at last,I use conn to get Hadmin instance
//next I will use Hadmin to operate hbase tables
private Configuration cOnf=null;
private Admin Hadmin=null;
private Connection cOnn=null;
public HBaseUtils(Configuration conf);
public void connect();
public boolean isExist(String tableName);
public boolean createTable(String tableName,String columnFamily);
public boolean writeOne(HashMap data,String[] keys, String tableName,String columnFamily);
public boolean writeMore(List> list,String[] keys,String tableName,String columnFamily);
public boolean deleteTable(String tableName);
public void close();
}
部分方法的实现如下,仅做参考:
public void connect(){
try {
//create connection to hbase
cOnn= ConnectionFactory.createConnection(conf);
//get Hadmin which is the database manager
Hadmin=conn.getAdmin();
} catch (IOException e) {
e.printStackTrace();
if(conn!=null){
try {
conn.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
if(Hadmin!=null){
try {
Hadmin.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
public boolean createTable(String tableName,String columnFamily){
/**
* create table
* here I limit the number of column family to 1
* So here only can create one column family's table
* **/
TableName table_name= TableName.valueOf(tableName);
LOG.info("Create table:"+tableName+" now!");
HTableDescriptor tableDesc=new HTableDescriptor(table_name);
HColumnDescriptor columnDesc=new HColumnDescriptor(columnFamily);
tableDesc.addFamily(columnDesc);
try {
this.Hadmin.createTable(tableDesc);
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
public boolean writeOne(HashMap data,String[] keys, String tableName,String columnFamily){
/**
* write one data to table at one time
* here I choose the first key as the rowKey,because I think the first key usually is the primary key
* **/
boolean flag=false;
TableName table_name=TableName.valueOf(tableName);
byte[] column_family=columnFamily.getBytes();
try {
Table table=conn.getTable(table_name);
byte[] rowKey=data.get(keys[0]).toString().getBytes(); //construct HBase table's rowKey
Put put=new Put(rowKey);
for(int i=0;i byte[] key=keys[i].getBytes();
byte[] value=data.get(keys[i]).toString().getBytes();
put.addColumn(column_family,key,value);
}
table.put(put);
table.close();
flag=true;
} catch (IOException e) {
e.printStackTrace();
}
return flag;
}
public void close(){
/**
* close connection
* **/
LOG.info("Close connection to HBase!");
if(Hadmin!=null){
try {
Hadmin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(conn!=null){
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
到此,工具类已经基本完成,下面需要写一个主类:
public class deMain {
public static void main(String[] args){
//JDBC Init
Connection cOnn=null;
String sql="select * from Info";
String databaseName="test";
String userName="root";
String password="****";
String url="jdbc:mysql://localhost:3306/"+databaseName+"?user="+userName+"&password="
+password+"&useUnicode=true&characterEncoding=utf-8";
//HBase Client Init
String tableName="test";
String columnFamily="info";
Configuration cOnf= HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","127.0.0.1");
conf.set("hbase.master","localhost:9000");
}
//JDBC connection and read data
JDBCUtils ju=new JDBCUtils(databaseName,userName,password,url);
ju.connect();
ResultSet rs=ju.readData(sql);
//HBase connect
HBaseUtils hbu=new HBaseUtils(conf);
hbu.connect();
//依次读取rs中每条记录,并将其写入HBase相应表中即可
.........
//close all connection
hbu.close();
ju.close();
}
到此,大功告成!由于是通过IDE端运行hadoop程序,所以我们需要将需要用到的依赖库导入,而这个过程如果不借助maven的话,就会特别的痛苦。下图是我配置的项目依赖包,仅供参考。
注:hadoop2.7.2,hbase1.2.1,zookeeper3.4.6
结语
由于内容比较多,所有剩下的方法将会在下一篇博文中介绍。