热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

技术日志:深入探讨SparkStreaming与SparkSQL的融合应用

篇首语:本文由编程笔记#小编为大家整理,主要介绍了071 SparkStreaming与SparkSQL集成相关的知识,希望对你有一定的参考价值。 1.说明  虽然DStream可以转换成RDD,但是

篇首语:本文由编程笔记#小编为大家整理,主要介绍了071 SparkStreaming与SparkSQL集成相关的知识,希望对你有一定的参考价值。


1.说明

  虽然DStream可以转换成RDD,但是如果比较复杂,可以考虑使用SparkSQL。

 

2.集成方式

  Streaming和Core整合:
    transform或者foreachRDD方法
  Core和SQL整合:
    RDD <==> DataFrame 互换

 

3.程序


1 package com.sql.it
2 import org.apache.spark.sql.SQLContext
3 import org.apache.spark.storage.StorageLevel
4 import org.apache.spark.streaming.kafka.KafkaUtils
5 import org.apache.spark.streaming.{Seconds, StreamingContext}
6 import org.apache.spark.{SparkConf, SparkContext}
7 object StreamingSQL {
8 def main(args: Array[String]): Unit = {
9 val cOnf= new SparkConf()
10 .setAppName("StreamingWindowOfKafka22")
11 .setMaster("local[*]")
12 val sc = SparkContext.getOrCreate(conf)
13 val ssc = new StreamingContext(sc, Seconds(5))
14 // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir
15 // 路径对应的文件夹不能存在
16 ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/9421151351")
17
18 val kafkaParams = Map(
19 "group.id" -> "streaming-kafka-78912151",
20 "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",
21 "auto.offset.reset" -> "smallest"
22 )
23 val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1
24 val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
25 ssc, // 给定SparkStreaming上下文
26 kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接
27 topics, // 给定读取对应topic的名称以及读取数据的线程数量
28 StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别
29 ).map(_._2)
30
31 /**
32 * transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可
33 */
34 dstream.transform(rdd => {
35 // 使用sql统计wordcoount
36 val sqlCOntext= SQLContextSingelton.getSQLContext(rdd.sparkContext)
37 import sqlContext.implicits._
38 val procedRDD = rdd.filter(_.nonEmpty).flatMap(_.split(" ").map((_, 1)))
39 procedRDD.toDF("word", "c").registerTempTable("tb_word")
40 val resultRDD = sqlContext.sql("select word, count(c) as vc from tb_word group by word").map(row => {
41 val word = row.getAs[String]("word")
42 val count = row.getAs[Long]("vc")
43 (word, count)
44 })
45
46 resultRDD
47 }).print()
48
49 // 启动开始处理
50 ssc.start()
51 ssc.awaitTermination() // 等等结束,监控一个线程的中断操作
52 }
53 }
54
55 object SQLContextSingelton {
56 @transient private var instance: SQLCOntext= _
57
58 def getSQLContext(sc: SparkContext): SQLCOntext= {
59 if (instance == null) {
60 synchronized[SQLContext] {
61 if (instance == null) {
62 instance = new SQLContext(sc)
63 }
64 instance
65 }
66 }
67 instance
68 }
69 }

 

4.效果

  

 



推荐阅读
  • 本文详细介绍如何使用 Apache Spark 执行基本任务,包括启动 Spark Shell、运行示例程序以及编写简单的 WordCount 程序。同时提供了参数配置的注意事项和优化建议。 ... [详细]
  • 本文探讨了如何使用pg-promise库在PostgreSQL中高效地批量插入多条记录,包括通过事务和单一查询两种方法。 ... [详细]
  • 本文详细探讨了如何在 SparkSQL 中创建 DataFrame,涵盖了从基本概念到具体实践的各种方法。作为持续学习的一部分,本文将持续更新以提供最新信息。 ... [详细]
  • 优化Flask应用的并发处理:解决Mysql连接过多问题
    本文探讨了在Flask应用中通过优化后端架构来应对高并发请求,特别是针对Mysql 'too many connections' 错误的解决方案。我们将介绍如何利用Redis缓存、Gunicorn多进程和Celery异步任务队列来提升系统的性能和稳定性。 ... [详细]
  • 优化SQL Server批量数据插入存储过程的实现
    本文介绍了一种改进的SQL Server存储过程,用于生成批量插入语句。该方法不仅提高了性能,还支持单行和多行模式,适用于SQL Server 2005及以上版本。 ... [详细]
  • 本文深入探讨了MySQL中常见的面试问题,包括事务隔离级别、存储引擎选择、索引结构及优化等关键知识点。通过详细解析,帮助读者在面对BAT等大厂面试时更加从容。 ... [详细]
  • 配置PHPStudy环境并使用DVWA进行Web安全测试
    本文详细介绍了如何在PHPStudy环境下配置DVWA( Damn Vulnerable Web Application ),并利用该平台进行SQL注入和XSS攻击的练习。通过此过程,读者可以熟悉常见的Web漏洞及其利用方法。 ... [详细]
  • 软件工程课堂测试2
    要做一个简单的保存网页界面,首先用jsp写出保存界面,本次界面比较简单,首先是三个提示语,后面是三个输入框,然 ... [详细]
  • 深入解析Hadoop的核心组件与工作原理
    本文详细介绍了Hadoop的三大核心组件:分布式文件系统HDFS、资源管理器YARN和分布式计算框架MapReduce。通过分析这些组件的工作机制,帮助读者更好地理解Hadoop的架构及其在大数据处理中的应用。 ... [详细]
  • 本文探讨了2019年前端技术的发展趋势,包括工具化、配置化和泛前端化等方面,并提供了详细的学习路线和职业规划建议。 ... [详细]
  • Java实现文本到图片转换,支持自动换行、字体自定义及图像优化
    本文详细介绍了如何使用Java实现将文本转换为图片的功能,包括自动换行、自定义字体加载、抗锯齿优化以及图片压缩等技术细节。 ... [详细]
  • 本文详细介绍了如何使用Python中的xlwt库将数据库中的数据导出至Excel文件,适合初学者和中级开发者参考。 ... [详细]
  • 本文将详细探讨 Linux 系统中的 netstat 命令,该命令用于查看网络状态和连接情况。通过了解 IP 地址和端口的基本概念,我们将更好地理解如何利用 netstat 命令来监控和管理网络服务。 ... [详细]
  • Java Servlet中获取客户端IP与MAC地址的方法
    本文介绍了一种在Java Servlet应用中获取客户端IP地址及MAC地址的技术实现方法,通过示例代码详细解析了获取过程中的关键步骤和技术点。 ... [详细]
  • Android中解析XML文件的实践指南
    本文详细介绍了在Android应用开发中解析XML文件的方法,包括从本地文件和网络资源获取XML文件的不同途径,以及使用DOM、SAX和PULL三种解析方式的具体实现。 ... [详细]
author-avatar
Liushan2502897753
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有