热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

再谈RDD、DataFrame、DataSet关系以及相互转换(JAVAAPI)

Spark提供了三种主要的与数据相关的API:RDDDataFrameDataSet三者图示下面详细介绍下各自的特点:RDD主要描述:RDD是Spark提供的最主要的一个抽象概念(

Spark提供了三种主要的与数据相关的API:

  • RDD
  • DataFrame
  • DataSet

《再谈RDD、DataFrame、DataSet关系以及相互转换(JAVA API)》 三者图示

下面详细介绍下各自的特点:

RDD

主要描述:RDD是Spark提供的最主要的一个抽象概念(Resilient Distributed Dataset),它是一个element的collection,分区化的位于集群的节点中,支持并行处理。

  • RDD的特性

    • 分布式:
      RDD使用MapReduce算子来广泛的适应在集群中并行分布式的大数据集的处理和产生。并且方便用户使用高级别的算子在并行计算中。
    • 不可变:
      RDD是由一个records的collection组成,而且是分区的。分区是RDD并行化的基础单元,而且每个分区就是对数据的逻辑分割,它是不可变的,它是通过已经存在的分区的某些transformations创建得到。这种不可变性方便在计算中做到数据一致性。
    • 错误容忍:
      在实际中如果我们丢失了RDD的部分分区,可以通过对丢失分区关联性的transformation重新计算得到。而不是在众多节点中做数据的复制等操作。这个特性是RDD的最大优点,它节省了大量的数据管理、复制等操作,使得计算速度更快。
    • 惰性执行:
      所有的transformation都是惰性的,他们并不是立刻计算出结果,而是只是记住了各个transformation对数据集的依赖关系。当driver程序需要一个action结果时才开始执行。
    • 功能支持:
      RDD支持两种类型的算子:transformation是指从已经存在的数据集中计算得到新的数据集;action是指通过对通过对数据集的计算得到一个结果返回给driver。
    • 数据格式:
      轻松且有效支持各种数据,包括结构化的和非结构化的。
    • 编程语言:
      RDD的API支持Scala、Java、Python和R
  • RDD的限制

    • 没有内置的优化引擎
      当对结构化的数据进行处理时,RDD没有使用Spark的高级优化器,比如catalyst优化器和Tungsten执行引擎。
    • 处理结构化的数据
      不像Dataframe或者Dataset,RDD不会主动推测出数据的schema,而是需要用户在代码里指示。

DataFrame

Spark从1.3版本开始引入Dataframe,它克服了RDD的最主要的挑战。

主要描述:Dataframe是一个分布式的数据collection,而且将数据按照列名进行组织。在概念上它与关系型的数据库的表或者R/Python语言中的DataFrame类似。与之一起提供的还有,Spark引入了catalyst优化器,它可以优化查询。

  • DataFrame的特性

    • 分布式的Row对象的Collection:
      分布式、列名组织的数据、后台优化。
      具体到代码里面,Dataframe就是Dataset
    • 数据处理:
      处理支持结构或者非结构化的格式(比如Avro, CSV, elastic search, 以及Cassandra)以及不同的文件系统(HDFS, HIVE tables, MySQL, etc)。它支持非常多的数据源
    • 使用catalyst优化器优化:
      它对SQL查询以及DataFrame API都提供优化支持。Dataframe使用catalyst树transformation框架有四个步骤:
      1、Analyzing a logical plan to resolve references
      2、Logical plan optimization
      3、Physical planning
      4、Code generation to compile parts of the query to Java bytecode.
    • Hive兼容性:
      使用Spark的SQL可以无修改的支持Hive查询在已经存在的Hive warehouses。它重用了Hive的前端、MetaStore并且对已经存在的Hive数据、查询和UDF提供完整的兼容性。
    • Tungsten:
      Tungsten提供了一个物理执行后端,管理内存动态产生expression evaluation的字节码
    • 编程语言:
      Dataframe API支持Scala、Java、Python和R
  • DataFrame的限制

    • 没有编译阶段的类型检查:
      不能在编译时刻对安全性做出检查,而且限制了用户对于未知结构的数据进行操作。比如下面代码在编译时没有错误,但是在执行时会出现异常:

    case class Person(name : String , age : Int)
    val dataframe = sqlContect.read.json("people.json")
    dataframe.filter("salary > 10000").show
    => throws Exception : cannot resolve 'salary' given input age , name

    • 不能保留类对象的结构:
      一旦把一个类结构的对象转成了Dataframe,就不能转回去了。下面这个栗子就是指出了:

    case class Person(name : String , age : Int)
    val persOnRDD= sc.makeRDD(Seq(Person("A",10),Person("B",20)))
    val persOnDF= sqlContect.createDataframe(personRDD)
    personDF.rdd // returns RDD[Row] , does not returns RDD[Person]

DataSet

主要描述:Dataset API是对DataFrame的一个扩展,使得可以支持类型安全的检查,并且对类结构的对象支持程序接口。它是强类型的,不可变collection,并映射成一个相关的schema。
Dataset API的核心是一个被称为Encoder的概念。它是负责对JVM的对象以及表格化的表达(tabular representation)之间的相互转化。
表格化的表达在存储时使用了Spark内置的Tungsten二进制形式,允许对序列化数据操作并改进了内存使用。在Spark 1.6版本之后,支持自动化生成Encoder,可以对广泛的primitive类型(比如String,Integer,Long等)、Scala的case class以及Java Bean自动生成对应的Encoder。

  • DataSet的特性

    • 支持RDD和Dataframe的优点:
      包括RDD的类型安全检查,Dataframe的关系型模型,查询优化,Tungsten执行,排序和shuffling。
    • Encoder:
      通过使用Encoder,用户可以轻松转换JVM对象到一个Dataset,允许用户在结构化和非结构化的数据操作。
    • 编程语言:
      Scala和Java
    • 类型安全检查:
      提供编译阶段的安全类型检查。比如下面这个栗子:

    case class Person(name : String , age : Int)
    val persOnRDD= sc.makeRDD(Seq(Person("A",10),Person("B",20)))
    val persOnDF= sqlContect.createDataframe(personRDD)
    val ds:Dataset[Person] = personDF.as[Person]
    ds.filter(p => p.age > 25)
    ds.filter(p => p.salary > 25)
    // error : value salary is not a member of person
    ds.rdd // returns RDD[Person]

    • 相互转换:
      Dataset可以让用户轻松从RDD和Dataframe转换到Dataset不需要额外太多代码。
  • DataSet的限制

    • 需要把类型转成String:
      Querying the data from datasets currently requires us to specify the fields in the class as a string. Once we have queried the data, we are forced to cast column to the required data type. On the other hand, if we use map operation on Datasets, it will not use Catalyst optimizer.
      比如:

    ds.select(col("name").as[String], $"age".as[Int]).collect()

Java API中三种数据格式的相互转换

首先构造一个数据集,是由Person类的结构组成的,然后在此之上看这三个API实例的构造以及相互转换

  • 数据创建

《再谈RDD、DataFrame、DataSet关系以及相互转换(JAVA API)》 Person类的定义
《再谈RDD、DataFrame、DataSet关系以及相互转换(JAVA API)》 数据创建

  • 直接构建出 JavaRDD

    JavaRDD persOnJavaRDD= jsc.parallelize(personList);
    System.out.println("1. 直接构建出 JavaRDD");
    personJavaRDD.foreach(element -> System.out.println(element.toString()));

    Print结果:

    直接构建出 JavaRDD
    Person: name = Andy, age = 32
    Person: name = Michael, age = 23
    Person: name = Justin, age = 19

  • 直接构建出 Dataset

    Encoder persOnEncoder= Encoders.bean(Person.class);
    Dataset persOnDS= spark.createDataset(personList, personEncoder);
    System.out.println("2. 直接构建出 Dataset");
    personDS.show();
    personDS.printSchema();

    Print结果:

    1. 直接构建出 Dataset
      +—+——-+
      |age| name|
      +—+——-+
      | 32| Andy|
      | 23|Michael|
      | 19| Justin|
      +—+——-+
      root
      |– age: integer (nullable = false)
      |– name: string (nullable = true)
  • 直接构建出 Dataset

    Dataset persOnDF= spark.createDataFrame(personList, Person.class);
    System.out.println("3. 直接构建出 Dataset");
    personDF.show();
    personDF.printSchema();

    Print结果:

    1. 直接构建出 Dataset
      +—+——-+
      |age| name|
      +—+——-+
      | 32| Andy|
      | 23|Michael|
      | 19| Justin|
      +—+——-+
      root
      |– age: integer (nullable = false)
      |– name: string (nullable = true)
  • JavaRDD -> Dataset

    persOnDS= spark.createDataset(personJavaRDD.rdd(), personEncoder);
    System.out.println("1->2 JavaRDD -> Dataset");
    personDS.show();
    personDS.printSchema();

    Print结果:

    1->2 JavaRDD -> Dataset
    +—+——-+
    |age| name|
    +—+——-+
    | 32| Andy|
    | 23|Michael|
    | 19| Justin|
    +—+——-+
    root
    |– age: integer (nullable = true)
    |– name: string (nullable = true)

  • JavaRDD -> Dataset

    persOnDF= spark.createDataFrame(personJavaRDD, Person.class);
    System.out.println("1->3 JavaRDD -> Dataset");
    personDF.show();
    personDF.printSchema();

    Print结果:

    1->3 JavaRDD -> Dataset
    +—+——-+
    |age| name|
    +—+——-+
    | 32| Andy|
    | 23|Michael|
    | 19| Justin|
    +—+——-+
    root
    |– age: integer (nullable = false)
    |– name: string (nullable = true)

  • 补充从JavaRDD到Dataset

    JavaRDD persOnRowRdd= personJavaRDD.map(person -> RowFactory.create(person.age, person.name));
    List fieldList = new ArrayList<>();
    fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
    fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, false));
    StructType rowAgeNameSchema = DataTypes.createStructType(fieldList);
    persOnDF= spark.createDataFrame(personRowRdd, rowAgeNameSchema);
    System.out.println("\n\n\n补充,由JavaRDD -> Dataset");
    personDF.show();
    personDF.printSchema();

    主要就是使用RowFactory把Row中的每一项写好后,通过spark的createDataFrame来创建。其中对于Row的解读包含在了自建的StructType中。

  • Dataset -> JavaRDD

    persOnJavaRDD= personDS.toJavaRDD();
    System.out.println("2->1 Dataset -> JavaRDD");
    personJavaRDD.foreach(element -> System.out.println(element.toString()));

    Print结果:

    2->1 Dataset -> JavaRDD
    Person: name = Justin, age = 19
    Person: name = Andy, age = 32
    Person: name = Michael, age = 23

  • Dataset -> JavaRDD

    persOnJavaRDD= personDF.toJavaRDD().map(row -> {
    String name = row.getAs("name");
    int age = row.getAs("age");
    return new Person(name, age);
    });
    System.out.println("3->1 Dataset -> JavaRDD");
    personJavaRDD.foreach(element -> System.out.println(element.toString()));

    Print结果:

    3->1 Dataset -> JavaRDD
    Person: name = Justin, age = 19
    Person: name = Michael, age = 23
    Person: name = Andy, age = 32

  • Dataset -> Dataset

    List fieldList = new ArrayList<>();
    fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, false));
    fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
    StructType rowSchema = DataTypes.createStructType(fieldList);
    ExpressionEncoder rowEncoder = RowEncoder.apply(rowSchema);
    Dataset personDF_fromDS = personDS.map(
    (MapFunction) person -> {
    List objectList = new ArrayList<>();
    objectList.add(person.name);
    objectList.add(person.age);
    return RowFactory.create(objectList.toArray());
    },
    rowEncoder
    );
    System.out.println("2->3 Dataset -> Dataset");
    personDF_fromDS.show();
    personDF_fromDS.printSchema();

    Print结果:

    2->3 Dataset -> Dataset
    +&#8212;+&#8212;&#8212;-+
    |age| name|
    +&#8212;+&#8212;&#8212;-+
    | 32| Andy|
    | 23|Michael
    | 19| Justin|
    +&#8212;+&#8212;&#8212;-+
    root
    |&#8211; age: integer (nullable = false)
    |&#8211; name: string (nullable = true)

  • Dataset -> Dataset

    persOnDS= personDF.map(new MapFunction() {
    @Override
    public Person call(Row value) throws Exception {
    return new Person(value.getAs("name"), value.getAs("age"));
    }
    }, personEncoder);
    System.out.println("3->2 Dataset -> Dataset");
    personDS.show();
    personDS.printSchema();

    Print结果:

    3->2 Dataset -> Dataset
    +&#8212;+&#8212;&#8212;-+
    |age| name|
    +&#8212;+&#8212;&#8212;-+
    | 32| Andy|
    | 23|Michael|
    | 19| Justin|
    +&#8212;+&#8212;&#8212;-+
    root
    |&#8211; age: integer (nullable = true)
    |&#8211; name: string (nullable = true)

总结:
其实RDD的Map和Dataset的Map只有一点不同,就是Dataset的Map要指定一个Encoder的参数。

《再谈RDD、DataFrame、DataSet关系以及相互转换(JAVA API)》 需要用Encoder类给出


推荐阅读
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • 《Spark核心技术与高级应用》——1.2节Spark的重要扩展
    本节书摘来自华章社区《Spark核心技术与高级应用》一书中的第1章,第1.2节Spark的重要扩展,作者于俊向海代其锋马海平,更多章节内容可以访问云栖社区“华章社区”公众号查看1. ... [详细]
  • 本文介绍了OpenStack的逻辑概念以及其构成简介,包括了软件开源项目、基础设施资源管理平台、三大核心组件等内容。同时还介绍了Horizon(UI模块)等相关信息。 ... [详细]
  • GreenDAO快速入门
    前言之前在自己做项目的时候,用到了GreenDAO数据库,其实对于数据库辅助工具库从OrmLite,到litePal再到GreenDAO,总是在不停的切换,但是没有真正去了解他们的 ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
  • 云原生应用最佳开发实践之十二原则(12factor)
    目录简介一、基准代码二、依赖三、配置四、后端配置五、构建、发布、运行六、进程七、端口绑定八、并发九、易处理十、开发与线上环境等价十一、日志十二、进程管理当 ... [详细]
  • 开发笔记:Spark Java API 之 CountVectorizer
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了SparkJavaAPI之CountVectorizer相关的知识,希望对你有一定的参考价值。 ... [详细]
  •        在搭建Hadoop环境之前,请先阅读如下博文,把搭建Hadoop环境之前的准备工作做好,博文如下:       1、CentOS6.7下安装JDK,地址:http:b ... [详细]
  • 我们在之前的文章中已经初步介绍了Cloudera。hadoop基础----hadoop实战(零)-----hadoop的平台版本选择从版本选择这篇文章中我们了解到除了hadoop官方版本外很多 ... [详细]
  • Java开发实战讲解!字节跳动三场技术面+HR面
    二、回顾整理阿里面试题基本就这样了,还有一些零星的问题想不起来了,答案也整理出来了。自我介绍JVM如何加载一个类的过程,双亲委派模型中有 ... [详细]
  • 你知道Kafka和Redis的各自优缺点吗?一文带你优化选择,不走弯路 ... [详细]
  • linux进阶50——无锁CAS
    1.概念比较并交换(compareandswap,CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作࿰ ... [详细]
  • 在IDEA中运行CAS服务器的配置方法
    本文介绍了在IDEA中运行CAS服务器的配置方法,包括下载CAS模板Overlay Template、解压并添加项目、配置tomcat、运行CAS服务器等步骤。通过本文的指导,读者可以轻松在IDEA中进行CAS服务器的运行和配置。 ... [详细]
  • 1Lock与ReadWriteLock1.1LockpublicinterfaceLock{voidlock();voidlockInterruptibl ... [详细]
  • 我知道那里有很多类似的问题,但我还没有找到任何与我的场景完全匹配的问题,所以请不要对重复标志太满意。我正在使用Spark3.0.1在AzureDatabrick ... [详细]
author-avatar
邪冫主_70139
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有