热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark(38)SparkStreamingDStream案例操作、应用监控及Streaming工作原理

1.wordCount案例1.1SparkStreaming接受socket数据实现WordCount基于IDEA集成开发环境,编程实现:从TCPS

1. wordCount案例


1.1 SparkStreaming接受socket数据实现WordCount

基于IDEA集成开发环境,编程实现:从TCP Socket实时读取流式数据,对每批次中数据进行词频统计WordCount。


1.2 架构图

在这里插入图片描述
通过socket发送一些数据出来, 右侧sparkstreaming使用receiver接受node01的socket发送的数据。


1.3 实现流程


1.3.1 第一步:创建maven工程并导入jar包

<dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming_${scala.binary.version}artifactId><version>${spark.version}version>
dependency>


1.3.2 第二步&#xff1a;node01服务器安装并启动生产者

首先在linux服务器上用YUM安装nc工具&#xff0c;nc命令是netcat命令的简称,它是用来设置路由器。我们可以利用它向某个端口发送数据。
node01服务器执行以下命令安装socket客户端工具&#xff0c;模拟发送数据

yum -y install nc

1.3.3 第三步&#xff1a;通过netcat工具向指定的端口发送数据

node01服务器执行以下命令&#xff0c;向指定的端口9999发送数据

nc -lk 9999

1.3.4 第四步&#xff1a;开发sparkStreaming程序&#xff0c;统计单词出现的次数

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**- sparkStreming流式处理接受socket数据&#xff0c;实现单词统计*/object SparkStreamingTCP {def main(args: Array[String]): Unit &#61; {//配置sparkConf参数val sparkConf: SparkConf &#61; new SparkConf().setAppName("SparkStreamingTCP").setMaster("local[2]")//构建sparkContext对象val sc: SparkContext &#61; new SparkContext(sparkConf)//设置日志输出级别sc.setLogLevel("WARN")//构建StreamingContext对象&#xff0c;每个批处理的时间间隔val scc: StreamingContext &#61; new StreamingContext(sc,Seconds(5))//注册一个监听的IP地址和端口 用来收集数据val lines: ReceiverInputDStream[String] &#61; scc.socketTextStream("192.168.200.160",9999)//切分每一行记录val words: DStream[String] &#61; lines.flatMap(_.split(" "))//每个单词记为1val wordAndOne: DStream[(String, Int)] &#61; words.map((_,1))//分组聚合val result: DStream[(String, Int)] &#61; wordAndOne.reduceByKey(_&#43;_)//打印数据result.print()scc.start()scc.awaitTermination()}
}

由于使用的是本地模式"local[2]"所以可以直接在本地运行该程序

注意&#xff1a; 要指定并行度&#xff0c;如在本地运行设置setMaster(“local[2]”)&#xff0c;相当于启动两个线程&#xff0c;一个给receiver&#xff0c;一个给computer。如果是在集群中运行&#xff0c;必须要求集群中可用core数大于1。


1.4 执行查看效果

&#xff08;1&#xff09;先执行nc -lk 9999
在这里插入图片描述
&#xff08;2&#xff09;然后在执行以上代码
在这里插入图片描述
&#xff08;3&#xff09;不断的在&#xff08;1&#xff09;中输入不同的单词&#xff0c;观察IDEA控制台输出
在这里插入图片描述
在这里插入图片描述
现象&#xff1a;sparkStreaming每隔5s计算一次当前5s内的数据&#xff0c;然后将每个批次的数据输出。

从WEB UI界面【Jobs Tab】可以看到【Job-0】是一个Receiver接收器&#xff0c;一直在运行&#xff0c;以Task方式运行&#xff0c;需要1Core CPU。
在这里插入图片描述
运行结果监控截图&#xff1a;
从2020/05/10 11:24:18开始以5秒为单位的批次运行48秒&#xff08;已完成10个批次&#xff0c;865条记录&#xff09;
在这里插入图片描述


1.5 Streaming 应用监控

运行上述词频统计案例&#xff0c;登录到WEB UI监控页面&#xff1a;http://localhost:4040&#xff0c;查看相关监控信息。


  • 其一、Streaming流式应用概要信息
    在这里插入图片描述
    每批次Batch数据处理总时间TD &#61; 批次调度延迟时间SD &#43; 批次数据处理时间PT。
    在这里插入图片描述
  • 其二、性能衡量标准
    SparkStreaming实时处理数据性能如何&#xff08;是否可以实时处理数据&#xff09;&#xff1f;&#xff1f;如何衡量的呢&#xff1f;&#xff1f;

每批次数据处理时间TD <&#61; BatchInterval每批次时间间隔

在这里插入图片描述


2. Streaming 工作原理

SparkStreaming处理流式数据时&#xff0c;按照时间间隔划分数据为微批次&#xff08;Micro-Batch&#xff09;&#xff0c;每批次数据当做RDD&#xff0c;再进行处理分析。
在这里插入图片描述
以上述词频统计WordCount程序为例&#xff0c;讲解Streaming工作原理。


2.1 创建 StreamingContext

当SparkStreaming流式应用启动&#xff08;streamingContext.start&#xff09;时&#xff0c;首先创建StreamingContext流式上下文实例对象&#xff0c;整个流式应用环境构建&#xff0c;底层还是SparkContext。
在这里插入图片描述
当StreamingContext对象构建以后&#xff0c;启动接收器Receiver&#xff0c;专门从数据源端接收数据&#xff0c;此接收器作为Task任务运行在Executor中&#xff0c;一直运行&#xff08;Long Runing&#xff09;&#xff0c;一直接收数据。
在这里插入图片描述
从WEB UI界面【Jobs Tab】可以看到【Job-0】是一个Receiver接收器&#xff0c;一直在运行&#xff0c;以Task方式运行&#xff0c;需要1Core CPU。
在这里插入图片描述
可以从多个数据源端实时消费数据进行处理&#xff0c;例如从多个TCP Socket接收数据&#xff0c;对每批次数据进行词频统计&#xff0c;使用DStream#union函数合并接收数据流&#xff0c;演示代码如下&#xff1a;


2.2 双流Join

可以从多个数据源端实时消费数据进行处理&#xff0c;例如从多个TCP Socket接收数据&#xff0c;对每批次数据进行词频统计&#xff0c;使用DStream#union函数合并接收数据流&#xff0c;演示代码如下&#xff1a;

/**
* DESC:
*/
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}/**
**
*- sparkStreming流式处理接受socket数据&#xff0c;实现单词统计
*/

object SparkStreamingTCP2 {
def main(args: Array[String]): Unit &#61; {
//配置sparkConf参数
val sparkConf: SparkConf &#61; new SparkConf().setAppName("SparkStreamingTCP").setMaster("local[6]")
//构建sparkContext对象
val sc: SparkContext &#61; new SparkContext(sparkConf)
//设置日志输出级别
sc.setLogLevel("WARN")
//构建StreamingContext对象&#xff0c;每个批处理的时间间隔
val scc: StreamingContext &#61; new StreamingContext(sc, Seconds(5))
//注册2个监听的IP地址和端口 用来收集数据
val inputDStream01: ReceiverInputDStream[String] &#61; scc.socketTextStream("node01", 9999)
val inputDStream02: ReceiverInputDStream[String] &#61; scc.socketTextStream("node01", 9998)
// 合并两个DStream流
val inputDStream: DStream[String] &#61; inputDStream01.union(inputDStream02)
//切分每一行记录
val words: DStream[String] &#61; inputDStream.flatMap(_.split(" "))
//每个单词记为1
val wordAndOne: DStream[(String, Int)] &#61; words.map((_, 1))
//分组聚合
val result: DStream[(String, Int)] &#61; wordAndOne.reduceByKey(_ &#43; _)
//打印数据
result.print()scc.start()
scc.awaitTermination()
scc.stop(stopSparkContext &#61; true, stopGracefully &#61; true)}
}

2.3 接收器接收数据

启动每个接收器Receiver以后&#xff0c;实时从数据源端接收数据&#xff08;比如TCP Socket&#xff09;&#xff0c;也是按照时间间隔将接收的流式数据划分为很多Block&#xff08;块&#xff09;。
在这里插入图片描述
接收器Receiver划分流式数据的时间间隔BlockInterval&#xff0c;默认值为200ms&#xff0c;通过属性【spark.streaming.blockInterval】设置。接收器将接收的数据划分为Block以后&#xff0c;按照设置的存储级别对Block进行存储&#xff0c;从TCP Socket中接收数据默认的存储级别为&#xff1a;MEMORY_AND_DISK_SER_2&#xff0c;先存储内存&#xff0c;不足再存储磁盘&#xff0c;存储2副本。
从TCP Socket消费数据时可以设置Block存储级别&#xff0c;演示代码如下&#xff1a;

// TODO: 2. 从数据源端读取数据&#xff0c;此处是TCP Socket读取数据/*def socketTextStream(hostname: String,port: Int,storageLevel: StorageLevel &#61; StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]*/val inputDStream: ReceiverInputDStream[String] &#61; ssc.socketTextStream("node01", //9999, //// TODO: 设置Block存储级别为先内存&#xff0c;不足磁盘&#xff0c;副本为1storageLevel &#61; StorageLevel.MEMORY_AND_DISK)

2.4 汇报接收Block报告

接收器Receiver将实时汇报接收的数据对应的Block信息&#xff0c;当BatchInterval时间达到以后&#xff0c;StreamingContext将对应时间范围内数据block当做RDD&#xff0c;加载SparkContextt处理数据。
在这里插入图片描述
以此循环处理流式的数据&#xff0c;如下图所示&#xff1a;
在这里插入图片描述


2.5 Streaming 工作原理总述

整个Streaming运行过程中&#xff0c;涉及到两个时间间隔&#xff1a;


  • 批次时间间隔&#xff1a;BatchInterval
  • 每批次数据的时间间隔&#xff0c;每隔多久加载一个Job&#xff1b;
  • Block时间间隔&#xff1a;BlockInterval
  • 接收器划分流式数据的时间间隔&#xff0c;可以调整大小哦&#xff0c;官方建议最小值不能小于50ms&#xff1b;
  • 默认值为200ms&#xff0c;属性&#xff1a;spark.streaming.blockInterval&#xff0c;调整设置
    在这里插入图片描述

//官方案例&#xff1a;BatchInterval&#xff1a; 1s &#61; 1000ms &#61; 5 * BlockInterval// 每批次RDD数据中&#xff0c;有5个Block&#xff0c;每个Block就是RDD一个分区数据

从代码层面结合实际数据处理层面来看&#xff0c;Streaming处理原理如下&#xff0c;左边为代码逻辑&#xff0c;右边为实际每批次数据处理过程。
在这里插入图片描述
具体运行数据时&#xff0c;每批次数据依据代码逻辑执行。

// TODO: 3. 对每批次的数据进行词频统计val resultDStream: DStream[(String, Int)] &#61; inputDStream// 过滤不合格的数据.filter(line &#61;> null !&#61; line && line.trim.length > 0)// 按照分隔符划分单词.flatMap(line &#61;> line.trim.split("\\s&#43;"))// 转换数据为二元组&#xff0c;表示每个单词出现一次.map(word &#61;> (word, 1))// 按照单词分组&#xff0c;聚合统计.reduceByKey((tmp, item) &#61;> tmp &#43; item)// TODO: 4. 将结果数据输出 -> 将每批次的数据处理以后输出resultDStream.print(10)

流式数据流图如下&#xff1a;
在这里插入图片描述


推荐阅读
  • Spring框架的核心组件与架构解析 ... [详细]
  • 使用Maven JAR插件将单个或多个文件及其依赖项合并为一个可引用的JAR包
    本文介绍了如何利用Maven中的maven-assembly-plugin插件将单个或多个Java文件及其依赖项打包成一个可引用的JAR文件。首先,需要创建一个新的Maven项目,并将待打包的Java文件复制到该项目中。通过配置maven-assembly-plugin,可以实现将所有文件及其依赖项合并为一个独立的JAR包,方便在其他项目中引用和使用。此外,该方法还支持自定义装配描述符,以满足不同场景下的需求。 ... [详细]
  • 小王详解:内部网络中最易理解的NAT原理剖析,挑战你的认知极限
    小王详解:内部网络中最易理解的NAT原理剖析,挑战你的认知极限 ... [详细]
  • 在使用 SQL Server 时,连接故障是用户最常见的问题之一。通常,连接 SQL Server 的方法有两种:一种是通过 SQL Server 自带的客户端工具,例如 SQL Server Management Studio;另一种是通过第三方应用程序或开发工具进行连接。本文将详细分析导致连接故障的常见原因,并提供相应的解决策略,帮助用户有效排除连接问题。 ... [详细]
  • Ping 命令的高级用法与技巧
    本文详细介绍了 Ping 命令的各种高级用法和技巧,帮助读者更好地理解和利用这一强大的网络诊断工具。 ... [详细]
  • Python 数据可视化实战指南
    本文详细介绍如何使用 Python 进行数据可视化,涵盖从环境搭建到具体实例的全过程。 ... [详细]
  • 网站访问全流程解析
    本文详细介绍了从用户在浏览器中输入一个域名(如www.yy.com)到页面完全展示的整个过程,包括DNS解析、TCP连接、请求响应等多个步骤。 ... [详细]
  • 在 Ubuntu 中遇到 Samba 服务器故障时,尝试卸载并重新安装 Samba 发现配置文件未重新生成。本文介绍了解决该问题的方法。 ... [详细]
  • Amoeba 通过优化 MySQL 的读写分离功能显著提升了数据库性能。作为一款基于 MySQL 协议的代理工具,Amoeba 能够高效地处理应用程序的请求,并根据预设的规则将 SQL 请求智能地分配到不同的数据库实例,从而实现负载均衡和高可用性。该方案不仅提高了系统的并发处理能力,还有效减少了主数据库的负担,确保了数据的一致性和可靠性。 ... [详细]
  • 本文介绍了如何利用 Delphi 中的 IdTCPServer 和 IdTCPClient 控件实现高效的文件传输。这些控件在默认情况下采用阻塞模式,并且服务器端已经集成了多线程处理,能够支持任意大小的文件传输,无需担心数据包大小的限制。与传统的 ClientSocket 相比,Indy 控件提供了更为简洁和可靠的解决方案,特别适用于开发高性能的网络文件传输应用程序。 ... [详细]
  • 将Jar包部署至Linux服务器的详细步骤与注意事项
    将Jar包部署至Linux服务器的详细步骤及注意事项包括:首先使用 `mvn install` 命令进行Jar包的打包构建。接着,需要停止当前正在运行的Jar进程,可以通过 `ps -ef | grep **.jar` 查找对应的进程ID(PID),然后使用 `kill -9 ` 终止该进程。最后,使用 `rm` 命令删除旧的Jar包文件,确保新版本能够顺利部署。在整个过程中,务必确保操作的准确性和安全性,避免对服务器造成不必要的影响。 ... [详细]
  • 在探讨Hibernate框架的高级特性时,缓存机制和懒加载策略是提升数据操作效率的关键要素。缓存策略能够显著减少数据库访问次数,从而提高应用性能,特别是在处理频繁访问的数据时。Hibernate提供了多层次的缓存支持,包括一级缓存和二级缓存,以满足不同场景下的需求。懒加载策略则通过按需加载关联对象,进一步优化了资源利用和响应时间。本文将深入分析这些机制的实现原理及其最佳实践。 ... [详细]
  • 本文详细介绍了如何在Java Web服务器上部署音视频服务,并提供了完整的验证流程。以AnyChat为例,这是一款跨平台的音视频解决方案,广泛应用于需要实时音视频交互的项目中。通过具体的部署步骤和测试方法,确保了音视频服务的稳定性和可靠性。 ... [详细]
  • Netty框架中运用Protobuf实现高效通信协议
    在Netty框架中,通过引入Protobuf来实现高效的通信协议。为了使用Protobuf,需要先准备好环境,包括下载并安装Protobuf的代码生成器`protoc`以及相应的源码包。具体资源可从官方下载页面获取,确保版本兼容性以充分发挥其性能优势。此外,配置好开发环境后,可以通过定义`.proto`文件来自动生成Java类,从而简化数据序列化和反序列化的操作,提高通信效率。 ... [详细]
  • CentOS 7环境下Jenkins的安装与前后端应用部署详解
    CentOS 7环境下Jenkins的安装与前后端应用部署详解 ... [详细]
author-avatar
童式恃
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有