热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Shuffle的5步

Shuffle的本意是洗牌、混乱的意思,类似于java中的Collections.shuffle(List)方法,它会随机地打乱参数list里的元素顺序。MapReduce中的Sh

Shuffle的本意是洗牌、混乱的意思,类似于java中的Collections.shuffle(List)方法,它会随机地打乱参数list里的元素顺序。MapReduce中的Shuffle过程。所谓Shuffle过程可以大致的理解成:怎样把map task的输出结果有效地传送到reduce输入端。也可以这样理解, Shuffle描述着数据从map task输出到reduce task输入的这段过程。 

        技术分享

  上图表示的是Shuffle的整个过程。在Hadoop这样的集群环境中,大部分map task与reduce task的执行是在不同的节点上。当然很多情况下Reduce执行时需要跨节点去读取其它节点上的map task结果,并存储到本地。如果集群正在运行的job有很多,那么task的正常执行对集群内部的网络资源消耗会很严重。这种网络消耗是正常的,我们不能限制,能做的就是最大化地减少不必要的消耗。另外在节点内,相比于内存,磁盘IO对job完成时间的影响也是比较大的,spark 就是基于这点对hadoop做出了改进,将map和reduce的所有任务都在内存中进行,并且中间接过都保存在内存中,从而比hadoop的速度要快100倍以上。从最基本的要求来说,我们对Shuffle过程希望做到:     


  • 完整地从map task端读取数据到reduce 端。

  • 在跨节点读取数据时,尽可能地减少对带宽的不必要消耗。

  • 减少磁盘IO对task执行的影响。

Shuffle实际上包括map端和reduce端的两个过程,在map端中我们称之为前半段,在reduce端我们称之为后半段。

Shuffle前半段过程主要包括:

1、split过程;2、partition过程;3、sort排序;4、Combiner分组;5、Merge过程

每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来读取数据。 下面可以将Shuffle过程主要分为四个步骤:(结合WordCount的例子来进行说明)

1、split过程:在map task执行时,它的输入数据来源于HDFS的block,当然在MapReduce概念中,map task只读取split。Split与block的对应关系可能是多对一,默认是一对一。在WordCount例子里,假设map的输入数据都是像“aaa”这样的字符串。 
2、partiton过程:在经过mapper的运行后,我们得知mapper的输出是这样一个key/value对: key是“aaa”, value是数值1。因为当前map端只做加1的操作,在reduce task里才去合并结果集。前面我们知道这个job有3个reduce task,到底当前的“aaa”应该交由哪个reduce去做呢,这个主要有partition来决定。下面就说明如何决定由哪个reduce去做这个事情。 
        MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认是对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以自己重新实现partition的接口并设置到job上即可。 
        在我们的例子中,“aaa”经过Partitioner后返回0,也就是这对值应当交由第一个reducer来处理。接下来,需要将数据写入内存缓冲区中,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。 

其实3和4都是发生在溢写时的
3、sort排序过程:
       当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。

 4、Combiner分组
       在这里我们可以想想,因为map task的输出是需要发送到不同的reduce端去,而内存缓冲区没有对将发送到相同reduce端的数据做合并,那么这种合并应该是体现是磁盘文件中的。从官方图上也可以看到写到磁盘中的溢写文件是对不同的reduce端的数值做过合并。所以溢写过程一个很重要的细节在于,如果有很多个key/value对需要发送到某个reduce端去,那么需要将这些key/value值拼接到一块,减少与partition相关的索引记录。 
        在针对每个reduce端而合并数据时,有些数据可能像这样:“aaa”/1, “aaa”/1。对于WordCount例子,就是简单地统计单词出现的次数,如果在同一个map task的结果中有很多个像“aaa”一样出现多次的key,我们就应该把它们的值合并到一块,这个过程叫reduce也叫combine。但MapReduce的术语中,reduce只指reduce端执行从多个map task取数据做计算的过程。除reduce外,非正式地合并数据只能算做combine了。其实大家知道的,MapReduce中将Combiner等同于Reducer。 
        如果client设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。那么哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。 
5、merge过程:merge是将多个溢写文件合并到一个文件。每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。当map task真正完成时,内存缓冲区中的数据也全部溢写到磁盘中形成一个溢写文件。最终磁盘中会至少有一个这样的溢写文件存在(如果map的输出结果很少,当map执行完成时,只会产生一个溢写文件),因为最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做Merge。Merge是怎样的?如前面的例子,“aaa”从某个map task读取过来时值是5,从另外一个map 读取时值是8,因为它们有相同的key,所以得merge成group。什么是group。对于“aaa”就是像这样的:{“aaa”, [5, 8, 2, …]},数组中的值就是从不同溢写文件中读取出来的,然后再把这些值加起来。请注意,因为merge是将多个溢写文件合并到一个文件,所以可能也有相同的key存在,在这个过程中如果client设置过Combiner,也会使用Combiner来合并相同的key。 

  至此,map端的所有工作都已结束,最终生成的这个文件也存放在TaskTracker够得着的某个本地目录内。每个reduce task不断地通过RPC从JobTracker那里获取map task是否完成的信息,如果reduce task得到通知,获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程开始启动。 

  简单地说,reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件。

  Shuffle在reduce端的过程也能用三点来概括。当前reduce copy数据的前提是它要从JobTracker获得有哪些map task已执行结束。Reducer真正运行之前,所有的时间都是在拉取数据,做merge,且不断重复地在做。如前面的方式一样,下面我也分段地描述reduce 端的Shuffle细节: 
1. Copy过程,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。 
2. Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。这里需要强调的是,merge有三种形式:1)内存到内存  2)内存到磁盘  3)磁盘到磁盘。默认情况下第一种形式不启用,让人比较困惑,是吧。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。 
3. Reducer的输入文件。不断地merge后,最后会生成一个“最终文件”。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。对我们来说,当然希望它存放于内存中,直接作为Reducer的输入,但默认情况下,这个文件是存放于磁盘中的。当Reducer的输入文件已定,整个Shuffle才最终结束。然后就是Reducer执行,把结果放到HDFS上。 

 

Shuffle产生的意义是什么?
Shuffle过程的期望可以有: 
完整地从map task端拉取数据到reduce 端。
在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗。
减少磁盘IO对task执行的影响。
每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据该如何处理?
每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。 
MapReduce提供Partitioner接口,它的作用是什么?
MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。 
什么是溢写?
在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。
溢写是为什么不影响往缓冲区写map结果的线程?
溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。
当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对谁的排序?
当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。 
溢写过程中如果有很多个key/value对需要发送到某个reduce端去,那么如何处理这些key/value值?
如果有很多个key/value对需要发送到某个reduce端去,那么需要将这些key/value值拼接到一块,减少与partition相关的索引记录。
哪些场景才能使用Combiner呢?
Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。 
Merge的作用是什么?
最终磁盘中会至少有一个这样的溢写文件存在(如果map的输出结果很少,当map执行完成时,只会产生一个溢写文件),因为最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做Merge
每个reduce task不断的通过什么协议从JobTracker那里获取map task是否完成的信息?
每个reduce task不断地通过RPC从JobTracker那里获取map task是否完成的信息
reduce中Copy过程采用是什么协议?
Copy过程,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。
reduce中merge过程有几种方式?
merge有三种形式:1)内存到内存  2)内存到磁盘  3)磁盘到磁盘。默认情况下第一种形式不启用,让人比较困惑,是吧。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。

本文参考:http://www.cnblogs.com/ljy2013/articles/4435657.html


推荐阅读
  • MySQL的查询执行流程涉及多个关键组件,包括连接器、查询缓存、分析器和优化器。在服务层,连接器负责建立与客户端的连接,查询缓存用于存储和检索常用查询结果,以提高性能。分析器则解析SQL语句,生成语法树,而优化器负责选择最优的查询执行计划。这一流程确保了MySQL能够高效地处理各种复杂的查询请求。 ... [详细]
  • 本项目通过Python编程实现了一个简单的汇率转换器v1.02。主要内容包括:1. Python的基本语法元素:(1)缩进:用于表示代码的层次结构,是Python中定义程序框架的唯一方式;(2)注释:提供开发者说明信息,不参与实际运行,通常每个代码块添加一个注释;(3)常量和变量:用于存储和操作数据,是程序执行过程中的重要组成部分。此外,项目还涉及了函数定义、用户输入处理和异常捕获等高级特性,以确保程序的健壮性和易用性。 ... [详细]
  • VS2019 在创建 Windows 恢复点时出现卡顿问题及解决方法
    在使用 Visual Studio 2019 时,有时会在创建 Windows 恢复点时遇到卡顿问题。这可能是由于频繁的自动更新导致的,每次更新文件大小可能达到 1-2GB。尽管现代网络速度较快,但这些更新仍可能对系统性能产生影响。本文将探讨该问题的原因,并提供有效的解决方法,帮助用户提升开发效率。 ... [详细]
  • 为了提升单位内部沟通效率,我们开发了一套飞秋软件与OA系统的消息接口服务系统。该系统能够将OA系统中的审批、通知等信息自动同步至飞秋平台,确保员工在使用飞秋进行日常沟通的同时,也能及时获取OA系统的各类重要信息,从而实现无缝对接,提高工作效率。 ... [详细]
  • 浏览器作为我们日常不可或缺的软件工具,其背后的运作机制却鲜为人知。本文将深入探讨浏览器内核及其版本的演变历程,帮助读者更好地理解这一关键技术组件,揭示其内部运作的奥秘。 ... [详细]
  • Vim 编辑器功能强大,但其默认的配色方案往往不尽如人意,尤其是注释颜色为蓝色时,对眼睛极为不友好。为了提升编程体验,自定义配色方案显得尤为重要。通过合理调整颜色,不仅可以减轻视觉疲劳,还能显著提高编码效率和兴趣。 ... [详细]
  • 在 Mac 上查看隐藏文件和文件夹的详细指南。通过终端命令,您可以轻松地显示或隐藏这些文件。具体步骤如下:输入 `defaults write com.apple.finder AppleShowAllFiles -bool true` 以显示所有隐藏文件,或使用 `defaults write com.apple.finder AppleShowAllFiles -bool false` 以重新隐藏它们。此方法适用于各种版本的 macOS,帮助用户更好地管理和访问系统文件。 ... [详细]
  • 本文详细解析了逻辑运算符“与”(&&)和“或”(||)在编程中的应用。通过具体示例,如 `[dehua@teacher~]$[$(id -u) -eq 0] && echo "You are root" || echo "You must be root"`,展示了如何利用这些运算符进行条件判断和命令执行。此外,文章还探讨了这些运算符在不同编程语言中的实现和最佳实践,帮助读者更好地理解和运用逻辑运算符。 ... [详细]
  • 二分查找算法详解与应用分析:本文深入探讨了二分查找算法的实现细节及其在实际问题中的应用。通过定义 `binary_search` 函数,详细介绍了算法的逻辑流程,包括初始化上下界、循环条件以及中间值的计算方法。此外,还讨论了该算法的时间复杂度和空间复杂度,并提供了多个应用场景示例,帮助读者更好地理解和掌握这一高效查找技术。 ... [详细]
  • 蚂蚁课堂:性能测试工具深度解析——JMeter应用与实践
    蚂蚁课堂:性能测试工具深度解析——JMeter应用与实践 ... [详细]
  • 在 iOS 开发中,经常会遇到 `@(YES)`、`@[firstViewController]` 以及 `@{@a:@b}` 这样的语法糖。这些简化的写法分别用于初始化布尔值、数组和字典对象,能够显著提高代码的可读性和编写效率。例如,`@(YES)` 可以快速创建一个布尔值对象,`@[firstViewController]` 则用于创建包含单个元素的数组,而 `@{@a:@b}` 则用于创建键值对字典。理解这些语法糖的使用方法,有助于开发者更加高效地进行编码。 ... [详细]
  • Unity3D 中 AsyncOperation 实现异步场景加载及进度显示优化技巧
    在Unity3D中,通过使用`AsyncOperation`可以实现高效的异步场景加载,并结合进度条显示来提升用户体验。本文详细介绍了如何利用`AsyncOperation`进行异步加载,并提供了优化技巧,包括进度条的动态更新和加载过程中的性能优化方法。此外,还探讨了如何处理加载过程中可能出现的异常情况,确保加载过程的稳定性和可靠性。 ... [详细]
  • 在跨线程调用UI控件方法时,通常使用同步调用机制,如 `控件.Invoke(Delegate, 参数)`。这里需要声明并实现一个委托,因为控件本身并不知道如何处理跨线程操作。通过将具体的实现逻辑封装在委托中,控件可以正确地执行这些操作,确保线程安全性和UI的一致性。此外,为了提高性能和可维护性,建议对频繁的跨线程调用进行优化,例如使用异步调用或批量处理请求。 ... [详细]
  • 系统数据实体验证异常:多个实体验证失败的错误处理与分析
    在使用MVC和EF框架进行数据保存时,遇到了 `System.Data.Entity.Validation.DbEntityValidationException` 错误,表明存在一个或多个实体验证失败的情况。本文详细分析了该错误的成因,并提出了有效的处理方法,包括检查实体属性的约束条件、调试日志的使用以及优化数据验证逻辑,以确保数据的一致性和完整性。 ... [详细]
  • 本文详细解析了Autofac在高级应用场景中的具体实现,特别是如何通过注册泛型接口的类来优化依赖注入。示例代码展示了如何使用 `builder.RegisterAssemblyTypes` 方法,结合 `typeof(IEventHandler).Assembly` 和 `Where` 过滤条件,动态注册所有符合条件的类,从而简化配置并提高代码的可维护性。此外,文章还探讨了这一方法在复杂系统中的实际应用及其优势。 ... [详细]
author-avatar
遗留下的痛cc-x_393
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有