热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spark程序优化总结

转行写spark程序快一年时间了,我最深刻的体会是实现功能容易,但如何提高程序的执行效率却是个难题。我们用的spark主要是sparksql框架,使用sparksql实现数据的清洗

转行写spark程序快一年时间了,我最深刻的体会是实现功能容易,但如何提高程序的执行效率却是个难题。我们用的spark主要是spark sql框架,使用spark sql实现数据的清洗、抽取以及计算。期间,我们用了大部分的时间对程序做优化,现将对程序的优化方法总结如下:

1. 数据存储优化

在数据存储上,经过了从hdfs切换到cassandra,再从cassandra换到内存文件系统alluxio上。

1.1 hadfs存储

hdfs是hadoop的分布式文件系统,被设计成适合运行在通用硬件上的分布式文件系统。能够存储多种格式的数据,包括文本及parquet格式等等。它是一个主从结构,一个hdfs集群是由一个名字节点,它是一个管理文件命名空间和调节客户端访问文件的主服务器,当然还有一些数据节点,通常是一个节点一个机器,它来管理对应节点的存储,如下图1所示。hdfs对外开放文件命名空间并允许用户数据以文件形式存储。

《spark程序优化总结》 图1 hdfs 架构图

  hdfs内部是将一个文件分割成一个或多个块,这些块被存储在一组数据节点中。名字节点用来操作文件命名空间的文件或目录操作,如打开,关闭,重命名等等。它同时确定块与数据节点的映射。数据节点负责来自文件系统客户的读写请求。数据节点同时还要执行块的创建,删除,和来自名字节点的块复制指令。

1.2 cassandra存储

cassandra是一个面向带索引的Nosql数据库,数据是以松散结构的多维哈希表存储在数据库中。它最大的优点就是写速度非常快,并且由于有索引,可以避免重复数据的出现,有自动去重的功能,而读的速度却不那么尽如人意。

1.3 alluxio+parquet存储

由于使用hdfs和cassandra加载数据时,数据的加载时间就占用了很大部分,甚至占用了一半的时间。基于数据加载速度的考虑,后来采用alluxio存储需要加载的数据,使用hdfs作为持久层,这样数据可以直接从内存加载,加载的速度大大提高了。
  现在的spark程序主要使用spark sql框架,程序结构如下图2所示。spark sql程序运行在yarn集群上,直接访问alluxio内存文件系统加载数据,持久化的数据存储在hdfs防止丢失。alluxio作为一个内存的文件系统,也可以存放多种类型的格式的文件,为了节省存储开销,提高访问速度,此处我们在alluxio上使用parquet格式存储数据。

《spark程序优化总结》 图2 spark程序结构图

2. 逻辑结构优化

2.1 数据加载逻辑优化

由于程序计算需要加载的历史数据较多,而程序的每轮执行都要加载历史数据,历史数据的加载占用了数据加载的大部分时间。因此我们后来改用数据增量加载的形式,即将数据分天存放。每次程序重启时,先加载历史数据,即当天以前的数据,将历史数据加载后进行预处理后,保存在内存中。以后每轮就只需加载当天数据,再跟历史数据合并。这样以后每轮都避免了重新加载历史数据,并进行预处理的时间,显著地提高了程序的执行效率。

2.2 数据分区合并

我们使用alluxio,以parquet的格式存储源数据时,如果小文件过多,数据的加载速度会慢很多。因此在存储源数据时,建议使用coalesce将数据合并到几个分区中,以防止小文件过多。虽然reparation也能够实现分区聚合功能,但reparation会出现shuffle,严重影响spark程序的执行效率。

3. 配置参数优化

3.1 任务调度模式修改

spark的任务调度模式分为FIFO(先进先出)和FAIR(公平竞争)。FIFO的调度机制是将队列中的job按照先进先出的方式进行调度执行,而FAIR则按照是使需要资源较少的任务先执行,如所有任务都得按先进先出的方式,则小作业也被阻塞不能执行。设置调度模式的方式,只需一行代码如下:

sparkConf.set(“spark.scheduler.mode”,”FAIR”)

3.2 执行并行度参数设置

并行度决定了spark作业划分task的数量,一般情况下,task越多,程序执行的并行度就越高。但task数量也不能太多,因为task的创建也需要耗时间和内存资源。一般建议设置的并行度是num-executorsexecutor-cores的2-3倍。如果不设置,程序默认的并行度只是num-executorsexecutor-cores的不到2倍。

4.shuffle调优

4.1 定义广播变量

大多数spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。shuffle调优的一个途径就是尽可能地减少shuffle,在两个表join的过程中,如果一个表不会经常改变,同时数据量又不会太大时,将这个表广播出去,这样集群上的每个节点上都会保存这个表,这样需要join操作的另一个表就可以在自己的节点上完成关联操作,可以尽可能地减少shuffle。

4.2 减少数据倾斜

在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。
  因此出现数据倾斜的时候,spark作业运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。数据倾斜的例子如下图所示,在三个节点上对应的以hello为key的总共有6条数据,这些数据都会被拉取到同一个task中进行处理,而以world为key的只有3条。实际情况中task可能个别key的数据量可能更大,比key少的可能多n多倍。因此key多的task的运行速度可能会比key少的task执行速度要慢n倍,而整个程序的执行速度是由最慢的task决定的。同时,如果某一个task处理的数据量过多的话,还会出现内存溢出的危险。

《spark程序优化总结》 图3 数据倾斜案例

  实际写spark程序时,要尽量避免出现数据倾斜的情况,如果出现上述现象时,原则是采用两阶段聚合的方式。首先针对某些key较多的数据,进行拆分,将一个key拆分成多个,比如上图3,对hello进行拆分,将hello前面加上1-10之间的任意随机数,变成1_hello,2_hello直到10_hello,对这些拆分后的key首先聚合,聚合后变成(1_hello,2),(2_hello,2)在再将这些随机数前缀去掉,再进行聚合,实现方式如下图4。

《spark程序优化总结》 图4 数据倾斜处理案列

5. 升级spark版本

还有一种较为有效的办法是升级spark版本,spark 2.1版本较spark 1.6版本性能有更多的提升。其中的优势在《spark sql执行流程》的最后有所介绍。


推荐阅读
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • 基于分布式锁的防止重复请求解决方案
    一、前言关于重复请求,指的是我们服务端接收到很短的时间内的多个相同内容的重复请求。而这样的重复请求如果是幂等的(每次请求的结果都相同,如查 ... [详细]
  • ZooKeeper 学习
    前言相信大家对ZooKeeper应该不算陌生。但是你真的了解ZooKeeper是个什么东西吗?如果别人面试官让你给他讲讲ZooKeeper是个什么东西, ... [详细]
  • LVS-DR直接路由实现负载均衡示例
    nsitionalENhttp:www.w3.orgTRxhtml1DTDxhtml1-transitional.dtd ... [详细]
  • 你知道Kafka和Redis的各自优缺点吗?一文带你优化选择,不走弯路 ... [详细]
  • Centos下安装memcached+memcached教程
    本文介绍了在Centos下安装memcached和使用memcached的教程,详细解释了memcached的工作原理,包括缓存数据和对象、减少数据库读取次数、提高网站速度等。同时,还对memcached的快速和高效率进行了解释,与传统的文件型数据库相比,memcached作为一个内存型数据库,具有更高的读取速度。 ... [详细]
  • 超级简单加解密工具的方案和功能
    本文介绍了一个超级简单的加解密工具的方案和功能。该工具可以读取文件头,并根据特定长度进行加密,加密后将加密部分写入源文件。同时,该工具也支持解密操作。加密和解密过程是可逆的。本文还提到了一些相关的功能和使用方法,并给出了Python代码示例。 ... [详细]
  • 本文整理了Java中org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc.getTypeInfo()方法的一些代码示例,展 ... [详细]
  • 一面自我介绍对象相等的判断,equals方法实现。可以简单描述挫折,并说明自己如何克服,最终有哪些收获。职业规划表明自己决心,首先自己不准备继续求学了,必须招工作了。希望去哪 ... [详细]
  • Linux线程的同步和互斥
    目录1、线程的互斥2、可重入VS线程安全3、线程的同步1、线程的互斥 ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • 【转】腾讯分析系统架构解析
    TA(TencentAnalytics,腾讯分析)是一款面向第三方站长的免费网站分析系统,在数据稳定性、及时性方面广受站长好评,其秒级的实时数据更新频率也获得业界的认可。本文将从实 ... [详细]
  • CentOS 7配置SSH远程访问及控制
    nsitionalENhttp:www.w3.orgTRxhtml1DTDxhtml1-transitional.dtd ... [详细]
  • MR程序的几种提交运行模式本地模型运行1在windows的eclipse里面直接运行main方法,就会将job提交给本地执行器localjobrunner执行-- ... [详细]
author-avatar
傲慢的小草7_170
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有