热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spark读取mongo数据(python)

使用mongo官方提供的sparkconnector可以很方便的让spark读写mongo中的数据。示例:frompyspark.sqlimportSparkSessionfrom

使用mongo官方提供的spark connector可以很方便的让spark读写mongo中的数据。

示例:

from pyspark.sql import SparkSession
from pyspark import SparkConf
if __name__=='__main__':
myconf = SparkConf()
myconf.setAppName("test").setMaster("yarn")
myconf.set('spark.executor.instances','4')
myconf.set('spark.executor.memory','4G')
myconf.set('spark.executor.cores','4')
myconf.set('spark.task.cpus','4')
# 指定连接器对应的spark-package
myconf.set("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.11:2.2.0")
# 指定mongo地址,需要每个工作节点都能访问到
myconf.set("spark.mongodb.input.uri","mongodb://192.168.1.15:27017/")
# 设置要读取的dbs名和collection名
myconf.set("spark.mongodb.input.database","db_name")
myconf.set("spark.mongodb.input.collection","collection_name")
# 指定分区方式
myconf.set("spark.mongodb.input.partitioner","MongoSplitVectorPartitioner")
spark = SparkSession.builder.config(conf=myconf).getOrCreate()
# 使用指定格式读取
mg_data = spark.read.format("com.mongodb.spark.sql").load()
mg_data.createOrReplaceTempView("tmp_table")
mydf = spark.sql("select _id, trackName from tmp_table limit 4")
print(mydf.rdd.collect())
spark.stop()

有几个问题需要注意,有一些我自己也没搞清楚。

1. spark的py脚本提交到yarn上,有这样几种方式:

    • 使用spark-submit提交
    • 使用python提交
    • 之前还可以使用pyspark提交,但是spark2.3已经不支持了

使用第一种方式提交,原则上相关参数的传入有三种方式:一种是在脚本中设置,就像上面这样;一种是提交的时候传入参数;还有一种是将参数设置写在文件中,通过文件传入。在Spark文档中有详细介绍。

mongo-spark连接器通过‘spark.jars.packages’这个参数设置,如果是提交时传入对应的参数是‘–packages’。spark的这些“工具包”(参考spark-packages.org),感觉上类似python中import导入的工具包。

这里的第一个问题是:如果使用spark-submit提交脚本,package的参数只能在提交时传入;像实例这样在脚本中设置会出一些问题:java.lang.ClassNotFoundException: Failed to find data source: com.mongodb.spark.sql. Please find packages at http://spark.apache.org/third-party-projects.html

2. 从spark文档来看,spark有意在弱化rdd,而强调DataFrame。因此spark程序的主要入口也从SparkContext转移到SparkSession。Dataframe这种格式支持sql,可以在map、reduce等计算之前对数据做一些预处理。

from pyspark import SparkConf
from pyspark.sql import SparkSession
if __name__=='__main__':
mycOnf= SparkConf().setMaster('yarn')
spark = SparkSession.builder.config(cOnf=myconf).getOrCreate()
# 读取各种格式的数据,并返回dataframe
mydata = spark.read.json('...') # json格式文件
mydata = spark.read.csv('...')
mydata = spark.read.text('...')
mydata = spark.read.format('..').load() # 自定义格式 读取mongo数据就是用的这种方式

这里的第二个问题是:这种方式读mongo中的表,好像是把整个表都读出来,因为读大表的时候明显感觉到比较慢。虽然读出来之后,可以使用sql语句做一些过滤操作。但是能不能读的时候就根据过滤条件只读一部分呢?

第二个问题的答案:可以在读mongo时使用filter或pipline,相关语句会传给mongo执行。使用sql的方式是将所有数据读入集群,然后并行的执行sql语句。两种方式适合不同的场景,可以参考这个链接

df = spark.read.format("com.mongodb.spark.sql") \
.option("uri", "mongodb://127.0.0.1:27017/dbname") \
.option("collection", "collection_name") \
.option("pipeline", "[{'$limit':100},{'$project':{'myfield':1}}]") \
.load()

3.使用Dataframe做sql操作有两种方式。一种是直接使用Dataframe这种数据类型的方法,另一种是使用spark.sql方法

#使用Dataframe方法
newdata = mydata.filter("col_name > 3").limit(1000)
newdata = newdata.select(col_name1,col_name2).orderBy(...).limit(10)
#使用spark.sql方法
mydata.createOrReplaceTempView('tmp_name')
newdata = spark.sql('select * from tmp_name where ...')

第三个问题:不清楚上面两种方法各有什么优缺点,或者两者等价?


推荐阅读
  • 十大经典排序算法动图演示+Python实现
    本文介绍了十大经典排序算法的原理、演示和Python实现。排序算法分为内部排序和外部排序,常见的内部排序算法有插入排序、希尔排序、选择排序、冒泡排序、归并排序、快速排序、堆排序、基数排序等。文章还解释了时间复杂度和稳定性的概念,并提供了相关的名词解释。 ... [详细]
  • 学习SLAM的女生,很酷
    本文介绍了学习SLAM的女生的故事,她们选择SLAM作为研究方向,面临各种学习挑战,但坚持不懈,最终获得成功。文章鼓励未来想走科研道路的女生勇敢追求自己的梦想,同时提到了一位正在英国攻读硕士学位的女生与SLAM结缘的经历。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • 本文介绍了在处理不规则数据时如何使用Python自动提取文本中的时间日期,包括使用dateutil.parser模块统一日期字符串格式和使用datefinder模块提取日期。同时,还介绍了一段使用正则表达式的代码,可以支持中文日期和一些特殊的时间识别,例如'2012年12月12日'、'3小时前'、'在2012/12/13哈哈'等。 ... [详细]
  • Allegro总结:1.防焊层(SolderMask):又称绿油层,PCB非布线层,用于制成丝网印板,将不需要焊接的地方涂上防焊剂.在防焊层上预留的焊盘大小要比实际的焊盘大一些,其差值一般 ... [详细]
  • step1.为mongodb添加admin管理员root@12.154.29.163:~#mongoMongoDBshellversionv3.4.2connectingto:mo ... [详细]
  • 开发笔记:Spark Java API 之 CountVectorizer
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了SparkJavaAPI之CountVectorizer相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 今天我们学习,数据库mongodb的使用,最下面有mongodb的下载链接。pipinstallpymongo首先安装pymongo,然后在需要用到的地方importpymongo ... [详细]
  • CSS3选择器的使用方法详解,提高Web开发效率和精准度
    本文详细介绍了CSS3新增的选择器方法,包括属性选择器的使用。通过CSS3选择器,可以提高Web开发的效率和精准度,使得查找元素更加方便和快捷。同时,本文还对属性选择器的各种用法进行了详细解释,并给出了相应的代码示例。通过学习本文,读者可以更好地掌握CSS3选择器的使用方法,提升自己的Web开发能力。 ... [详细]
  • “你永远都不知道明天和‘公司的意外’哪个先来。”疫情期间,这是我们最战战兢兢的心情。但是显然,有些人体会不了。这份行业数据,让笔者“柠檬” ... [详细]
  • 本文介绍了在Win10上安装WinPythonHadoop的详细步骤,包括安装Python环境、安装JDK8、安装pyspark、安装Hadoop和Spark、设置环境变量、下载winutils.exe等。同时提醒注意Hadoop版本与pyspark版本的一致性,并建议重启电脑以确保安装成功。 ... [详细]
  • 不同优化算法的比较分析及实验验证
    本文介绍了神经网络优化中常用的优化方法,包括学习率调整和梯度估计修正,并通过实验验证了不同优化算法的效果。实验结果表明,Adam算法在综合考虑学习率调整和梯度估计修正方面表现较好。该研究对于优化神经网络的训练过程具有指导意义。 ... [详细]
  • python中安装并使用redis相关的知识
    本文介绍了在python中安装并使用redis的相关知识,包括redis的数据缓存系统和支持的数据类型,以及在pycharm中安装redis模块和常用的字符串操作。 ... [详细]
  • 一面自我介绍对象相等的判断,equals方法实现。可以简单描述挫折,并说明自己如何克服,最终有哪些收获。职业规划表明自己决心,首先自己不准备继续求学了,必须招工作了。希望去哪 ... [详细]
  • 一、前言在数据库中,慢查询日志通常是用来进行优化数据库,MySQL中存在慢查询,Mongodb中也是如此。在Mongo中的慢查询属于Mon ... [详细]
author-avatar
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有