文章目录
- Spark优化总结(一)——数据倾斜
- 1. 前言
- 2. 数据源倾斜
- 3. 数据存储倾斜
- 4. 数据处理倾斜
- 4.1 过滤导致的数据不均
- 4.2 默认值、异常数据等导致的不均
- 4.3 join导致的数据不均
- 4.4 groupBy导致的数据不均
- 4.5 key倾斜了,但还必须要该key?
Spark优化总结(一)——数据倾斜
1. 前言
- 在Spark应用开发过程中,通常大多数性能问题是在数据倾斜点上。针对数据倾斜问题,我们可以按运行状态分出以下几点:
- 数据源倾斜
- 存储结果倾斜
- 数据处理倾斜(问题最多的部分)
2. 数据源倾斜
-
说明
- 使用Spark读取数据时,通常我们的数据源也是分布式存储的,这就需要Spark从数据源的各个节点分别读取数据。如果因为某些原因,导致某份数据在存储集群的各个节点分布不均匀,那么读取数据时需要的时间会等于最慢的节点
- 例如,数据分布在10个节点上,其中一个节点的数据量是其他的节点的5倍
-
举例:Hive分桶导致的数据倾斜
- Hive设计表的时候可为表设定分桶,选择一个良好的分布列能够使数据均匀的分布到集群中。
- 如果分布列选择错误,很可能导致某些节点数据极其多,而其他节点又很少。那么,使用Spark读取Hive表时,就需要等待数据最多的节点执行完毕。
-
举例:Kafka分区导致的数据倾斜
- 生产到Kafka某个Topic的数据,可以指定Partition、Key或者默认值,用于决定数据会发送到Topic的那一个分区。
- 如果生成的Partition、Key不对,那么会导致Topic某些分区数据量远多于其他分区。那么,使用SparkStream(或者StructuredStream)来接取Topic的数据时,会导致Spark某个节点单次接入的数据过多。
3. 数据存储倾斜
4. 数据处理倾斜
4.1 过滤导致的数据不均
-
开发时,在处理数据之前提前做好过滤,是一个良好的习惯。但是,过滤条件(例如where、filter)可能会导致部分分区的数据量较少(因为该分区符合过滤条件的数据少),而其他分区数据较多,那么本次Stage执行的时间将取决于最慢的节点(数据最多)。
-
例如:数据均匀的分布在10个分区,每个节点分区约1000万条。你需要根据某个条件进行过滤数据,然后再执行处理操作。当数据过滤后,可能其中3个分区约有900万条符合条件,其他7个分区约有200万条符合条件。接着继续执行后续逻辑,显然本次Stage需要等数据最多的3个分区执行完才能结束(如果过滤后需要执行的处理逻辑较为花费时间,例如每条数据还需要10秒,那么效果尤为明显)。
-
解决方案:
- 使用repartion将数据再次分布均匀。需要注意的是repartion是shuffle操作,网络传输需要消耗时间,如果你的数据倾斜程度不大可以不做。也就是说你需要对比repartion的时间开销与数据倾斜导致的时间开销后,再做决定。
- 如果直接repartion也倾斜了,那么你需要调用partitionBy()重写一个Patitioner,选择合适的字段来做Shuffle。(没有合适的字段,那就加随机前缀)
- 关于coalesce,在此几乎没有效果,因为coalesce是合并各自节点本地的分区,并没有解决数据倾斜的问题
-
简约示意图
// 数据原状态| node1 | node2 | node3
count | 10050 | 10002 | 10035↓
// where/filter 根据条件过滤| node1 | node2 | node3
count | 5000 | 510 | 490↓
// 有的节点符合条件的多,有的节点符合条件的少
// 如果继续进行map类型的处理,node1将远远慢于node2、node3,同时node2、node3还处于空闲状态(浪费)
// 如果每条数据的处理时间消耗比较大,node1与node2、node3的差距将非常大↓
// 使用repartition,重新分布| node1 | node2 | node3
count | 2014 | 1997 | 1989↓
// 继续进行map类型处理,每个节点的计算资源都利用起来了
// ……
4.2 默认值、异常数据等导致的不均
- 开发中遇到数据倾斜,可以优先排查该项。通常一个字段的处理中很可能会给一个默认值(例如0、null、空白字符等)或者一个异常值(例如@#¥%),这都是上游解析的问题导致的。
- 例如,一个表共1亿条数据,其中一列80%都是默认值字符串“01234”,如果使用该列作为shuffle分区的key,那么会导致数据倾斜。
- 建议提前统计好默认值或异常值的占比(例如抽样统计),如果不需要为该值的数据,那么提前过滤掉。如果需要,可以尝试加随机前缀(前提是能满足你的业务)。另外,你可以换一种业务的实现方式,想办法选择其他列作为shuffle的分区key。
4.3 join导致的数据不均
- 进行join时需要选择一个列,如果该的列值倾斜那么join操作也会倾斜。
- 例如:某批订单信息约1000万条,其中有city字段,其中A、B城市订单较多(各约300万),其他城市订单量较少(都少于50万)。此时需要将一份城市相关的数据与该数据关联,如果选择city字段进行join,会使各城市订单信息分别分布到各个分区节点上。显然A、B城市订单数据各自所在分区节点的数据量较大,需要等待该分区处理完,Stage才算结束。
- 解决方案:
- 使用广播进行 map-side join
- 示例:
- Spark代码可读性与性能优化——示例五(HashJoin)
4.4 groupBy导致的数据不均
- 进行groupBy时需要选择一个列,如果该的列值倾斜那么groupBy操作也会倾斜。
- 示例:同join,如果需要根据城市分组处理,执行groupBy操作后,因为A、B城市所在分区节点的数据量较大,同样需要等待。
- 解决方案:
- 想办法使用reduceByKey、aggregateByKey、combineByKey替代
- 示例:
- Spark代码可读性与性能优化——示例六(groupBy、reduceByKey、aggregateByKey)
- Spark代码可读性与性能优化——示例八(一个业务逻辑,多种解决方式)
4.5 key倾斜了,但还必须要该key?
- 如果你的业务每天经常根据该key进行不同的数据处理的话,那么可以尝试预先处理一次。
- 提前处理该key生成一份数据,虽然第一次仍然会有倾斜问题,速度较慢,但是后面很多次的业务可以跟新新生成的数据进行处理,不会再有该key倾斜的问题了。