热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spark程序优化总结

转行写spark程序快一年时间了,我最深刻的体会是实现功能容易,但如何提高程序的执行效率却是个难题。我们用的spark主要是sparksql框架,使用sparksql实现数据的清洗

转行写spark程序快一年时间了,我最深刻的体会是实现功能容易,但如何提高程序的执行效率却是个难题。我们用的spark主要是spark sql框架,使用spark sql实现数据的清洗、抽取以及计算。期间,我们用了大部分的时间对程序做优化,现将对程序的优化方法总结如下:

1. 数据存储优化

在数据存储上,经过了从hdfs切换到cassandra,再从cassandra换到内存文件系统alluxio上。

1.1 hadfs存储

hdfs是hadoop的分布式文件系统,被设计成适合运行在通用硬件上的分布式文件系统。能够存储多种格式的数据,包括文本及parquet格式等等。它是一个主从结构,一个hdfs集群是由一个名字节点,它是一个管理文件命名空间和调节客户端访问文件的主服务器,当然还有一些数据节点,通常是一个节点一个机器,它来管理对应节点的存储,如下图1所示。hdfs对外开放文件命名空间并允许用户数据以文件形式存储。

《spark程序优化总结》 图1 hdfs 架构图

  hdfs内部是将一个文件分割成一个或多个块,这些块被存储在一组数据节点中。名字节点用来操作文件命名空间的文件或目录操作,如打开,关闭,重命名等等。它同时确定块与数据节点的映射。数据节点负责来自文件系统客户的读写请求。数据节点同时还要执行块的创建,删除,和来自名字节点的块复制指令。

1.2 cassandra存储

cassandra是一个面向带索引的Nosql数据库,数据是以松散结构的多维哈希表存储在数据库中。它最大的优点就是写速度非常快,并且由于有索引,可以避免重复数据的出现,有自动去重的功能,而读的速度却不那么尽如人意。

1.3 alluxio+parquet存储

由于使用hdfs和cassandra加载数据时,数据的加载时间就占用了很大部分,甚至占用了一半的时间。基于数据加载速度的考虑,后来采用alluxio存储需要加载的数据,使用hdfs作为持久层,这样数据可以直接从内存加载,加载的速度大大提高了。
  现在的spark程序主要使用spark sql框架,程序结构如下图2所示。spark sql程序运行在yarn集群上,直接访问alluxio内存文件系统加载数据,持久化的数据存储在hdfs防止丢失。alluxio作为一个内存的文件系统,也可以存放多种类型的格式的文件,为了节省存储开销,提高访问速度,此处我们在alluxio上使用parquet格式存储数据。

《spark程序优化总结》 图2 spark程序结构图

2. 逻辑结构优化

2.1 数据加载逻辑优化

由于程序计算需要加载的历史数据较多,而程序的每轮执行都要加载历史数据,历史数据的加载占用了数据加载的大部分时间。因此我们后来改用数据增量加载的形式,即将数据分天存放。每次程序重启时,先加载历史数据,即当天以前的数据,将历史数据加载后进行预处理后,保存在内存中。以后每轮就只需加载当天数据,再跟历史数据合并。这样以后每轮都避免了重新加载历史数据,并进行预处理的时间,显著地提高了程序的执行效率。

2.2 数据分区合并

我们使用alluxio,以parquet的格式存储源数据时,如果小文件过多,数据的加载速度会慢很多。因此在存储源数据时,建议使用coalesce将数据合并到几个分区中,以防止小文件过多。虽然reparation也能够实现分区聚合功能,但reparation会出现shuffle,严重影响spark程序的执行效率。

3. 配置参数优化

3.1 任务调度模式修改

spark的任务调度模式分为FIFO(先进先出)和FAIR(公平竞争)。FIFO的调度机制是将队列中的job按照先进先出的方式进行调度执行,而FAIR则按照是使需要资源较少的任务先执行,如所有任务都得按先进先出的方式,则小作业也被阻塞不能执行。设置调度模式的方式,只需一行代码如下:

sparkConf.set(“spark.scheduler.mode”,”FAIR”)

3.2 执行并行度参数设置

并行度决定了spark作业划分task的数量,一般情况下,task越多,程序执行的并行度就越高。但task数量也不能太多,因为task的创建也需要耗时间和内存资源。一般建议设置的并行度是num-executorsexecutor-cores的2-3倍。如果不设置,程序默认的并行度只是num-executorsexecutor-cores的不到2倍。

4.shuffle调优

4.1 定义广播变量

大多数spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。shuffle调优的一个途径就是尽可能地减少shuffle,在两个表join的过程中,如果一个表不会经常改变,同时数据量又不会太大时,将这个表广播出去,这样集群上的每个节点上都会保存这个表,这样需要join操作的另一个表就可以在自己的节点上完成关联操作,可以尽可能地减少shuffle。

4.2 减少数据倾斜

在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。
  因此出现数据倾斜的时候,spark作业运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。数据倾斜的例子如下图所示,在三个节点上对应的以hello为key的总共有6条数据,这些数据都会被拉取到同一个task中进行处理,而以world为key的只有3条。实际情况中task可能个别key的数据量可能更大,比key少的可能多n多倍。因此key多的task的运行速度可能会比key少的task执行速度要慢n倍,而整个程序的执行速度是由最慢的task决定的。同时,如果某一个task处理的数据量过多的话,还会出现内存溢出的危险。

《spark程序优化总结》 图3 数据倾斜案例

  实际写spark程序时,要尽量避免出现数据倾斜的情况,如果出现上述现象时,原则是采用两阶段聚合的方式。首先针对某些key较多的数据,进行拆分,将一个key拆分成多个,比如上图3,对hello进行拆分,将hello前面加上1-10之间的任意随机数,变成1_hello,2_hello直到10_hello,对这些拆分后的key首先聚合,聚合后变成(1_hello,2),(2_hello,2)在再将这些随机数前缀去掉,再进行聚合,实现方式如下图4。

《spark程序优化总结》 图4 数据倾斜处理案列

5. 升级spark版本

还有一种较为有效的办法是升级spark版本,spark 2.1版本较spark 1.6版本性能有更多的提升。其中的优势在《spark sql执行流程》的最后有所介绍。


推荐阅读
  • 本文深入探讨了NoSQL数据库的四大主要类型:键值对存储、文档存储、列式存储和图数据库。NoSQL(Not Only SQL)是指一系列非关系型数据库系统,它们不依赖于固定模式的数据存储方式,能够灵活处理大规模、高并发的数据需求。键值对存储适用于简单的数据结构;文档存储支持复杂的数据对象;列式存储优化了大数据量的读写性能;而图数据库则擅长处理复杂的关系网络。每种类型的NoSQL数据库都有其独特的优势和应用场景,本文将详细分析它们的特点及应用实例。 ... [详细]
  • 网站访问全流程解析
    本文详细介绍了从用户在浏览器中输入一个域名(如www.yy.com)到页面完全展示的整个过程,包括DNS解析、TCP连接、请求响应等多个步骤。 ... [详细]
  • Spring框架的核心组件与架构解析 ... [详细]
  • Python 数据可视化实战指南
    本文详细介绍如何使用 Python 进行数据可视化,涵盖从环境搭建到具体实例的全过程。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 本文详细探讨了几种常用的Java后端开发框架组合及其具体应用场景。通过对比分析Spring Boot、MyBatis、Hibernate等框架的特点和优势,结合实际项目需求,为开发者提供了选择合适框架组合的参考依据。同时,文章还介绍了这些框架在微服务架构中的应用,帮助读者更好地理解和运用这些技术。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 在过去,我曾使用过自建MySQL服务器中的MyISAM和InnoDB存储引擎(也曾尝试过Memory引擎)。今年初,我开始转向阿里云的关系型数据库服务,并深入研究了其高效的压缩存储引擎TokuDB。TokuDB在数据压缩和处理大规模数据集方面表现出色,显著提升了存储效率和查询性能。通过实际应用,我发现TokuDB不仅能够有效减少存储成本,还能显著提高数据处理速度,特别适用于高并发和大数据量的场景。 ... [详细]
  • 本文详细介绍了如何使用OpenSSL自建CA证书的步骤,包括准备工作、生成CA证书、生成服务器待签证书以及证书签名等过程。 ... [详细]
  • 本文详细介绍了在 Ubuntu 系统上搭建 Hadoop 集群时遇到的 SSH 密钥认证问题及其解决方案。通过本文,读者可以了解如何在多台虚拟机之间实现无密码 SSH 登录,从而顺利启动 Hadoop 集群。 ... [详细]
  • 在 Ubuntu 中遇到 Samba 服务器故障时,尝试卸载并重新安装 Samba 发现配置文件未重新生成。本文介绍了解决该问题的方法。 ... [详细]
  • 解决Parallels Desktop错误15265的方法
    本文详细介绍了在使用Parallels Desktop时遇到错误15265的多种解决方案,包括检查网络连接、关闭代理服务器和修改主机文件等步骤。 ... [详细]
  • POJ 2482 星空中的星星:利用线段树与扫描线算法解决
    在《POJ 2482 星空中的星星》问题中,通过运用线段树和扫描线算法,可以高效地解决星星在窗口内的计数问题。该方法不仅能够快速处理大规模数据,还能确保时间复杂度的最优性,适用于各种复杂的星空模拟场景。 ... [详细]
  • ButterKnife 是一款用于 Android 开发的注解库,主要用于简化视图和事件绑定。本文详细介绍了 ButterKnife 的基础用法,包括如何通过注解实现字段和方法的绑定,以及在实际项目中的应用示例。此外,文章还提到了截至 2016 年 4 月 29 日,ButterKnife 的最新版本为 8.0.1,为开发者提供了最新的功能和性能优化。 ... [详细]
  • 本文深入解析了Java 8并发编程中的`AtomicInteger`类,详细探讨了其源码实现和应用场景。`AtomicInteger`通过硬件级别的原子操作,确保了整型变量在多线程环境下的安全性和高效性,避免了传统加锁方式带来的性能开销。文章不仅剖析了`AtomicInteger`的内部机制,还结合实际案例展示了其在并发编程中的优势和使用技巧。 ... [详细]
author-avatar
傲慢的小草7_170
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有