Spark提供了三种主要的与数据相关的API:
三者图示
下面详细介绍下各自的特点:
主要描述:RDD是Spark提供的最主要的一个抽象概念(Resilient Distributed Dataset),它是一个element的collection,分区化的位于集群的节点中,支持并行处理。
Spark从1.3版本开始引入Dataframe,它克服了RDD的最主要的挑战。
主要描述:Dataframe是一个分布式的数据collection,而且将数据按照列名进行组织。在概念上它与关系型的数据库的表或者R/Python语言中的DataFrame类似。与之一起提供的还有,Spark引入了catalyst优化器,它可以优化查询。
case class Person(name : String , age : Int)
val dataframe = sqlContect.read.json("people.json")
dataframe.filter("salary > 10000").show
=> throws Exception : cannot resolve 'salary' given input age , name
case class Person(name : String , age : Int)
val persOnRDD= sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val persOnDF= sqlContect.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
主要描述:Dataset API是对DataFrame的一个扩展,使得可以支持类型安全的检查,并且对类结构的对象支持程序接口。它是强类型的,不可变collection,并映射成一个相关的schema。
Dataset API的核心是一个被称为Encoder的概念。它是负责对JVM的对象以及表格化的表达(tabular representation)之间的相互转化。
表格化的表达在存储时使用了Spark内置的Tungsten二进制形式,允许对序列化数据操作并改进了内存使用。在Spark 1.6版本之后,支持自动化生成Encoder,可以对广泛的primitive类型(比如String,Integer,Long等)、Scala的case class以及Java Bean自动生成对应的Encoder。
case class Person(name : String , age : Int)
val persOnRDD= sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val persOnDF= sqlContect.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
// error : value salary is not a member of person
ds.rdd // returns RDD[Person]
ds.select(col("name").as[String], $"age".as[Int]).collect()
首先构造一个数据集,是由Person类的结构组成的,然后在此之上看这三个API实例的构造以及相互转换
Person类的定义
数据创建
直接构建出 JavaRDD
JavaRDD
System.out.println("1. 直接构建出 JavaRDD
personJavaRDD.foreach(element -> System.out.println(element.toString()));
Print结果:
直接构建出 JavaRDD
Person: name = Andy, age = 32
Person: name = Michael, age = 23
Person: name = Justin, age = 19
直接构建出 Dataset
Encoder
Dataset
System.out.println("2. 直接构建出 Dataset
personDS.show();
personDS.printSchema();
Print结果:
- 直接构建出 Dataset
+—+——-+
|age| name|
+—+——-+
| 32| Andy|
| 23|Michael|
| 19| Justin|
+—+——-+
root
|– age: integer (nullable = false)
|– name: string (nullable = true)
直接构建出 Dataset
Dataset
System.out.println("3. 直接构建出 Dataset
personDF.show();
personDF.printSchema();
Print结果:
- 直接构建出 Dataset
|
+—+——-+
|age| name|
+—+——-+
| 32| Andy|
| 23|Michael|
| 19| Justin|
+—+——-+
root
|– age: integer (nullable = false)
|– name: string (nullable = true)
JavaRDD
persOnDS= spark.createDataset(personJavaRDD.rdd(), personEncoder);
System.out.println("1->2 JavaRDD
personDS.show();
personDS.printSchema();
Print结果:
1->2 JavaRDD
-> Dataset
+—+——-+
|age| name|
+—+——-+
| 32| Andy|
| 23|Michael|
| 19| Justin|
+—+——-+
root
|– age: integer (nullable = true)
|– name: string (nullable = true)
JavaRDD
persOnDF= spark.createDataFrame(personJavaRDD, Person.class);
System.out.println("1->3 JavaRDD
personDF.show();
personDF.printSchema();
Print结果:
1->3 JavaRDD
-> Dataset |
+—+——-+
|age| name|
+—+——-+
| 32| Andy|
| 23|Michael|
| 19| Justin|
+—+——-+
root
|– age: integer (nullable = false)
|– name: string (nullable = true)
补充从JavaRDD
JavaRDD
List
fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, false));
StructType rowAgeNameSchema = DataTypes.createStructType(fieldList);
persOnDF= spark.createDataFrame(personRowRdd, rowAgeNameSchema);
System.out.println("\n\n\n补充,由JavaRDD
personDF.show();
personDF.printSchema();
主要就是使用RowFactory把Row中的每一项写好后,通过spark的createDataFrame来创建。其中对于Row的解读包含在了自建的StructType中。
Dataset
persOnJavaRDD= personDS.toJavaRDD();
System.out.println("2->1 Dataset
personJavaRDD.foreach(element -> System.out.println(element.toString()));
Print结果:
2->1 Dataset
-> JavaRDD
Person: name = Justin, age = 19
Person: name = Andy, age = 32
Person: name = Michael, age = 23
Dataset
persOnJavaRDD= personDF.toJavaRDD().map(row -> {
String name = row.getAs("name");
int age = row.getAs("age");
return new Person(name, age);
});
System.out.println("3->1 Dataset
personJavaRDD.foreach(element -> System.out.println(element.toString()));
Print结果:
3->1 Dataset
-> JavaRDD
Person: name = Justin, age = 19
Person: name = Michael, age = 23
Person: name = Andy, age = 32
Dataset
List
fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, false));
fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
StructType rowSchema = DataTypes.createStructType(fieldList);
ExpressionEncoder
Dataset
(MapFunction
List objectList = new ArrayList<>();
objectList.add(person.name);
objectList.add(person.age);
return RowFactory.create(objectList.toArray());
},
rowEncoder
);
System.out.println("2->3 Dataset
personDF_fromDS.show();
personDF_fromDS.printSchema();
Print结果:
2->3 Dataset
-> Dataset |
+&#8212;+&#8212;&#8212;-+
|age| name|
+&#8212;+&#8212;&#8212;-+
| 32| Andy|
| 23|Michael
| 19| Justin|
+&#8212;+&#8212;&#8212;-+
root
|&#8211; age: integer (nullable = false)
|&#8211; name: string (nullable = true)
Dataset
persOnDS= personDF.map(new MapFunction
@Override
public Person call(Row value) throws Exception {
return new Person(value.getAs("name"), value.getAs("age"));
}
}, personEncoder);
System.out.println("3->2 Dataset
personDS.show();
personDS.printSchema();
Print结果:
3->2 Dataset
-> Dataset
+&#8212;+&#8212;&#8212;-+
|age| name|
+&#8212;+&#8212;&#8212;-+
| 32| Andy|
| 23|Michael|
| 19| Justin|
+&#8212;+&#8212;&#8212;-+
root
|&#8211; age: integer (nullable = true)
|&#8211; name: string (nullable = true)
总结:
其实RDD的Map和Dataset的Map只有一点不同,就是Dataset的Map要指定一个Encoder的参数。
需要用Encoder类给出