在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。
当我们自定义一个RDD操作类的时候,通常需要在两个地方考虑序列化,分别是:传递一个方法、传递一个属性。
如何传递一个方法?
1、我们定义一个RDD工具类:
class Search() {val query: String = "h"// 过滤出包含字符串的数据def isMatch(s: String): Boolean = {s.contains(query)}// 过滤出包含字符串的RDDdef getMatch1(rdd: RDD[String]): RDD[String] = {rdd.filter(isMatch)}// 这个方法是上面两个方法的结合体// 过滤出包含字符串的RDDdef getMatch2(rdd: RDD[String]): RDD[String] = {rdd.filter(x => x.contains(query))}}
2、创建Spark主程序:
object SeriDemo01 {def main(args: Array[String]): Unit = {// 初始化配置信息及SparkContextval sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("seri")val sc = new SparkContext(sparkConf)// 创建一个rddval rdd1: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "hbase"))// 创建Search对象val search = new Search()// 运用第一个过滤函数并打印结果val rdd2: RDD[String] = search.getMatch2(rdd1)rdd2.collect.foreach(println)}
}
3、运行程序
运行报错,错误信息如下:
Exception in thread "main" org.apache.spark.SparkException: Task not serializableat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.RDD.filter(RDD.scala:386)at com.spark.seri.Search.getMatch2(Search.scala:21)at com.spark.seri.SeriDemo01$.main(SeriDemo01.scala:17)at com.spark.seri.SeriDemo01.main(SeriDemo01.scala)
Caused by: java.io.NotSerializableException: com.spark.seri.Search
4、问题说明
//过滤出包含字符串的RDD
def getMatch1 (rdd: RDD[String]): RDD[String] = {rdd.filter(isMatch)
}
在这个方法中所调用的方法isMatch()是定义在Search这个类中的,实际上调用的是this. isMatch(),this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。
5、解决方案
使类继承scala.Serializable即可。
如何传递一个属性
1、如果我们将Spark主程序中的Search对象改为调用getMatche2方法
object SeriDemo01 {def main(args: Array[String]): Unit = {// 初始化配置信息及SparkContextval sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("seri")val sc = new SparkContext(sparkConf)// 创建一个rddval rdd1: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "hbase"))// 创建Search对象val search = new Search()// 运用第一个过滤函数并打印结果val rdd2: RDD[String] = search.getMatch2(rdd1)rdd2.collect.foreach(println)}
}
2、运行结果
Exception in thread "main" org.apache.spark.SparkException: Task not serializableat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.RDD.filter(RDD.scala:386)at com.spark.seri.Search.getMatch2(Search.scala:21)at com.spark.seri.SeriDemo01$.main(SeriDemo01.scala:17)at com.spark.seri.SeriDemo01.main(SeriDemo01.scala)
Caused by: java.io.NotSerializableException: com.spark.seri.Search
3、问题说明
//过滤出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {rdd.filter(x => x.contains(query))
}
在这个方法中所调用的方法query是定义在Search这个类中的字段,实际上调用的是this. query,this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。
4、解决方案:
1)使类继承scala.Serializable即可。
class Search() extends scala.Serializable {...}
2)将类变量query赋值给局部变量
修改getMatche2为
//过滤出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {val query_ : String = this.query //将类变量赋值给局部变量rdd.filter(x => x.contains(query_))
}