热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

ApacheSparkStreaming实战

原理(Streamming)SparkStreaming是核心SparkAPI的扩展,支持可扩展,高吞吐量,

原理 (Streamming )

Spark Streaming 是核心 Spark API 的扩展,支持可扩展,高吞吐量,实时数据流的容错数据流处理。可以从 sources(如 Kafka、Flume、Kinesis、或者 TCP sockets)获取数据,并通过复杂的算法处理数据,这些算法使用高级函数(如 map,reduce,join 和 window)表示。

在这里插入图片描述
处理过的数据可以推送到文件系统、数据库和实时仪表板。你可以将 Spark 的机器学习和图形处理算法应用于数据流。
Spark Streaming 接受实时输入数据流,并把数据分成批,然后由 Spark 引擎处理,以批量生成最终结果流。
在这里插入图片描述
离散流 (Discretized Streams)

Discretized Streams 或者 DStream 是 Spark Streaming 提供的基本抽象。它表示一个可持续的数据流,或者是从 source 接收的输入数据流,或者是通过转换输入流生成的处理过的数据流。
在这里插入图片描述

在这里插入图片描述
Window 操作 (Window Operations)

Spark Streaming 也提供窗口化计算,这允许你在滑动的数据窗口上应用转换 transformations
在这里插入图片描述

应用:(案例)

1、依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>2.4.4</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>2.4.4</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>2.4.4</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>2.4.4</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.1.0</version></dependency>

2、案例

package com.citydo.faceadd;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import scala.Tuple2;import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Objects;
import java.util.regex.Pattern;public class SparkSteamingDemo {private static final Pattern SPACE;static {SPACE &#61; Pattern.compile(" ");}public static void main(String[] args) throws InterruptedException {getWords();getKafka(args);}/*** spark streaming 统计数据* &#64;throws InterruptedException*/public static void getWords() throws InterruptedException {//注意本地调试&#xff0c;master必须为local[n],n>1,表示一个线程接收数据&#xff0c;n-1个线程处理数据SparkConf conf &#61; new SparkConf().setMaster("local[2]").setAppName("streaming word count");JavaSparkContext sc &#61; new JavaSparkContext(conf);//设置日志运行级别sc.setLogLevel("INFO");JavaStreamingContext ssc &#61; new JavaStreamingContext(sc, Durations.seconds(1));//创建一个将要连接到hostname:port 的离散流JavaReceiverInputDStream<String> lines &#61; ssc.socketTextStream("localhost", 9999);JavaPairDStream<String, Integer> wordCounts &#61;lines.flatMap(x->Arrays.asList(x.split(" ")).iterator()).mapToPair(x -> new Tuple2<>(x, 1)).reduceByKey((x, y) -> x &#43; y);// 在控制台打印出在这个离散流&#xff08;DStream&#xff09;中生成的每个 RDD 的前十个元素wordCounts.print();// 启动计算ssc.start();ssc.awaitTermination();}/*** spark streaming 与Kafka 进行过滤数据* &#64;param args* &#64;throws InterruptedException*/public static void getKafka(String[] args) throws InterruptedException {String checkPointDir &#61; args[0];String batchTime &#61; args[1];String topics &#61; args[2];String brokers &#61; args[3];Duration batchDuration &#61; Durations.seconds(Integer.parseInt(batchTime));SparkConf conf &#61; new SparkConf().setAppName("streaming word count");JavaStreamingContext jase &#61; new JavaStreamingContext(conf, batchDuration);// 设置Spark Streaming的CheckPoint目录jase.checkpoint(checkPointDir);// 组装Kafka的主题列表HashSet<String> topicsSet &#61; new HashSet<>(Arrays.asList(topics.split(",")));HashMap<String, String> kafkaParams &#61; new HashMap<>();kafkaParams.put("metadata.broker.list", brokers);// 通过brokers和topics直接创建kafka stream// 1.接收Kafka中数据&#xff0c;生成相应DStreamInputDStream<ConsumerRecord<Object, Object>> lines;lines &#61; KafkaUtils.createDirectStream((StreamingContext) null,null,null,null);// 2.获取每一个行的字段属性DStream<Object> records &#61; lines.count();// 3.筛选女性网民上网时间数据信息DStream<Object> femaleRecords &#61; records.filter(null);// 4.筛选连续上网时间超过阈值的用户&#xff0c;并获取结果DStream<Object> upTimeUser &#61; femaleRecords.filter(Objects::isNull);upTimeUser.print();//5.Spark Streaming系统启动jase.start();jase.awaitTermination();}}

参考&#xff1a;https://www.cloudera.com/tutorials/introduction-to-spark-streaming/.html


推荐阅读
author-avatar
NHHermit
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有