热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

==Spark快速入门

Spark快速入门|JiZHANG’sBloghttp:shzhangji.comblog20141216spark-quick-startsc.textFile()用于生成一个R

Spark快速入门 | Ji ZHANG’s Blog
http://shzhangji.com/blog/2014/12/16/spark-quick-start/

sc.textFile()用于生成一个RDD,并声明该RDD指向的是/tmp/logs.txt文件。RDD可以暂时认为是一个列表,列表中的元素是一行行日志(因此是String类型)。

lines.map(f)表示对RDD中的每一个元素使用f函数来处理,并返回一个新的RDD。
line => line.split(“\t”)是一个匿名函数,又称为Lambda表达式、闭包等。它的作用和普通的函数是一样的,如这个匿名函数的参数是line(String类型),返回值是Array数组类型,因为String.split()函数返回的是数组。

需要注意的是,cache函数并不会立刻执行缓存操作,事实上map、filter等函数都不会立刻执行,而是在用户执行了一些特定操作后才会触发,比如first、count、reduce等。这两类操作分别称为Transformations和Actions。

之后对firstTenErrors的处理使用的是Scala集合类库中的方法,如map、foreach,和RDD提供的接口基本一致。所以说用Scala编写Spark程序是最自然的。

Apache Spark是新兴的一种快速通用的大规模数据处理引擎。它的优势有三个方面:
通用计算引擎 能够运行MapReduce、数据挖掘、图运算、流式计算、SQL等多种框架;
基于内存 数据可缓存在内存中,特别适用于需要迭代多次运算的场景;
与Hadoop集成 能够直接读写HDFS中的数据,并能运行在YARN之上。

Spark是用Scala语言编写的,所提供的API也很好地利用了这门语言的特性。它也可以使用Java和Python编写应用。本文将用Scala进行讲解。
安装Spark和SBT
从官网上下载编译好的压缩包,解压到一个文件夹中。下载时需注意对应的Hadoop版本,如要读写CDH4 HDFS中的数据,则应下载Pre-built for CDH4这个版本。
为了方便起见,可以将spark/bin添加到$PATH环境变量中:

1
2

export SPARK_HOME=/path/to/spark
export PATH=$PATH:$SPARK_HOME/bin

在练习例子时,我们还会用到SBT这个工具,它是用来编译打包Scala项目的。Linux下的安装过程比较简单:下载sbt-launch.jar到$HOME/bin目录;
新建$HOME/bin/sbt文件,权限设置为755,内容如下:

1
2

SBT_OPTS=”-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M”
java $SBT_OPTS -jar dirname $0/sbt-launch.jar “$@”

日志分析示例
假设我们有如下格式的日志文件,保存在/tmp/logs.txt文件中:
1
2
3
4
5

2014-12-11 18:33:52 INFO Java some message
2014-12-11 18:34:33 INFO MySQL some message
2014-12-11 18:34:54 WARN Java some message
2014-12-11 18:35:25 WARN Nginx some message
2014-12-11 18:36:09 INFO Java some message

每条记录有四个字段,即时间、级别、应用、信息,使用制表符分隔。
Spark提供了一个交互式的命令行工具,可以直接执行Spark查询:
1
2
3
4
5
6
7
8
9

$ spark-shell
Welcome to

/ / ___ _____/ /__
\ / _ / _ `/ / ‘/
/
/ ./_,// //_\ version 1.1.0
/_/
Spark context available as sc.
scala>

加载并预览数据
1
2
3
4
5

scala> val lines = sc.textFile(“/tmp/logs.txt”)
lines: org.apache.spark.rdd.RDD[String] = /tmp/logs.txt MappedRDD[1] at textFile at :12

scala> lines.first()
res0: String = 2014-12-11 18:33:52 INFO Java some message

sc是一个SparkContext类型的变量,可以认为是Spark的入口,这个对象在spark-shell中已经自动创建了。
sc.textFile()用于生成一个RDD,并声明该RDD指向的是/tmp/logs.txt文件。RDD可以暂时认为是一个列表,列表中的元素是一行行日志(因此是String类型)。这里的路径也可以是HDFS上的文件,如hdfs://127.0.0.1:8020/user/hadoop/logs.txt。
lines.first()表示调用RDD提供的一个方法:first(),返回第一行数据。

解析日志
为了能对日志进行筛选,如只处理级别为ERROR的日志,我们需要将每行日志按制表符进行分割:
1
2
3
4
5

scala> val logs = lines.map(line => line.split(“\t”))
logs: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[2] at map at :14

scala> logs.first()
res1: Array[String] = Array(2014-12-11 18:33:52, INFO, Java, some message)

lines.map(f)表示对RDD中的每一个元素使用f函数来处理,并返回一个新的RDD。
line => line.split(“\t”)是一个匿名函数,又称为Lambda表达式、闭包等。它的作用和普通的函数是一样的,如这个匿名函数的参数是line(String类型),返回值是Array数组类型,因为String.split()函数返回的是数组。
同样使用first()方法来看这个RDD的首条记录,可以发现日志已经被拆分成四个元素了。

过滤并计数
我们想要统计错误日志的数量:
1
2
3
4
5
6
7
8

scala> val errors = logs.filter(log => log(1) == “ERROR”)
errors: org.apache.spark.rdd.RDD[Array[String]] = FilteredRDD[3] at filter at :16

scala> errors.first()
res2: Array[String] = Array(2014-12-11 18:39:42, ERROR, Java, some message)

scala> errors.count()
res3: LOng= 158

logs.filter(f)表示筛选出满足函数f的记录,其中函数f需要返回一个布尔值。
log(1) == “ERROR”表示获取每行日志的第二个元素(即日志级别),并判断是否等于ERROR。
errors.count()用于返回该RDD中的记录。

缓存
由于我们还会对错误日志做一些处理,为了加快速度,可以将错误日志缓存到内存中,从而省去解析和过滤的过程:
1

scala> errors.cache()

errors.cache()函数会告知Spark计算完成后将结果保存在内存中。所以说Spark是否缓存结果是需要用户手动触发的。在实际应用中,我们需要迭代处理的往往只是一部分数据,因此很适合放到内存里。
需要注意的是,cache函数并不会立刻执行缓存操作,事实上map、filter等函数都不会立刻执行,而是在用户执行了一些特定操作后才会触发,比如first、count、reduce等。这两类操作分别称为Transformations和Actions。
显示前10条记录
1
2
3
4
5
6
7

scala> val firstTenErrors = errors.take(10)
firstTenErrors: Array[Array[String]] = Array(Array(2014-12-11 18:39:42, ERROR, Java, some message), Array(2014-12-11 18:40:23, ERROR, Nginx, some message), …)

scala> firstTenErrors.map(log => log.mkString(“\t”)).foreach(line => println(line))
2014-12-11 18:39:42 ERROR Java some message
2014-12-11 18:40:23 ERROR Nginx some message
…

errors.take(n)方法可用于返回RDD前N条记录,它的返回值是一个数组。之后对firstTenErrors的处理使用的是Scala集合类库中的方法,如map、foreach,和RDD提供的接口基本一致。所以说用Scala编写Spark程序是最自然的。
按应用进行统计
我们想要知道错误日志中有几条Java、几条Nginx,这和常见的Wordcount思路是一样的。
1
2
3
4
5
6
7
8
9
10
11
12
13

scala> val apps = errors.map(log => (log(2), 1))
apps: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[15] at map at :18

scala> apps.first()
res20: (String, Int) = (Java,1)

scala> val counts = apps.reduceByKey((a, b) => a + b)
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[17] at reduceByKey at :20

scala> counts.foreach(t => println(t))
(Java,58)
(Nginx,53)
(MySQL,47)

errors.map(log => (log(2), 1))用于将每条日志转换为键值对,键是应用(Java、Nginx等),值是1,如(“Java”, 1)
,这种数据结构在Scala中称为元组(Tuple),这里它有两个元素,因此称为二元组。
对于数据类型是二元组的RDD,Spark提供了额外的方法,reduceByKey(f)就是其中之一。它的作用是按键进行分组,然后对同一个键下的所有值使用f函数进行归约(reduce)。归约的过程是:使用列表中第一、第二个元素进行计算,然后用结果和第三元素进行计算,直至列表耗尽。如:
1
2

scala> Array(1, 2, 3, 4).reduce((a, b) => a + b)
res23: Int = 10

上述代码的计算过程即((1 + 2) + 3) + 4

counts.foreach(f)表示遍历RDD中的每条记录,并应用f函数。这里的f函数是一条打印语句(println)。
打包应用程序
为了让我们的日志分析程序能够在集群上运行,我们需要创建一个Scala项目。项目的大致结构是:
1
2
3
4
5
6
7
8
9

spark-sandbox
├── build.sbt
├── project
│ ├── build.properties
│ └── plugins.sbt
└── src
└── main
└── scala
└── LogMining.scala

你可以直接使用这个项目作为模板。下面说明一些关键部分:
配置依赖
build.sbt

1

libraryDependencies += “org.apache.spark” %% “spark-core” % “1.1.1”

程序内容
src/main/scala/LogMining.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object LogMining extends App {
val cOnf= new SparkConf().setAppName(“LogMining”)
val sc = new SparkContext(conf)
val inputFile = args(0)
val lines = sc.textFile(inputFile)
// 解析日志
val logs = lines.map(.split(“\t”))
val errors = logs.filter(
(1) == “ERROR”)
// 缓存错误日志
errors.cache()
// 统计错误日志记录数
println(errors.count())
// 获取前10条MySQL的错误日志
val mysqlErrors = errors.filter((2) == “MySQL”)
mysqlErrors.take(10).map(
mkString “\t”).foreach(println)
// 统计每个应用的错误日志数
val errorApps = errors.map(_(2) -> 1)
errorApps.countByKey().foreach(println)
}

打包运行
1
2
3

$ cd spark-sandbox
$ sbt package
$ spark-submit –class LogMining –master local target/scala-2.10/spark-sandbox_2.10-0.1.0.jar data/logs.txt

参考资料
Spark Programming Guide
Introduction to Spark Developer Training
Spark Runtime Internals


推荐阅读
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • 深入理解Spark框架:RDD核心概念与操作详解
    RDD是Spark框架的核心计算模型,全称为弹性分布式数据集(Resilient Distributed Dataset)。本文详细解析了RDD的基本概念、特性及其在Spark中的关键操作,包括创建、转换和行动操作等,帮助读者深入理解Spark的工作原理和优化策略。通过具体示例和代码片段,进一步阐述了如何高效利用RDD进行大数据处理。 ... [详细]
  • 2012年9月12日优酷土豆校园招聘笔试题目解析与备考指南
    2012年9月12日,优酷土豆校园招聘笔试题目解析与备考指南。在选择题部分,有一道题目涉及中国人的血型分布情况,具体为A型30%、B型20%、O型40%、AB型10%。若需确保在随机选取的样本中,至少有一人为B型血的概率不低于90%,则需要选取的最少人数是多少?该问题不仅考察了概率统计的基本知识,还要求考生具备一定的逻辑推理能力。 ... [详细]
  • 如何使用 net.sf.extjwnl.data.Word 类及其代码示例详解 ... [详细]
  • PHP中元素的计量单位是什么? ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 在使用sbt构建项目时,遇到了“对象apache不是org软件包的成员”的错误。本文详细分析了该问题的原因,并提供了有效的解决方案,包括检查依赖配置、清理缓存和更新sbt插件等步骤,帮助开发者快速解决问题。 ... [详细]
  • 字节流(InputStream和OutputStream),字节流读写文件,字节流的缓冲区,字节缓冲流
    字节流抽象类InputStream和OutputStream是字节流的顶级父类所有的字节输入流都继承自InputStream,所有的输出流都继承子OutputStreamInput ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 本文深入探讨了NoSQL数据库的四大主要类型:键值对存储、文档存储、列式存储和图数据库。NoSQL(Not Only SQL)是指一系列非关系型数据库系统,它们不依赖于固定模式的数据存储方式,能够灵活处理大规模、高并发的数据需求。键值对存储适用于简单的数据结构;文档存储支持复杂的数据对象;列式存储优化了大数据量的读写性能;而图数据库则擅长处理复杂的关系网络。每种类型的NoSQL数据库都有其独特的优势和应用场景,本文将详细分析它们的特点及应用实例。 ... [详细]
  • 深入探索HTTP协议的学习与实践
    在初次访问某个网站时,由于本地没有缓存,服务器会返回一个200状态码的响应,并在响应头中设置Etag和Last-Modified等缓存控制字段。这些字段用于后续请求时验证资源是否已更新,从而提高页面加载速度和减少带宽消耗。本文将深入探讨HTTP缓存机制及其在实际应用中的优化策略,帮助读者更好地理解和运用HTTP协议。 ... [详细]
  • 在嵌入式Linux系统中,性能低下通常由CPU、内存和I/O三个关键因素引起。为了有效提升系统性能,首先需要识别并定位性能瓶颈。通过综合分析这些瓶颈,可以采取针对性的优化措施,如调整内核参数、优化算法和改进数据结构等,从而显著提高系统的整体性能。 ... [详细]
  • 本文深入探讨了HTTP头部中的Expires与Cache-Control字段及其缓存机制。Cache-Control字段主要用于控制HTTP缓存行为,其在HTTP/1.1中得到了广泛应用,而HTTP/1.0中主要使用Pragma:no-cache来实现类似功能。Expires字段则定义了资源的过期时间,帮助浏览器决定是否从缓存中读取资源。文章详细解析了这两个字段的具体用法、相互关系以及在不同场景下的应用效果,为开发者提供了全面的缓存管理指南。 ... [详细]
  • 在第二课中,我们将深入探讨Scala的面向对象编程核心概念及其在Spark源码中的应用。首先,通过详细的实战案例,全面解析Scala中的类和对象。作为一门纯面向对象的语言,Scala的类设计和对象使用是理解其面向对象特性的关键。此外,我们还将介绍如何通过阅读Spark源码来进一步巩固对这些概念的理解。这不仅有助于提升编程技能,还能为后续的高级应用开发打下坚实的基础。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
author-avatar
国邮国旅刘峰
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有