热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

大表与大表join数据倾斜_12中方法,彻底搞定数据倾斜!

这是我的第64篇原创数据倾斜是上帝对某个服务器的过于偏爱。造成数据倾斜的原因上帝太过于偏爱某个服务器,因此给他分配了太多的任务,导致数据都倾斜到这台服务

这是我的第64篇原创

f1c44ac850065cbdc7ecf0aefd561464.png

数据倾斜是上帝对某个服务器的过于偏爱。

6ce01734694998c77d6c35959142e1b7.png造成数据倾斜的原因

上帝太过于偏爱某个服务器,因此给他分配了太多的任务,导致数据都倾斜到这台服务器了。

在大数据场景中,无论是MapReduce还是Spark,都会因两阶段之间的shuffle导致各个服务器接受到的数据导致处理量失衡的问题。情况严重,就变成数据倾斜了。

我们之所以创造分布式环境,就是因为我们将一个巨大的任务拆解成若干个小任务,给不同的服务器执行,这样总执行时间就会减小至1/n。理想状态的任务处理情况应该如下图所示:

38055a8ea8e33f1ce85ed26f518ac2dc.png

原本单机环境需要执行100s的任务,由5台服务器共同执行,每台服务器执行20s,最后总时间会远远大于单机环境的执行时间。而且我们还可以通过不断增加服务器,来不断减少总运行时间。但是往往会出现这种情况:

7ead8e1049daee0dbc0cec5306742506.png

某4台服务器很快就执行完了任务,但是其中有一台服务器的迟迟不能完成,严重的时候甚至会OOM(Out Of Memory)。究其原因,其实如同上面说过的,在分布式处理的不同阶段之间会有一个混洗(shuffle)的过程:

c40f47f1bf0e3b7d79807d95b9d3ea8b.png

在Map或者Spark的Stage1阶段,由于每个数据块的大小都是一致的(默认128M),所以在这个阶段是不会出现数据倾斜的。但是一旦我们对数据进行Shuffle,比如按照商品品类进行分组之后,在Reduce或Stage2阶段,数据将会出现严重的倾斜:原本每台服务器都只需要处理3条数据,Shuffle之后,其中两台服务器各只需处理1条,而剩余的那台服务器则需要执行8条数据。三台服务器处理的数据量比为7:1:1。数据倾斜至第一台服务器。任务延迟,甚至OOM。

如何解决数据倾斜呢:

三个层面:

1、预判-原始数据预防,保证原始数据不倾斜;

2、躲闪-规避数据倾斜,尽量规避Shuffle;

3、硬刚-处理数据倾斜,无法规避Shuffle,用各种办法优化Shuffle过程。

87ed45d0cfcc01e91de3655391b81fec.png

6ce01734694998c77d6c35959142e1b7.png预判

虽说HDFS的数据都是128M,不会一开始就出现数据倾斜,但是仍然有以下几种情况:

1、数据压缩后,128M文件大小一样,但是数据量不一样;

2、存在不可切分的大文件;

3、流式数据。

这几种情况还是可能会导致程度不一的数据倾斜的。我们需要做一些简单的处理:

1、数据压缩后,128M文件大小一样,但是数据量不一样;

  • 解决办法:压缩前,保证每个文件中的数据量基本一致;

2、存在不可切分的大文件;

  • 解决办法:生成数据时,尽量减少不可切分的文件,尽量按照HDFS的逻辑,存成可切分的文件;或者保证这些大文件中的数据量基本一致,且单机可处理。

3、流式数据;

  • 解决办法:Kafka的partition实现建议使用随机、轮询等方法,尽量使各topic的各partition的数据尽量平衡。

6ce01734694998c77d6c35959142e1b7.png闪避

既然我们知道数据倾斜的主要原因的shuffle导致的,那么我们首要的优化方向就是shuffle,能不用尽量不要用。有以下几种方法我们可以规避:

4、ETL预处理

在面对无法避免的原始数据倾斜(Hive表中key分布不均匀、kafka中某topic的partititoner含有业务属性,天然不均匀等),我们可以通过前置ETL过程,进行预处理。

注意:这个方法只是将成本转嫁,并没有解决问题。适合削峰填谷类的操作,比如我们将数据预处理好,避免凌晨集中计算的时候处理时间过长,影响其他任务。

5、过滤不必要的key

很多数据分析师在单体数据库的时候,就有一个不好的习惯:总喜欢select *。在hive、spark等分布式环境中,就吃苦头了,经常遇到数据倾斜甚至OOM。有经验的数据分析师在写sql的时候,通常会先group by一下,看看数据的分布情况,然后再处理。

咱在分布式环境中也可以做类似的事情,就是采样。

离线环境可以用随机采样,实时环境可以用鱼塘采样。采样能够快速摸清楚各个key的大致分布。扫一眼数据量大的key,如果跟你的计算没啥关系,直接过滤就行。

比如上面举的例子,母婴品类占绝大多数,但是运营的要求是分析3C产品,那你过滤掉母婴产品,一则减少计算量,二则规避了数据倾斜的问题。

6、Reduce join 改为Map join

如果是大小表的join,比如订单表和订单类型、订单状态的join,如果使用reduce join的话,就非常容易在shuflle之后出现数据倾斜。建议的原则:只要一台服务器的内存能吃下这张小表(主要看服务器内存大小,建议2g以内,再大就影响服务器性能了),就建议用map join。这样join完之后,每份数据依然是基本均衡的,而且规避了shuffle导致数据倾斜的问题。

6ce01734694998c77d6c35959142e1b7.png硬刚

上述几步,能做的都做了,还是不行,那就只能硬刚了。这时就只能八仙过海各显神通了。基本的逻辑还是一样的,就是能拆的尽量拆,不能拆的用空间换时间,或者自定义。

7、通用优化:shuffle并行度

spark的shuffle并行度默认值是200,建议根据服务器的情况进行调整。一般是集群cpu总和的2-3倍。当发生数据倾斜的时候,适当增大并行度,可以让任务和数据更均匀的分布在整个集群中。但是这个调优方法有些玄学成分在,因为你不知道他是咋分过去的。

并行度调整有三个方法:

●操作函数内设置

testRDD.groupByKey(200)

●代码中设置“spark.default.parallelism”

conf.set("spark.default.parallelism", 200)

●配置文件中设置“$SPARK_HOME/conf/spark-defaults.conf” 文件

spark.default.parallelism 200

8、拆分超大key

前面说过采样后过滤。如果采样之后发现这个key还是你需要的,无法怎么办?那就把超大数据量的key拆分出来,单独做成一个任务,这样超大数据量的key一个任务,其他中小数据量的key一个任务,两个任务分别做join啊什么的处理,最后把结果合并一下就行了。

为了避免超大数据量的key单独join的时候还是一个key一个任务,可以在key上加上随机数取模的前缀,这样就把数据分成了N份,然后再join。

9、阶段拆分-两阶段聚合

对于聚合类的操作,这种方式可以说是数据倾斜的大杀器。简单来说就是在需要聚合的key前加一个随机数取模的前缀,这样就能得到非常均匀的key,然后按这个加工之后的key进行第一次聚合之后,再对聚合的结果,按照原始key进行二次聚合,这样基本就不可能出现数据倾斜了。示意图如下:

9d4ad88b3134183ccdfdb9ef483065d1.png

对比之前的例子中,处理母婴的服务器和处理3c、图书的服务器任务量是7:1:1,这个方案的数据就非常均匀了。

10、任务拆分

很多时候数据情况会非常复杂,有null值、有超大数据量的key、还有各种需要过滤的数据,还有各种聚合和join。那这个时候就需要把任务再拆分。一部分用上面的key值过滤,一部分用Map Join,一部分用超大key单独处理。

11、随机前缀

前面说过小表join的时候可以用Map join。但是遇到大表join大表咋办?三个方法:1、大表拆成小表,多次join;2、SortMergeJoin;3、位图法(详见《位图法搞定10亿用户量用户标签处理》)。

那大表+中表,该咋处理?可以考虑用随机前缀+RDD扩容的方法解决join的问题。

如果你将要join的表不大不小,又不适合用上面大大表的处理方法,那就可以用这个通用的join方法。简单来说,就是对A表中需要join的字段加上n以内的随机数前缀,然后再把B表中的数据复制N份,join的字段加上1-N的前缀,然后量表再join,就能解决数据倾斜的问题了。示意如下:

原始数据如下:

745d68bdc0498c3f0260dbbe23c33fe8.png

不经处理直接join是这样的,part1很明显比part2要多好几倍的数据:

6b9b7649d44929d0d12d671406980f94.png

我们对A表和B表进行随机前缀和RDD扩容处理之后:

226e8b67a9bcce8e6e59dc364dcd518b.png

然后再join,这样每个part的数据就非常均匀了:

76e4c1eab469889834061e1b09843e24.png

这个方法比较坑的是B表这个RDD需要扩容,要复制N份,对内存要求比较高。但是这个方法可以说是通杀Join的数据倾斜问题。

12、自定义partitioner

上面说改spark的并行数也可以改善数据倾斜,但是有点玄学的意思在里面。其根本原因就是不管你怎么调优,计算引擎的分区都是按照固定的方法进行的,根本不会,也没办法考虑数据真实情况。

无论是二阶段聚合解决聚合的问题,还是随机前缀+RDD扩容解决join的问题,都是通用解决办法,而且还麻烦。其实最好的解决办法就是根据现在处理的这份数据,单独写一个适合的partitioner。比如现在是按省份进行汇总数据,如果只是简单的按省份去分(这并没有错),那么数据肯定会倾斜,因为各省的数据天然不一样。我们可以通过历史数据、抽样数据或者一些常识,对数据进行人工分区,让数据按照我们自定义的分区规则比较均匀的分配到不同的task中。

常见的分区方式:

  • 随机分区:每个区域的数据基本均衡,简单易用,偶尔出现倾斜,但是特征同样也会随机打散。

  • 轮询分区:绝对不会倾斜,但是需要提前预知分成若干份,进行轮询。

  • hash散列:可以针对某个特征进行hash散列,保证相同特征的数据在一个区,但是极容易出现数据倾斜。

  • 范围分区:需要排序,临近的数据会被分在同一个区,可以控制分区数据均匀。

数据倾斜并不可怕,咱可以糙一些,也可以精致一些。但是建议还是糙一些,这样简单粗暴,多节省一些时间干(xue)点(dong)别(xi)的3b393a1fd5b3ebd5a1428117d7513c56.gif

往期精彩回顾

热文 | 当我们在刷抖音的时候,抖音在干什么?

干货 | 架构师带你细细的捋一遍MapReduce全流程

干货 | bitmap解决超大表join实战案例

如果你觉得有用,就请帮忙分享一下,谢谢你了56c02058973a6320855c6c79e17f5ba5.gif



推荐阅读
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 本文介绍如何在将数据库从服务器复制到本地时,处理因外键约束导致的数据插入失败问题。 ... [详细]
  • SSAS入门指南:基础知识与核心概念解析
    ### SSAS入门指南:基础知识与核心概念解析Analysis Services 是一种专为决策支持和商业智能(BI)解决方案设计的数据引擎。该引擎能够为报告和客户端应用提供高效的分析数据,并支持在多维数据模型中构建高性能的分析应用。通过其强大的数据处理能力和灵活的数据建模功能,Analysis Services 成为了现代 BI 系统的重要组成部分。 ... [详细]
  • 包含phppdoerrorcode的词条 ... [详细]
  • Linux下MySQL 8.0.28安装指南
    本文详细介绍了在Linux系统上安装MySQL 8.0.28的步骤,包括下载数据库、解压数据包、安装必要组件和启动MySQL服务。 ... [详细]
  • Python 数据可视化实战指南
    本文详细介绍如何使用 Python 进行数据可视化,涵盖从环境搭建到具体实例的全过程。 ... [详细]
  • 本文详细介绍了如何解决DNS服务器配置转发无法解析的问题,包括编辑主配置文件和重启域名服务的具体步骤。 ... [详细]
  • Spark与HBase结合处理大规模流量数据结构设计
    本文将详细介绍如何利用Spark和HBase进行大规模流量数据的分析与处理,包括数据结构的设计和优化方法。 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 在 Ubuntu 中遇到 Samba 服务器故障时,尝试卸载并重新安装 Samba 发现配置文件未重新生成。本文介绍了解决该问题的方法。 ... [详细]
  • 如何将TS文件转换为M3U8直播流:HLS与M3U8格式详解
    在视频传输领域,MP4虽然常见,但在直播场景中直接使用MP4格式存在诸多问题。例如,MP4文件的头部信息(如ftyp、moov)较大,导致初始加载时间较长,影响用户体验。相比之下,HLS(HTTP Live Streaming)协议及其M3U8格式更具优势。HLS通过将视频切分成多个小片段,并生成一个M3U8播放列表文件,实现低延迟和高稳定性。本文详细介绍了如何将TS文件转换为M3U8直播流,包括技术原理和具体操作步骤,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 触发器的稳态数量分析及其应用价值
    本文对数据库中的SQL触发器进行了稳态数量的详细分析,探讨了其在实际应用中的重要价值。通过研究触发器在不同场景下的表现,揭示了其在数据完整性和业务逻辑自动化方面的关键作用。此外,还介绍了如何在Ubuntu 22.04环境下配置和使用触发器,以及在Tomcat和SQLite等平台上的具体实现方法。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 2016-2017学年《网络安全实战》第三次作业
    2016-2017学年《网络安全实战》第三次作业总结了教材中关于网络信息收集技术的内容。本章主要探讨了网络踩点、网络扫描和网络查点三个关键步骤。其中,网络踩点旨在通过公开渠道收集目标信息,为后续的安全测试奠定基础,而不涉及实际的入侵行为。 ... [详细]
  • 在搭建Hadoop集群以处理大规模数据存储和频繁读取需求的过程中,经常会遇到各种配置难题。本文总结了作者在实际部署中遇到的典型问题,并提供了详细的解决方案,帮助读者避免常见的配置陷阱。通过这些经验分享,希望读者能够更加顺利地完成Hadoop集群的搭建和配置。 ... [详细]
author-avatar
KristenW_ong_湘
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有