第五小节 数据迁移
一.数据迁移方案
数据迁移,更多的场景是外部的数据源如何将数据写入到HBase
1.数据库RDBMS
1)sqoop
2)kettle ETL工具
3)其他方式
**写程序
**导出文件加载
2.数据文件(log)
1)flume:实时数据收集,将数据的数据插入到HBase
source -> channel -> sink
2)MapReduce
input file -> mapreduce -> hbase table
3)completebulkload
input file -> mapreduce -> hfile -> completebulkload -> hbase table
二.数据迁移实施
1.通过importtsv命令,将文件直接导入到HBase
export HADOOP_HOME=/opt/modules/hadoop-2.6.0-cdh5.5.0
export HBASE_HOME=/opt/modules/hbase-1.2.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0.jar \
importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:username,info:age,info:address \ //列簇信息
stutsv \ //HBase目标表
hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/stu.tsv //hdfs源文件
*必须是HDFS路径,本地路径不可以
2. 通过-Dimporttsv.bulk.output命令,我们可以将外部的数据文件直接生成一个HFile文件,然后通过completebulkload直接加载到HBase数据表中。
log文件 -> HFfile文件 ->HBase table表中
1)第一步:
export HADOOP_HOME=/opt/modules/hadoop-2.6.0-cdh5.5.0
export HBASE_HOME=/opt/modules/hbase-1.2.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0.jar \
importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:username,info:age,info:address \
-Dimporttsv.bulk.output=hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/hfoutput \ //临时HFfile文件目录
stutsv \
hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/stu.tsv
2)第二步:
export HADOOP_HOME=/opt/modules/hadoop-2.6.0-cdh5.5.0
export HBASE_HOME=/opt/modules/hbase-1.2.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0.jar \
completebulkload \ //将临时文件加载到HBase表中
hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/hfoutput \
stutsv
3.不同文件中数据分割符的处理
export HADOOP_HOME=/opt/modules/hadoop-2.6.0-cdh5.5.0
export HBASE_HOME=/opt/modules/hbase-1.2.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0.jar \
importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:username,info:age,info:address \
-Dimporttsv.bulk.output=hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/hfoutputcsv \
-Dimporttsv.separator=, \ //源端以逗号分隔
stutsv \
hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/stu.csv
export HADOOP_HOME=/opt/modules/hadoop-2.6.0-cdh5.5.0
export HBASE_HOME=/opt/modules/hbase-1.2.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0.jar \
completebulkload \
hdfs://bigdata-pro-m01.kfk.com:9000//user/kfk/hfoutputcsv \
stutsv
4.自定义MR程序生成 HFile文件(企业常用的方案)
1)第一步:
log文件 -> HFfile文件
import com.kfk.hbase.HBaseConstant;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class MRHFHBase extends Configured implements Tool {
//
// //复杂版本,不用循环(不适用于多字段)
// public static class HFMapper extends Mapper {
//
// // rowkey username age address
// // 00001, henry, 20, city-10
//
// ImmutableBytesWritable rowkey = new ImmutableBytesWritable();
// @Override
// protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//
// String[] values = value.toString().split(",");
// rowkey.set(Bytes.toBytes(values[0]));
//
// Put put = new Put(rowkey.get());
// put.addColumn(Bytes.toBytes(HBaseConstant.HBASE_STU_CF_INFO),Bytes.toBytes("username"),Bytes.toBytes(values[1]));
// put.addColumn(Bytes.toBytes(HBaseConstant.HBASE_STU_CF_INFO),Bytes.toBytes("age"),Bytes.toBytes(values[2]));
// put.addColumn(Bytes.toBytes(HBaseConstant.HBASE_STU_CF_INFO),Bytes.toBytes("address"),Bytes.toBytes(values[3]));
//
// context.write(rowkey,put);
// }
// }
//简单版本,采用循环(适用于字段多)
public static class HFMapper extends Mapper{
// rowkey username age address
// 00001, henry, 20, city-10
ImmutableBytesWritable rowkey = new ImmutableBytesWritable();
String[] COLUMN = new String[]{"username","age","address"};
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split(",");
rowkey.set(Bytes.toBytes(values[0]));
Put put = new Put(rowkey.get());
for(int index = 1 ; index
put.addColumn(Bytes.toBytes(HBaseConstant.HBASE_STU_CF_INFO),Bytes.toBytes(COLUMN[index]),Bytes.toBytes(values[index]));
}
context.write(rowkey,put);
}
}
//driver组装
public int run(String args[]) throws Exception{
//get configuration
Configuration cOnfiguration= this.getConf();
//create job
Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
job.setJarByClass(this.getClass());
//input
Path inputpath = new Path(args[0]);
FileInputFormat.addInputPath(job,inputpath);
//map
job.setMapperClass(HFMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//获取链接信息
TableName tableName = TableName.valueOf(HBaseConstant.HBASE_STU);
Connection cOnnection= ConnectionFactory.createConnection();
Table table = connection.getTable(tableName);
RegionLocator regiOnLocator= connection.getRegionLocator(tableName);
HFileOutputFormat2.configureIncrementalLoad(job,table,regionLocator);
//标示
boolean isSuc = job.waitForCompletion(true);
return (isSuc) ? 0 : 1 ;
}
public static void main(String[] args) {
//demo
// args = new String[]{
// "hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/stu.csv", //hdfs上源文件路径
// "hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/hfcsv-output" //hdfs上HF临时文件路径
// };
Configuration cOnfiguration= HBaseConfiguration.create();
try{
Path outputFilePath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if(fileSystem.exists(outputFilePath)){
fileSystem.delete(outputFilePath,true);
}
int status = ToolRunner.run(configuration,new MRHFHBase(),args);
System.exit(status);
}catch (Exception e){
e.printStackTrace();
}
}
}
2)第二步:上传Jar包并执行
export HADOOP_HOME=/opt/modules/hadoop-2.6.0-cdh5.5.0
export HBASE_HOME=/opt/modules/hbase-1.2.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar /opt/jars/student.jar hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/stu.csv hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/hfcsv-output1
3)第三步:加载HBase数据
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0.jar completebulkload \
hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/hfcsv-output1 \
stutsv