热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

技术日志:深入探讨SparkStreaming与SparkSQL的融合应用

篇首语:本文由编程笔记#小编为大家整理,主要介绍了071 SparkStreaming与SparkSQL集成相关的知识,希望对你有一定的参考价值。 1.说明  虽然DStream可以转换成RDD,但是

篇首语:本文由编程笔记#小编为大家整理,主要介绍了071 SparkStreaming与SparkSQL集成相关的知识,希望对你有一定的参考价值。


1.说明

  虽然DStream可以转换成RDD,但是如果比较复杂,可以考虑使用SparkSQL。

 

2.集成方式

  Streaming和Core整合:
    transform或者foreachRDD方法
  Core和SQL整合:
    RDD <==> DataFrame 互换

 

3.程序


1 package com.sql.it
2 import org.apache.spark.sql.SQLContext
3 import org.apache.spark.storage.StorageLevel
4 import org.apache.spark.streaming.kafka.KafkaUtils
5 import org.apache.spark.streaming.{Seconds, StreamingContext}
6 import org.apache.spark.{SparkConf, SparkContext}
7 object StreamingSQL {
8 def main(args: Array[String]): Unit = {
9 val cOnf= new SparkConf()
10 .setAppName("StreamingWindowOfKafka22")
11 .setMaster("local[*]")
12 val sc = SparkContext.getOrCreate(conf)
13 val ssc = new StreamingContext(sc, Seconds(5))
14 // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir
15 // 路径对应的文件夹不能存在
16 ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/9421151351")
17
18 val kafkaParams = Map(
19 "group.id" -> "streaming-kafka-78912151",
20 "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",
21 "auto.offset.reset" -> "smallest"
22 )
23 val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1
24 val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
25 ssc, // 给定SparkStreaming上下文
26 kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接
27 topics, // 给定读取对应topic的名称以及读取数据的线程数量
28 StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别
29 ).map(_._2)
30
31 /**
32 * transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可
33 */
34 dstream.transform(rdd => {
35 // 使用sql统计wordcoount
36 val sqlCOntext= SQLContextSingelton.getSQLContext(rdd.sparkContext)
37 import sqlContext.implicits._
38 val procedRDD = rdd.filter(_.nonEmpty).flatMap(_.split(" ").map((_, 1)))
39 procedRDD.toDF("word", "c").registerTempTable("tb_word")
40 val resultRDD = sqlContext.sql("select word, count(c) as vc from tb_word group by word").map(row => {
41 val word = row.getAs[String]("word")
42 val count = row.getAs[Long]("vc")
43 (word, count)
44 })
45
46 resultRDD
47 }).print()
48
49 // 启动开始处理
50 ssc.start()
51 ssc.awaitTermination() // 等等结束,监控一个线程的中断操作
52 }
53 }
54
55 object SQLContextSingelton {
56 @transient private var instance: SQLCOntext= _
57
58 def getSQLContext(sc: SparkContext): SQLCOntext= {
59 if (instance == null) {
60 synchronized[SQLContext] {
61 if (instance == null) {
62 instance = new SQLContext(sc)
63 }
64 instance
65 }
66 }
67 instance
68 }
69 }

 

4.效果

  

 



推荐阅读
  • PHP面试题精选及答案解析
    本文精选了新浪PHP笔试题及最新的PHP面试题,并提供了详细的答案解析,帮助求职者更好地准备PHP相关的面试。 ... [详细]
  • 本文探讨了如何通过Service Locator模式来简化和优化在B/S架构中的服务命名访问,特别是对于需要频繁访问的服务,如JNDI和XMLNS。该模式通过缓存机制减少了重复查找的成本,并提供了对多种服务的统一访问接口。 ... [详细]
  • 在Java开发中,保护代码安全是一个重要的课题。由于Java字节码容易被反编译,因此使用代码混淆工具如ProGuard变得尤为重要。本文将详细介绍如何使用ProGuard进行代码混淆,以及其基本原理和常见问题。 ... [详细]
  • JUnit下的测试和suite
    nsitionalENhttp:www.w3.orgTRxhtml1DTDxhtml1-transitional.dtd ... [详细]
  • 如何将955万数据表的17秒SQL查询优化至300毫秒
    本文详细介绍了通过优化SQL查询策略,成功将一张包含955万条记录的财务流水表的查询时间从17秒缩短至300毫秒的方法。文章不仅提供了具体的SQL优化技巧,还深入探讨了背后的数据库原理。 ... [详细]
  • Go从入门到精通系列视频之go编程语言密码学哈希算法(二) ... [详细]
  • 本文详细介绍了PostgreSQL与MySQL在SQL语法上的主要区别,包括如何使用COALESCE替代IFNULL、金额格式化的方法、别名处理以及日期处理等关键点。 ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • 本文详细探讨了在Web开发中常见的UTF-8编码问题及其解决方案,包括HTML页面、PHP脚本、MySQL数据库以及JavaScript和Flash应用中的乱码问题。 ... [详细]
  • 原文地址:https:blog.csdn.netqq_35361471articledetails84715491原文地址:https:blog.cs ... [详细]
  • spring(22)JdbcTemplate
    2019独角兽企业重金招聘Python工程师标准###1.导入jar包,必须jar包:c3p0、mysql-connector、beans、con ... [详细]
  • Redis:缓存与内存数据库详解
    本文介绍了数据库的基本分类,重点探讨了关系型与非关系型数据库的区别,并详细解析了Redis作为非关系型数据库的特点、工作模式、优点及持久化机制。 ... [详细]
  • 在处理大数据量的SQL分页查询时,通常需要执行两次查询来分别获取数据和总记录数。本文介绍了一种优化方法,通过单次查询同时返回分页数据和总记录数,从而提高查询效率。 ... [详细]
  • flea,frame,db,使用,之 ... [详细]
  • 本文介绍了如何在两个Oracle数据库(假设为数据库A和数据库B)之间设置DBLink,以便能够从数据库A中直接访问和操作数据库B中的数据。文章详细描述了创建DBLink前的必要准备步骤以及具体的创建方法。 ... [详细]
author-avatar
Liushan2502897753
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有