热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

HBaseBulkLoading

   将数据导入到HBase有三种方式:(1) Mapreduce,输出为TableOutputFormat.(2) 用HBase API .(

   将数据导入到HBase有三种方式:(1) Mapreduce,输出为TableOutputFormat.(2) 用HBase API .(3)Bulk Loading。对于大量的数据入库,第三种数据是最为有效的。

  下图描述了Bulk Loading的过程:先将数据(MySQL ,Oracle ,文本文件等)加载到HDFS,通过MapReduce 将数据做成HFile (HFileOutPutForm)。然后使用HBase提供的CompleteBulkLoad(LoadIncrementalHFiles)工具加载到HBase中,这个过程很快,而且不很耗内存,不影响在线的Hbase 集群的正常操作。因为这个过程不需要结果WAL 和Memstore.

 

注意事项:

(1)配置一个total order partitioner。

(2)reduce 个数要和表的region 数目匹配。

(3)MR 输出的Key/Value 类型要和HFileOutPutFormat的匹配。

(4)reduce 采用KeyValueSortReducer 或者PutSortReducer。

应用场景:

(1)集群上线,原始数据集加载。

(2)数据增量。需要定期将MySql(Oracle) 的数据导入HBase。

(3)经常性的大批量入库。

对于CSV文件的加载:


hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-
security.jar importtsv
-Dimporttsv.separator=,
-Dimporttsv.bulk.output=output
-Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv

  该文件的数据格式为---> rowkey,列:值 。

  导入到的表名为wordcount ,数据文件为word_count.csv

  这样做,不会生成wordcount表。

 执行

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output wordcount
 入库完成。


hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-
security.jar importtsv
-Dimporttsv.separator=,
-Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv

 这样做一步到位,直接入库。

 或者用

HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-VERSION.jar completebulkload
同样 一步到位,直接入库。


下面是一个MR生成HFile的例子:



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
* HBase bulk import example
* Data preparation MapReduce job driver
*
    *
  1. args[0]: HDFS input path
  2. *
  3. args[1]: HDFS output path
  4. *
  5. args[2]: HBase table name
  6. *
    */
    public class Driver {
      public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        args = new GenericOptionsParser(conf, args).getRemainingArgs();

        /*
    * NBA Final 2010 game 1 tip-off time (seconds from epoch)
    * Thu, 03 Jun 2010 18:00:00 PDT
    */
        conf.setInt("epoch.seconds.tipoff", 1275613200);
        conf.set("hbase.table.name", args[2]);
        
        // Load hbase-site.xml
        HBaseConfiguration.addHbaseResources(conf);

        Job job = new Job(conf, "HBase Bulk Import Example");
        job.setJarByClass(HBaseKVMapper.class);

        job.setMapperClass(HBaseKVMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(KeyValue.class);

        job.setInputFormatClass(TextInputFormat.class);

        HTable hTable = new HTable(args[2]);
        
        // Auto configure partitioner and reducer
        HFileOutputFormat.configureIncrementalLoad(job, hTable);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
      }

    }

    import java.io.IOException;
    import java.util.Locale;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.joda.time.DateTime;
    import org.joda.time.DateTimeZone;
    import org.joda.time.format.DateTimeFormat;
    import org.joda.time.format.DateTimeFormatter;
    import au.com.bytecode.opencsv.CSVParser;
    /**
    * HBase bulk import example
    *

    * Parses Facebook and Twitter messages from CSV files and outputs
    * .
    *

    * The ImmutableBytesWritable key is used by the TotalOrderPartitioner to map it
    * into the correct HBase table region.
    *

    * The KeyValue value holds the HBase mutation information (column family,
    * column, and value)
    */
    public class HBaseKVMapper extends
    Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
    final static byte[] SRV_COL_FAM &#61; "srv".getBytes();
    final static int NUM_FIELDS &#61; 16;
    CSVParser csvParser &#61; new CSVParser();
    int tipOffSeconds &#61; 0;
    String tableName &#61; "";
    DateTimeFormatter p &#61; DateTimeFormat.forPattern("MMM dd, yyyy HH:mm:ss")
    .withLocale(Locale.US).withZone(DateTimeZone.forID("PST8PDT"));
    ImmutableBytesWritable hKey &#61; new ImmutableBytesWritable();
    KeyValue kv;
    /** {&#64;inheritDoc} */
    &#64;Override
    protected void setup(Context context) throws IOException,
    InterruptedException {
    Configuration c &#61; context.getConfiguration();
    tipOffSeconds &#61; c.getInt("epoch.seconds.tipoff", 0);
    tableName &#61; c.get("hbase.table.name");
    }
    /** {&#64;inheritDoc} */
    &#64;Override
    protected void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
    if (value.find("Service,Term,") > -1) {
    // Skip header
    return;
    }
    String[] fields &#61; null;
    try {
    fields &#61; csvParser.parseLine(value.toString());
    } catch (Exception ex) {
    context.getCounter("HBaseKVMapper", "PARSE_ERRORS").increment(1);
    return;
    }
    if (fields.length !&#61; NUM_FIELDS) {
    context.getCounter("HBaseKVMapper", "INVALID_FIELD_LEN").increment(1);
    return;
    }
    // Get game offset in seconds from tip-off
    DateTime dt &#61; null;
    try {
    dt &#61; p.parseDateTime(fields[9]);
    } catch (Exception ex) {
    context.getCounter("HBaseKVMapper", "INVALID_DATE").increment(1);
    return;
    }
    int gameOffset &#61; (int) ((dt.getMillis() / 1000) - tipOffSeconds);
    String offsetForKey &#61; String.format("%04d", gameOffset);
    String username &#61; fields[2];
    if (username.equals("")) {
    username &#61; fields[3];
    }
    // Key: e.g. "1200:twitter:jrkinley"
    hKey.set(String.format("%s:%s:%s", offsetForKey, fields[0], username)
    .getBytes());
    // Service columns
    if (!fields[0].equals("")) {
    kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_SERVICE.getColumnName(), fields[0].getBytes());
    context.write(hKey, kv);
    }
    if (!fields[1].equals("")) {
    kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_TERM.getColumnName(), fields[1].getBytes());
    context.write(hKey, kv);
    }
    if (!fields[2].equals("")) {
    kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_USERNAME.getColumnName(), fields[2].getBytes());
    context.write(hKey, kv);
    }
    if (!fields[3].equals("")) {
    kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_NAME.getColumnName(), fields[3].getBytes());
    context.write(hKey, kv);
    }
    if (!fields[4].equals("")) {
    kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_UPDATE.getColumnName(), fields[4].getBytes());
    context.write(hKey, kv);
    }
    if (!fields[9].equals("")) {
    kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_TIME.getColumnName(), fields[9].getBytes());
    context.write(hKey, kv);
    }
    context.getCounter("HBaseKVMapper", "NUM_MSGS").increment(1);
    /*
    * Output number of messages per quarter and before/after game. This should
    * correspond to the number of messages per region in HBase
    */
    if (gameOffset < 0) {
    context.getCounter("QStats", "BEFORE_GAME").increment(1);
    } else if (gameOffset < 900) {
    context.getCounter("QStats", "Q1").increment(1);
    } else if (gameOffset < 1800) {
    context.getCounter("QStats", "Q2").increment(1);
    } else if (gameOffset < 2700) {
    context.getCounter("QStats", "Q3").increment(1);
    } else if (gameOffset < 3600) {
    context.getCounter("QStats", "Q4").increment(1);
    } else {
    context.getCounter("QStats", "AFTER_GAME").increment(1);
    }
    }
    }






    /**
    * HBase table columns for the &#39;srv&#39; column family
    */
    public enum HColumnEnum {
      SRV_COL_SERVICE ("service".getBytes()),
      SRV_COL_TERM ("term".getBytes()),
      SRV_COL_USERNAME ("username".getBytes()),
      SRV_COL_NAME ("name".getBytes()),
      SRV_COL_UPDATE ("update".getBytes()),
      SRV_COL_TIME ("pdt".getBytes());
     
      private final byte[] columnName;
      
      HColumnEnum (byte[] column) {
        this.columnName &#61; column;
      }

      public byte[] getColumnName() {
        return this.columnName;
      }
    }













    推荐阅读
    • Hadoop2.6.0 + 云centos +伪分布式只谈部署
      3.0.3玩不好,现将2.6.0tar.gz上传到usr,chmod-Rhadoop:hadophadoop-2.6.0,rm掉3.0.32.在etcp ... [详细]
    • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
      本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
    • eclipse学习(第三章:ssh中的Hibernate)——11.Hibernate的缓存(2级缓存,get和load)
      本文介绍了eclipse学习中的第三章内容,主要讲解了ssh中的Hibernate的缓存,包括2级缓存和get方法、load方法的区别。文章还涉及了项目实践和相关知识点的讲解。 ... [详细]
    • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
    • 本文介绍了在sqoop1.4.*版本中,如何实现自定义分隔符的方法及步骤。通过修改sqoop生成的java文件,并重新编译,可以满足实际开发中对分隔符的需求。具体步骤包括修改java文件中的一行代码,重新编译所需的hadoop包等。详细步骤和编译方法在本文中都有详细说明。 ... [详细]
    • Java如何导入和导出Excel文件的方法和步骤详解
      本文详细介绍了在SpringBoot中使用Java导入和导出Excel文件的方法和步骤,包括添加操作Excel的依赖、自定义注解等。文章还提供了示例代码,并将代码上传至GitHub供访问。 ... [详细]
    • 使用Spring AOP实现切面编程的步骤和注意事项
      本文介绍了使用Spring AOP实现切面编程的步骤和注意事项。首先解释了@EnableAspectJAutoProxy、@Aspect、@Pointcut等注解的作用,并介绍了实现AOP功能的方法。然后详细介绍了创建切面、编写测试代码的过程,并展示了测试结果。接着讲解了关于环绕通知的使用方法,并修改了FirstTangent类以添加环绕通知方法。最后介绍了利用AOP拦截注解的方法,只需修改全局切入点即可实现。使用Spring AOP进行切面编程可以方便地实现对代码的增强和拦截。 ... [详细]
    • 使用freemaker生成Java代码的步骤及示例代码
      本文介绍了使用freemaker这个jar包生成Java代码的步骤,通过提前编辑好的模板,可以避免写重复代码。首先需要在springboot的pom.xml文件中加入freemaker的依赖包。然后编写模板,定义要生成的Java类的属性和方法。最后编写生成代码的类,通过加载模板文件和数据模型,生成Java代码文件。本文提供了示例代码,并展示了文件目录结构。 ... [详细]
    • 本文整理了Java中org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc.getTypeInfo()方法的一些代码示例,展 ... [详细]
    • ZooKeeper 学习
      前言相信大家对ZooKeeper应该不算陌生。但是你真的了解ZooKeeper是个什么东西吗?如果别人面试官让你给他讲讲ZooKeeper是个什么东西, ... [详细]
    • importorg.apache.hadoop.hdfs.DistributedFileSystem;导入方法依赖的package包类privatevoidtestHSyncOpe ... [详细]
    • Maven构建Hadoop,
      Maven构建Hadoop工程阅读目录序Maven安装构建示例下载系列索引 序  上一篇,我们编写了第一个MapReduce,并且成功的运行了Job,Hadoop1.x是通过ant ... [详细]
    • 动量|收益率_基于MT策略的实战分析
      篇首语:本文由编程笔记#小编为大家整理,主要介绍了基于MT策略的实战分析相关的知识,希望对你有一定的参考价值。基于MT策略的实战分析 ... [详细]
    • mapreduce源码分析总结
      这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
    • 基于词向量计算文本相似度1.测试数据:链接:https:pan.baidu.coms1fXJjcujAmAwTfsuTg2CbWA提取码:f4vx2.实验代码:imp ... [详细]
    author-avatar
    王小志2602928087
    这个家伙很懒,什么也没留下!
    PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
    Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有