热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spark读取mongo数据(python)

使用mongo官方提供的sparkconnector可以很方便的让spark读写mongo中的数据。示例:frompyspark.sqlimportSparkSessionfrom

使用mongo官方提供的spark connector可以很方便的让spark读写mongo中的数据。

示例:

from pyspark.sql import SparkSession
from pyspark import SparkConf
if __name__=='__main__':
myconf = SparkConf()
myconf.setAppName("test").setMaster("yarn")
myconf.set('spark.executor.instances','4')
myconf.set('spark.executor.memory','4G')
myconf.set('spark.executor.cores','4')
myconf.set('spark.task.cpus','4')
# 指定连接器对应的spark-package
myconf.set("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.11:2.2.0")
# 指定mongo地址,需要每个工作节点都能访问到
myconf.set("spark.mongodb.input.uri","mongodb://192.168.1.15:27017/")
# 设置要读取的dbs名和collection名
myconf.set("spark.mongodb.input.database","db_name")
myconf.set("spark.mongodb.input.collection","collection_name")
# 指定分区方式
myconf.set("spark.mongodb.input.partitioner","MongoSplitVectorPartitioner")
spark = SparkSession.builder.config(conf=myconf).getOrCreate()
# 使用指定格式读取
mg_data = spark.read.format("com.mongodb.spark.sql").load()
mg_data.createOrReplaceTempView("tmp_table")
mydf = spark.sql("select _id, trackName from tmp_table limit 4")
print(mydf.rdd.collect())
spark.stop()

有几个问题需要注意,有一些我自己也没搞清楚。

1. spark的py脚本提交到yarn上,有这样几种方式:

    • 使用spark-submit提交
    • 使用python提交
    • 之前还可以使用pyspark提交,但是spark2.3已经不支持了

使用第一种方式提交,原则上相关参数的传入有三种方式:一种是在脚本中设置,就像上面这样;一种是提交的时候传入参数;还有一种是将参数设置写在文件中,通过文件传入。在Spark文档中有详细介绍。

mongo-spark连接器通过‘spark.jars.packages’这个参数设置,如果是提交时传入对应的参数是‘–packages’。spark的这些“工具包”(参考spark-packages.org),感觉上类似python中import导入的工具包。

这里的第一个问题是:如果使用spark-submit提交脚本,package的参数只能在提交时传入;像实例这样在脚本中设置会出一些问题:java.lang.ClassNotFoundException: Failed to find data source: com.mongodb.spark.sql. Please find packages at http://spark.apache.org/third-party-projects.html

2. 从spark文档来看,spark有意在弱化rdd,而强调DataFrame。因此spark程序的主要入口也从SparkContext转移到SparkSession。Dataframe这种格式支持sql,可以在map、reduce等计算之前对数据做一些预处理。

from pyspark import SparkConf
from pyspark.sql import SparkSession
if __name__=='__main__':
mycOnf= SparkConf().setMaster('yarn')
spark = SparkSession.builder.config(cOnf=myconf).getOrCreate()
# 读取各种格式的数据,并返回dataframe
mydata = spark.read.json('...') # json格式文件
mydata = spark.read.csv('...')
mydata = spark.read.text('...')
mydata = spark.read.format('..').load() # 自定义格式 读取mongo数据就是用的这种方式

这里的第二个问题是:这种方式读mongo中的表,好像是把整个表都读出来,因为读大表的时候明显感觉到比较慢。虽然读出来之后,可以使用sql语句做一些过滤操作。但是能不能读的时候就根据过滤条件只读一部分呢?

第二个问题的答案:可以在读mongo时使用filter或pipline,相关语句会传给mongo执行。使用sql的方式是将所有数据读入集群,然后并行的执行sql语句。两种方式适合不同的场景,可以参考这个链接

df = spark.read.format("com.mongodb.spark.sql") \
.option("uri", "mongodb://127.0.0.1:27017/dbname") \
.option("collection", "collection_name") \
.option("pipeline", "[{'$limit':100},{'$project':{'myfield':1}}]") \
.load()

3.使用Dataframe做sql操作有两种方式。一种是直接使用Dataframe这种数据类型的方法,另一种是使用spark.sql方法

#使用Dataframe方法
newdata = mydata.filter("col_name > 3").limit(1000)
newdata = newdata.select(col_name1,col_name2).orderBy(...).limit(10)
#使用spark.sql方法
mydata.createOrReplaceTempView('tmp_name')
newdata = spark.sql('select * from tmp_name where ...')

第三个问题:不清楚上面两种方法各有什么优缺点,或者两者等价?


推荐阅读
author-avatar
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有