将数据导入到HBase有三种方式:(1) Mapreduce,输出为TableOutputFormat
.(2) 用HBase API .(3)
Bulk Loading。对于大量的数据入库,第三种数据是最为有效的。
下图描述了
Bulk Loading的过程:先将数据(MySQL ,Oracle ,文本文件等)加载到HDFS,通过MapReduce 将数据做成HFile (HFileOutPutForm)。然后使用HBase提供的CompleteBulkLoad(LoadIncrementalHFiles)工具加载到HBase中,这个过程很快,而且不很耗内存,不影响在线的Hbase 集群的正常操作。因为这个过程不需要结果WAL 和Memstore.
注意事项:
(1)配置一个total order partitioner。
(2)reduce 个数要和表的region 数目匹配。
(3)MR 输出的Key/Value 类型要和HFileOutPutFormat的匹配。
(4)reduce 采用KeyValueSortReducer 或者PutSortReducer。
应用场景:
(1)集群上线,原始数据集加载。
(2)数据增量。需要定期将MySql(Oracle) 的数据导入HBase。
(3)经常性的大批量入库。
对于CSV文件的加载:
hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-
security.jar importtsv
-Dimporttsv.separator=,
-Dimporttsv.bulk.output=output
-Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv
该文件的数据格式为---> rowkey,列:值 。
导入到的表名为wordcount ,数据文件为word_count.csv
这样做,不会生成wordcount表。
执行
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output wordcount
入库完成。
hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-
security.jar importtsv
-Dimporttsv.separator=,
-Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv
这样做一步到位,直接入库。
或者用
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-VERSION.jar completebulkload
同样 一步到位,直接入库。
下面是一个MR生成HFile的例子:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* HBase bulk import example
* Data preparation MapReduce job driver
* args[0]: HDFS input path
* args[1]: HDFS output path
* args[2]: HBase table name
*
*/
public class Driver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
/*
* NBA Final 2010 game 1 tip-off time (seconds from epoch)
* Thu, 03 Jun 2010 18:00:00 PDT
*/
conf.setInt("epoch.seconds.tipoff", 1275613200);
conf.set("hbase.table.name", args[2]);
// Load hbase-site.xml
HBaseConfiguration.addHbaseResources(conf);
Job job = new Job(conf, "HBase Bulk Import Example");
job.setJarByClass(HBaseKVMapper.class);
job.setMapperClass(HBaseKVMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setInputFormatClass(TextInputFormat.class);
HTable hTable = new HTable(args[2]);
// Auto configure partitioner and reducer
HFileOutputFormat.configureIncrementalLoad(job, hTable);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
import java.io.IOException;
import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import au.com.bytecode.opencsv.CSVParser;
/**
* HBase bulk import example
* Parses Facebook and Twitter messages from CSV files and outputs
* .
* The ImmutableBytesWritable key is used by the TotalOrderPartitioner to map it
* into the correct HBase table region.
* The KeyValue value holds the HBase mutation information (column family,
* column, and value)
*/
public class HBaseKVMapper extends
Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
final static byte[] SRV_COL_FAM &#61; "srv".getBytes();
final static int NUM_FIELDS &#61; 16;
CSVParser csvParser &#61; new CSVParser();
int tipOffSeconds &#61; 0;
String tableName &#61; "";
DateTimeFormatter p &#61; DateTimeFormat.forPattern("MMM dd, yyyy HH:mm:ss")
.withLocale(Locale.US).withZone(DateTimeZone.forID("PST8PDT"));
ImmutableBytesWritable hKey &#61; new ImmutableBytesWritable();
KeyValue kv;
/** {&#64;inheritDoc} */
&#64;Override
protected void setup(Context context) throws IOException,
InterruptedException {
Configuration c &#61; context.getConfiguration();
tipOffSeconds &#61; c.getInt("epoch.seconds.tipoff", 0);
tableName &#61; c.get("hbase.table.name");
}
/** {&#64;inheritDoc} */
&#64;Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (value.find("Service,Term,") > -1) {
// Skip header
return;
}
String[] fields &#61; null;
try {
fields &#61; csvParser.parseLine(value.toString());
} catch (Exception ex) {
context.getCounter("HBaseKVMapper", "PARSE_ERRORS").increment(1);
return;
}
if (fields.length !&#61; NUM_FIELDS) {
context.getCounter("HBaseKVMapper", "INVALID_FIELD_LEN").increment(1);
return;
}
// Get game offset in seconds from tip-off
DateTime dt &#61; null;
try {
dt &#61; p.parseDateTime(fields[9]);
} catch (Exception ex) {
context.getCounter("HBaseKVMapper", "INVALID_DATE").increment(1);
return;
}
int gameOffset &#61; (int) ((dt.getMillis() / 1000) - tipOffSeconds);
String offsetForKey &#61; String.format("%04d", gameOffset);
String username &#61; fields[2];
if (username.equals("")) {
username &#61; fields[3];
}
// Key: e.g. "1200:twitter:jrkinley"
hKey.set(String.format("%s:%s:%s", offsetForKey, fields[0], username)
.getBytes());
// Service columns
if (!fields[0].equals("")) {
kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_SERVICE.getColumnName(), fields[0].getBytes());
context.write(hKey, kv);
}
if (!fields[1].equals("")) {
kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_TERM.getColumnName(), fields[1].getBytes());
context.write(hKey, kv);
}
if (!fields[2].equals("")) {
kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_USERNAME.getColumnName(), fields[2].getBytes());
context.write(hKey, kv);
}
if (!fields[3].equals("")) {
kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_NAME.getColumnName(), fields[3].getBytes());
context.write(hKey, kv);
}
if (!fields[4].equals("")) {
kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_UPDATE.getColumnName(), fields[4].getBytes());
context.write(hKey, kv);
}
if (!fields[9].equals("")) {
kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_TIME.getColumnName(), fields[9].getBytes());
context.write(hKey, kv);
}
context.getCounter("HBaseKVMapper", "NUM_MSGS").increment(1);
/*
* Output number of messages per quarter and before/after game. This should
* correspond to the number of messages per region in HBase
*/
if (gameOffset < 0) {
context.getCounter("QStats", "BEFORE_GAME").increment(1);
} else if (gameOffset < 900) {
context.getCounter("QStats", "Q1").increment(1);
} else if (gameOffset < 1800) {
context.getCounter("QStats", "Q2").increment(1);
} else if (gameOffset < 2700) {
context.getCounter("QStats", "Q3").increment(1);
} else if (gameOffset < 3600) {
context.getCounter("QStats", "Q4").increment(1);
} else {
context.getCounter("QStats", "AFTER_GAME").increment(1);
}
}
}
/**
* HBase table columns for the &#39;srv&#39; column family
*/
public enum HColumnEnum {
SRV_COL_SERVICE ("service".getBytes()),
SRV_COL_TERM ("term".getBytes()),
SRV_COL_USERNAME ("username".getBytes()),
SRV_COL_NAME ("name".getBytes()),
SRV_COL_UPDATE ("update".getBytes()),
SRV_COL_TIME ("pdt".getBytes());
private final byte[] columnName;
HColumnEnum (byte[] column) {
this.columnName &#61; column;
}
public byte[] getColumnName() {
return this.columnName;
}
}