热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spark分区和分区【PartitioningPartition】的理解

1.理解spark分区【Partitioning】1.1.分区概念及分区方法Spark分区是一种将数据分割到多个分区的方法,这样您就可以并行地在多个分区上执行转换

1. 理解spark 分区【Partitioning】


1.1. 分区概念及分区方法

Spark分区是一种将数据分割到多个分区的方法,这样您就可以并行地在多个分区上执行转换,从而更快地完成作业。还可以将已分区的数据写入文件系统(多个子目录),以便下游系统更快地读取数据。

Spark有几种分区方法来实现并行性,可以根据需要选择使用哪种分区方法。


PARTITIONING METHODS (SCALA)说明提供类
repartition(numPartitions: Int)Uses RoundRobinPartitioningDataFrame
repartition(partitionExprs: Column*)Uses HashPartitionerDataFrame
repartition(numPartitions: Int, partitionExprs: Column*)partition = hash(partitionExprs) % numPartitionsDataFrame
coalesce(numPartitions: Int)Use only to reduce the number of partitions.DataFrame
repartitionByRange(partitionExprs : Column*)Uses rangepartitioning. Ideal to use on numeric columns.DataFrame
repartitionByRange(numPartitions: Int, partitionExprs: Column*) DataFrame
partitionBy(colNames: String*)Use to write the data into sub-folderDataFrameWriter

  • 默认情况下,Spark创建的分区与机器的CPU核数相等。

  • 每个分区的数据都驻留在一台机器上。

  • Spark为每个分区创建一个任务。

  • Spark Shuffle将数据从一个分区转移到其他分区。

  • 分区是一个昂贵的操作,因为它会造成数据转移(数据可能在节点之间移动)

  • 默认情况下,DataFrame shuffle操作创建200个分区。

Spark支持内存分区(RDD/DataFrame)和磁盘分区(文件系统)。

内存分区: 可以通过调用repartition()或coalesce()转换对DataFrame进行分区或重新分区。

磁盘分区:在将Spark DataFrame写回磁盘时,可以通过使用DataFrameWriter的partitionBy()来选择如何基于列对数据进行分区, 这类似于hive分区。


1.2. 分区的优点

正如您所知道的那样,Spark的设计是为了以比传统处理快100倍的速度处理大型数据集,如果没有分区,这是不可能的。下面是在内存或磁盘上使用Spark分区的一些优点。


  • 快速访问数据。

  • 提供在较小的数据集上执行操作的能力。

静态分区【Partitioning at rest (disk)】是许多数据库和数据处理框架的一个特性,是提高读取速度的关键。


2. Default Spark Partitions & Configurations

Spark在默认情况下基于多个因素对数据进行分区,这些因素不同于您在哪些模式上运行作业。


2.1. local mode

下面的示例将本地[20]作为参数提供给master()方法,这意味着使用20个分区在本地运行作业。虽然如果系统上只有8个核心,它仍然会创建20个分区任务。

def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("Spark Partitioning Test").master("local[20]").getOrCreate()val df = spark.range(0,20)print(df.rdd.getNumPartitions)}

2.2. HDFS cluster mode

在Hadoop集群上运行Spark任务时,默认分区数如下所示。


  • 在HDFS集群中,Spark默认为每个文件块创建一个Partition。

  • 在Version 1 Hadoop中,HDFS块大小是64mb,在Version 2 Hadoop中,HDFS块大小是128mb

  • 集群中所有执行器节点上的核总数和2个,取较大的值

例如,如果您有一个640mb的文件,并在Hadoop版本2上运行它,则创建5个分区,每个分区由128mb块组成(5块* 128mb = 640mb)。如果您将分区重新划分到10,那么它将为每个块创建2个分区。


2.3. spark 配置相关参数


  • spark.default.parallelism配置默认值:集群模式下,是集群的所有节点上的所有核数,在本地模式下,是机器的核数。

  • spark.sql.shuffle.partitions配置默认值是200,在调用shuffle操作时使用,如union()、groupBy()、join()等。此属性仅在DataFrame API中可用,在RDD中不可用。

可以在程序中或spark-submit提交任务时,根据业务需要调整这2个参数的值。

spark.conf.set("spark.sql.shuffle.partitions", "500")

./bin/spark-submit --conf spark.sql.shuffle.partitions=500 --conf spark.default.parallelism=500


3. 动态修改Spark分区

从一个文件/表创建一个RDD/DataFrame时,Spark会根据特定的参数创建一定数量分区的RDD/DataFrame

还提供在运行时修改内存中的分区;在写入磁盘时,提供选项基于一个或多个列进行分区


3.1 repartition() & coalesce()

在处理分区数据时,我们经常需要根据数据分布增加或减少分区。方法repartition()和coalesce()帮助我们重新分区。 下面的代码输出为1,10,2

文件从https://github.com/spark-examples/pyspark-examples/blob/master/resources/simple-zipcodes.csv 下载

基于采用HashPartitioner进行分区

HashPartitioner采用哈希的方式对键值对数据进行分区。其数据分区规则为 partitionId = Key.hashCode % numPartitions,其中partitionId代表该Key对应的键值对数据应当分配到的Partition标识,Key.hashCode表示该Key的哈希值,numPartitions表示包含的Partition个数。

def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("Spark Partitioning Test").master("local[*]").config("spark.default.parallelism",4).getOrCreate()val df=spark.read.option("header",true).csv("file:///F:/tmp/simple-zipcodes.csv")println(df.rdd.getNumPartitions)//Change DataFrame partitions to 10val newDF=df.repartition(10)println(newDF.rdd.getNumPartitions)println(newDF.coalesce(2).rdd.getNumPartitions)}

3.2 repartitionBy()

使用partitionBy()方法将Spark DataFrame写入磁盘时,根据分区列对记录进行分割,并将每个分区的数据存储到子目录中。

def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("Spark Partitioning Test").master("local[*]").config("spark.default.parallelism",4).getOrCreate()val df=spark.read.option("header",true).csv("file:///F:/tmp/simple-zipcodes.csv")df.write.option("header",true).partitionBy("state").mode("overwrite").csv("file:///F:/tmp/zipcodes-state")}

在我们的DataFrame上,我们总共有6个不同的状态,因此它创建了6个目录,如下所示。子目录的名称将是分区列及其值(分区列=value)。


3.3 partitionBy() Multiple Columns

可以使用Spark partitionBy()在多个列上创建分区。只需将您想要划分的列作为参数传递给此方法

它根据指定的列创建多级分区

df.write.option("header",true).partitionBy("state","city").mode("overwrite").csv("file:///F:/tmp/zipcodes-state")

             


3.4 repartitionByRange() – 范围分区

下面是一个使用repartitionByRange()转换的范围分区示例。RangePartitioner创建的分区的实际数量可能与指定的分区数量不同,在这种情况下,采样记录的数量小于“partitions”的值。

 Spark引入RangePartitioner的目的是为了解决HashPartitioner所带来的分区倾斜问题,也即分区中包含的数据量不均衡问题。HashPartitioner采用哈希的方式将同一类型的Key分配到同一个Partition中,因此当某一或某几种类型数据量较多时,就会造成若干Partition中包含的数据过大问题,而在Job执行过程中,一个Partition对应一个Task,此时就会使得某几个Task运行过慢。RangePartitioner基于抽样的思想来对数据进行分区。

def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("Spark Partitioning Test").master("local[*]").config("spark.default.parallelism", 4).getOrCreate()val data = Seq((1, 10), (2, 20), (3, 10), (4, 20), (5, 10),(6, 30), (7, 50), (8, 50), (9, 50), (10, 30),(11, 10), (12, 10), (13, 40), (14, 40), (15, 40),(16, 40), (17, 50), (18, 10), (19, 40), (20, 40))import org.apache.spark.sql.functions._import spark.implicits._val dfRange = data.toDF("id", "count").repartitionByRange(5, col("count"))dfRange.write.option("header", true).csv("file:///F:/tmp/range-partition")}

4. 如何选择Spark分区列

当使用partitionBy(),你必须非常谨慎它创建的分区,如果分区创建了太多太多的子目录,目录带来不必要的开销,NameNode(如果您正在使用Hadoop)因为它必须保持所有文件系统的元数据在内存中。

假设您有一个包含邮政编码、城市、州和其他列的美国人口普查表。在该状态上创建一个分区,将该表分成大约50个分区,在一个状态(state= ' CA '和zipcode = ' 92704 ')内搜索邮政编码会更快,因为它只需要在state=CA分区目录中扫描。

另一个分区的好例子是Date列。理想情况下,应该按年/月划分,而不是按日期划分

太多分区好嘛?


  • 如果您是初学者,您可能会认为过多的分区会提高Spark Job性能,实际上它不会,而且它是过度的。

  • Spark必须为每个分区创建一个任务,大多数时间都是创建、调度和管理任务,然后执行

太少分区好嘛?


  1. 分区太少也不好,因为您可能无法充分利用集群资源。

  2. 更少的并行性

  3.  应用程序可能会运行更长的时间,因为每个分区需要更多的时间来完成

 


推荐阅读
author-avatar
小白菜
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有