热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spark分区和分区【PartitioningPartition】的理解

1.理解spark分区【Partitioning】1.1.分区概念及分区方法Spark分区是一种将数据分割到多个分区的方法,这样您就可以并行地在多个分区上执行转换

1. 理解spark 分区【Partitioning】


1.1. 分区概念及分区方法

Spark分区是一种将数据分割到多个分区的方法,这样您就可以并行地在多个分区上执行转换,从而更快地完成作业。还可以将已分区的数据写入文件系统(多个子目录),以便下游系统更快地读取数据。

Spark有几种分区方法来实现并行性,可以根据需要选择使用哪种分区方法。


PARTITIONING METHODS (SCALA)说明提供类
repartition(numPartitions: Int)Uses RoundRobinPartitioningDataFrame
repartition(partitionExprs: Column*)Uses HashPartitionerDataFrame
repartition(numPartitions: Int, partitionExprs: Column*)partition = hash(partitionExprs) % numPartitionsDataFrame
coalesce(numPartitions: Int)Use only to reduce the number of partitions.DataFrame
repartitionByRange(partitionExprs : Column*)Uses rangepartitioning. Ideal to use on numeric columns.DataFrame
repartitionByRange(numPartitions: Int, partitionExprs: Column*) DataFrame
partitionBy(colNames: String*)Use to write the data into sub-folderDataFrameWriter

  • 默认情况下,Spark创建的分区与机器的CPU核数相等。

  • 每个分区的数据都驻留在一台机器上。

  • Spark为每个分区创建一个任务。

  • Spark Shuffle将数据从一个分区转移到其他分区。

  • 分区是一个昂贵的操作,因为它会造成数据转移(数据可能在节点之间移动)

  • 默认情况下,DataFrame shuffle操作创建200个分区。

Spark支持内存分区(RDD/DataFrame)和磁盘分区(文件系统)。

内存分区: 可以通过调用repartition()或coalesce()转换对DataFrame进行分区或重新分区。

磁盘分区:在将Spark DataFrame写回磁盘时,可以通过使用DataFrameWriter的partitionBy()来选择如何基于列对数据进行分区, 这类似于hive分区。


1.2. 分区的优点

正如您所知道的那样,Spark的设计是为了以比传统处理快100倍的速度处理大型数据集,如果没有分区,这是不可能的。下面是在内存或磁盘上使用Spark分区的一些优点。


  • 快速访问数据。

  • 提供在较小的数据集上执行操作的能力。

静态分区【Partitioning at rest (disk)】是许多数据库和数据处理框架的一个特性,是提高读取速度的关键。


2. Default Spark Partitions & Configurations

Spark在默认情况下基于多个因素对数据进行分区,这些因素不同于您在哪些模式上运行作业。


2.1. local mode

下面的示例将本地[20]作为参数提供给master()方法,这意味着使用20个分区在本地运行作业。虽然如果系统上只有8个核心,它仍然会创建20个分区任务。

def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("Spark Partitioning Test").master("local[20]").getOrCreate()val df = spark.range(0,20)print(df.rdd.getNumPartitions)}

2.2. HDFS cluster mode

在Hadoop集群上运行Spark任务时,默认分区数如下所示。


  • 在HDFS集群中,Spark默认为每个文件块创建一个Partition。

  • 在Version 1 Hadoop中,HDFS块大小是64mb,在Version 2 Hadoop中,HDFS块大小是128mb

  • 集群中所有执行器节点上的核总数和2个,取较大的值

例如,如果您有一个640mb的文件,并在Hadoop版本2上运行它,则创建5个分区,每个分区由128mb块组成(5块* 128mb = 640mb)。如果您将分区重新划分到10,那么它将为每个块创建2个分区。


2.3. spark 配置相关参数


  • spark.default.parallelism配置默认值:集群模式下,是集群的所有节点上的所有核数,在本地模式下,是机器的核数。

  • spark.sql.shuffle.partitions配置默认值是200,在调用shuffle操作时使用,如union()、groupBy()、join()等。此属性仅在DataFrame API中可用,在RDD中不可用。

可以在程序中或spark-submit提交任务时,根据业务需要调整这2个参数的值。

spark.conf.set("spark.sql.shuffle.partitions", "500")

./bin/spark-submit --conf spark.sql.shuffle.partitions=500 --conf spark.default.parallelism=500


3. 动态修改Spark分区

从一个文件/表创建一个RDD/DataFrame时,Spark会根据特定的参数创建一定数量分区的RDD/DataFrame

还提供在运行时修改内存中的分区;在写入磁盘时,提供选项基于一个或多个列进行分区


3.1 repartition() & coalesce()

在处理分区数据时,我们经常需要根据数据分布增加或减少分区。方法repartition()和coalesce()帮助我们重新分区。 下面的代码输出为1,10,2

文件从https://github.com/spark-examples/pyspark-examples/blob/master/resources/simple-zipcodes.csv 下载

基于采用HashPartitioner进行分区

HashPartitioner采用哈希的方式对键值对数据进行分区。其数据分区规则为 partitionId = Key.hashCode % numPartitions,其中partitionId代表该Key对应的键值对数据应当分配到的Partition标识,Key.hashCode表示该Key的哈希值,numPartitions表示包含的Partition个数。

def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("Spark Partitioning Test").master("local[*]").config("spark.default.parallelism",4).getOrCreate()val df=spark.read.option("header",true).csv("file:///F:/tmp/simple-zipcodes.csv")println(df.rdd.getNumPartitions)//Change DataFrame partitions to 10val newDF=df.repartition(10)println(newDF.rdd.getNumPartitions)println(newDF.coalesce(2).rdd.getNumPartitions)}

3.2 repartitionBy()

使用partitionBy()方法将Spark DataFrame写入磁盘时,根据分区列对记录进行分割,并将每个分区的数据存储到子目录中。

def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("Spark Partitioning Test").master("local[*]").config("spark.default.parallelism",4).getOrCreate()val df=spark.read.option("header",true).csv("file:///F:/tmp/simple-zipcodes.csv")df.write.option("header",true).partitionBy("state").mode("overwrite").csv("file:///F:/tmp/zipcodes-state")}

在我们的DataFrame上,我们总共有6个不同的状态,因此它创建了6个目录,如下所示。子目录的名称将是分区列及其值(分区列=value)。


3.3 partitionBy() Multiple Columns

可以使用Spark partitionBy()在多个列上创建分区。只需将您想要划分的列作为参数传递给此方法

它根据指定的列创建多级分区

df.write.option("header",true).partitionBy("state","city").mode("overwrite").csv("file:///F:/tmp/zipcodes-state")

             


3.4 repartitionByRange() – 范围分区

下面是一个使用repartitionByRange()转换的范围分区示例。RangePartitioner创建的分区的实际数量可能与指定的分区数量不同,在这种情况下,采样记录的数量小于“partitions”的值。

 Spark引入RangePartitioner的目的是为了解决HashPartitioner所带来的分区倾斜问题,也即分区中包含的数据量不均衡问题。HashPartitioner采用哈希的方式将同一类型的Key分配到同一个Partition中,因此当某一或某几种类型数据量较多时,就会造成若干Partition中包含的数据过大问题,而在Job执行过程中,一个Partition对应一个Task,此时就会使得某几个Task运行过慢。RangePartitioner基于抽样的思想来对数据进行分区。

def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("Spark Partitioning Test").master("local[*]").config("spark.default.parallelism", 4).getOrCreate()val data = Seq((1, 10), (2, 20), (3, 10), (4, 20), (5, 10),(6, 30), (7, 50), (8, 50), (9, 50), (10, 30),(11, 10), (12, 10), (13, 40), (14, 40), (15, 40),(16, 40), (17, 50), (18, 10), (19, 40), (20, 40))import org.apache.spark.sql.functions._import spark.implicits._val dfRange = data.toDF("id", "count").repartitionByRange(5, col("count"))dfRange.write.option("header", true).csv("file:///F:/tmp/range-partition")}

4. 如何选择Spark分区列

当使用partitionBy(),你必须非常谨慎它创建的分区,如果分区创建了太多太多的子目录,目录带来不必要的开销,NameNode(如果您正在使用Hadoop)因为它必须保持所有文件系统的元数据在内存中。

假设您有一个包含邮政编码、城市、州和其他列的美国人口普查表。在该状态上创建一个分区,将该表分成大约50个分区,在一个状态(state= ' CA '和zipcode = ' 92704 ')内搜索邮政编码会更快,因为它只需要在state=CA分区目录中扫描。

另一个分区的好例子是Date列。理想情况下,应该按年/月划分,而不是按日期划分

太多分区好嘛?


  • 如果您是初学者,您可能会认为过多的分区会提高Spark Job性能,实际上它不会,而且它是过度的。

  • Spark必须为每个分区创建一个任务,大多数时间都是创建、调度和管理任务,然后执行

太少分区好嘛?


  1. 分区太少也不好,因为您可能无法充分利用集群资源。

  2. 更少的并行性

  3.  应用程序可能会运行更长的时间,因为每个分区需要更多的时间来完成

 


推荐阅读
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • Linux重启网络命令实例及关机和重启示例教程
    本文介绍了Linux系统中重启网络命令的实例,以及使用不同方式关机和重启系统的示例教程。包括使用图形界面和控制台访问系统的方法,以及使用shutdown命令进行系统关机和重启的句法和用法。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 原文地址:https:www.cnblogs.combaoyipSpringBoot_YML.html1.在springboot中,有两种配置文件,一种 ... [详细]
  • 本文主要解析了Open judge C16H问题中涉及到的Magical Balls的快速幂和逆元算法,并给出了问题的解析和解决方法。详细介绍了问题的背景和规则,并给出了相应的算法解析和实现步骤。通过本文的解析,读者可以更好地理解和解决Open judge C16H问题中的Magical Balls部分。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • CEPH LIO iSCSI Gateway及其使用参考文档
    本文介绍了CEPH LIO iSCSI Gateway以及使用该网关的参考文档,包括Ceph Block Device、CEPH ISCSI GATEWAY、USING AN ISCSI GATEWAY等。同时提供了多个参考链接,详细介绍了CEPH LIO iSCSI Gateway的配置和使用方法。 ... [详细]
  • 本文分析了Wince程序内存和存储内存的分布及作用。Wince内存包括系统内存、对象存储和程序内存,其中系统内存占用了一部分SDRAM,而剩下的30M为程序内存和存储内存。对象存储是嵌入式wince操作系统中的一个新概念,常用于消费电子设备中。此外,文章还介绍了主电源和后备电池在操作系统中的作用。 ... [详细]
author-avatar
小白菜
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有