热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

分拆TableSplit让多个mapper同时读取

分拆TableSplit让多个mapper同时读取默认情况下,一个region是一个tableSplit,对应一个mapper进行读取,但单mapper读取速度较慢,因此想着把默认一个tables

分拆TableSplit 让多个mapper同时读取


默认情况下,一个region是一个tableSplit,对应一个mapper进行读取,但单mapper读取速度较慢,因此想着把默认一个table split分拆成多个split,这样hadoop就能通过多个mapper读取。 

由于HBase不能像hadoop一样通过以下参数调整split大小,而实现多个mapper读取Java代码  收藏代码
  1. mapred.min.split.size  
  2. mapred.max.split.size  


所以目前想到的方法有两种,一是修改TableInputFormatBase,把默认的一个TableSplit分拆成多个,另外一种方法是,通过Coprocessor处理。这里选择修改TableInputFormatBase类。 

HBase权威指南里面有介绍怎么把HBase与MR结合,通过需要用到一下的辅助类实现把HBase表作为数据来源,读取数据: 
Java代码  收藏代码
  1. TableMapReduceUtil.initTableMapperJob(table[0].getBytes(), scan,  
  2.                     UserViewHisMapper2.class, Text.class, Text.class,  
  3.                     genRecommendations);  

而这个方法,最终是调用以下方法进行初始化设置的: 
Java代码  收藏代码
  1. public static void initTableMapperJob(byte[] table, Scan scan,  
  2.      Classextends TableMapper> mapper,  
  3.      Classextends WritableComparable> outputKeyClass,  
  4.      Classextends Writable> outputValueClass, Job job,  
  5.      boolean addDependencyJars)  
  6.  throws IOException {  
  7.      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,  
  8.              outputValueClass, job, addDependencyJars, TableInputFormat.class);  
  9.  }  


所以,思路就应该修改TableInputFormat这个类。而这个类的核心方法是继承了TableInputFormatBase: 

Java代码  收藏代码
  1. public class TableInputFormat extends TableInputFormatBase  
  2. implements Configurable   


最终要修改的则是TableInputFormatBase这个类,修改其以下方法: 

Java代码  收藏代码
  1. public List getSplits(JobContext context) throws IOException {}  


这个方法的核心是,获得table对应所有region的起始row,把每个region作为一个tableSplit: 
Java代码  收藏代码
  1. public List getSplits(JobContext context) throws IOException {  
  2. f (table == null) {  
  3.    throw new IOException("No table was provided.");  
  4.   
  5.   Pair<byte[][], byte[][]> keys = table.getStartEndKeys();  
  6.   if (keys == null || keys.getFirst() == null ||  
  7.       keys.getFirst().length == 0) {  
  8.     throw new IOException("Expecting at least one region.");  
  9.   }  
  10.   int count = 0;  
  11.   List splits = new ArrayList(keys.getFirst().length);  
  12.   for (int i = 0; i < keys.getFirst().length; i++) {  
  13.     if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {  
  14.       continue;  
  15.     }  
  16.     String regionLocation = table.getRegionLocation(keys.getFirst()[i]).  
  17.       getHostname();  
  18.     byte[] startRow = scan.getStartRow();  
  19.     byte[] stopRow = scan.getStopRow();  
  20.     // determine if the given start an stop key fall into the region  
  21.     if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||  
  22.          Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&  
  23.         (stopRow.length == 0 ||  
  24.          Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {  
  25.       byte[] splitStart = startRow.length == 0 ||  
  26.         Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?  
  27.           keys.getFirst()[i] : startRow;  
  28.       byte[] splitStop = (stopRow.length == 0 ||  
  29.         Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&  
  30.         keys.getSecond()[i].length > 0 ?  
  31.           keys.getSecond()[i] : stopRow;  
  32.       InputSplit split = new TableSplit(table.getTableName(),  
  33.         splitStart, splitStop, regionLocation);  
  34.       splits.add(split);  
  35.       if (LOG.isDebugEnabled())  
  36.         LOG.debug("getSplits: split -> " + (count++) + " -> " + split);  
  37.     }  
  38.   }  
  39.   return splits;  
  40. }  


这里要做的就是,把本来属于一个tableSplit的row在细分,分成自己希望的多个小split。但没有找到轻巧的实现,唯有不断迭代,把一个tableSplit的row全部取出,再拆分了,有点蛮力。 
以下是我的实现方法: 

Java代码  收藏代码
  1. public List getSplits(JobContext context) throws IOException {  
  2.     if (table == null) {  
  3.         throw new IOException("No table was provided.");  
  4.     }  
  5.     Pair<byte[][], byte[][]> keys = table.getStartEndKeys();  
  6.     if (keys == null || keys.getFirst() == null  
  7.             || keys.getFirst().length == 0) {  
  8.         throw new IOException("Expecting at least one region.");  
  9.     }  
  10.     int count = 0;  
  11.     List splits = new ArrayList(  
  12.             keys.getFirst().length);  
  13.     for (int i = 0; i < keys.getFirst().length; i++) {  
  14.         if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {  
  15.             continue;  
  16.         }  
  17.         String regionLocation = table.getRegionLocation(keys.getFirst()[i],true)  
  18.                 .getHostname();  
  19.         byte[] startRow = scan.getStartRow();  
  20.         byte[] stopRow = scan.getStopRow();  
  21.         // determine if the given start an stop key fall into the region  
  22.         if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || Bytes  
  23.                 .compareTo(startRow, keys.getSecond()[i]) < 0)  
  24.                 && (stopRow.length == 0 || Bytes.compareTo(stopRow,  
  25.                         keys.getFirst()[i]) > 0)) {  
  26.             byte[] splitStart = startRow.length == 0  
  27.                     || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys  
  28.                     .getFirst()[i] : startRow;  
  29.             byte[] splitStop = (stopRow.length == 0 || Bytes.compareTo(  
  30.                     keys.getSecond()[i], stopRow) <= 0)  
  31.                     && keys.getSecond()[i].length > 0 ? keys.getSecond()[i]  
  32.                     : stopRow;  
  33.   
  34.             Scan scan1 = new Scan();  
  35.             scan1.setStartRow(splitStart);  
  36.             scan1.setStopRow(splitStop);  
  37.             scan1.setFilter(new KeyOnlyFilter());  
  38.             scan1.setBatch(500);  
  39.               
  40.             ResultScanner resultscanner = table.getScanner(scan1);  
  41.               
  42.             //用来保存该region的所有key  
  43.             List rows = new ArrayList();  
  44.             //Iterator  it = resultscanner.iterator();  
  45.               
  46.             for(Result rs : resultscanner)  
  47.             {  
  48.                 if(rs.isEmpty())  
  49.                     continue;  
  50.                 rows.add(new String(rs.getRow()));  
  51.             }  
  52.               
  53.             int splitSize = rows.size() / mappersPerSplit;  
  54.               
  55.             for (int j = 0; j < mappersPerSplit; j++) {  
  56.                 TableSplit tablesplit = null;  
  57.                 if (j == mappersPerSplit - 1)  
  58.                     tablesplit = new TableSplit(table.getTableName(),  
  59.                             rows.get(j * splitSize).getBytes(),  
  60.                             rows.get(rows.size() - 1).getBytes(),  
  61.                             regionLocation);  
  62.                 else  
  63.                     tablesplit = new TableSplit(table.getTableName(),  
  64.                             rows.get(j * splitSize).getBytes(),  
  65.                             rows.get(j * splitSize + splitSize).getBytes(), regionLocation);  
  66.                 splits.add(tablesplit);  
  67.                 if (LOG.isDebugEnabled())  
  68.                     LOG.debug((new StringBuilder())  
  69.                             .append("getSplits: split -> ").append(i++)  
  70.                             .append(" -> ").append(tablesplit).toString());  
  71.             }  
  72.             resultscanner.close();                
  73.         }  
  74.     }  
  75.     return splits;  
  76. }  


通过配置设置需要拆分的split数。 

推荐阅读
  • 本文介绍了如何在 MapReduce 作业中使用 SequenceFileOutputFormat 生成 SequenceFile 文件,并详细解释了 SequenceFile 的结构和用途。 ... [详细]
  • oracle c3p0 dword 60,web_day10 dbcp c3p0 dbutils
    createdatabasemydbcharactersetutf8;alertdatabasemydbcharactersetutf8;1.自定义连接池为了不去经常创建连接和释放 ... [详细]
  • 本文介绍如何通过整合SparkSQL与Hive来构建高效的用户画像环境,提高数据处理速度和查询效率。 ... [详细]
  • 在Java开发中,保护代码安全是一个重要的课题。由于Java字节码容易被反编译,因此使用代码混淆工具如ProGuard变得尤为重要。本文将详细介绍如何使用ProGuard进行代码混淆,以及其基本原理和常见问题。 ... [详细]
  • 本文介绍了如何在Android应用中使用EventBus库在Fragment之间传递参数。通过具体的代码示例,详细说明了EventBus的使用方法和注意事项。 ... [详细]
  • mybatis 详解(七)一对一、一对多、多对多
    mybatis详解(七)------一 ... [详细]
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
  • 2012年9月12日优酷土豆校园招聘笔试题目解析与备考指南
    2012年9月12日,优酷土豆校园招聘笔试题目解析与备考指南。在选择题部分,有一道题目涉及中国人的血型分布情况,具体为A型30%、B型20%、O型40%、AB型10%。若需确保在随机选取的样本中,至少有一人为B型血的概率不低于90%,则需要选取的最少人数是多少?该问题不仅考察了概率统计的基本知识,还要求考生具备一定的逻辑推理能力。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • PHP中元素的计量单位是什么? ... [详细]
  • Storm集成Kakfa
    一、整合说明Storm官方对Kafka的整合分为两个版本,官方说明文档分别如下:StormKafkaIntegratio ... [详细]
  • hadoop3.1.2 first programdefault wordcount (Mac)
    hadoop3.1.2安装完成后的第一个实操示例程 ... [详细]
author-avatar
特贰的大妞
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有