热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark系列修炼---入门笔记18

核心内容:1、Spark当中常用的3种创建RDD的方式2、自定义分片个数(并行度)今天又学习了一讲Spark(Spark本身就是一个计算框架,就是一个JVM计算框架而已),2016

核心内容:
1、Spark当中常用的3种创建RDD的方式
2、自定义分片个数(并行度)


今天又学习了一讲Spark(Spark本身就是一个计算框架,就是一个JVM计算框架而已),2016年12月份注定不平凡了,希望在2016年的最后一个月份多做一些有意义的事情,毕业在即……
好了,进入文章的正题,从学习Spark到现在,一直离不开一个概念RDD(弹性分布式数据集),今天主要学习关于RDD的三种创建方式,今天打算写一篇好的博客呢!
Spark中的一切算法的操作都是基于RDD的,而且所有操作的算子都至少会产生一个RDD,即Spark是基于RDD的。
这里写图片描述
从上面的例子可以看出,textFile这个算子虽然是读取文件,但是是产生了2个RDD。
在此思考一个问题,为什么在创建Spark中RDD的时候会有很多种不同的方式呢?
答案:因为Spark会基于不同的介质进行计算,Spark可以运行在不同的文件系统(比如本地文件系统、Hadoop的分布式文件系统HDFS等)和存储介质之上,所以创建RDD就会有很多种不同的方式。并且通常情况下我们创建的第一个RDD代表和包含了Spark整个应用程序输入数据的来源,因此创建的第一个RDD非常重要
SparkCore基本上提供了三种方式来创建初始的RDD:
1、使用Scala的集合—使用程序中的集合去创建RDD
2、使用本地文件系统 (Local FileSystem) 创建RDD
3、使用HDFS去创建RDD
当然,这三种创建RDD的方式也代表了应用程序3种不同的数据来源。注意:这是3种最基本创建RDD的方式, 基于数据库创建、NoSQL(Hbase),基于S3、数据流等等都可以创建RDD,本次博客主要讲述这3种最基本创建RDD的方式。
(一)、使用Scala的集合—使用程序中的集合去创建RDD
使用Scala的集合去创建RDD的这种方式适合程序员在本地做调试时进行使用,查看执行的结果是否符合预期。
直接上代码:

package com.appache.spark.app

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by hp on 2016/12/3.
* 本程序的目的是使用Scala中的集合去创建RDD
*/

object RddBasedOnCollections
{
def main(args:Array[String]):Unit=
{
val cOnf= new SparkConf()
conf.setAppName("RddBasedOnCollections")
conf.setMaster("local")

val sc = new SparkContext(conf)
val numbers = 1 to 10
//使用Scala中的集合来创建RDD
val rdd:RDD[Int] = sc.parallelize(numbers)
val sum = rdd.reduce((x,y)=>x+y)
println("sum is:"+sum)
}
}

查看运行结果:

sum is:55

其实到这里我们也可以想到,所谓创建RDD的不同方式无非就是应用程序的数据来源不同而已。通过上面的例子可以深刻的说明:Spark可以作为一个单机处理软件去计算、加工数据,此时Spark就相当于一个本地的JVM软件。(这种方式我在Hadoop中也遇到过,在本地运行模式下Hadoop和Spark其实就是一个JVM程序)
接下来我们对上面的这个程序进行详解,进入会话模式:
这里写图片描述
从运行结果我们可以看出,sc.parallelize(numbers)产生了一个RDD:ParallelCollectionRDD
好的,我们再次核对一下源码:
这里写图片描述
注意:numSlices:指的是并行计算的分片数(其实就是split数据分片、并行度),如果不写的话就是默认的内容。
这里写图片描述
呵呵,Spark的程序编程无非就是创建RDD,转化已有的RDD。

scala> rdd.collect

查看结果:

res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

这个结果再次说明一个事情:RDD本身就是一个数据的集合,可以简单将RDD理解为一个List或Array。
接下来我们求一下结果sum:
这里写图片描述
查一下日志:
这里写图片描述
从运行的结果日志中我们可以看到,对于这个Job来说,并行的任务数量是16个(即开了16个线程),但是为什么是16个呢,我们并没有指定任务的并发数啊?好的,针对这个问题,我们看一下Spark的集群配置:
这里写图片描述
这下答案就有了:因为我们Spark集群当中有16个core,在Spark集群当中有多少个core就启动多少个core,Spark会最大化的利用已有的core的个数,所有并行度就会提高,进而作业运行效率就会得到提高
但是从运行日志中我们可以看出只有一个Stage,但是为什么只有一个Stage呢,这和我们的MapReduce不一样啊?
答案:Hadoop中的MapReduce包括两个阶段:Mapper阶段和Reducer阶段,Hadoop即使做简单的操作都需要经过MapReduce阶段,但是Spark在做简单操作的时候根本就不用经过MapReduce操作即不需要Shuffle,这恰恰说明了我们Spark的灵活性和强大性。
但是到现在我产生一个问题:这16个并行任务最后是怎么合并出最后的结果呢?(不解)
接下来我用一张图生动的描绘一下Spark这个任务的运行场景:
这里写图片描述
这个我们下面的日志描述的是同一个意思:
这里写图片描述
Spark会最大化的利用已有的core的个数提高并行度,但是我们自己在写代码的时候可以指定并行任务的数量
如下示例:

scala> val rdd = sc.parallelize(numbers,8)

查看运行结果:
这里写图片描述
这里写图片描述

  override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}

深度思考:在MapReduce当中,默认情况下一个block块对应一个split数据片,而一个split数据片对应一个Mapper任务,即Mapper任务的并发数量是由切分的数量决定的,有多少个切片,就有多少个Mapper任务,一个split数据片对应一个Mapper进程。
这里写图片描述
但是在Spark当中,数据分片具有高度弹性(可以自由设置分片函数),即自由的设置并行度,体现了Spark强大的灵活性。
通过上面我们可以发现,我们可以通过参数自由设置分片的个数(即并行度),但是在具体工作的时候我们Spark的并行度到底应该设置为多少呢?
结论:自由分片的个数(并行度)= 所有的cores*(2-4)
正如我们Spark集群配置所示:共有16个cores,所以自由分片的个数(并行度)= 所有的cores*(2-4)=16*(2-4)=(32-64)个

scala> val rdd = sc.parallelize(numbers,32-64)

注意:Spark当中并行度的设置和数据的规模是没有关系的,只和每个Task在计算每个partition的时候消耗的内存和cpu的数量有关。
(二)使用本地文件系统 (Local FileSystem) 创建RDD
问题:使用本地文件系统创建RDD的作用是什么呢?主要的目的和集合创建的目的一样,也是为了做测试。
此时指定的文件来源应该用textFile算子
场景:计算所有行的长度的总和。

package com.appache.spark.app

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by hp on 2016/12/2.
* 本程序的目的是使用本地文件系统去创建RDD
*/

object RddBasedOnLocalFile
{
def main(args:Array[String]):Unit=
{
val cOnf= new SparkConf()
conf.setAppName("RddBasedOnLocalFile")
conf.setMaster("local")

val sc = new SparkContext(conf)
//不指定的话:一个block块对应一个split数据片
val rdd:RDD[String] = sc.textFile("C:\\word.txt")
val linesLength:RDD[Int] = rdd.map(line=>line.length)
val sum = linesLength.reduce(_+_)
println(sum)
}
}

运行结果:

69

上面的数据来源word.txt文件本身比较小,所以应该只有一个split数据片,即只有一个并行度。
接下来我们自己定义并行度:

scala> val rdd = sc.textFile("/word.txt",12)

运行日志:
这里写图片描述
呵呵,并行度13满足我们的条件设置
(三)使用HDFS去创建RDD
使用HDFS去创建RDD是生产环境下最常用的方式,主要就是用Spark读取HDFS中的数据进行处理。
呵呵,黑窗口交互式就可了,此时我们的数据来源由Windows的文件系统转化为了HDFS分布式文件系统。
OK,继续努力!!!!


推荐阅读
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • Android异步处理一:使用Thread+Handler实现非UI线程更新UI界面Android异步处理二:使用AsyncTask异步更新UI界面Android异步处理三:Handler+Loope ... [详细]
  • java解析json转Map前段时间在做json报文处理的时候,写了一个针对不同格式json转map的处理工具方法,总结记录如下:1、单节点单层级、单节点多层级json转mapim ... [详细]
  • 可参照github代码:https:github.comrabbitmqrabbitmq-tutorialsblobmasterjavaEmitLogTopic.ja ... [详细]
  • 本文介绍了如何在Spring框架中使用AspectJ实现AOP编程,重点讲解了通过注解配置切面的方法,包括方法执行前和方法执行后的增强处理。阅读本文前,请确保已安装并配置好AspectJ。 ... [详细]
  • 短视频app源码,Android开发底部滑出菜单首先依赖三方库implementationandroidx.appcompat:appcompat:1.2.0im ... [详细]
  • iOS snow animation
    CTSnowAnimationView.hCTMyCtripCreatedbyalexon1614.Copyright©2016年ctrip.Allrightsreserved.# ... [详细]
  • packagecom.panchan.tsmese.utils;importjava.lang.reflect.ParameterizedType;importjava.lang. ... [详细]
  • 2020年9月15日,Oracle正式发布了最新的JDK 15版本。本次更新带来了许多新特性,包括隐藏类、EdDSA签名算法、模式匹配、记录类、封闭类和文本块等。 ... [详细]
  • 本文节选自《NLTK基础教程——用NLTK和Python库构建机器学习应用》一书的第1章第1.2节,作者Nitin Hardeniya。本文将带领读者快速了解Python的基础知识,为后续的机器学习应用打下坚实的基础。 ... [详细]
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • 本文详细介绍了Java反射机制的基本概念、获取Class对象的方法、反射的主要功能及其在实际开发中的应用。通过具体示例,帮助读者更好地理解和使用Java反射。 ... [详细]
  • Spring – Bean Life Cycle
    Spring – Bean Life Cycle ... [详细]
  • 在多线程并发环境中,普通变量的操作往往是线程不安全的。本文通过一个简单的例子,展示了如何使用 AtomicInteger 类及其核心的 CAS 无锁算法来保证线程安全。 ... [详细]
  • WPF项目学习.一
    WPF项目搭建版权声明:本文为博主初学经验,未经博主允许不得转载。一、前言记录在学习与制作WPF过程中遇到的解决方案。使用MVVM的优点是数据和视图分离,双向绑定,低耦合,可重用行 ... [详细]
author-avatar
GuangLi1472_716
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有