sparkling-water是将spark和h2o集成与一体的工具,主要思想是利用h2o进行数据挖掘,而利用进行数据处理和一部分计算,具体架构如下:
我们可以从图中看到,spark对源数据做了处理,然后交给h2o进行建模,在预测阶段也作为了计算引擎, sparkling-water的牛逼之处在于使用了和spark的一样的数据结构,这样在数据处理的时候可以十分灵活。
我们在加载数据的时候,既可以使用spark,也可以使用h2o,spark和h2o直接可以共享同样的数据结构(RDD),但是我们在进行进行数据挖掘(h2o只能使用后缀为.hex的文件),因此需要转换才能够进行计算。
共享rdd数据结构有非常多好处,比如就可以利用spark进行数据的清洗,好了,我们直接来看一下怎么使用。
(1)官方提供的下载地址
但是官方提供的地址下载十分慢,当然有VPN的另谈了,这里我提供了我的网盘地址
(2)下载后上传到linux上进行解压
unzip sparkling-water-3.26.2-2.4.zip
(3)启动sparkling-water
找到解压路径下的sparkling-water的bin下的shell,进行启动即可
./sparkling-shell
启动结果如下:
[root@node11 bin]# ./sparkling-shell Using Spark defined in the SPARK_HOME=/opt/software/spark-2.4.3-bin-hadoop2.7 environmental property-----Spark master (MASTER) : local[*]Spark home (SPARK_HOME) : /opt/software/spark-2.4.3-bin-hadoop2.7H2O build version : 3.26.0.2 (yau)Sparkling Water version : 3.26.2-2.4Spark build version : 2.4.3Scala version : 2.11
----19/08/20 15:59:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... us applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://node11:4040
Spark context available as 'sc' (master = local[*], app id = local-1566316763461).
Spark session available as 'spark'.
Welcome to____ __/ __/__ ___ _____/ /___\ \/ _ \/ _ `/ __/ '_//___/ .__/\_,_/_/ /_/\_\ version 2.4.3/_/Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_221)
Type in expressions to have them evaluated.
Type :help for more information.scala> import org.apache.spark.h2o._
import org.apache.spark.h2o._
scala>
至此单机版的sparkling-water就可以使用了
这里的案例是我根据官方提供的资料(https://github.com/h2oai/sparkling-water/tree/master/examples)进行操练的。
(1)导入spark.h2o的包
import org.apache.spark.h2o._
(2)初始化,其实就是启动h2o
val hc = H2OContext.getOrCreate(spark)
命令运行后结果如下:
19/08/20 16:04:44 WARN internal.InternalH2OBackend: To avoid non-deterministic behavior of Spark broadcast-based joins,
we recommend to set `spark.sql.autoBroadcastJoinThreshold` property of SparkSession to -1.
E.g. spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
We also recommend to avoid using broadcast hints in your Spark SQL code.
19/08/20 16:04:44 WARN internal.InternalH2OBackend: Increasing 'spark.locality.wait' to value 0 (Infinitive) as we need to ensure we run on the nodes with H2O
hc: org.apache.spark.h2o.H2OContext =Sparkling Water Context:* Sparkling Water Version: 3.26.2-2.4* H2O name: sparkling-water-root_local-1566316763461* cluster size: 1* list of used nodes:(executorId, host, port)------------------------(driver,node11,54321)------------------------Open H2O Flow in browser: http://192.168.12.137:54321 (CMD + click in Mac OSX)
(3)导入hc的包
import hc.implicits._
(4)导入spark的包
import spark.implicits._
(5)定义天气数据的路径
val weatherDataFile = "/opt/software/sparkling-water-3.26.2-2.4/examples/smalldata/chicago/Chicago_Ohare_International_Airport.csv"
注意的是,这里最好是输入绝对路径,官方提供的是相对路径,自己要进行处理,我是直接使用的绝对路径,不然下一步加载数据会报找不到路径
org.apache.spark.sql.AnalysisException: Path does not exist: file:/opt/software/sparkling-water-3.26.2-2.4/bin/examples/smalldata/chicago/Chicago_Ohare_International_Airport.csv;at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:558)at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)at scala.collection.immutable.List.foreach(List.scala:392)at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)at scala.collection.immutable.List.flatMap(List.scala:355)at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:615)at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:467)... 55 elided
(6)利用上面定义的路径进行加载数据
val weatherTable = spark.read.option("header", "true").option("inferSchema", "true").csv(weatherDataFile).withColumn("Date", to_date('Date, "MM/dd/yyyy")).withColumn("Year", year('Date)).withColumn("Month", month('Date)) .withColumn("DayofMonth", dayofmonth('Date))
(7)导入java的包
import java.io.File
(8)定义航空的路径
val dataFile = "/opt/software/sparkling-water-3.26.2-2.4/examples/smalldata/airlines/allyears2k_headers.zip"
(9)加载航空数据
val airlinesH2OFrame = new H2OFrame(new File(dataFile))
会给我们这样一个返回:
airlinesH2OFrame: water.fvec.H2OFrame =
Frame key: allyears2k_headers.hexcols: 31rows: 43978chunks: 1size: 2154864
(10)将.hex文件转换成rdd
val airlinesTable = hc.asDataFrame(airlinesH2OFrame)
(11)利用spark进行数据的过滤
val flightsToORD = airlinesTable.filter('Dest === "ORD")
(12)计算一下看看过滤后还有多少数据
flightsToORD.count
结果:
9/08/20 16:24:08 WARN util.Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
res0: Long = 2103
(13)利用spark进行rdd的join操作(合并表)
val joinedDf = flightsToORD.join(weatherTable, Seq("Year", "Month", "DayofMonth"))
(14)导包
import water.support.H2OFrameSupport._
(15)转换成.hex
val joinedHf = columnsToCategorical(hc.asH2OFrame(joinedDf), Array("Year", "Month", "DayofMonth"))
(16)导入深度学习的包
import _root_.hex.deeplearning.DeepLearning
import _root_.hex.deeplearning.DeepLearningModel.DeepLearningParameters
import _root_.hex.deeplearning.DeepLearningModel.DeepLearningParameters.Activation
(17)设置深度学习的参数
val dlParams = new DeepLearningParameters()
dlParams._train = joinedHf
dlParams._response_column = "ArrDelay"
dlParams._epochs = 5
dlParams._activation = Activation.RectifierWithDropout
dlParams._hidden = Array[Int](100, 100)
(18)训练模型
val dl = new DeepLearning(dlParams)
val dlModel = dl.trainModel.get
运行结果:
dlModel: hex.deeplearning.DeepLearningModel =
Model Metrics Type: RegressionDescription: Metrics reported on full training framemodel id: DeepLearning_model_1566317084383_1frame id: frame_rdd_21_b16087b00dcb5349ed00b2f0a1964249MSE: 246.94397RMSE: 15.714451mean residual deviance: 246.94397mean absolute error: 9.7153425root mean squared log error: NaN
Variable Importances:Variable Relative Importance Scaled Importance PercentageDepDelay 1.000000 1.000000 0.020609NASDelay 0.953474 0.953474 0.019650Diverted 0.952912 0.952912 0.019639Cancelled 0.940236 0.940236 0.019378DayofMonth.12 0.929144 ...
(19)模型预测
val predictionsHf = dlModel.score(joinedHf)
val predictionsDf = hc.asDataFrame(predictionsHf)
(20)查看预测结果
predictionsDf.show
+-------------------+
| predict|
+-------------------+
| -14.28115203904661|
|-17.384369532025993|
|-15.648360659746515|
|-21.735323004320165|
|-0.4630290696992674|
| -9.351177667940217|
| 112.65659409295617|
| 30.161421574369385|
| 15.403270012684139|
| 170.8349751399989|
| 12.498370529294341|
| 147.3795710418184|
|-6.1483336982319585|
| 44.329600499888926|
| 17.50615431570487|
| 102.51282569095915|
| 7.4154391246514955|
| 9.09458182717221|
|-12.357870505795454|
|-14.798434263256837|
+-------------------+
only showing top 20 rows
官方提供的是利用Rstudio进行查看的,我这样其实不科学,因为只能查看最多20条数据