热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SparkSparkCore如何序列化自定义RDD操作

在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Exec

在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。

当我们自定义一个RDD操作类的时候,通常需要在两个地方考虑序列化,分别是:传递一个方法、传递一个属性。

如何传递一个方法?

1、我们定义一个RDD工具类:

class Search() {val query: String = "h"// 过滤出包含字符串的数据def isMatch(s: String): Boolean = {s.contains(query)}// 过滤出包含字符串的RDDdef getMatch1(rdd: RDD[String]): RDD[String] = {rdd.filter(isMatch)}// 这个方法是上面两个方法的结合体// 过滤出包含字符串的RDDdef getMatch2(rdd: RDD[String]): RDD[String] = {rdd.filter(x => x.contains(query))}}

2、创建Spark主程序:

object SeriDemo01 {def main(args: Array[String]): Unit = {// 初始化配置信息及SparkContextval sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("seri")val sc = new SparkContext(sparkConf)// 创建一个rddval rdd1: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "hbase"))// 创建Search对象val search = new Search()// 运用第一个过滤函数并打印结果val rdd2: RDD[String] = search.getMatch2(rdd1)rdd2.collect.foreach(println)}
}

3、运行程序

运行报错,错误信息如下:

Exception in thread "main" org.apache.spark.SparkException: Task not serializableat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.RDD.filter(RDD.scala:386)at com.spark.seri.Search.getMatch2(Search.scala:21)at com.spark.seri.SeriDemo01$.main(SeriDemo01.scala:17)at com.spark.seri.SeriDemo01.main(SeriDemo01.scala)
Caused by: java.io.NotSerializableException: com.spark.seri.Search

4、问题说明

//过滤出包含字符串的RDD
def getMatch1 (rdd: RDD[String]): RDD[String] = {rdd.filter(isMatch)
}

在这个方法中所调用的方法isMatch()是定义在Search这个类中的,实际上调用的是this. isMatch(),this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。

5、解决方案

使类继承scala.Serializable即可。

如何传递一个属性

1、如果我们将Spark主程序中的Search对象改为调用getMatche2方法

object SeriDemo01 {def main(args: Array[String]): Unit = {// 初始化配置信息及SparkContextval sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("seri")val sc = new SparkContext(sparkConf)// 创建一个rddval rdd1: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "hbase"))// 创建Search对象val search = new Search()// 运用第一个过滤函数并打印结果val rdd2: RDD[String] = search.getMatch2(rdd1)rdd2.collect.foreach(println)}
}

2、运行结果

Exception in thread "main" org.apache.spark.SparkException: Task not serializableat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.RDD.filter(RDD.scala:386)at com.spark.seri.Search.getMatch2(Search.scala:21)at com.spark.seri.SeriDemo01$.main(SeriDemo01.scala:17)at com.spark.seri.SeriDemo01.main(SeriDemo01.scala)
Caused by: java.io.NotSerializableException: com.spark.seri.Search

3、问题说明

//过滤出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {rdd.filter(x => x.contains(query))
}

在这个方法中所调用的方法query是定义在Search这个类中的字段,实际上调用的是this. query,this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。

4、解决方案:

1)使类继承scala.Serializable即可。

class Search() extends scala.Serializable {...}

2)将类变量query赋值给局部变量

修改getMatche2为

//过滤出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {val query_ : String = this.query //将类变量赋值给局部变量rdd.filter(x => x.contains(query_))
}

 


推荐阅读
  • com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ... [详细]
  • 我有一个从C项目编译的.o文件,该文件引用了名为init_static_pool ... [详细]
  • PBO(PixelBufferObject),将像素数据存储在显存中。优点:1、快速的像素数据传递,它采用了一种叫DMA(DirectM ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 如何使用 `org.apache.tomcat.websocket.server.WsServerContainer.findMapping()` 方法及其代码示例解析 ... [详细]
  • 本文将继续探讨 JavaScript 函数式编程的高级技巧及其实际应用。通过一个具体的寻路算法示例,我们将深入分析如何利用函数式编程的思想解决复杂问题。示例中,节点之间的连线代表路径,连线上的数字表示两点间的距离。我们将详细讲解如何通过递归和高阶函数等技术实现高效的寻路算法。 ... [详细]
  • 使用HTML和JavaScript实现视频截图功能
    本文介绍了如何利用HTML和JavaScript实现从远程MP4、本地摄像头及本地上传的MP4文件中截取视频帧,并展示了具体的实现步骤和示例代码。 ... [详细]
  • python模块之正则
    re模块可以读懂你写的正则表达式根据你写的表达式去执行任务用re去操作正则正则表达式使用一些规则来检测一些字符串是否符合个人要求,从一段字符串中找到符合要求的内容。在 ... [详细]
  • 使用方法:将要控制的角色拖到TargetBody,将相机的焦点拖到CamerPivot,,建议CameraPivot是一个放在TargetBody下的子物体,并且位置应该是在Tar ... [详细]
  • 目录预备知识导包构建数据集神经网络结构训练测试精度可视化计算模型精度损失可视化输出网络结构信息训练神经网络定义参数载入数据载入神经网络结构、损失及优化训练及测试损失、精度可视化qu ... [详细]
  • 包含phppdoerrorcode的词条 ... [详细]
  • 本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。 ... [详细]
  • Android 自定义 RecycleView 左滑上下分层示例代码
    为了满足项目需求,需要在多个场景中实现左滑删除功能,并且后续可能在列表项中增加其他功能。虽然网络上有很多左滑删除的示例,但大多数封装不够完善。因此,我们尝试自己封装一个更加灵活和通用的解决方案。 ... [详细]
  • 如何将TS文件转换为M3U8直播流:HLS与M3U8格式详解
    在视频传输领域,MP4虽然常见,但在直播场景中直接使用MP4格式存在诸多问题。例如,MP4文件的头部信息(如ftyp、moov)较大,导致初始加载时间较长,影响用户体验。相比之下,HLS(HTTP Live Streaming)协议及其M3U8格式更具优势。HLS通过将视频切分成多个小片段,并生成一个M3U8播放列表文件,实现低延迟和高稳定性。本文详细介绍了如何将TS文件转换为M3U8直播流,包括技术原理和具体操作步骤,帮助读者更好地理解和应用这一技术。 ... [详细]
  • Android 构建基础流程详解
    Android 构建基础流程详解 ... [详细]
author-avatar
我的双宝宝_486
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有