热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark(38)SparkStreamingDStream案例操作、应用监控及Streaming工作原理

1.wordCount案例1.1SparkStreaming接受socket数据实现WordCount基于IDEA集成开发环境,编程实现:从TCPS

1. wordCount案例


1.1 SparkStreaming接受socket数据实现WordCount

基于IDEA集成开发环境,编程实现:从TCP Socket实时读取流式数据,对每批次中数据进行词频统计WordCount。


1.2 架构图

在这里插入图片描述
通过socket发送一些数据出来, 右侧sparkstreaming使用receiver接受node01的socket发送的数据。


1.3 实现流程


1.3.1 第一步:创建maven工程并导入jar包

<dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming_${scala.binary.version}artifactId><version>${spark.version}version>
dependency>


1.3.2 第二步&#xff1a;node01服务器安装并启动生产者

首先在linux服务器上用YUM安装nc工具&#xff0c;nc命令是netcat命令的简称,它是用来设置路由器。我们可以利用它向某个端口发送数据。
node01服务器执行以下命令安装socket客户端工具&#xff0c;模拟发送数据

yum -y install nc

1.3.3 第三步&#xff1a;通过netcat工具向指定的端口发送数据

node01服务器执行以下命令&#xff0c;向指定的端口9999发送数据

nc -lk 9999

1.3.4 第四步&#xff1a;开发sparkStreaming程序&#xff0c;统计单词出现的次数

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**- sparkStreming流式处理接受socket数据&#xff0c;实现单词统计*/object SparkStreamingTCP {def main(args: Array[String]): Unit &#61; {//配置sparkConf参数val sparkConf: SparkConf &#61; new SparkConf().setAppName("SparkStreamingTCP").setMaster("local[2]")//构建sparkContext对象val sc: SparkContext &#61; new SparkContext(sparkConf)//设置日志输出级别sc.setLogLevel("WARN")//构建StreamingContext对象&#xff0c;每个批处理的时间间隔val scc: StreamingContext &#61; new StreamingContext(sc,Seconds(5))//注册一个监听的IP地址和端口 用来收集数据val lines: ReceiverInputDStream[String] &#61; scc.socketTextStream("192.168.200.160",9999)//切分每一行记录val words: DStream[String] &#61; lines.flatMap(_.split(" "))//每个单词记为1val wordAndOne: DStream[(String, Int)] &#61; words.map((_,1))//分组聚合val result: DStream[(String, Int)] &#61; wordAndOne.reduceByKey(_&#43;_)//打印数据result.print()scc.start()scc.awaitTermination()}
}

由于使用的是本地模式"local[2]"所以可以直接在本地运行该程序

注意&#xff1a; 要指定并行度&#xff0c;如在本地运行设置setMaster(“local[2]”)&#xff0c;相当于启动两个线程&#xff0c;一个给receiver&#xff0c;一个给computer。如果是在集群中运行&#xff0c;必须要求集群中可用core数大于1。


1.4 执行查看效果

&#xff08;1&#xff09;先执行nc -lk 9999
在这里插入图片描述
&#xff08;2&#xff09;然后在执行以上代码
在这里插入图片描述
&#xff08;3&#xff09;不断的在&#xff08;1&#xff09;中输入不同的单词&#xff0c;观察IDEA控制台输出
在这里插入图片描述
在这里插入图片描述
现象&#xff1a;sparkStreaming每隔5s计算一次当前5s内的数据&#xff0c;然后将每个批次的数据输出。

从WEB UI界面【Jobs Tab】可以看到【Job-0】是一个Receiver接收器&#xff0c;一直在运行&#xff0c;以Task方式运行&#xff0c;需要1Core CPU。
在这里插入图片描述
运行结果监控截图&#xff1a;
从2020/05/10 11:24:18开始以5秒为单位的批次运行48秒&#xff08;已完成10个批次&#xff0c;865条记录&#xff09;
在这里插入图片描述


1.5 Streaming 应用监控

运行上述词频统计案例&#xff0c;登录到WEB UI监控页面&#xff1a;http://localhost:4040&#xff0c;查看相关监控信息。


  • 其一、Streaming流式应用概要信息
    在这里插入图片描述
    每批次Batch数据处理总时间TD &#61; 批次调度延迟时间SD &#43; 批次数据处理时间PT。
    在这里插入图片描述
  • 其二、性能衡量标准
    SparkStreaming实时处理数据性能如何&#xff08;是否可以实时处理数据&#xff09;&#xff1f;&#xff1f;如何衡量的呢&#xff1f;&#xff1f;

每批次数据处理时间TD <&#61; BatchInterval每批次时间间隔

在这里插入图片描述


2. Streaming 工作原理

SparkStreaming处理流式数据时&#xff0c;按照时间间隔划分数据为微批次&#xff08;Micro-Batch&#xff09;&#xff0c;每批次数据当做RDD&#xff0c;再进行处理分析。
在这里插入图片描述
以上述词频统计WordCount程序为例&#xff0c;讲解Streaming工作原理。


2.1 创建 StreamingContext

当SparkStreaming流式应用启动&#xff08;streamingContext.start&#xff09;时&#xff0c;首先创建StreamingContext流式上下文实例对象&#xff0c;整个流式应用环境构建&#xff0c;底层还是SparkContext。
在这里插入图片描述
当StreamingContext对象构建以后&#xff0c;启动接收器Receiver&#xff0c;专门从数据源端接收数据&#xff0c;此接收器作为Task任务运行在Executor中&#xff0c;一直运行&#xff08;Long Runing&#xff09;&#xff0c;一直接收数据。
在这里插入图片描述
从WEB UI界面【Jobs Tab】可以看到【Job-0】是一个Receiver接收器&#xff0c;一直在运行&#xff0c;以Task方式运行&#xff0c;需要1Core CPU。
在这里插入图片描述
可以从多个数据源端实时消费数据进行处理&#xff0c;例如从多个TCP Socket接收数据&#xff0c;对每批次数据进行词频统计&#xff0c;使用DStream#union函数合并接收数据流&#xff0c;演示代码如下&#xff1a;


2.2 双流Join

可以从多个数据源端实时消费数据进行处理&#xff0c;例如从多个TCP Socket接收数据&#xff0c;对每批次数据进行词频统计&#xff0c;使用DStream#union函数合并接收数据流&#xff0c;演示代码如下&#xff1a;

/**
* DESC:
*/
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}/**
**
*- sparkStreming流式处理接受socket数据&#xff0c;实现单词统计
*/

object SparkStreamingTCP2 {
def main(args: Array[String]): Unit &#61; {
//配置sparkConf参数
val sparkConf: SparkConf &#61; new SparkConf().setAppName("SparkStreamingTCP").setMaster("local[6]")
//构建sparkContext对象
val sc: SparkContext &#61; new SparkContext(sparkConf)
//设置日志输出级别
sc.setLogLevel("WARN")
//构建StreamingContext对象&#xff0c;每个批处理的时间间隔
val scc: StreamingContext &#61; new StreamingContext(sc, Seconds(5))
//注册2个监听的IP地址和端口 用来收集数据
val inputDStream01: ReceiverInputDStream[String] &#61; scc.socketTextStream("node01", 9999)
val inputDStream02: ReceiverInputDStream[String] &#61; scc.socketTextStream("node01", 9998)
// 合并两个DStream流
val inputDStream: DStream[String] &#61; inputDStream01.union(inputDStream02)
//切分每一行记录
val words: DStream[String] &#61; inputDStream.flatMap(_.split(" "))
//每个单词记为1
val wordAndOne: DStream[(String, Int)] &#61; words.map((_, 1))
//分组聚合
val result: DStream[(String, Int)] &#61; wordAndOne.reduceByKey(_ &#43; _)
//打印数据
result.print()scc.start()
scc.awaitTermination()
scc.stop(stopSparkContext &#61; true, stopGracefully &#61; true)}
}

2.3 接收器接收数据

启动每个接收器Receiver以后&#xff0c;实时从数据源端接收数据&#xff08;比如TCP Socket&#xff09;&#xff0c;也是按照时间间隔将接收的流式数据划分为很多Block&#xff08;块&#xff09;。
在这里插入图片描述
接收器Receiver划分流式数据的时间间隔BlockInterval&#xff0c;默认值为200ms&#xff0c;通过属性【spark.streaming.blockInterval】设置。接收器将接收的数据划分为Block以后&#xff0c;按照设置的存储级别对Block进行存储&#xff0c;从TCP Socket中接收数据默认的存储级别为&#xff1a;MEMORY_AND_DISK_SER_2&#xff0c;先存储内存&#xff0c;不足再存储磁盘&#xff0c;存储2副本。
从TCP Socket消费数据时可以设置Block存储级别&#xff0c;演示代码如下&#xff1a;

// TODO: 2. 从数据源端读取数据&#xff0c;此处是TCP Socket读取数据/*def socketTextStream(hostname: String,port: Int,storageLevel: StorageLevel &#61; StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]*/val inputDStream: ReceiverInputDStream[String] &#61; ssc.socketTextStream("node01", //9999, //// TODO: 设置Block存储级别为先内存&#xff0c;不足磁盘&#xff0c;副本为1storageLevel &#61; StorageLevel.MEMORY_AND_DISK)

2.4 汇报接收Block报告

接收器Receiver将实时汇报接收的数据对应的Block信息&#xff0c;当BatchInterval时间达到以后&#xff0c;StreamingContext将对应时间范围内数据block当做RDD&#xff0c;加载SparkContextt处理数据。
在这里插入图片描述
以此循环处理流式的数据&#xff0c;如下图所示&#xff1a;
在这里插入图片描述


2.5 Streaming 工作原理总述

整个Streaming运行过程中&#xff0c;涉及到两个时间间隔&#xff1a;


  • 批次时间间隔&#xff1a;BatchInterval
  • 每批次数据的时间间隔&#xff0c;每隔多久加载一个Job&#xff1b;
  • Block时间间隔&#xff1a;BlockInterval
  • 接收器划分流式数据的时间间隔&#xff0c;可以调整大小哦&#xff0c;官方建议最小值不能小于50ms&#xff1b;
  • 默认值为200ms&#xff0c;属性&#xff1a;spark.streaming.blockInterval&#xff0c;调整设置
    在这里插入图片描述

//官方案例&#xff1a;BatchInterval&#xff1a; 1s &#61; 1000ms &#61; 5 * BlockInterval// 每批次RDD数据中&#xff0c;有5个Block&#xff0c;每个Block就是RDD一个分区数据

从代码层面结合实际数据处理层面来看&#xff0c;Streaming处理原理如下&#xff0c;左边为代码逻辑&#xff0c;右边为实际每批次数据处理过程。
在这里插入图片描述
具体运行数据时&#xff0c;每批次数据依据代码逻辑执行。

// TODO: 3. 对每批次的数据进行词频统计val resultDStream: DStream[(String, Int)] &#61; inputDStream// 过滤不合格的数据.filter(line &#61;> null !&#61; line && line.trim.length > 0)// 按照分隔符划分单词.flatMap(line &#61;> line.trim.split("\\s&#43;"))// 转换数据为二元组&#xff0c;表示每个单词出现一次.map(word &#61;> (word, 1))// 按照单词分组&#xff0c;聚合统计.reduceByKey((tmp, item) &#61;> tmp &#43; item)// TODO: 4. 将结果数据输出 -> 将每批次的数据处理以后输出resultDStream.print(10)

流式数据流图如下&#xff1a;
在这里插入图片描述


推荐阅读
author-avatar
童式恃
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有