热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

HBaseBulkLoading

   将数据导入到HBase有三种方式:(1) Mapreduce,输出为TableOutputFormat.(2) 用HBase API .(

   将数据导入到HBase有三种方式:(1) Mapreduce,输出为TableOutputFormat.(2) 用HBase API .(3)Bulk Loading。对于大量的数据入库,第三种数据是最为有效的。

  下图描述了Bulk Loading的过程:先将数据(MySQL ,Oracle ,文本文件等)加载到HDFS,通过MapReduce 将数据做成HFile (HFileOutPutForm)。然后使用HBase提供的CompleteBulkLoad(LoadIncrementalHFiles)工具加载到HBase中,这个过程很快,而且不很耗内存,不影响在线的Hbase 集群的正常操作。因为这个过程不需要结果WAL 和Memstore.

 

注意事项:

(1)配置一个total order partitioner。

(2)reduce 个数要和表的region 数目匹配。

(3)MR 输出的Key/Value 类型要和HFileOutPutFormat的匹配。

(4)reduce 采用KeyValueSortReducer 或者PutSortReducer。

应用场景:

(1)集群上线,原始数据集加载。

(2)数据增量。需要定期将MySql(Oracle) 的数据导入HBase。

(3)经常性的大批量入库。

对于CSV文件的加载:


hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-
security.jar importtsv
-Dimporttsv.separator=,
-Dimporttsv.bulk.output=output
-Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv

  该文件的数据格式为---> rowkey,列:值 。

  导入到的表名为wordcount ,数据文件为word_count.csv

  这样做,不会生成wordcount表。

 执行

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output wordcount
 入库完成。


hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-
security.jar importtsv
-Dimporttsv.separator=,
-Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv

 这样做一步到位,直接入库。

 或者用

HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-VERSION.jar completebulkload
同样 一步到位,直接入库。


下面是一个MR生成HFile的例子:



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
* HBase bulk import example
* Data preparation MapReduce job driver
*
    *
  1. args[0]: HDFS input path
  2. *
  3. args[1]: HDFS output path
  4. *
  5. args[2]: HBase table name
  6. *
    */
    public class Driver {
      public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        args = new GenericOptionsParser(conf, args).getRemainingArgs();

        /*
    * NBA Final 2010 game 1 tip-off time (seconds from epoch)
    * Thu, 03 Jun 2010 18:00:00 PDT
    */
        conf.setInt("epoch.seconds.tipoff", 1275613200);
        conf.set("hbase.table.name", args[2]);
        
        // Load hbase-site.xml
        HBaseConfiguration.addHbaseResources(conf);

        Job job = new Job(conf, "HBase Bulk Import Example");
        job.setJarByClass(HBaseKVMapper.class);

        job.setMapperClass(HBaseKVMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(KeyValue.class);

        job.setInputFormatClass(TextInputFormat.class);

        HTable hTable = new HTable(args[2]);
        
        // Auto configure partitioner and reducer
        HFileOutputFormat.configureIncrementalLoad(job, hTable);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
      }

    }

    import java.io.IOException;
    import java.util.Locale;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.joda.time.DateTime;
    import org.joda.time.DateTimeZone;
    import org.joda.time.format.DateTimeFormat;
    import org.joda.time.format.DateTimeFormatter;
    import au.com.bytecode.opencsv.CSVParser;
    /**
    * HBase bulk import example
    *

    * Parses Facebook and Twitter messages from CSV files and outputs
    * .
    *

    * The ImmutableBytesWritable key is used by the TotalOrderPartitioner to map it
    * into the correct HBase table region.
    *

    * The KeyValue value holds the HBase mutation information (column family,
    * column, and value)
    */
    public class HBaseKVMapper extends
    Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
    final static byte[] SRV_COL_FAM &#61; "srv".getBytes();
    final static int NUM_FIELDS &#61; 16;
    CSVParser csvParser &#61; new CSVParser();
    int tipOffSeconds &#61; 0;
    String tableName &#61; "";
    DateTimeFormatter p &#61; DateTimeFormat.forPattern("MMM dd, yyyy HH:mm:ss")
    .withLocale(Locale.US).withZone(DateTimeZone.forID("PST8PDT"));
    ImmutableBytesWritable hKey &#61; new ImmutableBytesWritable();
    KeyValue kv;
    /** {&#64;inheritDoc} */
    &#64;Override
    protected void setup(Context context) throws IOException,
    InterruptedException {
    Configuration c &#61; context.getConfiguration();
    tipOffSeconds &#61; c.getInt("epoch.seconds.tipoff", 0);
    tableName &#61; c.get("hbase.table.name");
    }
    /** {&#64;inheritDoc} */
    &#64;Override
    protected void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
    if (value.find("Service,Term,") > -1) {
    // Skip header
    return;
    }
    String[] fields &#61; null;
    try {
    fields &#61; csvParser.parseLine(value.toString());
    } catch (Exception ex) {
    context.getCounter("HBaseKVMapper", "PARSE_ERRORS").increment(1);
    return;
    }
    if (fields.length !&#61; NUM_FIELDS) {
    context.getCounter("HBaseKVMapper", "INVALID_FIELD_LEN").increment(1);
    return;
    }
    // Get game offset in seconds from tip-off
    DateTime dt &#61; null;
    try {
    dt &#61; p.parseDateTime(fields[9]);
    } catch (Exception ex) {
    context.getCounter("HBaseKVMapper", "INVALID_DATE").increment(1);
    return;
    }
    int gameOffset &#61; (int) ((dt.getMillis() / 1000) - tipOffSeconds);
    String offsetForKey &#61; String.format("%04d", gameOffset);
    String username &#61; fields[2];
    if (username.equals("")) {
    username &#61; fields[3];
    }
    // Key: e.g. "1200:twitter:jrkinley"
    hKey.set(String.format("%s:%s:%s", offsetForKey, fields[0], username)
    .getBytes());
    // Service columns
    if (!fields[0].equals("")) {
    kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_SERVICE.getColumnName(), fields[0].getBytes());
    context.write(hKey, kv);
    }
    if (!fields[1].equals("")) {
    kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_TERM.getColumnName(), fields[1].getBytes());
    context.write(hKey, kv);
    }
    if (!fields[2].equals("")) {
    kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_USERNAME.getColumnName(), fields[2].getBytes());
    context.write(hKey, kv);
    }
    if (!fields[3].equals("")) {
    kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_NAME.getColumnName(), fields[3].getBytes());
    context.write(hKey, kv);
    }
    if (!fields[4].equals("")) {
    kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_UPDATE.getColumnName(), fields[4].getBytes());
    context.write(hKey, kv);
    }
    if (!fields[9].equals("")) {
    kv &#61; new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_TIME.getColumnName(), fields[9].getBytes());
    context.write(hKey, kv);
    }
    context.getCounter("HBaseKVMapper", "NUM_MSGS").increment(1);
    /*
    * Output number of messages per quarter and before/after game. This should
    * correspond to the number of messages per region in HBase
    */
    if (gameOffset < 0) {
    context.getCounter("QStats", "BEFORE_GAME").increment(1);
    } else if (gameOffset < 900) {
    context.getCounter("QStats", "Q1").increment(1);
    } else if (gameOffset < 1800) {
    context.getCounter("QStats", "Q2").increment(1);
    } else if (gameOffset < 2700) {
    context.getCounter("QStats", "Q3").increment(1);
    } else if (gameOffset < 3600) {
    context.getCounter("QStats", "Q4").increment(1);
    } else {
    context.getCounter("QStats", "AFTER_GAME").increment(1);
    }
    }
    }






    /**
    * HBase table columns for the &#39;srv&#39; column family
    */
    public enum HColumnEnum {
      SRV_COL_SERVICE ("service".getBytes()),
      SRV_COL_TERM ("term".getBytes()),
      SRV_COL_USERNAME ("username".getBytes()),
      SRV_COL_NAME ("name".getBytes()),
      SRV_COL_UPDATE ("update".getBytes()),
      SRV_COL_TIME ("pdt".getBytes());
     
      private final byte[] columnName;
      
      HColumnEnum (byte[] column) {
        this.columnName &#61; column;
      }

      public byte[] getColumnName() {
        return this.columnName;
      }
    }













    推荐阅读
    • 流处理中的计数挑战与解决方案
      本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
    • 在搭建Hadoop集群以处理大规模数据存储和频繁读取需求的过程中,经常会遇到各种配置难题。本文总结了作者在实际部署中遇到的典型问题,并提供了详细的解决方案,帮助读者避免常见的配置陷阱。通过这些经验分享,希望读者能够更加顺利地完成Hadoop集群的搭建和配置。 ... [详细]
    • HBase在金融大数据迁移中的应用与挑战
      随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
    • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
      技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
    • Hadoop的分布式架构改进与应用
      nsitionalENhttp:www.w3.orgTRxhtml1DTDxhtml1-transitional.dtd ... [详细]
    • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
    • 从0到1搭建大数据平台
      从0到1搭建大数据平台 ... [详细]
    • 如何高效启动大数据应用之旅?
      在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
    • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
    • 本文详细介绍了HDFS的基础知识及其数据读写机制。首先,文章阐述了HDFS的架构,包括其核心组件及其角色和功能。特别地,对NameNode进行了深入解析,指出其主要负责在内存中存储元数据、目录结构以及文件块的映射关系,并通过持久化方案确保数据的可靠性和高可用性。此外,还探讨了DataNode的角色及其在数据存储和读取过程中的关键作用。 ... [详细]
    • Python 数据分析领域不仅拥有高质量的开发环境,还提供了众多功能强大的第三方库。本文将介绍六个关键步骤,帮助读者掌握 Python 数据分析的核心技能,并深入探讨六款虽不广为人知但却极具潜力的数据处理库,如 Pandas 的替代品和新兴的可视化工具,助力数据科学家和分析师提升工作效率。 ... [详细]
    • NoSQL数据库,即非关系型数据库,有时也被称作Not Only SQL,是一种区别于传统关系型数据库的管理系统。这类数据库设计用于处理大规模、高并发的数据存储与查询需求,特别适用于需要快速读写大量非结构化或半结构化数据的应用场景。NoSQL数据库通过牺牲部分一致性来换取更高的可扩展性和性能,支持分布式部署,能够有效应对互联网时代的海量数据挑战。 ... [详细]
    • 本文介绍了如何使用Hive分析用户最长连续登录天数的方法。首先对数据进行排序,然后计算相邻日期之间的差值,接着按用户ID分组并累加连续登录天数,最后求出每个用户的最大连续登录天数。此外,还探讨了该方法在其他领域的应用,如股票市场中最大连续涨停天数的分析。 ... [详细]
    • 数据读取hadoopFileParameters:path–pathtoHadoopfileinputFormatClass–fullyqualifiedclassnameo ... [详细]
    • Storm集成Kakfa
      一、整合说明Storm官方对Kafka的整合分为两个版本,官方说明文档分别如下:StormKafkaIntegratio ... [详细]
    author-avatar
    王小志2602928087
    这个家伙很懒,什么也没留下!
    PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
    Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有