作者:女孩明天_会更好 | 来源:互联网 | 2023-08-26 14:07
我在这里浏览了文档:https : //spark.apache.org/docs/latest/api/python/pyspark.sql.html
它说:
- 用于重新分区:生成的 DataFrame 是散列分区的。
- 对于 repartitionByRange:生成的 DataFrame 是范围分区的。
和前面的问题也提到了它。但是,我仍然不明白它们究竟有何不同,以及在选择其中一个时会产生什么影响?
更重要的是,如果 repartition 进行哈希分区,提供列作为其参数有什么影响?
回答
我认为最好通过一些实验来研究差异。
测试数据帧
对于这个实验,我使用了以下两个数据帧(我在 Scala 中展示了代码,但概念与 Python API 相同):
// Dataframe with one column "value" containing the values ranging from 0 to 1000000
val df = Seq(0 to 1000000: _*).toDF("value")
// Dataframe with one column "value" containing 1000000 the number 0 in addition to the numbers 5000, 10000 and 100000
val df2 = Seq((0 to 1000000).map(_ => 0) :+ 5000 :+ 10000 :+ 100000: _*).toDF("value")
理论
repartition
应用HashPartitioner
何时提供一列或多列,以及RoundRobinPartitioner
在提供的分区数量上均匀分布数据。如果提供了一列(或更多),这些值将被散列并用于通过计算类似的东西来确定分区号partition = hash(columns) % numberOfPartitions
。
repartitionByRange
将根据列值的范围对数据进行分区。这通常用于连续(非离散)值,例如任何类型的数字。请注意,由于性能原因,此方法使用采样来估计范围。因此,输出可能不一致,因为采样可能返回不同的值。样本大小可以由 config 控制spark.sql.execution.rangeExchange.sampleSizePerPartition
。
还值得一提的是,对于这两种方法,如果没有numPartitions
给出,默认情况下它会将 Dataframe 数据分区到spark.sql.shuffle.partitions
您的 Spark 会话中配置,并且可以通过自适应查询执行(自 Spark 3.x 起可用)合并。
测试设置
基于给定的 Testdata 我总是应用相同的代码:
val testDf = df
// here I will insert the partition logic
.withColumn("partition", spark_partition_id()) // applying SQL built-in function to determin actual partition
.groupBy(col("partition"))
.agg(
count(col("value")).as("count"),
min(col("value")).as("min_value"),
max(col("value")).as("max_value"))
.orderBy(col("partition"))
testDf.show(false)
检测结果
df.repartition(4, col("value"))
正如预期的那样,我们得到了 4 个分区,因为 的值df
范围从 0 到 1000000,我们看到它们的散列值将产生一个分布良好的 Dataframe。
+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0 |249911|12 |1000000 |
|1 |250076|6 |999994 |
|2 |250334|2 |999999 |
|3 |249680|0 |999998 |
+---------+------+---------+---------+
df.repartitionByRange(4, col("value"))
同样在这种情况下,我们得到 4 个分区,但这次最小值和最大值清楚地显示了分区内的值范围。它几乎均匀分布,每个分区有 250000 个值。
+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0 |244803|0 |244802 |
|1 |255376|244803 |500178 |
|2 |249777|500179 |749955 |
|3 |250045|749956 |1000000 |
+---------+------+---------+---------+
df2.repartition(4, col("value"))
现在,我们正在使用另一个 Dataframe df2
。这里,散列算法对只有 0、5000、10000 或 100000 的值进行散列。当然,值 0 的散列将始终相同,因此所有零最终都在同一个分区中(在这种情况下,分区 3 )。其他两个分区只包含一个值。
+---------+-------+---------+---------+
|partition|count |min_value|max_value|
+---------+-------+---------+---------+
|0 |1 |100000 |100000 |
|1 |1 |10000 |10000 |
|2 |1 |5000 |5000 |
|3 |1000001|0 |0 |
+---------+-------+---------+---------+
df2.repartition(4)
如果不使用“value”列的内容,该repartition
方法将在 RoundRobin 的基础上分发消息。所有分区的数据量几乎相同。
+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0 |250002|0 |5000 |
|1 |250002|0 |10000 |
|2 |249998|0 |100000 |
|3 |250002|0 |0 |
+---------+------+---------+---------+
df2.repartitionByRange(4, col("value"))
这种情况表明数据帧df2
没有很好地定义用于按范围重新分区,因为几乎所有值都是 0。因此,我们甚至最终只有两个分区,而分区 0 包含所有零。
+---------+-------+---------+---------+
|partition|count |min_value|max_value|
+---------+-------+---------+---------+
|0 |1000001|0 |0 |
|1 |3 |5000 |100000 |
+---------+-------+---------+---------+