热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

(七)SparkStreaming算子梳理—repartition算子

目录天小天:(一)SparkStreaming算子梳理—简单介绍streaming运行逻辑天小天:(二)SparkStreaming算子梳理—flatMap和mapPartitio

目录

天小天:(一)Spark Streaming 算子梳理 — 简单介绍streaming运行逻辑

天小天:(二)Spark Streaming 算子梳理 — flatMap和mapPartitions

天小天:(三)Spark Streaming 算子梳理 — transform算子

天小天:(四)Spark Streaming 算子梳理 — Kafka createDirectStream

天小天:(五)Spark Streaming 算子梳理 — foreachRDD

天小天:(六)Spark Streaming 算子梳理 — glom算子

天小天:(七)Spark Streaming 算子梳理 — repartition算子

天小天:(八)Spark Streaming 算子梳理 — window算子

前言

本文主要讲解repartiion的作用及原理。

作用

repartition用来调整父RDD的分区数,入参为调整之后的分区数。由于使用方法比较简单,这里就不写例子了。

源码分析

接下来从源码的角度去分析是如何实现重新分区的。

DStream

/** * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the * returned DStream has exactly numPartitions partitions. */
def repartition(numPartitions: Int): DStream[T] = ssc.withScope {
this.transform(_.repartition(numPartitions))
}

从方法中可以看到,实现repartition的方式是通过Dstreamtransform算子之间调用RDD的repartition算子实现的。

接下来就是看看RDD的repartition算子是如何实现的。

RDD

/** * Return a new RDD that has exactly numPartitions partitions. * * Can increase or decrease the level of parallelism in this RDD. Internally, this uses * a shuffle to redistribute data. * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. * * TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207. */
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}

首先可以看到RDDrepartition的实现是调用时coalesce方法。其中入参有两个第一个是numPartitions为重新分区后的分区数量,第二个参数为是否shuffle,这里的入参为true代表会进行shuffle。

接下来看下coalesce是如何实现的。

def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {// 是否经过shuffle,repartition是走这个逻辑 /** Distributes elements evenly across output partitions, starting from a random partition. */
// distributePartition是shuffle的逻辑, // 对迭代器中的每个元素分派不同的key,shuffle时根据这些key平均的把元素分发到下一个stage的各个partition中。 val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), // 为每个元素分配key,分配的逻辑为distributePartition new HashPartitioner(numPartitions)), // ShuffledRDD 根据key进行混洗 numPartitions,
partitionCoalescer).values
} else {
// 如果不经过shuffle之间返回CoalescedRDD new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}

从源码中可以看到无论是否经过shuffle最终返回的都是CoalescedRDD。其中区别是经过shuffle需要为每个元素分配key,并根据key将所有的元素平均分配到task中。

CoalescedRDD

private[spark] class CoalescedRDD[T: ClassTag](
@transient var prev: RDD[T], // 父RDD maxPartitions: Int, // 最大partition数量,这里就是重新分区后的partition数量 partitionCoalescer: Option[PartitionCoalescer] = None // 重新分区算法,入参默认为None) extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies
require(maxPartitions > 0 || maxPartitions == prev.partitions.length,
s"Number of partitions ($maxPartitions) must be positive.")
if (partitionCoalescer.isDefined) {
require(partitionCoalescer.get.isInstanceOf[Serializable],
"The partition coalescer passed in must be serializable.")
}
override def getPartitions: Array[Partition] = {
// 获取重新算法,默认为DefaultPartitionCoalescer val pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer())
// coalesce方法是根据传入的rdd和最大分区数计算出每个新的分区处理哪些旧的分区 pc.coalesce(maxPartitions, prev).zipWithIndex.map {
case (pg, i) => // pg为partitionGroup即旧的partition组成的集合,集合里的partition对应一个新的partition val ids = pg.partitions.map(_.index).toArray
new CoalescedRDDPartition(i, prev, ids, pg.prefLoc) //组成一个新的parititon }
}
override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
// 当执行到这里时分区已经重新分配好了,这部分代码也是执行在新的分区的task中的。 // 新的partition取出就的partition对应的所有partition并以此调用福rdd的迭代器执行next计算。 partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>
firstParent[T].iterator(parentPartition, context)
}
}
override def getDependencies: Seq[Dependency[_]] = {
Seq(new NarrowDependency(prev) {
def getParents(id: Int): Seq[Int] =
partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices
})
}
override def clearDependencies() {
super.clearDependencies()
prev = null
}
/** * Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition, * then the preferred machine will be one which most parent splits prefer too. * @param partition * @return the machine most preferred by split */
override def getPreferredLocations(partition: Partition): Seq[String] = {
partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
}
}

对于CoalescedRDD来讲getPartitions方法是最核心的方法。旧的parition对应哪些新的partition就是在这个方法里计算出来的。具体的算法是在DefaultPartitionCoalescercoalesce方法体现出来的。

compute方法是在新的task中执行的,即分区已经重新分配好,并且拉取父RDD指定parition对应的元素提供给下游迭代器计算。

图示

写下来用两张图解释下是如何repartition

无shuffle

《(七)Spark Streaming 算子梳理 — repartition算子》
《(七)Spark Streaming 算子梳理 — repartition算子》

有shuffle

《(七)Spark Streaming 算子梳理 — repartition算子》
《(七)Spark Streaming 算子梳理 — repartition算子》

总结

以上repartition的逻辑基本就已经介绍完了。其中DefaultPartitionCoalescer中重新分区的算法逻辑并没有展开说。这里以后如果有时间会再写一篇详细介绍。


推荐阅读
  • 本打算教一步步实现koa-router,因为要解释的太多了,所以先简化成mini版本,从实现部分功能到阅读源码,希望能让你好理解一些。希望你之前有读过koa源码,没有的话,给你链接 ... [详细]
  • 首部|接口类型_OSI 7层模型 & TCP/IP协议首部封装格式解析
    首部|接口类型_OSI 7层模型 & TCP/IP协议首部封装格式解析 ... [详细]
  • 本文详细介绍了如何对一个整数的二进制表示进行逆序操作。通过多种方法,包括直接法、查表法和分治法,帮助读者全面理解和掌握这一技术。 ... [详细]
  • 图像处理学习笔记:噪声分析与去除策略
    本文详细探讨了不同类型的图像噪声及其对应的降噪技术,旨在帮助读者理解各种噪声的本质,并掌握有效的降噪方法。文章不仅介绍了高斯噪声、瑞利噪声、伽马噪声、指数噪声、均匀噪声和椒盐噪声等常见噪声类型,还特别讨论了周期噪声的特性及处理技巧。 ... [详细]
  • 龙蜥社区开发者访谈:技术生涯的三次蜕变 | 第3期
    龙蜥社区的开发者们通过自己的实践和经验,推动着开源技术的发展。本期「龙蜥开发者说」聚焦于一位资深开发者的三次技术转型,分享他在龙蜥社区的成长故事。 ... [详细]
  • Go从入门到精通系列视频之go编程语言密码学哈希算法(二) ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • PHP面试题精选及答案解析
    本文精选了新浪PHP笔试题及最新的PHP面试题,并提供了详细的答案解析,帮助求职者更好地准备PHP相关的面试。 ... [详细]
  • C# 中创建和执行存储过程的方法
    本文详细介绍了如何使用 C# 创建和调用 SQL Server 存储过程,包括连接数据库、定义命令类型、设置参数等步骤。 ... [详细]
  • 原文地址:https:blog.csdn.netqq_35361471articledetails84715491原文地址:https:blog.cs ... [详细]
  • 高效的JavaScript异步资源加载解决方案
    本文探讨了如何通过异步加载技术处理网页中大型第三方插件的加载问题,避免将大文件打包进主JS文件中导致的加载时间过长,介绍了实现异步加载的具体方法及其优化。 ... [详细]
  • Flutter 核心技术与混合开发模式深入解析
    本文深入探讨了 Flutter 的核心技术,特别是其混合开发模式,包括统一管理模式和三端分离模式,以及混合栈原理。通过对比不同模式的优缺点,帮助开发者选择最适合项目的混合开发策略。 ... [详细]
  • 本文详细介绍了 Spark 中的弹性分布式数据集(RDD)及其常见的操作方法,包括 union、intersection、cartesian、subtract、join、cogroup 等转换操作,以及 count、collect、reduce、take、foreach、first、saveAsTextFile 等行动操作。 ... [详细]
  • 机器学习算法:SVM(支持向量机)
    SVM算法(SupportVectorMachine,支持向量机)的核心思想有2点:1、如果数据线性可分,那么基于最大间隔的方式来确定超平面,以确保全局最优, ... [详细]
  • 本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ... [详细]
author-avatar
傲慢的小草7_170
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有