热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SpringBoot集成Spark

为什么80%的码农都做不了架构师?Spark单机运行时,都是跑Main方法,那如何集成到SpringBoot实现http调用呢&#x

为什么80%的码农都做不了架构师?>>>   hot3.png

Spark单机运行时,都是跑Main方法,那如何集成到Spring Boot实现http调用呢?

接下实现一个从一个文本里排序出频次最高的前10名

项目环境:

JDK:1.8;

Spark:2.2.0

项目搭建:

pom.xml 依赖:

org.springframework.bootspring-boot-starter-parent1.5.3.RELEASE UTF-8UTF-81.82.112.2.0org.springframework.bootspring-boot-starter-aoporg.springframework.bootspring-boot-starter-weborg.apache.sparkspark-core_${scala.version}${spark.version}org.slf4jslf4j-log4j12log4jlog4jprovidedorg.apache.sparkspark-streaming_${scala.version}${spark.version}providedorg.apache.sparkspark-sql_${scala.version}${spark.version}org.springframework.bootspring-boot-starter-testtestorg.springframework.bootspring-boot-configuration-processortrue

配置类:

@Configuration
@ConfigurationProperties(prefix = "spark")
public class SparkContextBean {private String sparkHome = ".";private String appName = "sparkTest";private String master = "local";@Bean@ConditionalOnMissingBean(SparkConf.class)public SparkConf sparkConf() throws Exception {SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);return conf;}@Bean@ConditionalOnMissingBean(JavaSparkContext.class)public JavaSparkContext javaSparkContext() throws Exception {return new JavaSparkContext(sparkConf());}public String getSparkHome() {return sparkHome;}public void setSparkHome(String sparkHome) {this.sparkHome = sparkHome;}public String getAppName() {return appName;}public void setAppName(String appName) {this.appName = appName;}public String getMaster() {return master;}public void setMaster(String master) {this.master = master;}
}

实现类:

@Service
public class SparkTestService {private static final Logger logger &#61; LoggerFactory.getLogger(SparkTestService.class);private static final Pattern SPACE &#61; Pattern.compile(" ");&#64;Autowiredprivate JavaSparkContext sc;public Map calculateTopTen() {Map result &#61; new HashMap();JavaRDD lines &#61; sc.textFile("src/test/java/test.txt").cache();System.out.println();System.out.println("-------------------------------------------------------");System.out.println(lines.count());JavaRDD words &#61; lines.flatMap(str -> Arrays.asList(SPACE.split(str)).iterator());JavaPairRDD ones &#61; words.mapToPair(str -> new Tuple2(str, 1));JavaPairRDD counts &#61; ones.reduceByKey((Integer i1, Integer i2) -> (i1 &#43; i2));JavaPairRDD temp &#61; counts.mapToPair(tuple -> new Tuple2(tuple._2, tuple._1));JavaPairRDD sorted &#61; temp.sortByKey(false).mapToPair(tuple -> new Tuple2(tuple._2, tuple._1));System.out.println();System.out.println("-------------------------------------------------------");System.out.println(sorted.count());//List> output &#61; sorted.collect();//List> output &#61; sorted.take(10);List> output &#61; sorted.top(10);for (Tuple2 tuple : output) {result.put(tuple._1(), tuple._2());}return result;}/*** 练习demo&#xff0c;熟悉其中API*/public void sparkExerciseDemo() {List data &#61; Lists.newArrayList(1,2,3,4,5,6);JavaRDD rdd01 &#61; sc.parallelize(data);rdd01 &#61; rdd01.map(num ->{return num * num; });//data map :1,4,9,16,25,36logger.info("data map :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString());rdd01 &#61; rdd01.filter(x -> x <6);//data filter :1,4logger.info("data filter :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString());rdd01 &#61; rdd01.flatMap( x ->{Integer[] test &#61; {x,x&#43;1,x&#43;2};return Arrays.asList(test).iterator();});//flatMap :1,2,3,4,5,6logger.info("flatMap :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString());JavaRDD unionRdd &#61; sc.parallelize(data);rdd01 &#61; rdd01.union(unionRdd);//union :1,2,3,4,5,6,1,2,3,4,5,6logger.info("union :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString());List result &#61; Lists.newArrayList();result.add(rdd01.reduce((Integer v1,Integer v2) -> {return v1&#43;v2;}));//reduce :42logger.info("reduce :{}",Joiner.on(",").skipNulls().join(result).toString());result.forEach(System.out::print);JavaPairRDD> groupRdd &#61; rdd01.groupBy(x -> {logger.info("&#61;&#61;&#61;&#61;&#61;&#61;grouby&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#xff1a;{}",x);if (x > 10) return 0;else return 1;});List>> resul &#61; groupRdd.collect();//group by key:1 value:1,2,3,4,5,6,1,2,3,4,5,6resul.forEach(x -> {logger.info("group by key:{} value:{}",x._1,Joiner.on(",").skipNulls().join(x._2).toString());});}/*** spark streaming 练习*/public void sparkStreaming() throws InterruptedException {JavaStreamingContext jsc &#61; new JavaStreamingContext(sc,Durations.seconds(10));//批间隔时间JavaReceiverInputDStream lines &#61; jsc.receiverStream(new CustomReceiver(StorageLevel.MEMORY_AND_DISK_2()));JavaDStream count &#61; lines.count();count &#61; count.map(x -> {logger.info("这次批一共多少条数据&#xff1a;{}",x);return x; });count.print();jsc.start();jsc.awaitTermination();jsc.stop();}}/*** 自定义接收streaming类*/
public class CustomReceiver extends Receiver{private static Logger logger &#61; LoggerFactory.getLogger(CustomReceiver.class);/*** * &#64;author hz15041240 * &#64;date 2018年1月18日 下午4:37:22* &#64;version */ private static final long serialVersionUID &#61; 5817531198342629801L;public CustomReceiver(StorageLevel storageLevel) {super(storageLevel);}&#64;Overridepublic void onStart() {new Thread(this::doStart).start();logger.info("开始启动Receiver...");//doStart();}public void doStart() {while(!isStopped()) {int value &#61; RandomUtils.nextInt(100);if(value <20) {try {Thread.sleep(1000);}catch (Exception e) {logger.error("sleep exception",e);restart("sleep exception", e);}}store(String.valueOf(value));}}&#64;Overridepublic void onStop() {logger.info("即将停止Receiver...");}}&#64;RestController
public class DemoController {&#64;Autowiredprivate SparkTestService sparkTestService;&#64;RequestMapping("/demo/top10")public Map calculateTopTen() {return sparkTestService.calculateTopTen();}&#64;RequestMapping("/demo/exercise")public void exercise() {sparkTestService.sparkExerciseDemo();}&#64;RequestMapping("/demo/stream")public void streamingDemo() throws InterruptedException {sparkTestService.sparkStreaming();}
}

application.yml&#xff1a;

server:port: 8054spark: spark-home: .app-name: sparkTestmaster: local[4]

在项目的 src/test/java 目录下新建一个test.txt文件&#xff0c;立面随便一堆随机的字符就可以了。

启动项目&#xff0c;访问&#xff1a;http://localhost:8054/demo/top10 就能得到前10频率词汇了。


转:https://my.oschina.net/woter/blog/1843755



推荐阅读
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
  • Nginx使用(server参数配置)
    本文介绍了Nginx的使用,重点讲解了server参数配置,包括端口号、主机名、根目录等内容。同时,还介绍了Nginx的反向代理功能。 ... [详细]
  • Android Studio Bumblebee | 2021.1.1(大黄蜂版本使用介绍)
    本文介绍了Android Studio Bumblebee | 2021.1.1(大黄蜂版本)的使用方法和相关知识,包括Gradle的介绍、设备管理器的配置、无线调试、新版本问题等内容。同时还提供了更新版本的下载地址和启动页面截图。 ... [详细]
  • 本文讨论了在Spring 3.1中,数据源未能自动连接到@Configuration类的错误原因,并提供了解决方法。作者发现了错误的原因,并在代码中手动定义了PersistenceAnnotationBeanPostProcessor。作者删除了该定义后,问题得到解决。此外,作者还指出了默认的PersistenceAnnotationBeanPostProcessor的注册方式,并提供了自定义该bean定义的方法。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • FeatureRequestIsyourfeaturerequestrelatedtoaproblem?Please ... [详细]
  • 基于PgpoolII的PostgreSQL集群安装与配置教程
    本文介绍了基于PgpoolII的PostgreSQL集群的安装与配置教程。Pgpool-II是一个位于PostgreSQL服务器和PostgreSQL数据库客户端之间的中间件,提供了连接池、复制、负载均衡、缓存、看门狗、限制链接等功能,可以用于搭建高可用的PostgreSQL集群。文章详细介绍了通过yum安装Pgpool-II的步骤,并提供了相关的官方参考地址。 ... [详细]
  • 本文介绍了在Pygame中使用矩形对表面进行涂色的方法。通过查阅Pygame文档中的blit函数,可以了解到如何将一个表面的特定部分复制到另一个表面的指定位置上。具体的解决方法和参数说明在文中都有详细说明。 ... [详细]
  • 本文讨论了使用差分约束系统求解House Man跳跃问题的思路与方法。给定一组不同高度,要求从最低点跳跃到最高点,每次跳跃的距离不超过D,并且不能改变给定的顺序。通过建立差分约束系统,将问题转化为图的建立和查询距离的问题。文章详细介绍了建立约束条件的方法,并使用SPFA算法判环并输出结果。同时还讨论了建边方向和跳跃顺序的关系。 ... [详细]
  • baresip android编译、运行教程1语音通话
    本文介绍了如何在安卓平台上编译和运行baresip android,包括下载相关的sdk和ndk,修改ndk路径和输出目录,以及创建一个c++的安卓工程并将目录考到cpp下。详细步骤可参考给出的链接和文档。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • 本文介绍了一种划分和计数油田地块的方法。根据给定的条件,通过遍历和DFS算法,将符合条件的地块标记为不符合条件的地块,并进行计数。同时,还介绍了如何判断点是否在给定范围内的方法。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文介绍了P1651题目的描述和要求,以及计算能搭建的塔的最大高度的方法。通过动态规划和状压技术,将问题转化为求解差值的问题,并定义了相应的状态。最终得出了计算最大高度的解法。 ... [详细]
author-avatar
美女牙医--妙菡众
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有