热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

第8章SparkStreaming进阶与案例实战

第8章SparkStreaming进阶与案例实战本章节将讲解SparkStreaming如何处理带状态的数据,通过案例让大家知道SparkStreaming如何写数据到MySQL,
第8章 Spark Streaming进阶与案例实战

本章节将讲解Spark Streaming如何处理带状态的数据,通过案例让大家知道Spark Streaming如何写数据到MySQL,Spark Streaming如何整合Spark SQL进行操作

[TOC]

8-1 -课程目录

  • 带状态的算子:UpdateStateByKey
  • 实战:计算到目前为止累计出现的单词个数写入到Mysqlzhon
  • 基于window的统计
  • 实战:黑名单过滤
  • 实战:Spark Streaming整合Spark SQL实战

8-2 -实战之updateStateByKey算子的使用

带状态的算子:UpdateStateByKey

The updateStateByKey operation allows you to maintain arbitrary state while continuously updating it with new information. To use this, you will have to do two steps.

  1. Define the state – The state can be an arbitrary data type.
  2. Define the state update function – Specify with a function how to update the state using the previous state and the new values from an input stream.

updateStateByKey算子
需求:统计到目前为止累计出现的单词的个数(需要保持住以前的状态)

package com.imooc.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 使用Spark Streaming完成有状态统计
* 累计求和功能
*/
object StatefulWordCount {
def main(args: Array[String]): Unit = {
val sparkCOnf= new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 如果使用了stateful的算子,必须设置checkpoint
// 在生产环境中,建议大家把checkpoint设置到HDFS的某个文件夹中
ssc.checkpoint(".")
val lines = ssc.socketTextStream("hadoop000", 6789)
val result = lines.flatMap(_.split(" ").map((_, 1)))
val state = result.updateStateByKey[Int](updateFunction _)
state.print()
ssc.start()
ssc.awaitTermination()
}
/**
* 把当前的数据去更新已有的或者是老的数据
*
* @param currentValues
* @param preValues
* @return
*/
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
val current = currentValues.sum
val pre = preValues.getOrElse(0)
Some(current + pre)
}
}

开启6789端口

$ nc -lk 6789

错误

java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().

8-3 -实战之将统计结果写入到MySQL数据库中

实战:计算到目前为止累计出现的单词个数写入到MySQL

  • 使用Spark Streaming进行统计分析
  • Spark Streaming统计结果写入到MySQL

需求:将统计结果写入到MySQL

create table wordcount(
word varchar(50) default null,
wordcount int(10) default null
);

安装mysql8.0.11,修改密码,设置时区

java.sql.SQLException: The server time zone value 'Öйú±ê׼ʱ¼ä' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.

set global time_zOne='+8:00';

通过该sql将统计结果写入到MySQL
insert into wordcount(word, wordcount) values(‘” + record._1 + “‘,” + record._2 + “)”

存在的问题:

  1. 对于已有的数据做更新,而是所有的数据均为insert
    改进思路:
    a) 在插入数据前先判断单词是否存在,如果存在就update,不存在则insert
    b) 工作中:HBase/Redis

  2. 每个rdd的partition创建connection,建议大家改成连接池

8-4 -实战之窗口函数的使用

  • 基于window的统计
  • http://spark.apache.org/docs/latest/streaming-programming-guide.html#window-operations

《第8章 Spark Streaming进阶与案例实战》 基于window的统计

window:定时的进行一个时间段内的数据处理

  • window length : 窗口的长度
  • sliding interval: 窗口的间隔

这2个参数和我们的batch size有关系:倍数

每隔多久计算某个范围内的数据:每隔10秒计算前10分钟的wc

==> 每隔sliding interval统计前window length的值

《第8章 Spark Streaming进阶与案例实战》 Spark Streaming滑动窗口

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

8-5 -实战之黑名单过滤

实战:黑名单过滤

  • transform算子的使用
  • Spark Streaming 整合RDD进行操作

需求: 黑名单过滤

访问日志 ==> DStream
20180808,zs
20180808,ls
20180808,ww
==> (zs: 20180808,zs)(ls: 20180808,ls)(ww: 20180808,ww)

黑名单列表 ==> RDD
zs
ls
==>(zs: true)(ls: true)

==> 20180808,ww

leftjoin
(zs: [<20180808,zs>, ]) x
(ls: [<20180808,ls>, ]) x
(ww: [<20180808,ww>, ]) ==> tuple 1

package com.imooc.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* transform:黑名单过滤
*
*/
object TransformApp {
def main(args: Array[String]): Unit = {
val sparkCOnf= new SparkConf().setMaster("local[2]").setAppName("TransformApp")
val ssc = new StreamingContext(sparkConf, Seconds(5))
/**
* 构建黑名单
*/
val blacks = List("zs", "ls")
val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true))
val lines = ssc.socketTextStream("192.168.95.131", 6789)
val clicklog = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
rdd.leftOuterJoin(blacksRDD)
.filter(x => x._2._2.getOrElse(false) != true)
.map(x => x._2._1)
//getOrElse(false) != true 留下等于false的,等于true的过滤
})
clicklog.print()
ssc.start()
ssc.awaitTermination()
}
}

8-6 -实战之Spark Streaming整合Spark SQL操作

package com.imooc.spark
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
/**
* Spark Streaming 完成Spark SQL完成词频统计操作
*/
object SqlNetworkWordCount {
def main(args: Array[String]): Unit = {
val sparkCOnf= new SparkConf().setMaster("local[2]").setAppName("TransformApp")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("192.168.95.131", 6789)
val words = lines.flatMap(_.split(" "))
// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD { (rdd: RDD[String], time: Time) =>
// Get the singleton instance of SparkSession
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on table using SQL and print it
val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()
}
ssc.start()
ssc.awaitTermination()
}
/** Case class for converting RDD to DataFrame */
case class Record(word: String)
/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
}

Boy-201806015


推荐阅读
  • 探索聚类分析中的K-Means与DBSCAN算法及其应用
    聚类分析是一种用于解决样本或特征分类问题的统计分析方法,也是数据挖掘领域的重要算法之一。本文主要探讨了K-Means和DBSCAN两种聚类算法的原理及其应用场景。K-Means算法通过迭代优化簇中心来实现数据点的划分,适用于球形分布的数据集;而DBSCAN算法则基于密度进行聚类,能够有效识别任意形状的簇,并且对噪声数据具有较好的鲁棒性。通过对这两种算法的对比分析,本文旨在为实际应用中选择合适的聚类方法提供参考。 ... [详细]
  • 本文介绍了如何利用Apache POI库高效读取Excel文件中的数据。通过实际测试,除了分数被转换为小数存储外,其他数据均能正确读取。若在使用过程中发现任何问题,请及时留言反馈,以便我们进行更新和改进。 ... [详细]
  • 每日精选Codeforces训练题:1119E(贪心算法)、821C(栈模拟)和645D(拓扑排序)
    题目涉及三种不同类型的算法问题:1119E(贪心算法)、821C(栈模拟)和645D(拓扑排序)。其中,1119E的问题背景是有n种不同长度的棍子,长度分别为2^0, 2^1, …, 2^(n-1),每种棍子的数量为a[i]。任务是计算可以组成的三角形数量。根据三角形的性质,任意两边之和必须大于第三边。该问题可以通过贪心算法高效解决,通过合理选择棍子组合来最大化三角形的数量。 ... [详细]
  • 在Java编程中,若需实现两个整数(例如2和3)相除并保留两位小数的结果,可以通过精确计算方法来达到预期效果。具体而言,可以利用BigDecimal类进行高精度运算,确保2除以3的结果准确显示为0.66。此外,还可以通过格式化输出来控制小数位数,确保最终结果符合要求。 ... [详细]
  • BZOJ4240 Gym 102082G:贪心算法与树状数组的综合应用
    BZOJ4240 Gym 102082G 题目 "有趣的家庭菜园" 结合了贪心算法和树状数组的应用,旨在解决在有限时间和内存限制下高效处理复杂数据结构的问题。通过巧妙地运用贪心策略和树状数组,该题目能够在 10 秒的时间限制和 256MB 的内存限制内,有效处理大量输入数据,实现高性能的解决方案。提交次数为 756 次,成功解决次数为 349 次,体现了该题目的挑战性和实际应用价值。 ... [详细]
  • 本文介绍了UUID(通用唯一标识符)的概念及其在JavaScript中生成Java兼容UUID的代码实现与优化技巧。UUID是一个128位的唯一标识符,广泛应用于分布式系统中以确保唯一性。文章详细探讨了如何利用JavaScript生成符合Java标准的UUID,并提供了多种优化方法,以提高生成效率和兼容性。 ... [详细]
  • 探索偶数次幂二项式系数的求和方法及其数学意义 ... [详细]
  • 在MFC框架中,存在多个全局函数,用于在不同对象间获取信息或创建新对象。其中,`afxGetApp`函数尤为关键,它能够帮助开发者轻松获取当前应用程序的实例指针。本文将详细解析`afxGetApp`函数的内部机制及其在MFC应用程序中的具体应用场景,探讨其在提升代码可维护性和灵活性方面的优势。此外,还将介绍其他常用全局函数如`AfxWinInit()`和`AfxBeginThread()`的功能和使用方法,为开发者提供全面的参考。 ... [详细]
  • HDU1176:免费馅饼问题的动态规划解法分析
    题目“免费馅饼”通过动态规划方法进行了解析。该问题的时间限制为 Java 2000ms 和其他语言 1000ms,内存限制为 Java 65536K 和其他语言 32768K。本文详细探讨了如何利用动态规划算法高效求解此问题,并对算法的时间复杂度和空间复杂度进行了深入分析。此外,还提供了具体的实现步骤和代码示例,帮助读者更好地理解和应用这一方法。 ... [详细]
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • 在处理大图片时,PHP 常常会遇到内存溢出的问题。为了避免这种情况,建议避免使用 `setImageBitmap`、`setImageResource` 或 `BitmapFactory.decodeResource` 等方法直接加载大图。这些函数在处理大图片时会消耗大量内存,导致应用崩溃。推荐采用分块处理、图像压缩和缓存机制等策略,以优化内存使用并提高处理效率。此外,可以考虑使用第三方库如 ImageMagick 或 GD 库来处理大图片,这些库提供了更高效的内存管理和图像处理功能。 ... [详细]
  • 在HDU 1166敌军布阵问题中,通过运用线段树数据结构,可以高效地计算指定区间的敌军数量。该算法不仅能够在限定的时间和内存条件下快速求解,还能够灵活应对动态变化的战场局势,为实时决策提供支持。 ... [详细]
  • 从2019年AI顶级会议最佳论文,探索深度学习的理论根基与前沿进展 ... [详细]
  • 在尝试使用指定的数据库进行登录时,遇到了权限问题。具体表现为用户 'lzldb'@'%' 无权访问数据库 'lzldb',错误代码为 1044 (42000)。命令行参数 `-D` 指定了数据库名称,但未能成功连接。建议检查用户的权限配置,确保其具有对目标数据库的访问权限。此外,使用密码时应谨慎处理,避免在命令行中明文显示。 ... [详细]
  • 在多模块项目中,项目A作为一个独立的工具包,不依赖于任何第三方库。其目录结构如下:`--src--main--java--resources`。当将项目A打包成JAR文件后,发现无法正确访问`resources`目录下的文件资源。这一问题可能源于JAR文件的构建配置或类路径设置不当,需要仔细检查Maven或Gradle的构建脚本,确保资源文件被正确包含并加载。 ... [详细]
author-avatar
鲁有军_644
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有