Spark集群与python的结合
从上一篇文章我们知道,spark有几种资源管理方式,具体可参考:spark几种集群管理器总结
如果Spark Application运行在yarn集群上,在这种运行模式下,资源的管理与协调会统一由yarn处理,而这种模式就能够实现基于yarn集群运算的Application的多样性,可以支持运行MapReduc程序、HBase集群、Storm集群,还可以运行使用Python开发的机器学习应用程序,等等。
我们知道,Spark on YARN又分为client模式和cluster模式。对比这两种模式,最关键的是Spark Application运行时Driver所在的节点不同,而且,如果想要对Driver所在节点的运行环境进行配置,区别很大,但这对于PySpark Application运行来说是非常关键的。
PySpark是Spark为python提供的api,为使用Python程序编写Spark Application而实现的客户端库,通过PySpark可以编写Spark Application并在Spark集群上运行。Python具有非常丰富的科学计算、机器学习处理库,如numpy、pandas、scipy等等。为了能够充分利用这些高效的Python模块,很多机器学习程序都会使用Python实现,同时也希望能够在Spark集群上运行。
如果python环境不具备的话会报以下类似错误:
ImportError: No module named numpy
要实现Python程序运行在Spark集群之上,总结有有2种情况:
YARN集群配置Python环境
在开始安装YARN及Spark集群的时候,可以将对应Python软件包、依赖模块,在YARN集群中的每个节点上进行安装。这样,YARN集群的每个NodeManager上都具有Python环境,可以编写PySpark Application并在集群上运行。
优点
对于PySpark的使用,如果偏向机器学习,可以考虑在整个集群中都安装好Python环境,并根据不同的需要进行依赖模块的统一管理,能够极大地方便PySpark Application的运行。
缺点
如果后续使用Python编写Spark Application,需要增加新的依赖模块,那么就需要将该模块在YARN集群的每个节点上都进行安装。而且如果环境中有多个Python版本,还需要涉及不同版本Python环境的管理。因为提交PySpark Application运行是由YARN资源管理器决定的,所以必须保证每个NodeManager上都具有Python环境(基础环境+依赖模块)。
YARN集群未配置Python环境
如果已经安装了规模较大的YARN集群,且在开始开始的时候未曾考虑后续会使用基于Python来编写Spark Application,未曾在YARN集群的NodeManager上安装Python基础环境及其依赖模块。现在要想使用Python编写Spark Application。
实现思路如下所示:
- 在任意一台机器节点上,安装Anaconda软件
- 通过Anaconda创建虚拟Python环境
- 在创建好的Python环境中下载安装依赖的Python模块
- 将整个Python环境打成zip包
- 提交PySpark Application时,并通过–archives选项指定zip包路径
具体操作步骤见作者另一文章:linux下用Anaconda创建python虚拟环境并打包
可以在linux本地解压文件,也可以将压缩包上传至hdfs
hdfs dfs -put ***/***/python37.zip /***/***/***/env
使用spark-submit命令引用
client模式
假设上一步得到python37.zip文件,该文件包含numpy、pandas、scipy这三个依赖包。该zip文件大概有500MB以上。将该zip压缩包拷贝到指定目录中,方便后续提交PySpark Application。
1、在linux本地解压
--conf spark.yarn.dist.archives=/home/hadoop/python37 \
--conf spark.pyspark.driver.python=/home/hadoop/python37/bin/python \
--conf spark.pyspark.python=/home/hadoop/python37/bin/python \
2、将python37.zip文件上传到hdfs
--conf spark.yarn.dist.archives=hdfs://input/test/python37.zip
--conf spark.pyspark.driver.python=./python37/bin/python3 \
--conf spark.pyspark.python=./python37/bin/python3 \
注意:archives命令后的#是必须的,它指的是将这个zip包解压到的文件夹。
在提交PySpark Application时会将该环境zip包上传到运行Application的所在的每个节点上,并解压缩后为Python代码提供运行环境。如果不想每次都从客户端将该环境文件上传到集群中运行PySpark Application的节点上,可以采用将zip包上传到HDFS上的方式,并修改–archives参数的值为hdfs://***/***/python37.zip#python37。
另外,如果我们要运行的.py文件中,也依赖一些自己编写的函数,具有多个Python依赖的文件。想要通过上面的方式运行,则必须将这些依赖的Python文件拷贝到我们创建的环境中,对应的目录为python37/lib/python3.7/site-packages/
cluster模式
cluster模式下,driver端可以略去。
--conf spark.yarn.dist.archives=hdfs://input/test/python37.zip
--conf spark.pyspark.python=./python37/bin/python3 \
举例:
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 3 \
--queue prod \
--conf spark.yarn.dist.archives=hdfs://input/test/python37.zip
--conf spark.pyspark.driver.python=./python37/bin/python3 \
--conf spark.pyspark.python=./python37/bin/python3 \
--jars /home/hadoop/mysql-connector-java-8.0.21.jar,/home/hadoop/elasticsearch-spark-20_2.11-7.9.2.jar,/home/hadoop/ojdbc6.jar \
/home/hadoop/test.py
优点
不需要考虑YARN集群每个节点上是否都具有Python环境,任何版本Python编写的PySpark Application都可以使用集群资源运行。
缺点
- 不在YARN集群上安装Python环境的方案,会使提交的Python环境zip包在YARN集群中传输带来一定开销,而且每次提交一个PySpark Application都需要打包一个环境zip文件,如果有大量的Python实现的PySpark Application需要在Spark集群上运行,开销会越来越大。
- 如果PySpark应用程序修改,可能需要重新打包环境。增加额外的工作。
参考:
pyspark提交代码到yarn模式,报错ImportError: No module
PySpark任务在YARN集群上运行 关联python包numpy pandas scipy 等