热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

sparkmongojava_java操作spark读写mongodb

首先要引入mongodb-spark-connector的maven依赖,具体的可见这个api网址:https:docs.mongodb.comspar

首先要引入mongodb-spark-connector的maven依赖,具体的可见这个api网址:https://docs.mongodb.com/spark-connector/current/java-api/,然后基本上就可以按照api上面的内容来进行spark操作了。这里面已经有spark读入mongodb数据转化为rdd的操作了。

有一些补充的或许有用(?)的代码,放在这里。

import com.mongodb.MongoClient;

import com.mongodb.MongoClientURI;

import com.mongodb.client.MongoDatabase;

import com.mongodb.spark.MongoConnector;

import com.mongodb.spark.MongoSpark;

import com.mongodb.spark.config.ReadConfig;

import com.mongodb.spark.config.WriteConfig;

import com.mongodb.spark.rdd.api.java.JavaMongoRDD;

import com.mongodb.spark.sql.helpers.StructFields;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.StructType;

import org.bson.Document;

import org.bson.types.ObjectId;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import static java.lang.String.format;

import static java.util.Arrays.asList;

import static java.util.Collections.singletonList;

public final class JavaIntroduction {

/**

* Run this main method to see the output of this quick example.

*

* @param args takes an optional single argument for the connection string

* @throws InterruptedException if a latch is interrupted

*/

public static void main(final String[] args) throws InterruptedException {

JavaSparkContext jsc = createJavaSparkContext(args);

// Create a RDD

JavaRDD documents = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map

(new Function() {

@Override

public Document call(final Integer i) throws Exception {

return Document.parse("{test: " + i + "}");

}

});

// Saving data from an RDD to MongoDB

MongoSpark.save(documents);

// Saving data with a custom WriteConfig

Map writeOverrides = new HashMap();

writeOverrides.put("collection", "spark");

writeOverrides.put("writeConcern.w", "majority");

WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides);

JavaRDD sparkDocuments = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map

(new Function() {

@Override

public Document call(final Integer i) throws Exception {

return Document.parse("{spark: " + i + "}");

}

});

// Saving data from an RDD to MongoDB

MongoSpark.save(sparkDocuments, writeConfig);

// Loading and analyzing data from MongoDB

JavaMongoRDD rdd = MongoSpark.load(jsc);

System.out.println(rdd.count());

System.out.println(rdd.first().toJson());

// Loading data with a custom ReadConfig

Map readOverrides = new HashMap();

readOverrides.put("collection", "spark");

readOverrides.put("readPreference.name", "secondaryPreferred");

ReadConfig readConfig = ReadConfig.create(jsc).withOptions(readOverrides);

JavaMongoRDD customRdd = MongoSpark.load(jsc, readConfig);

System.out.println(customRdd.count());

System.out.println(customRdd.first().toJson());

// Filtering an rdd using an aggregation pipeline before passing data to Spark

JavaMongoRDD aggregatedRdd = rdd.withPipeline(singletonList(Document.parse("{ $match: { test : { $gt : 5 } } }")));

System.out.println(aggregatedRdd.count());

System.out.println(aggregatedRdd.first().toJson());

// Datasets

// Drop database

dropDatabase(getMongoClientURI(args));

// Add Sample Data

List characters = asList(

"{'name': 'Bilbo Baggins', 'age': 50}",

"{'name': 'Gandalf', 'age': 1000}",

"{'name': 'Thorin', 'age': 195}",

"{'name': 'Balin', 'age': 178}",

"{'name': 'K铆li', 'age': 77}",

"{'name': 'Dwalin', 'age': 169}",

"{'name': '脫in', 'age': 167}",

"{'name': 'Gl贸in', 'age': 158}",

"{'name': 'F铆li', 'age': 82}",

"{'name': 'Bombur'}"

);

MongoSpark.save(jsc.parallelize(characters).map(new Function() {

@Override

public Document call(final String json) throws Exception {

return Document.parse(json);

}

}));

// Load inferring schema

Dataset df = MongoSpark.load(jsc).toDF();

df.printSchema();

df.show();

// Declare the Schema via a Java Bean

SparkSession sparkSession = SparkSession.builder().getOrCreate();

Dataset explicitDF = MongoSpark.load(jsc).toDF(Character.class);

explicitDF.printSchema();

// SQL

explicitDF.registerTempTable("characters");

Dataset centenarians = sparkSession.sql("SELECT name, age FROM characters WHERE age >= 100");

// Saving DataFrame

MongoSpark.write(centenarians).option("collection", "hundredClub").save();

MongoSpark.load(sparkSession, ReadConfig.create(sparkSession).withOption("collection", "hundredClub"), Character.class).show();

// Drop database

MongoConnector.apply(jsc.sc()).withDatabaseDo(ReadConfig.create(sparkSession), new Function() {

@Override

public Void call(final MongoDatabase db) throws Exception {

db.drop();

return null;

}

});

String objectId = "123400000000000000000000";

List docs = asList(

new Document("_id", new ObjectId(objectId)).append("a", 1),

new Document("_id", new ObjectId()).append("a", 2));

MongoSpark.save(jsc.parallelize(docs));

// Set the schema using the ObjectId helper

StructType schema = DataTypes.createStructType(asList(

StructFields.objectId("_id", false),

DataTypes.createStructField("a", DataTypes.IntegerType, false)));

// Create a dataframe with the helper functions registered

df = MongoSpark.read(sparkSession).schema(schema).option("registerSQLHelperFunctions", "true").load();

// Query using the ObjectId string

df.filter(format("_id = ObjectId('%s')", objectId)).show();

}

private static JavaSparkContext createJavaSparkContext(final String[] args) {

String uri = getMongoClientURI(args);

dropDatabase(uri);

SparkConf conf = new SparkConf()

.setMaster("local")

.setAppName("MongoSparkConnectorTour")

.set("spark.app.id", "MongoSparkConnectorTour")

.set("spark.mongodb.input.uri", uri)

.set("spark.mongodb.output.uri", uri);

return new JavaSparkContext(conf);

}

private static String getMongoClientURI(final String[] args) {

String uri;

if (args.length == 0) {

uri = "mongodb://localhost/test.coll"; // default

} else {

uri = args[0];

}

return uri;

}

private static void dropDatabase(final String connectionString) {

MongoClientURI uri = new MongoClientURI(connectionString);

new MongoClient(uri).dropDatabase(uri.getDatabase());

}

}



推荐阅读
  • 深入解析 Spring Security 用户认证机制
    本文将详细介绍 Spring Security 中用户登录认证的核心流程,重点分析 AbstractAuthenticationProcessingFilter 和 AuthenticationManager 的工作原理。通过理解这些组件的实现,读者可以更好地掌握 Spring Security 的认证机制。 ... [详细]
  • 基于KVM的SRIOV直通配置及性能测试
    SRIOV介绍、VF直通配置,以及包转发率性能测试小慢哥的原创文章,欢迎转载目录?1.SRIOV介绍?2.环境说明?3.开启SRIOV?4.生成VF?5.VF ... [详细]
  • 本文介绍如何在现有网络中部署基于Linux系统的透明防火墙(网桥模式),以实现灵活的时间段控制、流量限制等功能。通过详细的步骤和配置说明,确保内部网络的安全性和稳定性。 ... [详细]
  • Explore how Matterverse is redefining the metaverse experience, creating immersive and meaningful virtual environments that foster genuine connections and economic opportunities. ... [详细]
  • 本文详细介绍了如何在Linux系统上安装和配置Smokeping,以实现对网络链路质量的实时监控。通过详细的步骤和必要的依赖包安装,确保用户能够顺利完成部署并优化其网络性能监控。 ... [详细]
  • 1.如何在运行状态查看源代码?查看函数的源代码,我们通常会使用IDE来完成。比如在PyCharm中,你可以Ctrl+鼠标点击进入函数的源代码。那如果没有IDE呢?当我们想使用一个函 ... [详细]
  • CentOS7源码编译安装MySQL5.6
    2019独角兽企业重金招聘Python工程师标准一、先在cmake官网下个最新的cmake源码包cmake官网:https:www.cmake.org如此时最新 ... [详细]
  • 使用 Azure Service Principal 和 Microsoft Graph API 获取 AAD 用户列表
    本文介绍了一段通用代码示例,该代码不仅能够操作 Azure Active Directory (AAD),还可以通过 Azure Service Principal 的授权访问和管理 Azure 订阅资源。Azure 的架构可以分为两个层级:AAD 和 Subscription。 ... [详细]
  • 本文详细介绍了Akka中的BackoffSupervisor机制,探讨其在处理持久化失败和Actor重启时的应用。通过具体示例,展示了如何配置和使用BackoffSupervisor以实现更细粒度的异常处理。 ... [详细]
  • DNN Community 和 Professional 版本的主要差异
    本文详细解析了 DotNetNuke (DNN) 的两种主要版本:Community 和 Professional。通过对比两者的功能和附加组件,帮助用户选择最适合其需求的版本。 ... [详细]
  • 在 Swift 编程中,遇到错误提示“一元运算符 '!' 不能应用于 '()' 类型的操作数”,通常是因为尝试对没有返回值的方法或函数应用逻辑非运算符。本文将详细解释该错误的原因,并提供解决方案。 ... [详细]
  • 本文介绍如何使用阿里云的fastjson库解析包含时间戳、IP地址和参数等信息的JSON格式文本,并进行数据处理和保存。 ... [详细]
  • 基于Node.js、Express、MongoDB和Socket.io的实时聊天应用开发
    本文详细介绍了使用Node.js、Express、MongoDB和Socket.io构建的实时聊天应用程序。涵盖项目结构、技术栈选择及关键依赖项的配置。 ... [详细]
  • Mongoose 5.12.10 发布:MongoDB 异步对象模型工具的新特性与修复
    Mongoose 是一款专为异步环境设计的 MongoDB 对象模型工具,支持 Promise 和回调函数。最新版本 Mongoose 5.12.10 带来了多项修复和改进,包括查询选项中的默认值设置、嵌入式判别器填充、以及 TypeScript 定义文件的优化。 ... [详细]
  • 本文探讨了2019年前端技术的发展趋势,包括工具化、配置化和泛前端化等方面,并提供了详细的学习路线和职业规划建议。 ... [详细]
author-avatar
广东淡水未央
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有