作者:童式恃 | 来源:互联网 | 2023-09-13 11:23
1. wordCount案例 1.1 SparkStreaming接受socket数据实现WordCount 基于IDEA集成开发环境,编程实现:从TCP Socket实时读取流式数据,对每批次中数据进行词频统计WordCount。
1.2 架构图 通过socket发送一些数据出来, 右侧sparkstreaming使用receiver接受node01的socket发送的数据。
1.3 实现流程 1.3.1 第一步&#xff1a;创建maven工程并导入jar包 < dependency> < groupId> org.apache.spark groupId> < artifactId> spark-streaming_${scala.binary.version} artifactId> < version> ${spark.version} version> dependency>
1.3.2 第二步&#xff1a;node01服务器安装并启动生产者 首先在linux服务器上用YUM安装nc工具&#xff0c;nc命令是netcat命令的简称,它是用来设置路由器。我们可以利用它向某个端口发送数据。 node01服务器执行以下命令安装socket客户端工具&#xff0c;模拟发送数据
yum -y install nc
1.3.3 第三步&#xff1a;通过netcat工具向指定的端口发送数据 node01服务器执行以下命令&#xff0c;向指定的端口9999发送数据
nc -lk 9999
1.3.4 第四步&#xff1a;开发sparkStreaming程序&#xff0c;统计单词出现的次数 import org. apache. spark. streaming. dstream. { DStream, ReceiverInputDStream} import org. apache. spark. streaming. { Seconds, StreamingContext} import org. apache. spark. { SparkConf, SparkContext} object SparkStreamingTCP { def main( args: Array[ String ] ) : Unit &#61; { val sparkConf: SparkConf &#61; new SparkConf( ) . setAppName( "SparkStreamingTCP" ) . setMaster( "local[2]" ) val sc: SparkContext &#61; new SparkContext( sparkConf) sc. setLogLevel( "WARN" ) val scc: StreamingContext &#61; new StreamingContext( sc, Seconds( 5 ) ) val lines: ReceiverInputDStream[ String ] &#61; scc. socketTextStream( "192.168.200.160" , 9999 ) val words: DStream[ String ] &#61; lines. flatMap( _. split( " " ) ) val wordAndOne: DStream[ ( String , Int ) ] &#61; words. map( ( _, 1 ) ) val result: DStream[ ( String , Int ) ] &#61; wordAndOne. reduceByKey( _&#43; _) result. print( ) scc. start( ) scc. awaitTermination( ) } }
由于使用的是本地模式"local[2]"所以可以直接在本地运行该程序
注意&#xff1a; 要指定并行度&#xff0c;如在本地运行设置setMaster(“local[2]”)&#xff0c;相当于启动两个线程&#xff0c;一个给receiver&#xff0c;一个给computer。如果是在集群中运行&#xff0c;必须要求集群中可用core数大于1。
1.4 执行查看效果 &#xff08;1&#xff09;先执行nc -lk 9999 &#xff08;2&#xff09;然后在执行以上代码 &#xff08;3&#xff09;不断的在&#xff08;1&#xff09;中输入不同的单词&#xff0c;观察IDEA控制台输出 现象&#xff1a;sparkStreaming每隔5s计算一次当前5s内的数据&#xff0c;然后将每个批次的数据输出。
从WEB UI界面【Jobs Tab】可以看到【Job-0】是一个Receiver接收器&#xff0c;一直在运行&#xff0c;以Task方式运行&#xff0c;需要1Core CPU。 运行结果监控截图&#xff1a; 从2020/05/10 11:24:18开始以5秒为单位的批次运行48秒&#xff08;已完成10个批次&#xff0c;865条记录&#xff09;
1.5 Streaming 应用监控 运行上述词频统计案例&#xff0c;登录到WEB UI监控页面&#xff1a;http://localhost:4040&#xff0c;查看相关监控信息。
其一、Streaming流式应用概要信息 每批次Batch数据处理总时间TD &#61; 批次调度延迟时间SD &#43; 批次数据处理时间PT。 其二、性能衡量标准 SparkStreaming实时处理数据性能如何&#xff08;是否可以实时处理数据&#xff09;&#xff1f;&#xff1f;如何衡量的呢&#xff1f;&#xff1f; 每批次数据处理时间TD <&#61; BatchInterval每批次时间间隔
2. Streaming 工作原理 SparkStreaming处理流式数据时&#xff0c;按照时间间隔划分数据为微批次&#xff08;Micro-Batch&#xff09;&#xff0c;每批次数据当做RDD&#xff0c;再进行处理分析。 以上述词频统计WordCount程序为例&#xff0c;讲解Streaming工作原理。
2.1 创建 StreamingContext 当SparkStreaming流式应用启动&#xff08;streamingContext.start&#xff09;时&#xff0c;首先创建StreamingContext流式上下文实例对象&#xff0c;整个流式应用环境构建&#xff0c;底层还是SparkContext。 当StreamingContext对象构建以后&#xff0c;启动接收器Receiver&#xff0c;专门从数据源端接收数据&#xff0c;此接收器作为Task任务运行在Executor中&#xff0c;一直运行&#xff08;Long Runing&#xff09;&#xff0c;一直接收数据。 从WEB UI界面【Jobs Tab】可以看到【Job-0】是一个Receiver接收器&#xff0c;一直在运行&#xff0c;以Task方式运行&#xff0c;需要1Core CPU。 可以从多个数据源端实时消费数据进行处理&#xff0c;例如从多个TCP Socket接收数据&#xff0c;对每批次数据进行词频统计&#xff0c;使用DStream#union函数合并接收数据流&#xff0c;演示代码如下&#xff1a;
2.2 双流Join 可以从多个数据源端实时消费数据进行处理&#xff0c;例如从多个TCP Socket接收数据&#xff0c;对每批次数据进行词频统计&#xff0c;使用DStream#union函数合并接收数据流&#xff0c;演示代码如下&#xff1a;
import org. apache. spark. streaming. dstream. { DStream, ReceiverInputDStream} import org. apache. spark. streaming. { Seconds, StreamingContext} import org. apache. spark. { SparkConf, SparkContext} object SparkStreamingTCP2 { def main( args: Array[ String ] ) : Unit &#61; { val sparkConf: SparkConf &#61; new SparkConf( ) . setAppName( "SparkStreamingTCP" ) . setMaster( "local[6]" ) val sc: SparkContext &#61; new SparkContext( sparkConf) sc. setLogLevel( "WARN" ) val scc: StreamingContext &#61; new StreamingContext( sc, Seconds( 5 ) ) val inputDStream01: ReceiverInputDStream[ String ] &#61; scc. socketTextStream( "node01" , 9999 ) val inputDStream02: ReceiverInputDStream[ String ] &#61; scc. socketTextStream( "node01" , 9998 ) val inputDStream: DStream[ String ] &#61; inputDStream01. union( inputDStream02) val words: DStream[ String ] &#61; inputDStream. flatMap( _. split( " " ) ) val wordAndOne: DStream[ ( String , Int ) ] &#61; words. map( ( _, 1 ) ) val result: DStream[ ( String , Int ) ] &#61; wordAndOne. reduceByKey( _ &#43; _) result. print( ) scc. start( ) scc. awaitTermination( ) scc. stop( stopSparkContext &#61; true , stopGracefully &#61; true ) } }
2.3 接收器接收数据 启动每个接收器Receiver以后&#xff0c;实时从数据源端接收数据&#xff08;比如TCP Socket&#xff09;&#xff0c;也是按照时间间隔将接收的流式数据划分为很多Block&#xff08;块&#xff09;。 接收器Receiver划分流式数据的时间间隔BlockInterval&#xff0c;默认值为200ms&#xff0c;通过属性【spark.streaming.blockInterval】设置。接收器将接收的数据划分为Block以后&#xff0c;按照设置的存储级别对Block进行存储&#xff0c;从TCP Socket中接收数据默认的存储级别为&#xff1a;MEMORY_AND_DISK_SER_2&#xff0c;先存储内存&#xff0c;不足再存储磁盘&#xff0c;存储2副本。 从TCP Socket消费数据时可以设置Block存储级别&#xff0c;演示代码如下&#xff1a;
val inputDStream: ReceiverInputDStream[ String ] &#61; ssc. socketTextStream( "node01" , 9999 , storageLevel &#61; StorageLevel. MEMORY_AND_DISK)
2.4 汇报接收Block报告 接收器Receiver将实时汇报接收的数据对应的Block信息&#xff0c;当BatchInterval时间达到以后&#xff0c;StreamingContext将对应时间范围内数据block当做RDD&#xff0c;加载SparkContextt处理数据。 以此循环处理流式的数据&#xff0c;如下图所示&#xff1a;
2.5 Streaming 工作原理总述 整个Streaming运行过程中&#xff0c;涉及到两个时间间隔&#xff1a;
批次时间间隔&#xff1a;BatchInterval 每批次数据的时间间隔&#xff0c;每隔多久加载一个Job&#xff1b; Block时间间隔&#xff1a;BlockInterval 接收器划分流式数据的时间间隔&#xff0c;可以调整大小哦&#xff0c;官方建议最小值不能小于50ms&#xff1b; 默认值为200ms&#xff0c;属性&#xff1a;spark.streaming.blockInterval&#xff0c;调整设置 BatchInterval&#xff1a; 1 s &#61; 1000 ms &#61; 5 * BlockInterval
从代码层面结合实际数据处理层面来看&#xff0c;Streaming处理原理如下&#xff0c;左边为代码逻辑&#xff0c;右边为实际每批次数据处理过程。 具体运行数据时&#xff0c;每批次数据依据代码逻辑执行。
val resultDStream: DStream[ ( String , Int ) ] &#61; inputDStream. filter( line &#61;> null !&#61; line && line. trim. length > 0 ) . flatMap( line &#61;> line. trim. split( "\\s&#43;" ) ) . map( word &#61;> ( word, 1 ) ) . reduceByKey( ( tmp, item) &#61;> tmp &#43; item) resultDStream. print( 10 )
流式数据流图如下&#xff1a;