一、window滑动窗口
1、概述
Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,
会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,
这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定
两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。(Spark Streaming对滑动窗口的支持,是比Storm更加完善和强大的)
2、window滑动窗口操作
案例:热点搜索词滑动统计,每隔10秒钟,统计最近60秒钟的搜索词的搜索频次,并打印出排名最靠前的3个搜索词以及出现次数
2、java案例
package cn.spark.study.streaming;import java.util.List;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;/*** 基于滑动窗口的热点搜索词实时统计* @author Administrator**/
public class WindowHotWord {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WindowHotWord"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));// 说明一下,这里的搜索日志的格式// leo hello// tom worldJavaReceiverInputDStream
finalDStream.print();jssc.start();jssc.awaitTermination();jssc.close();}}##在eclipse中启动程序##服务器上启动nc,并输入内容
[root@spark1 ~]# nc -lk 9999
leo hello
tom word
leo hello
jack you
leo you##统计结果
(hello,2)
(word,1)
(you,2)
3、scala案例
package cn.spark.study.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds/*** @author Administrator*/
object WindowHotWord {def main(args: Array[String]): Unit &#61; {val conf &#61; new SparkConf().setMaster("local[2]") .setAppName("WindowHotWord")val ssc &#61; new StreamingContext(conf, Seconds(1))val searchLogsDStream &#61; ssc.socketTextStream("spark1", 9999) val searchWordsDStream &#61; searchLogsDStream.map { _.split(" ")(1) } val searchWordPairsDStream &#61; searchWordsDStream.map { searchWord &#61;> (searchWord, 1) } val searchWordCountsDSteram &#61; searchWordPairsDStream.reduceByKeyAndWindow((v1: Int, v2: Int) &#61;> v1 &#43; v2, Seconds(60), Seconds(10)) val finalDStream &#61; searchWordCountsDSteram.transform(searchWordCountsRDD &#61;> {val countSearchWordsRDD &#61; searchWordCountsRDD.map(tuple &#61;> (tuple._2, tuple._1)) val sortedCountSearchWordsRDD &#61; countSearchWordsRDD.sortByKey(false) val sortedSearchWordCountsRDD &#61; sortedCountSearchWordsRDD.map(tuple &#61;> (tuple._1, tuple._2))val top3SearchWordCounts &#61; sortedSearchWordCountsRDD.take(3)for(tuple <- top3SearchWordCounts) {println(tuple)}searchWordCountsRDD})finalDStream.print()ssc.start()ssc.awaitTermination()}}##在eclipse中启动程序##服务器上启动nc&#xff0c;并输入内容
[root&#64;spark1 ~]# nc -lk 9999
leo hello
leo hello
leo hello
leo word
leo word
leo word
leo hello
leo you
leo you##统计结果
(hello,4)
(word,3)
(you,2)