最近工作中用到pyspark, 在家自学整理了笔记
觉得有用的话,点赞支持一下谢谢~
SparkContext & SparkSession
SparkContext主要用于创建和操作RDD
SparkSession实质上是SQLContext, HiveContext和SparkContext的组合。在Spark 2.x之前,使用不同的功能,需要引入不同的Context.
- 创建和操作RDD - SparkContext
- 使用streaming - StreamingContext
- 使用SQL - SQLContext
- 使用Hive - HiveContext
创建SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
数据表读写
(1)查看数据表
spark.catalog.listTables()
(2)从表中查询数据
# retrieve spark dataframe
query = "select * from demo"
data = spark.sql(query)
data.show()# spark dataframe to pandas dataframe
query = "select * from demo"
data = spark.sql(query).toPandas()
(3)输入数据
# csv to spark dataframe
data = spark.read.csv(file_path, header = True)
data.show()# read pandas dataframe to spark dataframe
# add spark_data to the catalog
spark_data = spark.createDataFrame(df)
spark_data.createOrReplaceTempView('temp')# read from catalog table
spark_data = spark.table("table")
数据表操作
spark dataframe是immutable, 因此每次返回的都是一个新的dataframe
(1)列操作
# add a new column
data = data.withColumn("newCol",df.oldCol+1)# replace the old column
data = data.withColumn("oldCol",newCol)# rename the column
data.withColumnRenamed("oldName","newName")# change column data type
data.withColumn("oldColumn", data.oldColumn.cast("integer"))
(2)条件筛选数据
# filter data by pass a string
temp1 = data.filter("col > 1000")# filter data by pass a column of boolean value
temp2 = data.filter(data.col > 1000)
(3)选择数据
# select based on column name
temp1 = data.select("col1","col2")
temp1 = data.select("col1*100 as newCol1")# select based on column object
temp2 = data.select(data.col1, data.col2)
temp2 = data.select(data.col1+1.alias(newCol1))
(4)聚合函数
# get the minimum value of a column
data.groupBy().min("col1")# group by on certain column and do calculation
data.groupBy("col1").max("col2")# agg function
import pyspark.sql.functions as F
data.groupBy("a","b").agg(F.stddev("c"))
(5)合并数据表
newData = data1.join(data2, on = "col", how = "leftouter")
newData = data1.join(data2, data1['col1'] == data2['col2'])