val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark") val sparkContext =new SparkContext(sparkConf) val rdd1 = sparkContext.parallelize(List(1, 2,3,4) ) val rdd2 = sparkContext.makeRDD(List(1, 2,3,4) ) rdd1.collect().foreach(println)
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark") val sparkContext =new SparkContext(sparkConf) val fileRDD: RDD[String]= sparkContext.textFile("input") fileRDD.collect().foreach(println) sparkContext.stop()
三.RDD主要算子介绍
RDD分为转换算子和行动算子
3.1.转换算子
3.1.1 map
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
val rdd = spark.makeRDD(List(1,2,3,4)) val mapRdd = rdd.map(num =>{ num *2 }) mapRdd.collect().foreach(println)
3.1.2 flatMap
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
val rdd = spark.makeRDD(List(List(1,2),List(3,4))) val flatMapRdd = rdd.flatMap(list =>{ list }) flatMapRdd.collect().foreach(println)
object SerializerDemo { def main(args: Array[String]):Unit={ val conf =new SparkConf().setMaster("local[*]").setAppName("SerializerDemo") val sc =new SparkContext(conf) val rdd = sc.makeRDD(List(1,2,3,4)) val user =new User() rdd.foreach(num =>{ println("age="+ user.age + num) }) sc.stop() } class User { var age:Int=30 } }
object SerializerDemo { def main(args: Array[String]):Unit={ val conf =new SparkConf().setMaster("local[*]").setAppName("SerializerDemo") val sc =new SparkContext(conf) val rdd = sc.makeRDD(List()) val user =new User() // list 为空foreach应该不会执行,但仍然报错 rdd.foreach(num =>{ println("age="+ user.age + num) }) sc.stop() } class User{ var age:Int=30 } }
报错:Task not serializable 报错位置:Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 还未到任务执行阶段,在ClosureCleaner$.ensureSerializable处检测序列化时报错