作者:童式恃 | 来源:互联网 | 2023-09-13 11:23
1. wordCount案例
1.1 SparkStreaming接受socket数据实现WordCount
基于IDEA集成开发环境,编程实现:从TCP Socket实时读取流式数据,对每批次中数据进行词频统计WordCount。
1.2 架构图
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/0d80e8a685a9a87b.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
通过socket发送一些数据出来, 右侧sparkstreaming使用receiver接受node01的socket发送的数据。
1.3 实现流程
1.3.1 第一步:创建maven工程并导入jar包
<dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming_${scala.binary.version}artifactId><version>${spark.version}version>
dependency>
1.3.2 第二步&#xff1a;node01服务器安装并启动生产者
首先在linux服务器上用YUM安装nc工具&#xff0c;nc命令是netcat命令的简称,它是用来设置路由器。我们可以利用它向某个端口发送数据。
node01服务器执行以下命令安装socket客户端工具&#xff0c;模拟发送数据
yum -y install nc
1.3.3 第三步&#xff1a;通过netcat工具向指定的端口发送数据
node01服务器执行以下命令&#xff0c;向指定的端口9999发送数据
nc -lk 9999
1.3.4 第四步&#xff1a;开发sparkStreaming程序&#xff0c;统计单词出现的次数
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkStreamingTCP {def main(args: Array[String]): Unit &#61; {val sparkConf: SparkConf &#61; new SparkConf().setAppName("SparkStreamingTCP").setMaster("local[2]")val sc: SparkContext &#61; new SparkContext(sparkConf)sc.setLogLevel("WARN")val scc: StreamingContext &#61; new StreamingContext(sc,Seconds(5))val lines: ReceiverInputDStream[String] &#61; scc.socketTextStream("192.168.200.160",9999)val words: DStream[String] &#61; lines.flatMap(_.split(" "))val wordAndOne: DStream[(String, Int)] &#61; words.map((_,1))val result: DStream[(String, Int)] &#61; wordAndOne.reduceByKey(_&#43;_)result.print()scc.start()scc.awaitTermination()}
}
由于使用的是本地模式"local[2]"所以可以直接在本地运行该程序
注意&#xff1a; 要指定并行度&#xff0c;如在本地运行设置setMaster(“local[2]”)&#xff0c;相当于启动两个线程&#xff0c;一个给receiver&#xff0c;一个给computer。如果是在集群中运行&#xff0c;必须要求集群中可用core数大于1。
1.4 执行查看效果
&#xff08;1&#xff09;先执行nc -lk 9999
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/1113165c4904ecc5.webp?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
&#xff08;2&#xff09;然后在执行以上代码
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/43a754c811e7ec5c.webp?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
&#xff08;3&#xff09;不断的在&#xff08;1&#xff09;中输入不同的单词&#xff0c;观察IDEA控制台输出
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/b428d8f746fb8d47.webp#pic_center)
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/1113165c4904ecc5.webp?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
现象&#xff1a;sparkStreaming每隔5s计算一次当前5s内的数据&#xff0c;然后将每个批次的数据输出。
从WEB UI界面【Jobs Tab】可以看到【Job-0】是一个Receiver接收器&#xff0c;一直在运行&#xff0c;以Task方式运行&#xff0c;需要1Core CPU。
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/7d7ef3f69d479716.webp?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
运行结果监控截图&#xff1a;
从2020/05/10 11:24:18开始以5秒为单位的批次运行48秒&#xff08;已完成10个批次&#xff0c;865条记录&#xff09;
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/70be2ca197098d98.webp?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
1.5 Streaming 应用监控
运行上述词频统计案例&#xff0c;登录到WEB UI监控页面&#xff1a;http://localhost:4040&#xff0c;查看相关监控信息。
- 其一、Streaming流式应用概要信息
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/4fae50aeee651818.webp?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
每批次Batch数据处理总时间TD &#61; 批次调度延迟时间SD &#43; 批次数据处理时间PT。
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/e88efe5b0a13a7fa.webp?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
- 其二、性能衡量标准
SparkStreaming实时处理数据性能如何&#xff08;是否可以实时处理数据&#xff09;&#xff1f;&#xff1f;如何衡量的呢&#xff1f;&#xff1f;
每批次数据处理时间TD <&#61; BatchInterval每批次时间间隔
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/bff2716168d1ed7b.webp?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
2. Streaming 工作原理
SparkStreaming处理流式数据时&#xff0c;按照时间间隔划分数据为微批次&#xff08;Micro-Batch&#xff09;&#xff0c;每批次数据当做RDD&#xff0c;再进行处理分析。
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/6c257b6ba227cc3e.webp?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
以上述词频统计WordCount程序为例&#xff0c;讲解Streaming工作原理。
2.1 创建 StreamingContext
当SparkStreaming流式应用启动&#xff08;streamingContext.start&#xff09;时&#xff0c;首先创建StreamingContext流式上下文实例对象&#xff0c;整个流式应用环境构建&#xff0c;底层还是SparkContext。
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/ea91d84a82557da5.webp?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
当StreamingContext对象构建以后&#xff0c;启动接收器Receiver&#xff0c;专门从数据源端接收数据&#xff0c;此接收器作为Task任务运行在Executor中&#xff0c;一直运行&#xff08;Long Runing&#xff09;&#xff0c;一直接收数据。
![在这里插入图片描述](https://img.php1.cn/3cd4a/1e618/cd5/af17da15769ccb2e.jpeg?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
从WEB UI界面【Jobs Tab】可以看到【Job-0】是一个Receiver接收器&#xff0c;一直在运行&#xff0c;以Task方式运行&#xff0c;需要1Core CPU。
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/21e585a7e21fc7dc.png?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
可以从多个数据源端实时消费数据进行处理&#xff0c;例如从多个TCP Socket接收数据&#xff0c;对每批次数据进行词频统计&#xff0c;使用DStream#union函数合并接收数据流&#xff0c;演示代码如下&#xff1a;
2.2 双流Join
可以从多个数据源端实时消费数据进行处理&#xff0c;例如从多个TCP Socket接收数据&#xff0c;对每批次数据进行词频统计&#xff0c;使用DStream#union函数合并接收数据流&#xff0c;演示代码如下&#xff1a;
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkStreamingTCP2 {
def main(args: Array[String]): Unit &#61; {
val sparkConf: SparkConf &#61; new SparkConf().setAppName("SparkStreamingTCP").setMaster("local[6]")
val sc: SparkContext &#61; new SparkContext(sparkConf)
sc.setLogLevel("WARN")
val scc: StreamingContext &#61; new StreamingContext(sc, Seconds(5))
val inputDStream01: ReceiverInputDStream[String] &#61; scc.socketTextStream("node01", 9999)
val inputDStream02: ReceiverInputDStream[String] &#61; scc.socketTextStream("node01", 9998)
val inputDStream: DStream[String] &#61; inputDStream01.union(inputDStream02)
val words: DStream[String] &#61; inputDStream.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] &#61; words.map((_, 1))
val result: DStream[(String, Int)] &#61; wordAndOne.reduceByKey(_ &#43; _)
result.print()scc.start()
scc.awaitTermination()
scc.stop(stopSparkContext &#61; true, stopGracefully &#61; true)}
}
2.3 接收器接收数据
启动每个接收器Receiver以后&#xff0c;实时从数据源端接收数据&#xff08;比如TCP Socket&#xff09;&#xff0c;也是按照时间间隔将接收的流式数据划分为很多Block&#xff08;块&#xff09;。
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/011ac27956d007f0.webp?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
接收器Receiver划分流式数据的时间间隔BlockInterval&#xff0c;默认值为200ms&#xff0c;通过属性【spark.streaming.blockInterval】设置。接收器将接收的数据划分为Block以后&#xff0c;按照设置的存储级别对Block进行存储&#xff0c;从TCP Socket中接收数据默认的存储级别为&#xff1a;MEMORY_AND_DISK_SER_2&#xff0c;先存储内存&#xff0c;不足再存储磁盘&#xff0c;存储2副本。
从TCP Socket消费数据时可以设置Block存储级别&#xff0c;演示代码如下&#xff1a;
val inputDStream: ReceiverInputDStream[String] &#61; ssc.socketTextStream("node01", 9999, storageLevel &#61; StorageLevel.MEMORY_AND_DISK)
2.4 汇报接收Block报告
接收器Receiver将实时汇报接收的数据对应的Block信息&#xff0c;当BatchInterval时间达到以后&#xff0c;StreamingContext将对应时间范围内数据block当做RDD&#xff0c;加载SparkContextt处理数据。
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/45a090220e38e09d.webp?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
以此循环处理流式的数据&#xff0c;如下图所示&#xff1a;
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/4283cd4bbba41b87.png?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
2.5 Streaming 工作原理总述
整个Streaming运行过程中&#xff0c;涉及到两个时间间隔&#xff1a;
- 批次时间间隔&#xff1a;BatchInterval
- 每批次数据的时间间隔&#xff0c;每隔多久加载一个Job&#xff1b;
- Block时间间隔&#xff1a;BlockInterval
- 接收器划分流式数据的时间间隔&#xff0c;可以调整大小哦&#xff0c;官方建议最小值不能小于50ms&#xff1b;
- 默认值为200ms&#xff0c;属性&#xff1a;spark.streaming.blockInterval&#xff0c;调整设置
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/70be2ca197098d98.webp?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
BatchInterval&#xff1a; 1s &#61; 1000ms &#61; 5 * BlockInterval
从代码层面结合实际数据处理层面来看&#xff0c;Streaming处理原理如下&#xff0c;左边为代码逻辑&#xff0c;右边为实际每批次数据处理过程。
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/8ad8f3bf8da691df.webp?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)
具体运行数据时&#xff0c;每批次数据依据代码逻辑执行。
val resultDStream: DStream[(String, Int)] &#61; inputDStream.filter(line &#61;> null !&#61; line && line.trim.length > 0).flatMap(line &#61;> line.trim.split("\\s&#43;")).map(word &#61;> (word, 1)).reduceByKey((tmp, item) &#61;> tmp &#43; item)resultDStream.print(10)
流式数据流图如下&#xff1a;
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/0ef126b5295c089b.webp?x-oss-process&#61;image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2VyYWluaW5n,size_16,color_FFFFFF,t_70#pic_center)