热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark之sparkshell,MapPartition和Map的区别

前言:要学习spark程序开发,建议先学习spark-shell交互式学习,加深对spark程序开发的理解。spark-shell提供了一种学习API的简单方式,以及一个能够进行交

前言:要学习spark程序开发,建议先学习spark-shell交互式学习,加深对spark程序开发的理解。spark-shell提供了一种学习API的简单方式,以及一个能够进行交互式分析数据的强大工具,可以使用scala编写(scala运行与Java虚拟机可以使用现有的Java库)或使用Python编写。

1.启动spark-shell

    spark-shell的本质是在后台调用了spark-submit脚本来启动应用程序的,在spark-shell中已经创建了一个名为sc的SparkContext对象,在4个CPU核运行spark-shell命令如下:

spark-shell --master local[4]

    如果指定Jar包路径,则命令如下:

spark-shell --master local[4] --jars xxx.jar,yyy,jar

    –master用来设置context将要连接并使用的资源主节点,master的值是standalone模式中spark的集群地址、yarn或mesos集群的URL,或是一个local地址

    –jars可以添加需要用到的jar包,通过逗号分隔来添加多个包。

2.加载text文件

    spark创建sc后,可以加载本地文件创建RDD,这里测试是加载spark自带的本地文件README.md,返回一个MapPartitionsRDD文件。

scala> val textFile = sc.textFile(“file:///opt/cloud/spark-2.1.1-bin-hadoop2.7/README.md”);
textFile: org.apache.spark.rdd.RDD[String] = file:///opt/cloud/spark-2.1.1-bin-hadoop2.7/README.md MapPartitionsRDD[9] at textFile at :24

    加载HDFS文件和本地文件都是使用textFile,区别是添加前缀(hdfs://和file://)进行标识,从本地读取文件直接返回MapPartitionsRDD,而从HDFS读取的文件是先转成HadoopRDD,然后隐试转换成MapPartitionsRDD。想了解MapPartitions可以看这篇MapPartition和Map的区别。

3.简单RDD操作

    对于RDD可以执行Transformation返回新的RDD,也可以执行Action得到返回结果。first命令返回文件第一行,count命令返回文件所有行数。

scala> textFile.first();
res6: String
= # Apache Spark
scala
> textFile.count();
res7: Long
= 104

 接下来进行transformation操作,使用filter命令从README.md文件中抽取出一个子集,返回一个新的FilteredRDD。

scala> val textFilter = textFile.filter(line=>line.contains("Spark"));
textFilter: org.apache.spark.rdd.RDD[String]
= MapPartitionsRDD[16] at filter at :26

 链接多个Transformation和Action,计算包括”Spark”字符串的行数。

scala> textFile.filter(line=>line.contains("Spark")).count();
res10: Long
= 20

4.RDD应用的简单操作

 (1)计算文本中单词最多的一行的单词数

scala> textFile.map(line =>line.split(" ").size).reduce((a,b) => if (a > b) a else b);
res11: Int
= 22

 先将每一行的单词使用空格进行拆分,并统计每一行的单词数,创建一个基于单词数的新RDD,然后对该RDD进行Reduce操作返回最大值。

 (2)统计单词

 词频统计WordCount是大数据处理最流行的入门程序之一,Spark可以很容易实现WordCount操作。

//这个过程返回的是一个(string,int)类型的键值对ShuffledRDD(y执行reduceByKey的时候需要进行Shuffle操作,返回的是一个Shuffle形式的RDD),最后用Collect聚合统计结果
scala> val wordCount = textFile.flatMap(line =>line.split(" ")).map(x => (x,1)).reduceByKey((a,b) => a+b);
wordCount: org.apache.spark.rdd.RDD[(String, Int)]
= ShuffledRDD[23] at reduceByKey at :26
scala
> wordCount.collect
[Stage
7:> (0 + 0)
[Stage 7:> (0 + 2)
res12: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (page](http://spark.apache.org/documentation.html).,1), (cluster.,1), (its,1), ([run,1), (general,3), (have,1), (pre-built,1), (YARN,,1), ([http://spark.apache.org/developer-tools.html](the,1), (changed,1), (locally,2), (sc.parallelize(1,1), (only,1), (locally.,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (first,1), (graph,1), (Hive,2), (info,1), (["Specifying,1), ("yarn",1), ([params]`.,1), ([project,1), (prefer,1), (SparkPi,2), (//spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (["Parallel,1), (ar...

//这里使用了占位符_,使表达式更为简洁,是Scala语音的特色,每个_代表一个参数。
scala> val wordCount2 = textFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_);
wordCount2: org.apache.spark.rdd.RDD[(String, Int)]
= ShuffledRDD[26] at reduceByKey at :26
scala
> wordCount2.collect
res14: Array[(String, Int)]
= Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (page](http://spark.apache.org/documentation.html).,1), (cluster.,1), (its,1), ([run,1), (general,3), (have,1), (pre-built,1), (YARN,,1), ([http://spark.apache.org/developer-tools.html](the,1), (changed,1), (locally,2), (sc.parallelize(1,1), (only,1), (locally.,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (first,1), (graph,1), (Hive,2), (info,1), (["Specifying,1), ("yarn",1), ([params]`.,1), ([project,1), (prefer,1), (SparkPi,2), (//spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (["Parallel,1), (ar...

//Spark默认不进行排序,如有需要排序输出,排序的时候将key和value互换,使用sortByKey方法指定升序(true)和降序(false)
scala> val wordCount3 = textFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1));
wordCount3: org.apache.spark.rdd.RDD[(String, Int)]
= MapPartitionsRDD[34] at map at :26
scala
> wordCount3.collect
res15: Array[(String, Int)]
= Array(("",71), (the,24), (to,17), (Spark,16), (for,12), (##,9), (and,9), (a,8), (can,7), (run,7), (on,7), (is,6), (in,6), (using,5), (of,5), (build,4), (Please,4), (with,4), (also,4), (if,4), (including,4), (an,4), (You,4), (you,4), (general,3), (documentation,3), (example,3), (how,3), (one,3), (For,3), (use,3), (or,3), (see,3), (Hadoop,3), (Python,2), (locally,2), (This,2), (Hive,2), (SparkPi,2), (refer,2), (Interactive,2), (Scala,2), (detailed,2), (return,2), (Shell,2), (class,2), (Python,,2), (set,2), (building,2), (SQL,2), (guidance,2), (cluster,2), (shell:,2), (supports,2), (particular,2), (following,2), (which,2), (should,2), (To,2), (be,2), (do,2), (./bin/run-example,2), (It,2), (1000:,2), (tests,2), (examples,2), (at,2), (`examples`,2), (that,2), (H...

5.RDD缓存使用RDD的cache()方法

 


推荐阅读
  • 本文介绍了在Win10上安装WinPythonHadoop的详细步骤,包括安装Python环境、安装JDK8、安装pyspark、安装Hadoop和Spark、设置环境变量、下载winutils.exe等。同时提醒注意Hadoop版本与pyspark版本的一致性,并建议重启电脑以确保安装成功。 ... [详细]
  • Python脚本编写创建输出数据库并添加模型和场数据的方法
    本文介绍了使用Python脚本编写创建输出数据库并添加模型数据和场数据的方法。首先导入相应模块,然后创建输出数据库并添加材料属性、截面、部件实例、分析步和帧、节点和单元等对象。接着向输出数据库中添加场数据和历程数据,本例中只添加了节点位移。最后保存数据库文件并关闭文件。文章还提供了部分代码和Abaqus操作步骤。另外,作者还建立了关于Abaqus的学习交流群,欢迎加入并提问。 ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
  • Sleuth+zipkin链路追踪SpringCloud微服务的解决方案
    在庞大的微服务群中,随着业务扩展,微服务个数增多,系统调用链路复杂化。Sleuth+zipkin是解决SpringCloud微服务定位和追踪的方案。通过TraceId将不同服务调用的日志串联起来,实现请求链路跟踪。通过Feign调用和Request传递TraceId,将整个调用链路的服务日志归组合并,提供定位和追踪的功能。 ... [详细]
  • 本文介绍了使用readlink命令获取文件的完整路径的简单方法,并提供了一个示例命令来打印文件的完整路径。共有28种解决方案可供选择。 ... [详细]
  • 1.脚本功能1)自动替换jar包中的配置文件。2)自动备份老版本的Jar包3)自动判断是初次启动还是更新服务2.脚本准备进入ho ... [详细]
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • Android Studio Bumblebee | 2021.1.1(大黄蜂版本使用介绍)
    本文介绍了Android Studio Bumblebee | 2021.1.1(大黄蜂版本)的使用方法和相关知识,包括Gradle的介绍、设备管理器的配置、无线调试、新版本问题等内容。同时还提供了更新版本的下载地址和启动页面截图。 ... [详细]
  • 使用Ubuntu中的Python获取浏览器历史记录原文: ... [详细]
  • Voicewo在线语音识别转换jQuery插件的特点和示例
    本文介绍了一款名为Voicewo的在线语音识别转换jQuery插件,该插件具有快速、架构、风格、扩展和兼容等特点,适合在互联网应用中使用。同时还提供了一个快速示例供开发人员参考。 ... [详细]
  • 有没有一种方法可以在不继承UIAlertController的子类或不涉及UIAlertActions的情况下 ... [详细]
  • 服务网关与流量网关
    一、为什么需要服务网关1、什么是服务网关传统的单体架构中只需要开放一个服务给客户端调用,但是微服务架构中是将一个系统拆分成多个微服务,如果没有网关& ... [详细]
  • 浅解XXE与Portswigger Web Sec
    XXE与PortswiggerWebSec​相关链接:​博客园​安全脉搏​FreeBuf​XML的全称为XML外部实体注入,在学习的过程中发现有回显的XXE并不多,而 ... [详细]
author-avatar
liangpengtao
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有