1.理解spark分区【Partitioning】1.1.分区概念及分区方法Spark分区是一种将数据分割到多个分区的方法,这样您就可以并行地在多个分区上执行转换
1. 理解spark 分区【Partitioning】
1.1. 分区概念及分区方法
Spark分区是一种将数据分割到多个分区的方法,这样您就可以并行地在多个分区上执行转换,从而更快地完成作业。还可以将已分区的数据写入文件系统(多个子目录),以便下游系统更快地读取数据。
Spark有几种分区方法来实现并行性,可以根据需要选择使用哪种分区方法。
PARTITIONING METHODS (SCALA) | 说明 | 提供类 |
---|
repartition(numPartitions: Int) | Uses RoundRobinPartitioning | DataFrame |
repartition(partitionExprs: Column*) | Uses HashPartitioner | DataFrame |
repartition(numPartitions: Int, partitionExprs: Column*) | partition = hash(partitionExprs) % numPartitions | DataFrame |
coalesce(numPartitions: Int) | Use only to reduce the number of partitions. | DataFrame |
repartitionByRange(partitionExprs : Column*) | Uses rangepartitioning. Ideal to use on numeric columns. | DataFrame |
repartitionByRange(numPartitions: Int, partitionExprs: Column*) | | DataFrame |
partitionBy(colNames: String*) | Use to write the data into sub-folder | DataFrameWriter |
-
默认情况下,Spark创建的分区与机器的CPU核数相等。
-
每个分区的数据都驻留在一台机器上。
-
Spark为每个分区创建一个任务。
-
Spark Shuffle将数据从一个分区转移到其他分区。
-
分区是一个昂贵的操作,因为它会造成数据转移(数据可能在节点之间移动)
-
默认情况下,DataFrame shuffle操作创建200个分区。
Spark支持内存分区(RDD/DataFrame)和磁盘分区(文件系统)。
内存分区: 可以通过调用repartition()或coalesce()转换对DataFrame进行分区或重新分区。
磁盘分区:在将Spark DataFrame写回磁盘时,可以通过使用DataFrameWriter的partitionBy()来选择如何基于列对数据进行分区, 这类似于hive分区。
1.2. 分区的优点
正如您所知道的那样,Spark的设计是为了以比传统处理快100倍的速度处理大型数据集,如果没有分区,这是不可能的。下面是在内存或磁盘上使用Spark分区的一些优点。
-
快速访问数据。
-
提供在较小的数据集上执行操作的能力。
静态分区【Partitioning at rest (disk)】是许多数据库和数据处理框架的一个特性,是提高读取速度的关键。
2. Default Spark Partitions & Configurations
Spark在默认情况下基于多个因素对数据进行分区,这些因素不同于您在哪些模式上运行作业。
2.1. local mode
下面的示例将本地[20]作为参数提供给master()方法,这意味着使用20个分区在本地运行作业。虽然如果系统上只有8个核心,它仍然会创建20个分区任务。
def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("Spark Partitioning Test").master("local[20]").getOrCreate()val df = spark.range(0,20)print(df.rdd.getNumPartitions)}
2.2. HDFS cluster mode
在Hadoop集群上运行Spark任务时,默认分区数如下所示。
-
在HDFS集群中,Spark默认为每个文件块创建一个Partition。
-
在Version 1 Hadoop中,HDFS块大小是64mb,在Version 2 Hadoop中,HDFS块大小是128mb
-
集群中所有执行器节点上的核总数和2个,取较大的值
例如,如果您有一个640mb的文件,并在Hadoop版本2上运行它,则创建5个分区,每个分区由128mb块组成(5块* 128mb = 640mb)。如果您将分区重新划分到10,那么它将为每个块创建2个分区。
2.3. spark 配置相关参数
-
spark.default.parallelism配置默认值:集群模式下,是集群的所有节点上的所有核数,在本地模式下,是机器的核数。
-
spark.sql.shuffle.partitions配置默认值是200,在调用shuffle操作时使用,如union()、groupBy()、join()等。此属性仅在DataFrame API中可用,在RDD中不可用。
可以在程序中或spark-submit提交任务时,根据业务需要调整这2个参数的值。
spark.conf.set("spark.sql.shuffle.partitions", "500")
./bin/spark-submit --conf spark.sql.shuffle.partitions=500 --conf spark.default.parallelism=500
3. 动态修改Spark分区
从一个文件/表创建一个RDD/DataFrame时,Spark会根据特定的参数创建一定数量分区的RDD/DataFrame
还提供在运行时修改内存中的分区;在写入磁盘时,提供选项基于一个或多个列进行分区
3.1 repartition() & coalesce()
在处理分区数据时,我们经常需要根据数据分布增加或减少分区。方法repartition()和coalesce()帮助我们重新分区。 下面的代码输出为1,10,2
文件从https://github.com/spark-examples/pyspark-examples/blob/master/resources/simple-zipcodes.csv 下载
基于采用HashPartitioner进行分区
HashPartitioner采用哈希的方式对键值对数据进行分区。其数据分区规则为 partitionId = Key.hashCode % numPartitions,其中partitionId代表该Key对应的键值对数据应当分配到的Partition标识,Key.hashCode表示该Key的哈希值,numPartitions表示包含的Partition个数。
def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("Spark Partitioning Test").master("local[*]").config("spark.default.parallelism",4).getOrCreate()val df=spark.read.option("header",true).csv("file:///F:/tmp/simple-zipcodes.csv")println(df.rdd.getNumPartitions)//Change DataFrame partitions to 10val newDF=df.repartition(10)println(newDF.rdd.getNumPartitions)println(newDF.coalesce(2).rdd.getNumPartitions)}
3.2 repartitionBy()
使用partitionBy()方法将Spark DataFrame写入磁盘时,根据分区列对记录进行分割,并将每个分区的数据存储到子目录中。
def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("Spark Partitioning Test").master("local[*]").config("spark.default.parallelism",4).getOrCreate()val df=spark.read.option("header",true).csv("file:///F:/tmp/simple-zipcodes.csv")df.write.option("header",true).partitionBy("state").mode("overwrite").csv("file:///F:/tmp/zipcodes-state")}
在我们的DataFrame上,我们总共有6个不同的状态,因此它创建了6个目录,如下所示。子目录的名称将是分区列及其值(分区列=value)。
3.3 partitionBy() Multiple Columns
可以使用Spark partitionBy()在多个列上创建分区。只需将您想要划分的列作为参数传递给此方法
它根据指定的列创建多级分区
df.write.option("header",true).partitionBy("state","city").mode("overwrite").csv("file:///F:/tmp/zipcodes-state")
3.4 repartitionByRange() – 范围分区
下面是一个使用repartitionByRange()转换的范围分区示例。RangePartitioner创建的分区的实际数量可能与指定的分区数量不同,在这种情况下,采样记录的数量小于“partitions”的值。
Spark引入RangePartitioner的目的是为了解决HashPartitioner所带来的分区倾斜问题,也即分区中包含的数据量不均衡问题。HashPartitioner采用哈希的方式将同一类型的Key分配到同一个Partition中,因此当某一或某几种类型数据量较多时,就会造成若干Partition中包含的数据过大问题,而在Job执行过程中,一个Partition对应一个Task,此时就会使得某几个Task运行过慢。RangePartitioner基于抽样的思想来对数据进行分区。
def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("Spark Partitioning Test").master("local[*]").config("spark.default.parallelism", 4).getOrCreate()val data = Seq((1, 10), (2, 20), (3, 10), (4, 20), (5, 10),(6, 30), (7, 50), (8, 50), (9, 50), (10, 30),(11, 10), (12, 10), (13, 40), (14, 40), (15, 40),(16, 40), (17, 50), (18, 10), (19, 40), (20, 40))import org.apache.spark.sql.functions._import spark.implicits._val dfRange = data.toDF("id", "count").repartitionByRange(5, col("count"))dfRange.write.option("header", true).csv("file:///F:/tmp/range-partition")}
4. 如何选择Spark分区列
当使用partitionBy(),你必须非常谨慎它创建的分区,如果分区创建了太多太多的子目录,目录带来不必要的开销,NameNode(如果您正在使用Hadoop)因为它必须保持所有文件系统的元数据在内存中。
假设您有一个包含邮政编码、城市、州和其他列的美国人口普查表。在该状态上创建一个分区,将该表分成大约50个分区,在一个状态(state= ' CA '和zipcode = ' 92704 ')内搜索邮政编码会更快,因为它只需要在state=CA分区目录中扫描。
另一个分区的好例子是Date列。理想情况下,应该按年/月划分,而不是按日期划分
太多分区好嘛?
太少分区好嘛?
-
分区太少也不好,因为您可能无法充分利用集群资源。
-
更少的并行性
-
应用程序可能会运行更长的时间,因为每个分区需要更多的时间来完成