热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

技术日志:深入探讨SparkStreaming与SparkSQL的融合应用

篇首语:本文由编程笔记#小编为大家整理,主要介绍了071 SparkStreaming与SparkSQL集成相关的知识,希望对你有一定的参考价值。 1.说明  虽然DStream可以转换成RDD,但是

篇首语:本文由编程笔记#小编为大家整理,主要介绍了071 SparkStreaming与SparkSQL集成相关的知识,希望对你有一定的参考价值。


1.说明

  虽然DStream可以转换成RDD,但是如果比较复杂,可以考虑使用SparkSQL。

 

2.集成方式

  Streaming和Core整合:
    transform或者foreachRDD方法
  Core和SQL整合:
    RDD <==> DataFrame 互换

 

3.程序


1 package com.sql.it
2 import org.apache.spark.sql.SQLContext
3 import org.apache.spark.storage.StorageLevel
4 import org.apache.spark.streaming.kafka.KafkaUtils
5 import org.apache.spark.streaming.{Seconds, StreamingContext}
6 import org.apache.spark.{SparkConf, SparkContext}
7 object StreamingSQL {
8 def main(args: Array[String]): Unit = {
9 val cOnf= new SparkConf()
10 .setAppName("StreamingWindowOfKafka22")
11 .setMaster("local[*]")
12 val sc = SparkContext.getOrCreate(conf)
13 val ssc = new StreamingContext(sc, Seconds(5))
14 // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir
15 // 路径对应的文件夹不能存在
16 ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/9421151351")
17
18 val kafkaParams = Map(
19 "group.id" -> "streaming-kafka-78912151",
20 "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",
21 "auto.offset.reset" -> "smallest"
22 )
23 val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1
24 val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
25 ssc, // 给定SparkStreaming上下文
26 kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接
27 topics, // 给定读取对应topic的名称以及读取数据的线程数量
28 StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别
29 ).map(_._2)
30
31 /**
32 * transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可
33 */
34 dstream.transform(rdd => {
35 // 使用sql统计wordcoount
36 val sqlCOntext= SQLContextSingelton.getSQLContext(rdd.sparkContext)
37 import sqlContext.implicits._
38 val procedRDD = rdd.filter(_.nonEmpty).flatMap(_.split(" ").map((_, 1)))
39 procedRDD.toDF("word", "c").registerTempTable("tb_word")
40 val resultRDD = sqlContext.sql("select word, count(c) as vc from tb_word group by word").map(row => {
41 val word = row.getAs[String]("word")
42 val count = row.getAs[Long]("vc")
43 (word, count)
44 })
45
46 resultRDD
47 }).print()
48
49 // 启动开始处理
50 ssc.start()
51 ssc.awaitTermination() // 等等结束,监控一个线程的中断操作
52 }
53 }
54
55 object SQLContextSingelton {
56 @transient private var instance: SQLCOntext= _
57
58 def getSQLContext(sc: SparkContext): SQLCOntext= {
59 if (instance == null) {
60 synchronized[SQLContext] {
61 if (instance == null) {
62 instance = new SQLContext(sc)
63 }
64 instance
65 }
66 }
67 instance
68 }
69 }

 

4.效果

  

 



推荐阅读
  • 在Linux系统中,原本已安装了多个版本的Python 2,并且还安装了Anaconda,其中包含了Python 3。本文详细介绍了如何通过配置环境变量,使系统默认使用指定版本的Python,以便在不同版本之间轻松切换。此外,文章还提供了具体的实践步骤和注意事项,帮助用户高效地管理和使用不同版本的Python环境。 ... [详细]
  • 在 Asp.net 应用中,动态加载 DropDownList 控件的数据源是一项常见需求。本文探讨了如何高效地从数据库中获取数据,并实时更新下拉列表,确保用户界面始终与后台数据保持同步。通过使用 ADO.NET 和 LINQ to SQL 技术,开发者可以轻松实现这一功能,同时提高应用的性能和用户体验。文中还提供了代码示例和最佳实践,帮助开发者解决常见的数据绑定问题。 ... [详细]
  • 分布式一致性算法:Paxos 的企业级实战
    一、简介首先我们这个平台是ES专题技术的分享平台,众所周知,ES是一个典型的分布式系统。在工作和学习中,我们可能都已经接触和学习过多种不同的分布式系统了,各 ... [详细]
  • MySQL:不仅仅是数据库那么简单
    MySQL不仅是一款高效、可靠的数据库管理系统,它还具备丰富的功能和扩展性,支持多种存储引擎,适用于各种应用场景。从简单的网站开发到复杂的企业级应用,MySQL都能提供强大的数据管理和优化能力,满足不同用户的需求。其开源特性也促进了社区的活跃发展,为技术进步提供了持续动力。 ... [详细]
  • MarsJava 最新发布了 2.1.7 版本,此次更新带来了多项性能优化和功能增强。新版本在系统稳定性、响应速度和资源管理方面进行了显著改进,进一步提升了用户体验。同时,新增了多个实用功能,满足开发者在复杂应用场景中的需求。 ... [详细]
  • 开发心得:利用 Redis 构建分布式系统的轻量级协调机制
    开发心得:利用 Redis 构建分布式系统的轻量级协调机制 ... [详细]
  • NoSQL数据库,即非关系型数据库,有时也被称作Not Only SQL,是一种区别于传统关系型数据库的管理系统。这类数据库设计用于处理大规模、高并发的数据存储与查询需求,特别适用于需要快速读写大量非结构化或半结构化数据的应用场景。NoSQL数据库通过牺牲部分一致性来换取更高的可扩展性和性能,支持分布式部署,能够有效应对互联网时代的海量数据挑战。 ... [详细]
  • 深入解析Java中的轮询与加权轮询负载均衡算法实现
    网上找了不少负载均衡算法的资源,都不够全面,后来自己结合了网上的一些算法实现,下面这篇文章主要给大家介绍了关于Java负载均衡算法实现之轮询和加权轮询的相关资料,文中通过示例代码介 ... [详细]
  • 背景 本文基于spark3.1.2之前在做bug调试的时候遇到了expand的问题,在此记录一下 分析 运行该sql: createtabletest_ ... [详细]
  • 掌握 MongoDB 基础操作与实用技巧 ... [详细]
  • php写excel文件_原生php实现excel文件读写的方法分析
    本文实例分析了原生php实现excel文件读写的方法。分享给大家供大家参考,具体如下:最近在工作中遇到一个需求,需要将数据库中的数据导出到 ... [详细]
  • 【JavaScript 教程】DOM——属性的操作
    作者|阮一峰HTML元素包括标签名和若干个键值对,这个键值对就称为“属性”(attribute)。上面代码中,a元素包括两个 ... [详细]
  • 一文了解消息中间件RabbitMQ
    消息中间件---RabbitMQ1消息中间件的作用2.常用的消息中间件3消息中间件RabbitMQ3.1RabbitMQ介绍3.3RabbitMQ的队列模式3.3RabbitMQ的 ... [详细]
  • PHP中如何使用hidef代替define优化效率?本文主要介绍了PHP中使用hidef扩展代替define提高性能,本文着重测试hidef的性能,同时提供了实例。希望对大家有所帮 ... [详细]
  • 【二十五】cookie与session学习总结
    一:cookie1.创建cookie关键字:setcookie用于保存cookie原理:当浏览器访问cookie.php页面时,我们的服务 ... [详细]
author-avatar
Liushan2502897753
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有