热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

碰壁Spark+Mongodb

在尝试Spark+MongoDB过程中,总是遇到Cursorxxxxxnotfound错误,尝试加入keep_alive_ms和pipeline也不能解决问题。目前总数据量在100

在尝试Spark + MongoDB过程中,总是遇到Cursor xxxxx not found错误, 尝试加入keep_alive_ms 和 pipeline 也不能解决问题。

目前总数据量在10000条左右,从Mongodb中加载后交给Spark的NaiveBayes training.

pipeline = {{ $limit: 5000 },{ $skip: 2000 }}
has_train = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
.option("spark.mongodb.input.uri", "mongodb://mongo_and_spark_server:27017/resume_db.has_train") \
.option("spark.mongodb.keep_alive_ms", "3600000") \
.option("pipeline", pipeline) \
.load()

在1.6之前,我们需要手动部署并指明额外加载第三方jar文件路径,在实验2.1的时候,这些Package会自动下载

./spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 \
--py-files ./utility.py \
--files ./dicts/statistic_college.txt \
--files ./dicts/degrees.txt \
--files ./dicts/diming.txt \
--files ./dicts/subjects.txt \
--files ./dicts/training_org.txt \
naive_bayes.py

Output:

# ./submit.sh
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/home/pluto/spark/spark-2.1.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.mongodb.spark#mongo-spark-connector_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found org.mongodb.spark#mongo-spark-connector_2.11;2.0.0 in central
found org.mongodb#mongo-java-driver;3.2.2 in central
:: resolution report :: resolve 221ms :: artifacts dl 4ms
:: modules in use:
org.mongodb#mongo-java-driver;3.2.2 from central in [default]
org.mongodb.spark#mongo-spark-connector_2.11;2.0.0 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 2 | 0 | 0 | 0 || 2 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 2 already retrieved (0kB/7ms)
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

当数据量增大时,总是遇到如下错误,目前还没有排查出错误根源 :(

17/05/29 01:23:16 INFO MongoClientCache: Closing MongoClient: [mongo_and_spark_server:27017]
17/05/29 01:23:16 INFO connection: Closed connection [connectionId{localValue:2, serverValue:42}] to mongo_and_spark_server:27017 because the pool has been closed.
17/05/29 01:23:16 INFO MongoClientCache: Closing MongoClient: [mongo_and_spark_server:27017]
17/05/29 01:23:16 INFO connection: Closed connection [connectionId{localValue:4, serverValue:46}] to mongo_and_spark_server:27017 because the pool has been closed.
17/05/29 01:27:56 WARN TaskSetManager: Lost task 3.0 in stage 2.0 (TID 5, mongo_and_spark_server, executor 2): com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message 'Cursor 34611963569 not found on server mongo_and_spark_server:27017' on server mongo_and_spark_server:27017
at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:27)
at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:215)
at com.mongodb.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:103)
at com.mongodb.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:46)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
17/05/29 01:27:56 INFO TaskSetManager: Starting task 3.1 in stage 2.0 (TID 6, mongo_and_spark_server, executor 2, partition 3, ANY, 6787 bytes)
17/05/29 01:29:01 WARN TaskSetManager: Lost task 2.0 in stage 2.0 (TID 4, mongo_and_spark_server, executor 0): com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message 'Cursor 34615739977 not found on server mongo_and_spark_server:27017' on server mongo_and_spark_server:27017
at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:27)
at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:215)
at com.mongodb.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:103)
at com.mongodb.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:46)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
17/05/29 01:29:01 INFO TaskSetManager: Starting task 2.1 in stage 2.0 (TID 7, mongo_and_spark_server, executor 2, partition 2, ANY, 6799 bytes)

参考:

https://docs.mongodb.com/spark-connector/v2.0/configuration/#spark-input-conf
https://docs.mongodb.com/manual/core/aggregation-pipeline-optimization/
http://www.mongoing.com/tj/mongodb_shanghai_spark


推荐阅读
  • 深入解析:手把手教你构建决策树算法
    本文详细介绍了机器学习中广泛应用的决策树算法,通过天气数据集的实例演示了ID3和CART算法的手动推导过程。文章长度约2000字,建议阅读时间5分钟。 ... [详细]
  • Django Token 认证详解与 HTTP 401、403 状态码的区别
    本文详细介绍了如何在 Django 中配置和使用 Token 认证,并解释了 HTTP 401 和 HTTP 403 状态码的区别。通过具体的代码示例,帮助开发者理解认证机制及权限控制。 ... [详细]
  • 360SRC安全应急响应:从漏洞提交到修复的全过程
    本文详细介绍了360SRC平台处理一起关键安全事件的过程,涵盖从漏洞提交、验证、排查到最终修复的各个环节。通过这一案例,展示了360在安全应急响应方面的专业能力和严谨态度。 ... [详细]
  • 根据最新发布的《互联网人才趋势报告》,尽管大量IT从业者已转向Python开发,但随着人工智能和大数据领域的迅猛发展,仍存在巨大的人才缺口。本文将详细介绍如何使用Python编写一个简单的爬虫程序,并提供完整的代码示例。 ... [详细]
  • 本文介绍如何在现有网络中部署基于Linux系统的透明防火墙(网桥模式),以实现灵活的时间段控制、流量限制等功能。通过详细的步骤和配置说明,确保内部网络的安全性和稳定性。 ... [详细]
  • 华为USG基于源地址的多出口策略路由配置
    网络拓扑如下:组网情况:企业用户主要有技术部(VLAN10)和行政部(VLAN20),通过汇聚交换机连接到USG。企业分别通过两个不同运营商(ISP1和ISP2)连接到 ... [详细]
  • Python 工具推荐 | PyHubWeekly 第二十一期:提升命令行体验的五大工具
    本期 PyHubWeekly 为大家精选了 GitHub 上五个优秀的 Python 工具,涵盖金融数据可视化、终端美化、国际化支持、图像增强和远程 Shell 环境配置。欢迎关注并参与项目。 ... [详细]
  • Mongoose 5.12.10 发布:MongoDB 异步对象模型工具的新特性与修复
    Mongoose 是一款专为异步环境设计的 MongoDB 对象模型工具,支持 Promise 和回调函数。最新版本 Mongoose 5.12.10 带来了多项修复和改进,包括查询选项中的默认值设置、嵌入式判别器填充、以及 TypeScript 定义文件的优化。 ... [详细]
  • MySQL锁机制详解
    本文深入探讨了MySQL中的锁机制,包括表级锁、行级锁以及元数据锁,通过实例详细解释了各种锁的工作原理及其应用场景。同时,文章还介绍了如何通过锁来优化数据库性能,避免常见的并发问题。 ... [详细]
  • 本文深入探讨了Linux系统中网卡绑定(bonding)的七种工作模式。网卡绑定技术通过将多个物理网卡组合成一个逻辑网卡,实现网络冗余、带宽聚合和负载均衡,在生产环境中广泛应用。文章详细介绍了每种模式的特点、适用场景及配置方法。 ... [详细]
  • Babylon.js 实例展示
    探索 Babylon.js 的强大功能,通过全屏演示体验其卓越性能。本文提供在线文档链接和默认渲染管线的源码调试地址,帮助您深入了解 Babylon.js 的工作原理。 ... [详细]
  • 本文详细介绍了Python编程语言的学习路径,涵盖基础语法、常用组件、开发工具、数据库管理、Web服务开发、大数据分析、人工智能、爬虫开发及办公自动化等多个方向。通过系统化的学习计划,帮助初学者快速掌握Python的核心技能。 ... [详细]
  • 本文探讨了在 SQL Server 中使用 JDBC 插入数据时遇到的问题。通过详细分析代码和数据库配置,提供了解决方案并解释了潜在的原因。 ... [详细]
  • 优化Flask应用的并发处理:解决Mysql连接过多问题
    本文探讨了在Flask应用中通过优化后端架构来应对高并发请求,特别是针对Mysql 'too many connections' 错误的解决方案。我们将介绍如何利用Redis缓存、Gunicorn多进程和Celery异步任务队列来提升系统的性能和稳定性。 ... [详细]
  • 本文详细介绍如何使用 Apache Spark 执行基本任务,包括启动 Spark Shell、运行示例程序以及编写简单的 WordCount 程序。同时提供了参数配置的注意事项和优化建议。 ... [详细]
author-avatar
刘妤劭明馨
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有