热门标签 | HotTags
当前位置:  开发笔记 > 数据库 > 正文

SparkcreateDataFrame(df.rdd,df.schema)vscheckPoint打破世袭

如何解决《SparkcreateDataFrame(df.rdd,df.schema)vscheckPoint打破世袭》经验,为你挑选了1个好方法。

我目前正在使用

val df=longLineageCalculation(....)
val newDf=sparkSession.createDataFrame(df.rdd, df.schema)
newDf.join......

为了节省计算计划时的时间,但是文档称检查点是“削减”沿袭的建议方法。但是我不想付出将RDD保存到磁盘的代价。

我的过程是一个不那么长的批处理过程,可以无问题地重新启动,因此检查点对我没有好处(我认为)。

使用“我的”方法会出现什么问题?(文档建议使用检查点,这种检查会比较昂贵,而不是用这个来破坏血统,我想知道原因)

只想到我能猜到,如果某个节点在我的“血统破坏”之后失败了,也许我的过程将失败,而检查点将正常运行吗?(如果DF被缓存而不是检查点该怎么办?)

谢谢!

编辑:

根据SMaZ的回答,我自己的知识以及他提供的文章。使用createDataframe(这是一个Dev-API,因此使用“我” /您自己承担风险)将沿袭保留在内存中(对我来说这不是问题,因为我没有内存问题,而且沿袭也不大)。

有了它,Spark看起来(如果没有经过100%测试)应该能够重建任何失败的内容。

由于在以下执行中不使用数据,因此我将使用cache + createDataframe与检查点(如果我没有记错的话,实际上是cache + saveToHDFS +“ createDataFrame”)。

我的过程不是很关键(如果崩溃),因为用户将始终期望结果并手动启动它,因此,如果出现问题,他们可以重新启动(+ Spark将重新启动)或给我打电话,以便我接听无论如何都存在一些风险,但是我99%的确没有任何风险:)

小智.. 5

让我从下面的行开始创建数据框:

val newDf = sparkSession.createDataFrame(df.rdd,df.schema)

如果我们仔细查看SparkSession类,则会使用注释此方法@DeveloperApi。要了解此注释的含义,请查看DeveloperApi类的以下几行

面向开发人员的较低级别的不稳定API。

在次要版本的Spark中,开发人员API可能会更改或删除。

因此,不建议将此方法用于生产解决方案,在开源世界中,这种方法称为“ 自担风险”

但是,让我们更深入地探讨createDataframe从RDD 调用时发生的情况。它正在调用internalCreateDataFrameprivate方法并创建LogicalRDD

在以下情况下创建LogicalRDD:

数据集被请求到检查点

请求SparkSession从内部二进制行的RDD创建DataFrame

因此,它与checkpoint不物理保存数据集的操作相同。它只是从内部二进制行和架构的RDD创建DataFrame。这可能会截断内存中的沿袭,而不是在物理级别。

所以我相信这只是创建另一RDDS的开销,不能被用作替换checkpoint

现在,Checkpoint是截断谱系图并将其保存到可靠的分布式/本地文件系统的过程。

为什么要检查站?

如果计算需要很长时间沿袭时间太长取决于太多的RDD

保持繁重的血统信息会带来内存成本。

即使在Spark应用程序终止后,检查点文件也不会自动删除,因此我们可以将其用于其他过程

使用“我的”方法会出现什么问题?(文档建议使用检查点,这种检查会比较昂贵,而不是用这个来破坏血统,我想知道原因)

此文章会给高速缓存和检查点的详细信息。IIUC,您的问题更多是在哪里使用检查点。让我们讨论一些有用的检查点实用方案

    让我们假设一个场景,我们有一个要执行100次迭代操作的数据集,而每个迭代都将最后一个迭代结果作为输入(Spark MLlib用例)。现在,在此迭代过程中,沿袭将在此期间增长。这里以规则的时间间隔(每10个迭代)对数据集进行检查将确保在发生任何故障的情况下,我们可以从上一个故障点开始处理。

    让我们来看一些批处理示例。想象一下,我们有一批正在创建一个具有大量谱系或复杂计算的主数据集。现在,经过一定的定期间隔后,我们得到了一些数据,这些数据应使用较早计算出的主数据集。在这里,如果我们检查主数据集,则可以将其用于来自的所有后续过程sparkSession

我的过程是一个不那么长的批处理过程,可以无问题地重新启动,因此检查点对我没有好处(我认为)。

没错,如果您的过程不是繁重的工作/大笔的血统,那么就没有检查点。经验法则是,如果不多次使用您的数据集,并且重建时间比花费的时间和用于检查点/缓存的资源要快,那么我们应该避免使用它。它将为您的过程提供更多资源。



1> 小智..:

让我从下面的行开始创建数据框:

val newDf = sparkSession.createDataFrame(df.rdd,df.schema)

如果我们仔细查看SparkSession类,则会使用注释此方法@DeveloperApi。要了解此注释的含义,请查看DeveloperApi类的以下几行

面向开发人员的较低级别的不稳定API。

在次要版本的Spark中,开发人员API可能会更改或删除。

因此,不建议将此方法用于生产解决方案,在开源世界中,这种方法称为“ 自担风险”

但是,让我们更深入地探讨createDataframe从RDD 调用时发生的情况。它正在调用internalCreateDataFrameprivate方法并创建LogicalRDD

在以下情况下创建LogicalRDD:

数据集被请求到检查点

请求SparkSession从内部二进制行的RDD创建DataFrame

因此,它与checkpoint不物理保存数据集的操作相同。它只是从内部二进制行和架构的RDD创建DataFrame。这可能会截断内存中的沿袭,而不是在物理级别。

所以我相信这只是创建另一RDDS的开销,不能被用作替换checkpoint

现在,Checkpoint是截断谱系图并将其保存到可靠的分布式/本地文件系统的过程。

为什么要检查站?

如果计算需要很长时间沿袭时间太长取决于太多的RDD

保持繁重的血统信息会带来内存成本。

即使在Spark应用程序终止后,检查点文件也不会自动删除,因此我们可以将其用于其他过程

使用“我的”方法会出现什么问题?(文档建议使用检查点,这种检查会比较昂贵,而不是用这个来破坏血统,我想知道原因)

此文章会给高速缓存和检查点的详细信息。IIUC,您的问题更多是在哪里使用检查点。让我们讨论一些有用的检查点实用方案

    让我们假设一个场景,我们有一个要执行100次迭代操作的数据集,而每个迭代都将最后一个迭代结果作为输入(Spark MLlib用例)。现在,在此迭代过程中,沿袭将在此期间增长。这里以规则的时间间隔(每10个迭代)对数据集进行检查将确保在发生任何故障的情况下,我们可以从上一个故障点开始处理。

    让我们来看一些批处理示例。想象一下,我们有一批正在创建一个具有大量谱系或复杂计算的主数据集。现在,经过一定的定期间隔后,我们得到了一些数据,这些数据应使用较早计算出的主数据集。在这里,如果我们检查主数据集,则可以将其用于来自的所有后续过程sparkSession

我的过程是一个不那么长的批处理过程,可以无问题地重新启动,因此检查点对我没有好处(我认为)。

没错,如果您的过程不是繁重的工作/大笔的血统,那么就没有检查点。经验法则是,如果不多次使用您的数据集,并且重建时间比花费的时间和用于检查点/缓存的资源要快,那么我们应该避免使用它。它将为您的过程提供更多资源。


推荐阅读
author-avatar
520那孩HAPPY
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有