作者:大街上 | 来源:互联网 | 2023-08-25 21:21
有时用Spark运行Job的时候,输出可能会出现一些空或者小内容。这时重新将输出的Partition进行重新调整,可以减少RDD中Patition的数目。 两种方式: 1. coa
有时用Spark 运行Job 的时候,输出可能会出现一些空或者小内容。这时重新将输出的Partition 进行重新调整,可以减少RDD中Patition的数目。
两种方式:
1. coalesce(numPartitions:Int, shuffle:Boolean = false)
2. repartition(numPartitions:Int)
,repartition只是coalesce 简化版
/**
* Return a new RDD that has exactly numPartitions partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
如果你想减少patitions 用coalesce
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitiOns= 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitiOns= 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* Note: With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner.
shuffle为收缩 默认采用 partitionCoalescer进行分区
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
//index 所在分区, items
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
//如果扩展 newShuffledRDD 这个核心操作distributePartition看 对现有分区进行重新划分看上面函数distributePartition 。
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
//shuffle为false
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}