作者:dongquchunlaizv_123 | 来源:互联网 | 2023-09-25 17:01
说明 由于某些原因,上周五未发布博客未,本文补上。 DataFrame是一种spark 1.3版本提供Spark SQL接口下的分布式数据集,继承自DataSet数据集,该概念最早由R语言和Pandas库(Python)提出。 DataFrame更像传统数据库里的表,除了数据外还包含更多的辅助信息,如列名、列值和列的属性,同时支持一些复杂的数据格式。从API应用的角度,DataFrame提供的API层次更高,比RDD编程方便,学习的门槛更低。
分享 记录 特性 DataFrame会将数据分割成列,并为每列取名,概念上等同于关系型数据库中表或R、Python中的数据框。
可以处理结构化和非结构化数据格式。例如Avro、CSV、弹性搜索和Cassandra。它还处理存储系统HDFS、HIVE表、MySQL等。
Catalyst的通用树转换框架分为四个阶段,如下所示:
分析解决引用的逻辑计划 逻辑计划优化, 物理计划 代码生成用于编译部分查询生成Java字节码。 在物理规划阶段,Catalyst可能会生成多个计划并根据成本进行比较。 所有其他阶段完全是基于规则的。 每个阶段使用不同类型的树节点; Catalyst包括用于表达式、数据类型以及逻辑和物理运算符的节点库。 这些阶段如下所示:
接口 DataFrame API支持Scala、java、python、R,它作为行数据集,在Scala API中DataFrame是DataSet[Row]的类型别名,Java中使用数据集表示。 SparkSession是所有Spark功能入口,应用程序通过它能从现存RDD或Hive Table、Spark数据源中创建DataFram。Spark SQL能对多种数据源使用DataFrame接口,使用SparkSQL DataFrame创建临时视图,然后执行Sql查询。 一般处理流程 先创建Spark基础变量,spark,sc 加载数据,rdd.textFile,spark.read.csv/json等 数据处理,mapPartition, map,filter,reduce等一系列transformation操作 数据保存,saveAstextFile,或者其他DataFrame方法 优点 DataFrame优于RDD,因为它提供了内存管理和优化执行计划:
自定义内存管理,当数据以二进制格式存储在堆外内存时,将节省大量内存,除此之外,没有垃圾回收(GC)开销。同时避免了昂贵的java序列化,因为数据以二进制格式存储,且已知内存schema。 优化执行计划:也称为查询优化器,查询执行时,创建一个优化的执行计划,并将计划运行在RDD上。 缺点 Spark SQL DataFrame API不支持编译时类型安全,如果结构未知,不能操作数据。 一旦将域对象转化为DataFrame,域对象不能重构。 代码实例 java代码创建简单DataFrame,JSON格式RDD创建DataFrame:
SparkConf conf &#61; new SparkConf ( ) ; conf. setMaster ( "local" ) . setAppName ( "jsonRDD" ) ; JavaSparkContext sc &#61; new JavaSparkContext ( conf) ; SQLContext sqlContext &#61; new SQLContext ( sc) ; JavaRDD < String > nameRDD &#61; sc. parallelize ( Arrays . asList ( "{\"name\":\"zhangsan\",\"age\":\"18\"}" , "{\"name\":\"lisi\",\"age\":\"19\"}" , "{\"name\":\"wangwu\",\"age\":\"20\"}" ) ) ; JavaRDD < String > scoreRDD &#61; sc. parallelize ( Arrays . asList ( "{\"name\":\"zhangsan\",\"score\":\"100\"}" , "{\"name\":\"lisi\",\"score\":\"200\"}" , "{\"name\":\"wangwu\",\"score\":\"300\"}" ) ) ; DataFrame namedf &#61; sqlContext. read ( ) . json ( nameRDD) ; DataFrame scoredf &#61; sqlContext. read ( ) . json ( scoreRDD) ; namedf. registerTempTable ( "name" ) ; scoredf. registerTempTable ( "score" ) ; DataFrame result &#61; sqlContext. sql ( "select name.name,name.age,score.score from name,score where name.name &#61; score.name" ) ; result. show ( ) ; sc. stop ( ) ;
spark与RDD对比 DataFrame与RDD类似&#xff0c;但DataFrame更像传统数据库的二维表格&#xff0c;除了数据以外&#xff0c;还掌握数据的结构信息&#xff0c;即schema。同时&#xff0c;与Hive类似&#xff0c;DataFrame也支持嵌套数据类型&#xff08;struct、array和map&#xff09;。
总结 spark DataFrame本质是将数据映射为表&#xff0c;在此基础上完成sql方式业务功能开发。