热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

深入了解MapReduce计算引擎及Hive相关优化

MapRe

MapReduce整体处理过程

MapReduce运行所需要经过的环节

进一步分解为:

MapReduce作业输入

作业输入的核心是InputFormat类,用于MapReduce作业的输入规范,读取数据文件的规范。

输入格式类InputFormat

InputFormat涉及3个接口/类,即InputFormat、InputSplit和RecordReader。

  1. InputFormat接口

    InputFormat有两个方法

    • getSplits()方法:获取逻辑输入分片(InputSplit),逻辑分片用于指定输入到每个Mapper任务的文件大小。

    • getRecordReader()方法:获取记录读取器,记录读取器根据逻辑分配读取文件相应位置的数据,转化为k-v的形式给Mapper处理。

  2. InputSplit接口

    InputSplit有两个方法

    在获取逻辑分片的时候需要验证文件是否可以分割,文件存储时分块的大小和文件大小。对于一些压缩文件无法计算分片,会直接读取整个压缩文件。

    • getLength()方法:获取每个分片的大小。

    • getLocations()方法:获取每个分片所在的位置。

  3. RecordReader接口

    RecordReader有5个方法

    RecordReader通过这几个方法的配合运用,将数据转化为key-value形式的数据,输送给Mapper Task。

    InputFormat在Hive中的使用

    调节Map任务数需要一套算法,该算法也和InputFormat有密切的关系,具体如下:

    优化的核心就是

    MapReduce的Mapper

    Mapper类负责MapReduce计算引擎Map阶段业务逻辑的处理。

    Mapper类

    Mapper的核心Map方法是MapReduce提供给用户编写业务的主要接口之一,它的输入是一个键-值对的形式,输出也是一个键-值形式

    cleanup()、map()、run()和setup()方法之间的关系

    setup()方法主要用于初始化一些信息,以供map()中使用,如初始化数据库连接。

    cleanup方法主要是清理释放setup()、map()方法中用到的资源,例如,释放数据库连接、关闭打开的文件句柄等资源。

    Hive中与Mapper相关的配置

    Hive可以通过一些配置来影响Mapper的运行

    MapReduce的Reducer

    Reducer类是MapReduce处理Reduce阶段业务逻辑的地方

    Reducer类

    Reducer的核心Reduce方法是MapReduce提供给用户编写业务的另一个主要接口,它的输入是一个键-数组的形式,和Mapper的输入不太一样。

    Hive中与Reducer相关的配置

    MapReduce的Shuffle

    在Mapper的Map方法中,context.write()方法会将数据计算所在分区后写入到内存缓冲区,缓冲区的大小为mapreduce.task.io.sort.mb=100MB。当缓冲区缓存的数据达到一定的阀值mapreduce.map.sort.spill.percent=0.8, 即总缓冲区的80%时,将会启动新的线程将数据写入到HDFS临时目录中。这样设计的目的在于避免单行数据频繁写,以减轻磁盘的负载。这与关系型数据库提倡批量提交(commit)有相同的作用。在写入到HDFS的过程中,为了下游Reducer任务顺序快速拉取数据,会将数据进行排序后再写入临时文件中,当整个Map执行结束后会将临时文件归并成一个文件

    为什么不选用Mapper任务结束后直接推送到Reducer节点,这样可以节省写入到磁盘的操作,效率更高?因为采用缓存到HDFS,让Reducer主动来拉,当一个Reducer任务因为一些其他原因导致异常结束时,再启动一个新的Reducer依然能够读取到正确的数据。

    MapReduce的Map端聚合

    MapReduce的Map端聚合通常指代实现Combiner类。Combiner也是处理数据聚合,但不同于Reduce是聚合集群的全局数据。

    Combiner类

    Combiner是MapReduce计算引擎提供的另外一个可以供用户编程的接口

    使用Combiner的初衷是减少Shuffle过程的数据量,减轻系统的磁盘和网络的压力。

    Map端的聚合与Hive配置

    • mapred.reduce.tasks:设置Reducer的数量,默认值是-1,代表有系统根据需要自行决定Reducer的数量。

    • hive.exec.reducers.bytes.per.reducer:设置每个Reducer所能处理的数据量,在Hive 0.14版本以前默认是1000000000(1GB),Hive 0.14及之后的版本默认是256MB。输入到Reduce的数据量有1GB,那么将会拆分成4个Reducer任务。

    • hive.exec.reducers.max:设置一个作业运行的最大Reduce个数,默认值是999。

    • hive.multigroupby.singlereducer:表示如果一个SQL语句中有多个分组聚合操作,且分组是使用相同的字段,那么这些分组聚合操作可以用一个作业的Reduce完成,而不是分解成多个作业、多个Reduce完成。这可以减少作业重复读取和Shuffle的操作。

    • hive.mapred.reduce.tasks.speculative.execution:表示是否开启Reduce任务的推测执行。即系统在一个Reduce任务中执行进度远低于其他任务的执行进度,会尝试在另外的机器上启动一个相同的Reduce任务。

    • hive.optimize.reducededuplication:表示当数据需要按相同的键再次聚合时,则开启这个配置,可以减少重复的聚合操作。

    • hive.vectorized.execution.reduce.enabled:表示是否启用Reduce任务的向量化执行模式,默认是true。MapReduce计算引擎并不支持对Reduce阶段的向量化处理。

    • hive.vectorized.execution.reduce.groupby.enabled:表示是否移动Reduce 任务分组聚合查询的向量化模式,默认值为true。MapReduce计算引擎并不支持对Reduce阶段的向量化处理。

    • Mapper通过调用run()方法,在通过run()调用setup()方法,紧接着通过一个while循环调用map方法,将一行行数据循环发送给map()方法进行处理,跳出循环后再调用cleanup()方法,整个run()方法结束。

    • 增大Map个数,需要减少mapred.min.split.size的值,同时增大mapred.map.tasks的值。

    • 减少Map个数,需要增大mapred.min.split.size的值,减少mapred.map.tasks的值;

    • getCurrentKey()方法,用于获取当前的key。

    • getCurrentValue()方法,用于获取当前的value。

    • nextKeyValue()方法,读取下一个key-value对。

    • getProgress()方法,读取当前逻辑分片的进度。

    1. hive.map.aggr:默认值为true,表示开启Map端的聚合。开启和不开启Map端聚合的差别可以在执行计划中看到,详见下面两个案例执行计划的精简结构。

      Hive在默认开启hive.map.aggr的同时,引入了两个参数, hive.map.aggr.hash.min.reduction和hive.groupby.mapaggr.checkinterval,用于控制何时启用聚合

    2. hive.map.aggr.hash.min.reduction:是一个阈值,默认值是0.5。

    3. hive.groupby.mapaggr.checkinterval:默认值是100000。Hive在启用Combiner时会尝试取这个配置对应的数据量进行聚合,将聚合后的数据除以聚合前的数据,如果小于hive.map.aggr.hash.min.reduction会自动关闭。

    4. hive.map.aggr.hash.percentmemory:默认值是0.5。该值表示在进行Mapper端的聚合运行占用的最大内存。例如,分配给该节点的最大堆(xmx)为1024MB,那么聚合所能使用的最大Hash表内存是512MB,如果资源较为宽裕,可以适当调节这个参数。

    5. hive.map.aggr.hash.force.flush.memory.threshold:默认值是0.9。该值表示当在聚合时,所占用的Hash表内存超过0.9,将触发Hash表刷写磁盘的操作。例如Hash表内存是512MB,当Hash表的数据内存超过461MB时将触发Hash表写入到磁盘的操作。

    6. hive.vectorized.execution.enabled:表示是否开启向量模式,默认值为false。在run()方法中,我们看到map()方法是逐行处理数据,这样的操作容易产生更多的CPU指令和CPU上下文切换,导致系统的处理性能不高。

      目前MapReduce计算引擎只支持Map端的向量化执行模式, Tez和Spark计算引擎可以支持Map和Reduce端的向量化执行模式。

    7. hive.ignore.mapjoin.Hint:是否忽略SQL中MapJoin的Hint关键,在Hive 0.11版本之后默认值为true,即开启忽略Hint的关键字。如果要使用MapJoin的Hint关键字,一定要在使用前开启支持Hint语法,否则达不到预期的效果。

    8. hive.auto.convert.join:是否开启MapJoin自动优化,hive 0.11版本以前默认关闭,0.11及以后的版本默认开启。

    9. hive.smalltable.filesize or hive.mapjoin.smalltable.filesize:默认值2500000(25MB)如果大小表在进行表连接时的小表数据量小于这个默认值,则自动开启MapJoin优化。在Hive 0.8.1以前使用hive.smalltable.filesize, 之后的版本使用hive.mapjoin.smalltable.filesize参数。Hive 0.11版本及以后的版本,可以使用hive.auto.convert.join.noconditionaltask.size和hive.auto.convert.join.noconditionaltask两个配置参数。

      hive.auto.convert.join.noconditionaltask.size的默认值为10000000(10MB)。

      hive.auto.convert.join.noconditionaltask的默认值是true,表示Hive会把输入文件的大小小于

      hive.auto.convert.join.noconditionaltask.size指定值的普通表连接操作自动转化为MapJoin的形式。

      hive.auto.convert.join.use.nonstaged:是否省略小表加载的作业,默认值为false。对于一些MapJoin的连接操作,如果小表没有必要做数据过滤或者列投影操作,则会直接省略小表加载时额外需要新增的MapReduce作业(一般一个MapReduc作业对应一个Stage)。

    10. hive.map.aggr:是否开启Map任务的聚合,默认值是true。

    11. hive.map.aggr.hash.percentmemory:默认值是0.5,表示开启Map任务的聚合,聚合所用的哈希表,所能占用到整个Map被分配的内存50%。例如,Map任务被分配2GB内存,那么哈希表最多只能用1GB。

    12. hive.mapjoin.optimized.hashtable:默认值是true,Hive 0.14新增, 表示使用一种内存优化的哈希表去做MapJoin。由于该类型的哈希表无法被序列化到磁盘,因此该配置只能用于Tez或者Spark。

    13. hive.mapjoin.optimized.hashtable.wbsize:默认值是10485760(10MB),优化的哈希表使用的是一种链块的内存缓存,该值表示一个块的内存缓存大小。这种结构对于数据相对较大的表能够加快数据加载,但是对于数据量较小的表,将会分配多余的内存。

    14. hive.map.groupby.sorted:在Hive 2.0以前的默认值是False,2.0及2.0以后的版本默认值为true。对于分桶或者排序表,如果分组聚合的键(列)和分桶或者排序的列一致,将会使用BucketizedHiveInputFormat。

    15. hive.vectorized.execution.mapjoin.native.enabled:是否使用原生的向量化执行模式执行MapJoin,它会比普通MapJoin速度快。默认值为False。

    16. hive.vectorized.execution.mapjoin.minmax.enabled:默认值为False,是否使用vector map join哈希表,用于整型连接的最大值和最小值过滤。

    17. 在默认情况下Map的个数defaultNum=目标文件或数据的总大小totalSize/hdfs集群文件块的大小blockSize。

    18. 当用户指定mapred.map.tasks,即为用户期望的Map大小,用expNum表示,这个期望值计算引擎不会立即采纳,它会获取mapred.map.tasks与defaultNum的较大值,用expMaxNum表示,作为待定选项。

    19. 获取文件分片的大小和分片个数,分片大小为参数mapred.min.split.size和blockSize间的较大值,用splitMaxSize表示,将目标文件或数据的总大小除以splitMaxSize即为真实的分片个数,用realSplitNum表示。

    20. 获取realSplitNum与expMaxNum较小值则为实际的Map个数。

MapReduce作业输出

OutputFormat作业输出

作业输出(OutputFormat)是用于MapReduce作业的输出规范。通过继承并实现OutputFormat接口,可以将数据输出到任何想要存储的数据存储文件中。

OutputFormat主要设计两个接口/类:OutputFormat和OutputCommitter。

  1. OutputFormat类

    OutputFormat类提供了3个方法

    • checkOutputSpecs()方法:校验作业的输出规范。

    • getOutputCommitter()方法:获取OutputCommitter对象, OutputCommitter用于管理配置,并提交作业的输出任务。

    • getRecordWriter()方法:获取RecordWriter对象,通过RecordWriter 将数据写入HDFS中。

  2. OutputCommitter类

    OuputCommitter类在这个输出任务组中要承担的任务如下:

    • 在初始化期间,做些作业运行时的准备工作。例如,在作业初始化期间为作业创建临时输出目录。

    • 在作业完成后,清理作业遗留的文件目录。

    • 检查任务是否需要提交。这是为了在任务不需要提交时避免提交过程。

    • 提交输出任务。一旦任务完成,整个作业要提交一个输出任务。

    • 丢弃任务提交。如果任务失败/终止,输出将被清理。如果任务无法清除(在异常块中),将启动一个单独的任务,使用相同的attempt-id执行清除。

Hive配置与作业输出

开启文件作业的压缩只要将hive.exec.compress.intermediate参数设置为true。

Hive提供写入到最终Hive表或者HDFS文件的压缩参数--hive.exec.compress.output。

如果要是MapReduce中起作用的前提是需要配置mapred.output.compression.codec和mapred.output.compression两个属性。Hive提供多种方式去合并执行过程中产生的小文件,例如: ·

  • 启用hive.merge.mapfile参数,默认启用,合并只有Map任务作业的输出文件;

  • 启用hive.merge.mapredfiles参数,默认启用,合并MapReduce作业最终的输出文件;

  • 设置hive.merge.smallfiles.avgsize参数,默认16MB,当输出的文件小于该值时,启用一个MapReduce任务合并小文件;

  • 设置hive.merge.size.per.task参数,默认256MB,是每个任务合并后文件的大小。一般设置为和HDFS集群的文件块大小一致。

MapReduce作业与Hive配置

较为常见的通过Hive配置操作MapReduce作业运行的配置。

  • hive.optimize.countdistinct:默认值为true,Hive 3.0新增的配置项。当开启该配置项时,去重并计数的作业会分成两个作业来处理这类SQL,以达到减缓SQL的数据倾斜作用。

  • hive.exec.parallel:默认值是False,是否开启作业的并行。默认情况下,如果一个SQL被拆分成两个阶段,如stage1、stage2,假设这两个stage 没有直接的依赖关系,还是会采用窜行的方式依次执行两个阶段。如果开启该配置,则会同时执行两个阶段。在资源较为充足的情况下开启该配置可以有效节省作业的运行时间。

  • hive.exec.parallel.thread.num:默认值是8,表示一个作业最多允许8个作业同时并行执行。

  • hive.exec.mode.local.auto:默认值是false,表示是否开启本地的执行模式。开启该配置表示Hive会在单台机器上处理完所有的任务,对于处理数据量较少的任务可以有效地节省时间。开启本地模式还需要以下几个配置帮助。

  • hive.exec.mode.local.auto.inputbytes.max:默认值134217728(128MB),表示作业处理的数据量要小于该值,本地模式。

  • hive.exec.mode.local.auto.tasks.max:默认值是4,表示作业启动的任务数必须小于或者等于该值,本地模式才能生效。在Hive 0.9的版本以后该配置被hive.exec.mode.local.auto.input.files.max配置所取代,其含义和hive.exec.mode.local.auto.tasks.max相同。

  • hive.optimize.correlation:默认值为false,这个配置我们称之为相关性优化,打开该配置可以减少重复的Shuffle操作。




推荐阅读
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • Hadoop源码解析1Hadoop工程包架构解析
    1 Hadoop中各工程包依赖简述   Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。   GoogleCluster:ht ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • 《Spark核心技术与高级应用》——1.2节Spark的重要扩展
    本节书摘来自华章社区《Spark核心技术与高级应用》一书中的第1章,第1.2节Spark的重要扩展,作者于俊向海代其锋马海平,更多章节内容可以访问云栖社区“华章社区”公众号查看1. ... [详细]
  •        在搭建Hadoop环境之前,请先阅读如下博文,把搭建Hadoop环境之前的准备工作做好,博文如下:       1、CentOS6.7下安装JDK,地址:http:b ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • HDFS2.x新特性
    一、集群间数据拷贝scp实现两个远程主机之间的文件复制scp-rhello.txtroothadoop103:useratguiguhello.txt推pushscp-rr ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 本文介绍了在sqoop1.4.*版本中,如何实现自定义分隔符的方法及步骤。通过修改sqoop生成的java文件,并重新编译,可以满足实际开发中对分隔符的需求。具体步骤包括修改java文件中的一行代码,重新编译所需的hadoop包等。详细步骤和编译方法在本文中都有详细说明。 ... [详细]
  • oracle恢复失败,RMAN数据库恢复失败解决一例
    问题:这是一个从RAC环境的数据库的RAMN备份恢复到一个单机数据库的操作。当恢复数据文件和恢复正常,但在open数据库时出报下面的错误。--rman备 ... [详细]
  •     这里使用自己编译的hadoop-2.7.0版本部署在windows上,记得几年前,部署hadoop需要借助于cygwin,还需要开启ssh服务,最近发现,原来不需要借助cy ... [详细]
  • Oracle主从同步、双向同步的配置
    (本教程展示了Windows环境的oracle数据库主从同步,Linux环境一样也可以)(把主数据库obpm和从数据库orcl用实际的数据库名给替换掉)(配置主从同步后,再配置双向同步,可 ... [详细]
  • 对于开源的东东,尤其是刚出来不久,我认为最好的学习方式就是能够看源代码和doc,測试它的样例为了方便查看源代码,关联导入源代 ... [详细]
  • Kylin 单节点安装
    软件环境Hadoop:2.7,3.1(sincev2.5)Hive:0.13-1.2.1HBase:1.1,2.0(sincev2.5)Spark(optional)2.3.0K ... [详细]
  • 前言折腾了一段时间hadoop的部署管理,写下此系列博客记录一下。为了避免各位做部署这种重复性的劳动,我已经把部署的步骤写成脚本,各位只需要按着本文把脚本执行完,整个环境基本就部署 ... [详细]
author-avatar
blankworld
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有