热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

第8章SparkStreaming进阶与案例实战

第8章SparkStreaming进阶与案例实战本章节将讲解SparkStreaming如何处理带状态的数据,通过案例让大家知道SparkStreaming如何写数据到MySQL,
第8章 Spark Streaming进阶与案例实战

本章节将讲解Spark Streaming如何处理带状态的数据,通过案例让大家知道Spark Streaming如何写数据到MySQL,Spark Streaming如何整合Spark SQL进行操作

[TOC]

8-1 -课程目录

  • 带状态的算子:UpdateStateByKey
  • 实战:计算到目前为止累计出现的单词个数写入到Mysqlzhon
  • 基于window的统计
  • 实战:黑名单过滤
  • 实战:Spark Streaming整合Spark SQL实战

8-2 -实战之updateStateByKey算子的使用

带状态的算子:UpdateStateByKey

The updateStateByKey operation allows you to maintain arbitrary state while continuously updating it with new information. To use this, you will have to do two steps.

  1. Define the state – The state can be an arbitrary data type.
  2. Define the state update function – Specify with a function how to update the state using the previous state and the new values from an input stream.

updateStateByKey算子
需求:统计到目前为止累计出现的单词的个数(需要保持住以前的状态)

package com.imooc.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 使用Spark Streaming完成有状态统计
* 累计求和功能
*/
object StatefulWordCount {
def main(args: Array[String]): Unit = {
val sparkCOnf= new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 如果使用了stateful的算子,必须设置checkpoint
// 在生产环境中,建议大家把checkpoint设置到HDFS的某个文件夹中
ssc.checkpoint(".")
val lines = ssc.socketTextStream("hadoop000", 6789)
val result = lines.flatMap(_.split(" ").map((_, 1)))
val state = result.updateStateByKey[Int](updateFunction _)
state.print()
ssc.start()
ssc.awaitTermination()
}
/**
* 把当前的数据去更新已有的或者是老的数据
*
* @param currentValues
* @param preValues
* @return
*/
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
val current = currentValues.sum
val pre = preValues.getOrElse(0)
Some(current + pre)
}
}

开启6789端口

$ nc -lk 6789

错误

java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().

8-3 -实战之将统计结果写入到MySQL数据库中

实战:计算到目前为止累计出现的单词个数写入到MySQL

  • 使用Spark Streaming进行统计分析
  • Spark Streaming统计结果写入到MySQL

需求:将统计结果写入到MySQL

create table wordcount(
word varchar(50) default null,
wordcount int(10) default null
);

安装mysql8.0.11,修改密码,设置时区

java.sql.SQLException: The server time zone value 'Öйú±ê׼ʱ¼ä' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.

set global time_zOne='+8:00';

通过该sql将统计结果写入到MySQL
insert into wordcount(word, wordcount) values(‘” + record._1 + “‘,” + record._2 + “)”

存在的问题:

  1. 对于已有的数据做更新,而是所有的数据均为insert
    改进思路:
    a) 在插入数据前先判断单词是否存在,如果存在就update,不存在则insert
    b) 工作中:HBase/Redis

  2. 每个rdd的partition创建connection,建议大家改成连接池

8-4 -实战之窗口函数的使用

  • 基于window的统计
  • http://spark.apache.org/docs/latest/streaming-programming-guide.html#window-operations

《第8章 Spark Streaming进阶与案例实战》 基于window的统计

window:定时的进行一个时间段内的数据处理

  • window length : 窗口的长度
  • sliding interval: 窗口的间隔

这2个参数和我们的batch size有关系:倍数

每隔多久计算某个范围内的数据:每隔10秒计算前10分钟的wc

==> 每隔sliding interval统计前window length的值

《第8章 Spark Streaming进阶与案例实战》 Spark Streaming滑动窗口

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

8-5 -实战之黑名单过滤

实战:黑名单过滤

  • transform算子的使用
  • Spark Streaming 整合RDD进行操作

需求: 黑名单过滤

访问日志 ==> DStream
20180808,zs
20180808,ls
20180808,ww
==> (zs: 20180808,zs)(ls: 20180808,ls)(ww: 20180808,ww)

黑名单列表 ==> RDD
zs
ls
==>(zs: true)(ls: true)

==> 20180808,ww

leftjoin
(zs: [<20180808,zs>, ]) x
(ls: [<20180808,ls>, ]) x
(ww: [<20180808,ww>, ]) ==> tuple 1

package com.imooc.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* transform:黑名单过滤
*
*/
object TransformApp {
def main(args: Array[String]): Unit = {
val sparkCOnf= new SparkConf().setMaster("local[2]").setAppName("TransformApp")
val ssc = new StreamingContext(sparkConf, Seconds(5))
/**
* 构建黑名单
*/
val blacks = List("zs", "ls")
val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true))
val lines = ssc.socketTextStream("192.168.95.131", 6789)
val clicklog = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
rdd.leftOuterJoin(blacksRDD)
.filter(x => x._2._2.getOrElse(false) != true)
.map(x => x._2._1)
//getOrElse(false) != true 留下等于false的,等于true的过滤
})
clicklog.print()
ssc.start()
ssc.awaitTermination()
}
}

8-6 -实战之Spark Streaming整合Spark SQL操作

package com.imooc.spark
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
/**
* Spark Streaming 完成Spark SQL完成词频统计操作
*/
object SqlNetworkWordCount {
def main(args: Array[String]): Unit = {
val sparkCOnf= new SparkConf().setMaster("local[2]").setAppName("TransformApp")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("192.168.95.131", 6789)
val words = lines.flatMap(_.split(" "))
// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD { (rdd: RDD[String], time: Time) =>
// Get the singleton instance of SparkSession
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on table using SQL and print it
val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()
}
ssc.start()
ssc.awaitTermination()
}
/** Case class for converting RDD to DataFrame */
case class Record(word: String)
/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
}

Boy-201806015


推荐阅读
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • OBS Studio自动化实践:利用脚本批量生成录制场景
    本文探讨了如何利用OBS Studio进行高效录屏,并通过脚本实现场景的自动生成。适合对自动化办公感兴趣的读者。 ... [详细]
  • 本文探讨了如何通过Service Locator模式来简化和优化在B/S架构中的服务命名访问,特别是对于需要频繁访问的服务,如JNDI和XMLNS。该模式通过缓存机制减少了重复查找的成本,并提供了对多种服务的统一访问接口。 ... [详细]
  • Windows操作系统提供了Encrypting File System (EFS)作为内置的数据加密工具,特别适用于对NTFS分区上的文件和文件夹进行加密处理。本文将详细介绍如何使用EFS加密文件夹,以及加密过程中的注意事项。 ... [详细]
  • importjava.io.*;importjava.util.*;publicclass五子棋游戏{staticintm1;staticintn1;staticfinalintS ... [详细]
  • linux网络子系统分析(二)—— 协议栈分层框架的建立
    目录一、综述二、INET的初始化2.1INET接口注册2.2抽象实体的建立2.3代码细节分析2.3.1socket参数三、其他协议3.1PF_PACKET3.2P ... [详细]
  • 本文介绍了如何在 Node.js 中使用流(Stream)进行数据读取与写入,包括创建可读流与可写流的基本方法,并提供了具体的代码示例。 ... [详细]
  • 在Effective Java第三版中,建议在方法返回类型中优先考虑使用Collection而非Stream,以提高代码的灵活性和兼容性。 ... [详细]
  • 在Java开发中,保护代码安全是一个重要的课题。由于Java字节码容易被反编译,因此使用代码混淆工具如ProGuard变得尤为重要。本文将详细介绍如何使用ProGuard进行代码混淆,以及其基本原理和常见问题。 ... [详细]
  • OpenCV中的霍夫圆检测技术解析
    本文详细介绍了如何使用OpenCV库中的HoughCircles函数实现霍夫圆检测,并提供了具体的代码示例及参数解释。 ... [详细]
  • 本文档详细介绍了软通动力Java开发工程师职位的笔试题目,涵盖了Java基础、集合框架、JDBC、JSP等内容,并提供了详细的答案解析。 ... [详细]
  • 本文档介绍了如何使用ESP32开发板在STA模式下实现与TCP服务器的通信,包括环境搭建、代码解析及实验步骤。 ... [详细]
  • C# 中创建和执行存储过程的方法
    本文详细介绍了如何使用 C# 创建和调用 SQL Server 存储过程,包括连接数据库、定义命令类型、设置参数等步骤。 ... [详细]
  • 本文介绍了如何在不同操作系统上安装Git,以及一些基本和高级的Git操作,包括项目初始化、文件状态检查、版本控制、分支管理、标签处理、版本回退等,并简要提及了开源许可协议的选择。 ... [详细]
  • Python 日志记录模块详解
    日志记录机制是软件开发中不可或缺的一部分,它帮助开发者追踪和调试程序运行时的各种异常。Python 提供了内置的 logging 模块,使我们在代码中记录和管理日志信息变得更加方便。本文将详细介绍如何使用 Python 的 logging 模块。 ... [详细]
author-avatar
鲁有军_644
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有