作者:继续不插电的名单 | 来源:互联网 | 2024-12-10 18:55
本文深入探讨了在 SparkSQL 中创建 DataFrame 的多种方法,旨在为读者提供全面的指导。随着技术的发展和社区的贡献,本文将持续更新以反映最新的实践和技术进展。
在 SparkSQL 中,DataFrame 类似于关系型数据库中的表格,能够支持对单表或跨表的复杂查询操作。这些操作主要通过调用 Spark 提供的 API 接口来实现。本文基于 Spark 2.1 版本的 API 进行讨论。
1. 创建 DataFrame 对象的方法
创建 DataFrame(下文简称 DF)的方式多样,但在开始之前,需要先创建一个 SparkSession 实例,示例如下:
val spark = SparkSession.builder()
.appName("Spark SQL Example")
.enableHiveSupport()
.getOrCreate()
1.1 使用 toDF 方法
通过引入 Spark SQL 的隐式转换,可以轻松地将 Scala 序列、列表或 RDD 转换为 DataFrame。只要数据源具有明确的数据类型,就可以进行转换。例如,将一个 Scala 序列转换为 DataFrame 的代码如下:
import spark.implicits._
val data = Seq((1, "First", java.sql.Date.valueOf("2010-01-01")),
(2, "Second", java.sql.Date.valueOf("2010-02-01"))).toDF("id", "value", "date")
data.show()
输出结果为:
+---+------+----------+
| id| value| date|
+---+------+----------+
| 1| First|2010-01-01|
| 2|Second|2010-02-01|
+---+------+----------+
同样地,也可以将 RDD 或 Scala 列表转换为 DataFrame:
val rddData = spark.sparkContext.parallelize(List(1, 2, 3, 4, 5))
val rddDf = rddData.map(x => (x, x * 2)).toDF("original", "double")
rddDf.show()
输出为:
+--------+------+
|original| double|
+--------+------+
| 1| 2|
| 2| 4|
| 3| 6|
| 4| 8|
| 5|10|
+--------+------+
对于 Scala 列表的转换,示例如下:
val listData = List((1, 3), (2, 4), (3, 5))
val listDf = listData.toDF("first", "second")
listDf.show()
输出结果为:
+-----+------+
|first|second|
+-----+------+
| 1| 3|
| 2| 4|
| 3| 5|
+-----+------+
1.2 使用 createDataFrame 方法
除了使用 toDF 方法外,还可以通过 SqlContext 中的 createDataFrame 方法来创建 DataFrame。此方法同样支持从本地数组或 RDD 中创建 DataFrame。例如,通过 Row 和 Schema 创建 DataFrame 的代码如下:
import org.apache.spark.sql.types._
val schema = StructType(List(
StructField("id", IntegerType, nullable = false),
StructField("value", StringType, nullable = true),
StructField("date", DateType, nullable = true)))
val rdd = spark.sparkContext.parallelize(Seq(
Row(1, "First", java.sql.Date.valueOf("2010-01-01")),
Row(2, "Second", java.sql.Date.valueOf("2010-02-01"))))
val df = spark.createDataFrame(rdd, schema)
df.show()
输出为:
+---+------+----------+
| id| value| date|
+---+------+----------+
| 1| First|2010-01-01|
| 2|Second|2010-02-01|
+---+------+----------+
1.3 通过文件创建 DataFrame
除了从内存数据创建 DataFrame 外,还可以直接从文件中读取数据来创建 DataFrame。以下是几种常见的文件类型及其创建方法。
1.3.1 JSON 文件
假设有一个 JSON 文件,内容如下:
可以通过以下代码将其读取为 DataFrame:
val df = spark.read.json("path/to/file.json")
df.show()
输出为:
+-----+---------+----------+
|email|firstName|lastName|
+-----+---------+----------+
| #a|Brett|McLaughlin|
|bbbb|Jason|Hunter|
|cccc|Elliotte|Harold|
+-----+---------+----------+
1.3.2 CSV 文件
CSV 文件也是常见的数据来源之一。可以通过以下代码读取 CSV 文件并创建 DataFrame:
val df = spark.read.format("csv")
.option("header", "true")
.option("delimiter", ",")
.load("path/to/file.csv")
df.show()
1.3.3 MySQL 数据库
从 MySQL 数据库读取数据并创建 DataFrame 也非常常见。假设数据库结构如下:
可以通过以下代码连接 MySQL 并读取数据:
val url = "jdbc:mysql://localhost:3306/test"
val df = spark.read.format("jdbc")
.option("url", url)
.option("dbtable", "pivot")
.option("user", "root")
.option("password", "admin")
.load()
df.show()
输出为:
+---+----+----+
| id| user|type|
+---+----+----+
| 1| 1| 助手1|
| 2| 1|APP1|
| 3| 2| 助手1|
| 4| 2| 助手1|
| 5| 3|APP1|
| 6| 3|APP1|
| 7| 3| 助手2|
| 8| 3|APP2|
| 9| 2|APP2|
|10| 2| 助手1|
|11| 1|APP1|
|12| 1| 助手2|
+---+----+----+
1.3.4 Hive 表
最后,从 Hive 表中读取数据并创建 DataFrame 也非常简单。示例如下:
val df = spark.sql("SELECT * FROM people")
df.show()
输出为:
+----+-------+
| age|name|
+----+-------+
|null|Michael|
| 30|Andy|
| 19|Justin|
+----+-------+