热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

57、SparkStreaming:window滑动窗口以及热点搜索词滑动统计案例

一、window滑动窗口1、概述SparkStreaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据

一、window滑动窗口

1、概述

Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,
会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,
这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定
两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。(Spark Streaming对滑动窗口的支持,是比Storm更加完善和强大的)

image

 

2、window滑动窗口操作

 image

 

案例:热点搜索词滑动统计,每隔10秒钟,统计最近60秒钟的搜索词的搜索频次,并打印出排名最靠前的3个搜索词以及出现次数

 

2、java案例

package cn.spark.study.streaming;import java.util.List;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;/*** 基于滑动窗口的热点搜索词实时统计* @author Administrator**/
public class WindowHotWord {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WindowHotWord"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));// 说明一下,这里的搜索日志的格式// leo hello// tom worldJavaReceiverInputDStream searchLogsDStream = jssc.socketTextStream("spark1", 9999);// 将搜索日志给转换成,只有一个搜索词,即可JavaDStream searchWordsDStream = searchLogsDStream.map(new Function() {private static final long serialVersionUID = 1L;@Overridepublic String call(String searchLog) throws Exception {return searchLog.split(" ")[1];}});// 将搜索词映射为(searchWord, 1)的tuple格式JavaPairDStream searchWordPairDStream = searchWordsDStream.mapToPair(new PairFunction() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2 call(String searchWord)throws Exception {return new Tuple2(searchWord, 1);}});// 针对(searchWord, 1)的tuple格式的DStream,执行reduceByKeyAndWindow,滑动窗口操作// 第二个参数,是窗口长度,这里是60秒// 第三个参数,是滑动间隔,这里是10秒// 也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续// 计算// 所以说,这里的意思,就是,之前的searchWordPairDStream为止,其实,都是不会立即进行计算的// 而是只是放在那里// 然后,等待我们的滑动间隔到了以后,10秒钟到了,会将之前60秒的RDD,因为一个batch间隔是,5秒,所以之前// 60秒,就有12个RDD,给聚合起来,然后,统一执行redcueByKey操作// 所以这里的reduceByKeyAndWindow,是针对每个窗口执行计算的,而不是针对某个DStream中的RDDJavaPairDStream searchWordCountsDStream = //Function2:一个双参数函数,它接受类型为T1和T2的参数并返回一个RsearchWordPairDStream.reduceByKeyAndWindow(new Function2() {private static final long serialVersionUID = 1L; @Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}}, Durations.seconds(60), Durations.seconds(10));// 到这里为止,就已经可以做到,每隔10秒钟,出来,之前60秒的收集到的单词的统计次数// 执行transform操作,因为,一个窗口,就是一个60秒钟的数据,会变成一个RDD,然后,对这一个RDD// 根据每个搜索词出现的频率进行排序,然后获取排名前3的热点搜索词JavaPairDStream finalDStream = searchWordCountsDStream.transformToPair(new Function, JavaPairRDD>() {private static final long serialVersionUID = 1L;@Overridepublic JavaPairRDD call(JavaPairRDD searchWordCountsRDD) throws Exception {// 执行搜索词和出现频率的反转JavaPairRDD countSearchWordsRDD = searchWordCountsRDD.mapToPair(new PairFunction, Integer, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2 call(Tuple2 tuple)throws Exception {return new Tuple2(tuple._2, tuple._1);}});// 然后执行降序排序JavaPairRDD sortedCountSearchWordsRDD = countSearchWordsRDD.sortByKey(false);// 然后再次执行反转,变成(searchWord, count)的这种格式JavaPairRDD sortedSearchWordCountsRDD = sortedCountSearchWordsRDD.mapToPair(new PairFunction, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2 call(Tuple2 tuple)throws Exception {return new Tuple2(tuple._2, tuple._1);}});// 然后用take(),获取排名前3的热点搜索词List> hogSearchWordCounts = sortedSearchWordCountsRDD.take(3);for(Tuple2 wordCount : hogSearchWordCounts) {System.out.println(wordCount._1 + ": " + wordCount._2); }return searchWordCountsRDD;}});// 这个无关紧要,只是为了触发job的执行,所以必须有output操作
finalDStream.print();jssc.start();jssc.awaitTermination();jssc.close();}}##在eclipse中启动程序##服务器上启动nc,并输入内容
[root@spark1
~]# nc -lk 9999
leo hello
tom word
leo hello
jack you
leo you##统计结果
(hello,
2)
(word,
1)
(you,
2)

 

3、scala案例

package cn.spark.study.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds/*** @author Administrator*/
object WindowHotWord {def main(args: Array[String]): Unit
&#61; {val conf &#61; new SparkConf().setMaster("local[2]") .setAppName("WindowHotWord")val ssc &#61; new StreamingContext(conf, Seconds(1))val searchLogsDStream &#61; ssc.socketTextStream("spark1", 9999) val searchWordsDStream &#61; searchLogsDStream.map { _.split(" ")(1) } val searchWordPairsDStream &#61; searchWordsDStream.map { searchWord &#61;> (searchWord, 1) } val searchWordCountsDSteram &#61; searchWordPairsDStream.reduceByKeyAndWindow((v1: Int, v2: Int) &#61;> v1 &#43; v2, Seconds(60), Seconds(10)) val finalDStream &#61; searchWordCountsDSteram.transform(searchWordCountsRDD &#61;> {val countSearchWordsRDD &#61; searchWordCountsRDD.map(tuple &#61;> (tuple._2, tuple._1)) val sortedCountSearchWordsRDD &#61; countSearchWordsRDD.sortByKey(false) val sortedSearchWordCountsRDD &#61; sortedCountSearchWordsRDD.map(tuple &#61;> (tuple._1, tuple._2))val top3SearchWordCounts &#61; sortedSearchWordCountsRDD.take(3)for(tuple <- top3SearchWordCounts) {println(tuple)}searchWordCountsRDD})finalDStream.print()ssc.start()ssc.awaitTermination()}}##在eclipse中启动程序##服务器上启动nc&#xff0c;并输入内容
[root&#64;spark1
~]# nc -lk 9999
leo hello
leo hello
leo hello
leo word
leo word
leo word
leo hello
leo you
leo you##统计结果
(hello,
4)
(word,
3)
(you,
2)


转:https://www.cnblogs.com/weiyiming007/p/11376577.html



推荐阅读
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • PHP中的单例模式与静态变量的区别及使用方法
    本文介绍了PHP中的单例模式与静态变量的区别及使用方法。在PHP中,静态变量的存活周期仅仅是每次PHP的会话周期,与Java、C++不同。静态变量在PHP中的作用域仅限于当前文件内,在函数或类中可以传递变量。本文还通过示例代码解释了静态变量在函数和类中的使用方法,并说明了静态变量的生命周期与结构体的生命周期相关联。同时,本文还介绍了静态变量在类中的使用方法,并通过示例代码展示了如何在类中使用静态变量。 ... [详细]
  • JavaScript和HTML之间的交互是经由过程事宜完成的。事宜:文档或浏览器窗口中发作的一些特定的交互霎时。能够运用侦听器(或处置惩罚递次来预订事宜),以便事宜发作时实行相应的 ... [详细]
  • 使用eclipse创建一个Java项目的步骤
    本文介绍了使用eclipse创建一个Java项目的步骤,包括启动eclipse、选择New Project命令、在对话框中输入项目名称等。同时还介绍了Java Settings对话框中的一些选项,以及如何修改Java程序的输出目录。 ... [详细]
  • 用Vue实现的Demo商品管理效果图及实现代码
    本文介绍了一个使用Vue实现的Demo商品管理的效果图及实现代码。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • Voicewo在线语音识别转换jQuery插件的特点和示例
    本文介绍了一款名为Voicewo的在线语音识别转换jQuery插件,该插件具有快速、架构、风格、扩展和兼容等特点,适合在互联网应用中使用。同时还提供了一个快速示例供开发人员参考。 ... [详细]
  • 本文介绍了在Mac上搭建php环境后无法使用localhost连接mysql的问题,并通过将localhost替换为127.0.0.1或本机IP解决了该问题。文章解释了localhost和127.0.0.1的区别,指出了使用socket方式连接导致连接失败的原因。此外,还提供了相关链接供读者深入了解。 ... [详细]
  • 标题: ... [详细]
  • 面向对象之3:封装的总结及实现方法
    本文总结了面向对象中封装的概念和好处,以及在Java中如何实现封装。封装是将过程和数据用一个外壳隐藏起来,只能通过提供的接口进行访问。适当的封装可以提高程序的理解性和维护性,增强程序的安全性。在Java中,封装可以通过将属性私有化并使用权限修饰符来实现,同时可以通过方法来访问属性并加入限制条件。 ... [详细]
  • (三)多表代码生成的实现方法
    本文介绍了一种实现多表代码生成的方法,使用了java代码和org.jeecg框架中的相关类和接口。通过设置主表配置,可以生成父子表的数据模型。 ... [详细]
  • 本文介绍了在MFC下利用C++和MFC的特性动态创建窗口的方法,包括继承现有的MFC类并加以改造、插入工具栏和状态栏对象的声明等。同时还提到了窗口销毁的处理方法。本文详细介绍了实现方法并给出了相关注意事项。 ... [详细]
  • 如何在HTML中获取鼠标的当前位置
    本文介绍了在HTML中获取鼠标当前位置的三种方法,分别是相对于屏幕的位置、相对于窗口的位置以及考虑了页面滚动因素的位置。通过这些方法可以准确获取鼠标的坐标信息。 ... [详细]
  • 本文介绍了RxJava在Android开发中的广泛应用以及其在事件总线(Event Bus)实现中的使用方法。RxJava是一种基于观察者模式的异步java库,可以提高开发效率、降低维护成本。通过RxJava,开发者可以实现事件的异步处理和链式操作。对于已经具备RxJava基础的开发者来说,本文将详细介绍如何利用RxJava实现事件总线,并提供了使用建议。 ... [详细]
author-avatar
梅花七
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有