核心内容:
1、Spark当中常用的3种创建RDD的方式
2、自定义分片个数(并行度)
今天又学习了一讲Spark(Spark本身就是一个计算框架,就是一个JVM计算框架而已),2016年12月份注定不平凡了,希望在2016年的最后一个月份多做一些有意义的事情,毕业在即……
好了,进入文章的正题,从学习Spark到现在,一直离不开一个概念RDD(弹性分布式数据集),今天主要学习关于RDD的三种创建方式,今天打算写一篇好的博客呢!
Spark中的一切算法的操作都是基于RDD的,而且所有操作的算子都至少会产生一个RDD,即Spark是基于RDD的。
从上面的例子可以看出,textFile这个算子虽然是读取文件,但是是产生了2个RDD。
在此思考一个问题,为什么在创建Spark中RDD的时候会有很多种不同的方式呢?
答案:因为Spark会基于不同的介质进行计算,Spark可以运行在不同的文件系统(比如本地文件系统、Hadoop的分布式文件系统HDFS等)和存储介质之上,所以创建RDD就会有很多种不同的方式。并且通常情况下我们创建的第一个RDD代表和包含了Spark整个应用程序输入数据的来源,因此创建的第一个RDD非常重要。
SparkCore基本上提供了三种方式来创建初始的RDD:
1、使用Scala的集合—使用程序中的集合去创建RDD
2、使用本地文件系统 (Local FileSystem) 创建RDD
3、使用HDFS去创建RDD
当然,这三种创建RDD的方式也代表了应用程序3种不同的数据来源。注意:这是3种最基本创建RDD的方式, 基于数据库创建、NoSQL(Hbase),基于S3、数据流等等都可以创建RDD,本次博客主要讲述这3种最基本创建RDD的方式。
(一)、使用Scala的集合—使用程序中的集合去创建RDD
使用Scala的集合去创建RDD的这种方式适合程序员在本地做调试时进行使用,查看执行的结果是否符合预期。
直接上代码:
package com.appache.spark.app
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by hp on 2016/12/3.
* 本程序的目的是使用Scala中的集合去创建RDD
*/
object RddBasedOnCollections
{
def main(args:Array[String]):Unit=
{
val cOnf= new SparkConf()
conf.setAppName("RddBasedOnCollections")
conf.setMaster("local")
val sc = new SparkContext(conf)
val numbers = 1 to 10
val rdd:RDD[Int] = sc.parallelize(numbers)
val sum = rdd.reduce((x,y)=>x+y)
println("sum is:"+sum)
}
}
查看运行结果:
sum is:55
其实到这里我们也可以想到,所谓创建RDD的不同方式无非就是应用程序的数据来源不同而已。通过上面的例子可以深刻的说明:Spark可以作为一个单机处理软件去计算、加工数据,此时Spark就相当于一个本地的JVM软件。(这种方式我在Hadoop中也遇到过,在本地运行模式下Hadoop和Spark其实就是一个JVM程序)
接下来我们对上面的这个程序进行详解,进入会话模式:
从运行结果我们可以看出,sc.parallelize(numbers)产生了一个RDD:ParallelCollectionRDD
好的,我们再次核对一下源码:
注意:numSlices:指的是并行计算的分片数(其实就是split数据分片、并行度),如果不写的话就是默认的内容。
呵呵,Spark的程序编程无非就是创建RDD,转化已有的RDD。
scala> rdd.collect
查看结果:
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
这个结果再次说明一个事情:RDD本身就是一个数据的集合,可以简单将RDD理解为一个List或Array。
接下来我们求一下结果sum:
查一下日志:
从运行的结果日志中我们可以看到,对于这个Job来说,并行的任务数量是16个(即开了16个线程),但是为什么是16个呢,我们并没有指定任务的并发数啊?好的,针对这个问题,我们看一下Spark的集群配置:
这下答案就有了:因为我们Spark集群当中有16个core,在Spark集群当中有多少个core就启动多少个core,Spark会最大化的利用已有的core的个数,所有并行度就会提高,进而作业运行效率就会得到提高。
但是从运行日志中我们可以看出只有一个Stage,但是为什么只有一个Stage呢,这和我们的MapReduce不一样啊?
答案:Hadoop中的MapReduce包括两个阶段:Mapper阶段和Reducer阶段,Hadoop即使做简单的操作都需要经过MapReduce阶段,但是Spark在做简单操作的时候根本就不用经过MapReduce操作即不需要Shuffle,这恰恰说明了我们Spark的灵活性和强大性。
但是到现在我产生一个问题:这16个并行任务最后是怎么合并出最后的结果呢?(不解)
接下来我用一张图生动的描绘一下Spark这个任务的运行场景:
这个我们下面的日志描述的是同一个意思:
Spark会最大化的利用已有的core的个数提高并行度,但是我们自己在写代码的时候可以指定并行任务的数量:
如下示例:
scala> val rdd = sc.parallelize(numbers,8)
查看运行结果:
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
深度思考:在MapReduce当中,默认情况下一个block块对应一个split数据片,而一个split数据片对应一个Mapper任务,即Mapper任务的并发数量是由切分的数量决定的,有多少个切片,就有多少个Mapper任务,一个split数据片对应一个Mapper进程。
但是在Spark当中,数据分片具有高度弹性(可以自由设置分片函数),即自由的设置并行度,体现了Spark强大的灵活性。
通过上面我们可以发现,我们可以通过参数自由设置分片的个数(即并行度),但是在具体工作的时候我们Spark的并行度到底应该设置为多少呢?
结论:自由分片的个数(并行度)= 所有的cores*(2-4)
正如我们Spark集群配置所示:共有16个cores,所以自由分片的个数(并行度)= 所有的cores*(2-4)=16*(2-4)=(32-64)个
scala> val rdd = sc.parallelize(numbers,32-64)
注意:Spark当中并行度的设置和数据的规模是没有关系的,只和每个Task在计算每个partition的时候消耗的内存和cpu的数量有关。
(二)使用本地文件系统 (Local FileSystem) 创建RDD
问题:使用本地文件系统创建RDD的作用是什么呢?主要的目的和集合创建的目的一样,也是为了做测试。
此时指定的文件来源应该用textFile算子
场景:计算所有行的长度的总和。
package com.appache.spark.app
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by hp on 2016/12/2.
* 本程序的目的是使用本地文件系统去创建RDD
*/
object RddBasedOnLocalFile
{
def main(args:Array[String]):Unit=
{
val cOnf= new SparkConf()
conf.setAppName("RddBasedOnLocalFile")
conf.setMaster("local")
val sc = new SparkContext(conf)
val rdd:RDD[String] = sc.textFile("C:\\word.txt")
val linesLength:RDD[Int] = rdd.map(line=>line.length)
val sum = linesLength.reduce(_+_)
println(sum)
}
}
运行结果:
69
上面的数据来源word.txt文件本身比较小,所以应该只有一个split数据片,即只有一个并行度。
接下来我们自己定义并行度:
scala> val rdd = sc.textFile("/word.txt",12)
运行日志:
呵呵,并行度13满足我们的条件设置
(三)使用HDFS去创建RDD
使用HDFS去创建RDD是生产环境下最常用的方式,主要就是用Spark读取HDFS中的数据进行处理。
呵呵,黑窗口交互式就可了,此时我们的数据来源由Windows的文件系统转化为了HDFS分布式文件系统。
OK,继续努力!!!!