Spark 2.0系列第一篇见【Spark 2.0系列】: Spark Session API和Dataset API,本文将讲解Spark 2.0 的Catalog 和Custom Optimizer。
首先,先了解下RDD 和Dataset 在开发中使用对比。
RDD 和Dataset 使用对比
Dataset API 是RDD 和DataFrame API 的统一,但大部分Dataset API 与RDD API使用方法看起来是相似的(其实实现方法是不同的)。所以RDD代码很容易转换成Dataset API。下面直接上代码:
WordCount
val rdd = sparkContext.textFile("src/main/resources/data.txt")
val wordsRDD = rdd.flatMap(value => value.split("\\s+"))
val wordsPair = wordsRDD.map(word => (word,1))
val wordCount = wordsPair.reduceByKey(_+_)
val ds = sparkSession.read.text("src/main/resources/data.txt")
import sparkSession.implicits._
val wordsDs = ds.flatMap(value => value.split("\\s+"))
val wordsPairDs = wordsDs.groupByKey(value => value)
val wordCountDs = wordsPairDs.count()
其它
| RDD | Dataset |
---|
Caching | rdd.cache() | ds.cache() |
Filter | val filteredRDD = wordsRDD.filter(value => value ==”hello”) | val filteredDS = wordsDs.filter(value => value ==”hello”) |
Map Partition | val mapPartitiOnsRDD= rdd.mapPartitions(iterator => List(iterator.count(value => true)).iterator) | val mapPartitiOnsDs= ds.mapPartitions(iterator => List(iterator.count(value => true)).iterator) |
reduceByKey | val reduceCountByRDD = wordsPair.reduceByKey(+) | val reduceCountByDs = wordsPairDs.mapGroups((key,values) =>(key,values.length))
|
备注:此处表格横屏观看效果更佳。Dataset 和RDD 相互转换
val dsToRDD = ds.rdd
RDD 转换成Dataframe稍麻烦,需要指定schema。
val rddStringToRowRDD = rdd.map(value => Row(value))
val dfschema = StructType(Array(StructField("value",StringType)))
val rddToDF = sparkSession.createDataFrame(rddStringToRowRDD,dfschema)
val rDDToDataSet = rddToDF.as[String]
Catalog API
DataSet 和Dataframe API 支持结构化数据分析,而结构化数据重要的是管理metadata。这里的metadata包括temporary metadata(临时表);registered udfs;permanent metadata(Hive metadata或HCatalog)。
早期Spark版本并未提供标准的API访问metadata,开发者需要使用类似show tables的查询来查询metadata;而Spark 2.0 在Spark SQL中提供标准API 调用catalog来访问metadata。
访问Catalog
建立SparkSession,然后调用Catalog:
val catalog = sparkSession.catalog
查询数据库
catalog.listDatabases().select("name").show()
listDatabases可查询所有数据库。在Hive中,Catalog可以访问Hive metadata中的数据库。listDatabases返回一个dataset,所以你可以使用适用于dataset的所有操作去处理metadata。
用createTempView 注册Dataframe
早期版本Spark用registerTempTable注册dataframe,而Spark 2.0 用createTempView替代。
df.createTempView("sales")
一旦注册视图,即可使用listTables访问所有表。
查询表
catalog.listTables().select("name").show()
检查表缓存
通过Catalog可检查表是否缓存。访问频繁的表缓存起来是非常有用的。
catalog.isCached("sales")
默认表是不缓存的,所以你会得到false。
df.cache()
catalog.isCached("sales")
现在将会打印true。
删除视图
catalog.dropTempView("sales")
查询注册函数
catalog.listFunctions().
select("name","description","className","isTemporary").show(100)
Catalog不仅能查询表,也可以访问UDF。上面代码会显示Spark Session中所有的注册函数(包括内建函数)。
自定义 Optimizer
Catalyst optimizer
Spark SQL使用Catalyst优化所有的查询,优化之后的查询比直接操作RDD速度要快。Catalyst是基于rule的,每个rule都有一个特定optimization,比如,ConstantFolding rule用来移除常数表达式,具体可直接看Spark SQL源代码。
在早期版本Spark中,如果想自定义optimization,需要开发者修改Spark源代码。操作起来麻烦,而且要求开发者能读懂源码。在Spark 2.0中,已提供API自定义optimization。
访问Optimized plan
在开始编写自定义optimization之前,先来看看如何访问optimized plan:
val df = sparkSession.read.option("header","true").csv("src/main/resources/data.csv")
val multipliedDF = df.selectExpr("amountPaid * 1")
println(multipliedDF.queryExecution.optimizedPlan.numberedTreeString)
上面的代码是加载一个csv文件,并对某一行所有值乘以1。queryExecution 可访问查询相关的所有执行信息。 queryExecution 的optimizedPlan对象可以访问dataframe的optimized plan。
Spark中的执行计划以tree表示,所以用numberedTreeString打印optimized plan。打印结果如下:
00 Project [(cast(amountPaid#3 as double) * 1.0) AS (amountPaid * 1)#5]01 +- Relation[transactionId#0,customerId#1,itemId#2,amountPaid#3] csv
所有执行计划是由底向上读取:
编写自定义optimizer rule
从上面的执行计划可以清晰的看到:对一列的每个值乘以1 这里并没有优化。我们知道,乘以1 这个操作应该返回的是值本身,所以可以利用这个特点来增加只能点的optimizer。代码如下:
object MultiplyOptimizationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case Multiply(left,right) if right.isInstanceOf[Literal] &&
right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1.0 =>
println("optimization of one applied")
left
}
}
这里MultiplyOptimizationRule扩展自Rule类,采用Scala的模式匹配编写。检测右操作数是否是 1,如果是1 则直接返回左节点。
把MultiplyOptimizationRule加入进optimizer:
sparkSession.experimental.extraOptimizatiOns= Seq(MultiplyOptimizationRule)
你可以使用extraOptimizations将定义好的Rule加入 catalyst。
下面实际使用看看效果:
val multipliedDFWithOptimization = df.selectExpr("amountPaid * 1")
println("after optimization")
println(multipliedDFWithOptimization.queryExecution.
optimizedPlan.numberedTreeString)
我们看到打印结果:
00 Project [cast(amountPaid#3 as double) AS (amountPaid * 1)#7]01 +- Relation[transactionId#0,customerId#1,itemId#2,amountPaid#3] csv
说明自定义Optimizer已生效。
侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。
若发现以上文章有任何不妥,请联系我。