热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark大数据分布式处理实战笔记(三):SparkSQL

​前言Spark是一种大规模、快速计算的集群平台,本公众号试图通过学习Spark官网的实战演练笔记提升笔者实操能力以及展现Spark的精彩之处。有关框架介绍和环境配置

​前言

    Spark是一种大规模、快速计算的集群平台,本公众号试图通过学习Spark官网的实战演练笔记提升笔者实操能力以及展现Spark的精彩之处。有关框架介绍和环境配置可以参考以下内容:

    1.大数据处理框架Hadoop、Spark介绍

    2.linux下Hadoop安装与环境配置

    3.linux下Spark安装与环境配置

    本文的参考配置为:Deepin 15.11、Java 1.8.0_241、Hadoop 2.10.0、Spark 2.4.4、scala 2.11.12

    本文的目录为:

        一、Spark SQL入门

            1.Spark Session

            2.创建DataFrames

            3.SQL语句运行

            4.创建DataSets

            5.RDD互操作性

            6.UDF自定义函数

        二、数据源

            1.通用功能

            2.Hive表

            3.JDBC数据库

        三、性能调优

 

一、Spark SQL入门

    Spark SQL 是 Spark 处理结构化数据的一个模块。与基础的 Spark RDD API 不同,Spark SQL 提供了查询结构化数据计算结果等信息的接口。在内部,Spark SQL 使用这个额外的信息去执行额外的优化。有几种方式可以跟 Spark SQL 进行交互,包括 SQL  Dataset API。当使用相同执行引擎进行计算时,无论使用哪种 API / 语言都可以快速的计算。这种统一意味着开发人员能够在基于提供最自然的方式来表达一个给定的 transformation API 之间实现轻松的来回切换不同的。

    1.Spark Session

    Spark SQL中所有功能的入口点是SparkSession 类。要创建一个 SparkSession,仅使用 SparkSession.builder()就可以了。如果提示已创建的Warning,则代表之前有创建SparkSession,有些设置不会生效,可以通过.stop方法先停止当前SparkSession:

scala> val spark = SparkSession.builder().appName("Spark SQL").config("spark.some.config.option","some-value").getOrCreate()spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@28a821f9

    2.创建DataFrames

    在一个 SparkSession中,应用程序可以从一个已经存在的 RDD,从hive表,或者从 Spark数据源中创建一个DataFrames。

scala> import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Datasetscala> import org.apache.spark.sql.Row;import org.apache.spark.sql.Rowscala> val df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> df.show()+----+-------+| age| name|+----+-------+|null|Michael|| 30| Andy|| 19| Justin|+----+-------+

    一个 Dataset 是一个分布式的数据集合 Dataset 是在 Spark 1.6 中被添加的新接口,它提供了 RDD 的优点(强类型化,能够使用强大的 lambda 函数)与Spark SQL执行引擎的优点。一个 Dataset 可以从 JVM 对象来 构造 并且使用转换功能(map,flatMap,filter,等等)。一个 DataFrame 是一个 Dataset 组成的指定列。这里包括一些使用 Dataset 进行结构化数据处理的示例 :

scala> import spark.implicits._import spark.implicits._scala> df.printSchema()root |-- age: long (nullable = true) |-- name: string (nullable = true)scala> df.select("name").show()+-------+| name|+-------+|Michael|| Andy|| Justin|+-------+scala> df.select($"name", $"age" + 1).show()+-------+---------+| name|(age + 1)|+-------+---------+|Michael| null|| Andy| 31|| Justin| 20|+-------+---------+scala> df.filter($"age" > 21).show()+---+----+|age|name|+---+----+| 30|Andy|+---+----+scala> df.groupBy("age").count().show()+----+-----+| age|count|+----+-----+| 19| 1||null| 1|| 30| 1|+----+-----+

    3.SQL语句运行

    SparkSession 的 sql 函数可以让应用程序以编程的方式运行 SQL 查询,并将结果作为一个 DataFrame 返回。

// 创建临时视图scala> df.createOrReplaceTempView("people")scala> val sqlDF = spark.sql("SELECT * FROM people")sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> sqlDF.show()+----+-------+| age| name|+----+-------+|null|Michael|| 30| Andy|| 19| Justin|+----+-------+

    Spark SQL中的临时视图是session级别的,也就是会随着session的消失而消失。如果你想让一个临时视图在所有session中相互传递并且可用,直到Spark 应用退出,你可以建立一个全局的临时视图。全局的临时视图存在于系统数据库 global_temp中,我们必须加上库名去引用它,比如。SELECT * FROM global_temp.view1。

// 创建全局视图scala> df.createGlobalTempView("people")scala> spark.sql("SELECT * FROM global_temp.people").show()+----+-------+| age| name|+----+-------+|null|Michael|| 30| Andy|| 19| Justin|+----+-------+

    4.创建DataSets

    Dataset 与 RDD 相似,然而,并不是使用 Java 序列化或者 Kryo 编码器来序列化用于处理或者通过网络进行传输的对象。虽然编码器和标准的序列化都负责将一个对象序列化成字节,编码器是动态生成的代码,并且使用了一种允许 Spark 去执行许多像 filtering,sorting 以及 hashing 这样的操作,不需要将字节反序列化成对象的格式。

//注意:Scala 2.10中case class最多只能支持22个字段。可以通过自定义类突破限制。scala> case class Person(name: String, age: Long)defined class Person// 为case class创建编码器scala> val caseClassDS = Seq(Person("Andy",32)).toDS()caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]// 通过导入spark.implicits._自动提供最常见类型的编码器。scala> caseClassDS.show()+----+---+|name|age|+----+---+|Andy| 32|+----+---+scala> val primitiveDS = Seq(1, 2, 3).toDS()primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]scala> val path = "file:///usr/local/spark/examples/src/main/resources/people.json"path: String = file:///usr/local/spark/examples/src/main/resources/people.json// 通过提供一个类,可以将DataFrame转换为Dataset。 映射将按名称进行scala> val peopleDS = spark.read.json(path).as[Person]peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]scala> peopleDS.show()+----+-------+| age| name|+----+-------+|null|Michael|| 30| Andy|| 19| Justin|+----+-------+

    5.RDD互操作性

    Spark SQL 支持两种不同的方法用于转换已存在的 RDD 成为 Dataset,分别是使用反射推断Schema和以编程的方式指定Schema。

    Spark SQL 的 Scala 接口支持自动转换一个包含 case classes 的 RDD 为 DataFrame。Case class 定义了表的 Schema。Case class 的参数名使用反射读取并且成为了列名。Case class 也可以是嵌套的或者包含像 Seq 或者 Array 这样的复杂类型。这个 RDD 能够被隐式转换成一个 DataFrame 然后被注册为一个表。表可以用于后续的 SQL 语句。

scala> import spark.implicits._import spark.implicits._scala> val peopleDF = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(attributes => Person(attributes(0),attributes(1).trim.toInt)).toDF()peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]scala> peopleDF.createOrReplaceTempView("people")scala> val teenagersDF = spark.sql("SELECT name,age FROM people WHERE age BETWEEN 13 AND 19")teenagersDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]// 可以通过字段索引访问结果中一行的列scala> teenagersDF.map(teenager => "Name:" + teenager(0)).show()+-----------+| value|+-----------+|Name:Justin|+-----------+// 或者通过字段名scala> teenagersDF.map(teenager => "Name:"+teenager.getAs[String]("name")).show()+-----------+| value|+-----------+|Name:Justin|+-----------+// 没有用于Dataset [Map [K,V]]的预定义编码器,定义为隐式scala> implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]mapEncoder: org.apache.spark.sql.Encoder[Map[String,Any]] = class[value[0// row.getValuesMap [T]一次将多个列检索到Map [String,T]中scala> teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()res5: Array[Map[String,Any]] = Array(Map(name -> Justin, age -> 19))

    当 case class 不能够在执行之前被定义,例如记录的结构在一个字符串中被编码了,或者一个文本 dataset 将被解析并且不同的用户投影的字段是不一样的。一个 DataFrame 可以使用下面的三步以编程的方式来创建。

  • 从原始的 RDD 创建 RDD 的 Row(行)。

  • Step 1 被创建后,创建 Schema 表示一个 StructType 匹配 RDD 中的 Row(行)的结构。

  • 通过 SparkSession 提供的 createDataFrame 方法应用 Schema 到 RDD 的 RowS(行)。

scala> import org.apache.spark.sql.types._import org.apache.spark.sql.types._scala> val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")peopleRDD: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/examples/src/main/resources/people.txt MapPartitionsRDD[18] at textFile at :34scala> val schemaString = "name age"schemaString: String = name agescala> val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = true))fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(name,StringType,true), StructField(age,StringType,true))scala> val schema = StructType(fields)schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,StringType,true))// 将RDD转换为Rowscala> val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim))rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[22] at map at :44scala> val peopleDF = spark.createDataFrame(rowRDD,schema)peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: string]scala> peopleDF.createOrReplaceTempView("people")scala> val results = spark.sql("SELECT name FROM people")results: org.apache.spark.sql.DataFrame = [name: string]// spark SQL返回内容可进行正常操作scala> results.map(attributes => "Name:"+attributes(0)).show()+------------+| value|+------------+|Name:Michael|| Name:Andy|| Name:Justin|+------------+

    6.UDF自定义函数

内置的DataFrames函数提供常见的聚合,例如count(),countDistinct(),avg(),max(),min()等。尽管这些函数是为DataFrames设计的,但用户不限于预定义的聚合功能,还可以创建自己的功能。

import org.apache.spark.sql.expressions.MutableAggregationBufferimport org.apache.spark.sql.expressions.UserDefinedAggregateFunctionimport org.apache.spark.sql.types._import org.apache.spark.sql.Rowimport org.apache.spark.sql.SparkSessionobject MyAverage extends UserDefinedAggregateFunction { def inputSchema: StructType = StructType(StructField("inputColumn",LongType) :: Nil) def bufferSchema: StructType = {StructType(StructField("sum",LongType) :: StructField("count",LongType) :: Nil)} def dataType:DataType = DoubleType def deterministic:Boolean = true def initialize(buffer:MutableAggregationBuffer):Unit = { buffer(0) = 0L buffer(1) = 0L } def update(buffer:MutableAggregationBuffer,input:Row):Unit = { if(!input.isNullAt(0)){ buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } def merge(buffer1:MutableAggregationBuffer,buffer2:Row):Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } def evaluate(buffer:Row):Double = buffer.getLong(0).toDouble / buffer.getLong(1)}scala> spark.udf.register("myAverage", MyAverage)res22: org.apache.spark.sql.expressions.UserDefinedAggregateFunction = MyAverage$@65d6f337scala> val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")result: org.apache.spark.sql.DataFrame = [average_salary: double]scala> spark.udf.register("myAverage", MyAverage)res23: org.apache.spark.sql.expressions.UserDefinedAggregateFunction = MyAverage$@65d6f337scala> val df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/employees.json")df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]scala> df.createOrReplaceTempView("employees")scala> df.show()+-------+------+| name|salary|+-------+------+|Michael| 3000|| Andy| 4500|| Justin| 3500|| Berta| 4000|+-------+------+scala> val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")result: org.apache.spark.sql.DataFrame = [average_salary: double]scala> result.show()+--------------+|average_salary|+--------------+|  3750.0     |+--------------+

二、数据源

    Spark SQL 支持通过 DataFrame 接口对各种 data sources(数据源)进行操作。DataFrame 可以使用 relational transformations(关系转换)操作,也可用于创建 temporary view(临时视图)。将 DataFrame 注册为 temporary view(临时视图)允许您对其数据运行 SQL 查询。本节 描述了使用 Spark Data Sources 加载和保存数据的一般方法,然后涉及可用于 built-in data sources(内置数据源)的 specific options(特定选项)。

    1.通用功能

    在最简单的形式中,默认数据源(parquet,除非另有配置 spark.sql.sources.default)将用于所有操作。

scala> val usersDF = spark.read.load("file:///usr/local/spark/examples/src/main/resources/users.parquet")usersDF: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]scala> usersDF.show()+------+--------------+----------------+ | name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa| null| [3, 9, 15, 20]|| Ben| red| []|+------+--------------+----------------+// 保存之后再用hdfs dfs -ls data 命令即可查看parquet文件scala> usersDF.select("name","favorite_color").write.save("data/user.parquet")

    还可以手动指定数据源格式。对于内置的源,你也可以使用它们的 短名称(json,parquet,jdbc,orc,libsvm,csv,text)。从任何 data source type(数据源类型)加载 DataFrames 可以使用此 syntax(语法)转换为其他类型。

scala> val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> peopleDF.select("name", "age").write.format("parquet").save("data/namesAndAges.parquet")

    保存操作可以选择使用 SaveMode,它指定如何处理现有数据如果存在的话。重要的是这些保存模式不使用任何锁定。另外,当执行 Overwrite 时,数据将在新数据写出之前被删除。DataFrames 也可以使用 saveAsTable 命令作为 persistent tables(持久表)保存到 Hive metastore 中。对于基于文件的数据源,也可以对 output(输出)进行 bucket  sort 或者 partitionBucketing 和 sorting 仅适用于 persistent tables 。

scala> peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

    2.Hive表

    Spark SQL 还支持读取和写入存储在 Apache Hive 中的数据。但是,由于 Hive 具有大量依赖关系,因此这些依赖关系不包含在默认 Spark 分发中。如果在类路径中找到 Hive 依赖项,Spark 将自动加载它们。请注意,这些 Hive 依赖关系也必须存在于所有工作节点上,因为它们将需要访问 Hive 序列化和反序列化库(SerDes),以访问存储在 Hive 中的数据。

    创建 Hive 表时,需要定义如何 从/向 文件系统 read/write 数据,即 “输入格式” 和 “输出格式”。您还需要定义该表如何将数据反序列化为行,或将行序列化为数据,即 “serde”。以下选项可用于指定存储格式(“serde”, “input format”, “output format”),例如,CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。默认情况下,我们将以纯文本形式读取表格文件。请注意,Hive 存储处理程序在创建表时不受支持,您可以使用 Hive 端的存储处理程序创建一个表,并使用 Spark SQL 来读取它。

    3.JDBC数据库    

    Spark SQL 还包括可以使用 JDBC 从其他数据库读取数据的数据源。此功能应优于使用 JdbcRDD。这是因为结果作为 DataFrame 返回,并且可以轻松地在 Spark SQL 中处理或与其他数据源连接。JDBC 数据源也更容易从 Java 或 Python 使用,因为它不需要用户提供 ClassTag。(请注意,这不同于 Spark SQL JDBC 服务器,允许其他应用程序使用 Spark SQL 运行查询)。

    有关Spark SQL的内容至此结束,下文将进一步对Spark Streaming即Spark流处理的内容做详细介绍。

    前文笔记请参考下面的链接:

    Spark大数据分布式处理实战笔记(一):快速开始

    Spark大数据分布式处理实战笔记(二):RDD、共享变量

 

 

 

    你可能错过了这些~

    “高频面经”之数据分析篇

    “高频面经”之数据结构与算法篇

    “高频面经”之大数据研发篇

    “高频面经”之机器学习篇

    “高频面经”之深度学习篇

 

我就知道你“在看”

 


推荐阅读
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
  • 本文介绍了设计师伊振华受邀参与沈阳市智慧城市运行管理中心项目的整体设计,并以数字赋能和创新驱动高质量发展的理念,建设了集成、智慧、高效的一体化城市综合管理平台,促进了城市的数字化转型。该中心被称为当代城市的智能心脏,为沈阳市的智慧城市建设做出了重要贡献。 ... [详细]
  • 本文介绍了在CentOS上安装Python2.7.2的详细步骤,包括下载、解压、编译和安装等操作。同时提供了一些注意事项,以及测试安装是否成功的方法。 ... [详细]
  • 本文讨论了如何使用Web.Config进行自定义配置节的配置转换。作者提到,他将msbuild设置为详细模式,但转换却忽略了带有替换转换的自定义部分的存在。 ... [详细]
  • 本文介绍了在CentOS 6.4系统中更新源地址的方法,包括备份现有源文件、下载163源、修改文件名、更新列表和系统,并提供了相应的命令。 ... [详细]
  • GPT-3发布,动动手指就能自动生成代码的神器来了!
    近日,OpenAI发布了最新的NLP模型GPT-3,该模型在GitHub趋势榜上名列前茅。GPT-3使用的数据集容量达到45TB,参数个数高达1750亿,训练好的模型需要700G的硬盘空间来存储。一位开发者根据GPT-3模型上线了一个名为debuid的网站,用户只需用英语描述需求,前端代码就能自动生成。这个神奇的功能让许多程序员感到惊讶。去年,OpenAI在与世界冠军OG战队的表演赛中展示了他们的强化学习模型,在限定条件下以2:0完胜人类冠军。 ... [详细]
  • 本文介绍了使用readlink命令获取文件的完整路径的简单方法,并提供了一个示例命令来打印文件的完整路径。共有28种解决方案可供选择。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Nginx使用AWStats日志分析的步骤及注意事项
    本文介绍了在Centos7操作系统上使用Nginx和AWStats进行日志分析的步骤和注意事项。通过AWStats可以统计网站的访问量、IP地址、操作系统、浏览器等信息,并提供精确到每月、每日、每小时的数据。在部署AWStats之前需要确认服务器上已经安装了Perl环境,并进行DNS解析。 ... [详细]
  • 本文介绍了闭包的定义和运转机制,重点解释了闭包如何能够接触外部函数的作用域中的变量。通过词法作用域的查找规则,闭包可以访问外部函数的作用域。同时还提到了闭包的作用和影响。 ... [详细]
  • Linux重启网络命令实例及关机和重启示例教程
    本文介绍了Linux系统中重启网络命令的实例,以及使用不同方式关机和重启系统的示例教程。包括使用图形界面和控制台访问系统的方法,以及使用shutdown命令进行系统关机和重启的句法和用法。 ... [详细]
  • 目录实现效果:实现环境实现方法一:基本思路主要代码JavaScript代码总结方法二主要代码总结方法三基本思路主要代码JavaScriptHTML总结实 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • Python语法上的区别及注意事项
    本文介绍了Python2x和Python3x在语法上的区别,包括print语句的变化、除法运算结果的不同、raw_input函数的替代、class写法的变化等。同时还介绍了Python脚本的解释程序的指定方法,以及在不同版本的Python中如何执行脚本。对于想要学习Python的人来说,本文提供了一些注意事项和技巧。 ... [详细]
  • 像跟踪分布式服务调用那样跟踪Go函数调用链 | Gopher Daily (2020.12.07) ʕ◔ϖ◔ʔ
    每日一谚:“Acacheisjustamemoryleakyouhaven’tmetyet.”—Mr.RogersGo技术专栏“改善Go语⾔编程质量的50个有效实践” ... [详细]
author-avatar
大小姐_T_841
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有