热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spark终止运行_如何优雅地终止正在运行的SparkStreaming程序

一直运行的Streaming程序如何关闭呢?是直接使用kill命令强制关闭吗?这种手段是可以达到关闭的目的,但是带来的后果就是可能会导致数

一直运行的Streaming程序如何关闭呢?是直接使用kill命令强制关闭吗?这种手段是可以达到关闭的目的,但是带来的后果就是可能会导致数据的丢失,因为这时候如果程序正在处理接收到的数据,但是由于接收到kill命令,那它只能停止整个程序,而那些正在处理或者还没有处理的数据可能就会被丢失。那我们咋办?这里有两种方法。

文章目录

等作业运行完再关闭

我们都知道,Streaming每隔batchDuration的时间会把源源不断的流数据分割成一批有限数据集,然后计算这些数据,我们可以从提供的监控页面看到当前batch是否执行完成,当作业执行完,我们就可以手动执行kill命令来强制关闭这个Streaming作业。这种方式的缺点就是得盯着监控页面,然后决定关不关闭,很不灵活。

如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

通过Spark内置机制关闭

其实Spark内置就为我们提供了一种优雅的方法来关闭长期运行的Streaming作业,我们来看看 StreamingContext类中定义的一个 stop 方法:

def stop(stopSparkContext : Boolean, stopGracefully : Boolean)

官方文档对其解释是:Stop the execution of the streams, with option of ensuring all received data has been processed.控制所有接收的数据是否被处理的参数就是 stopGracefully,如果我们将它设置为true,Spark则会等待所有接收的数据被处理完成,然后再关闭计算引擎,这样就可以避免数据的丢失。现在的问题是我们在哪里调用这个stop方法?

Spark 1.4版本之前

在Spark 1.4版本之前,我们需要手动调用这个 stop 方法,一种比较合适的方式是通过 Runtime.getRuntime().addShutdownHook 来添加一个钩子,其会在JVM关闭的之前执行传递给他的函数,如下:

Runtime.getRuntime().addShutdownHook( new Thread() {

override def run() {

log( "Gracefully stop Spark Streaming" )

streamingContext.stop( true , true )

}

})

如果你使用的是Scala,我们还可以通过以下的方法实现类似的功能:

scala.sys.addShutdownHook({

streamingContext.stop( true , true )

)})

通过上面的办法,我们客户确保程序退出之前会执行上面的函数,从而保证Streaming程序关闭的时候不丢失数据。

Spark 1.4版本之后

上面方式可以达到我们的需求,但是在每个程序里面都添加这样的重复代码也未免太过麻烦了!值得高兴的是,从Apache Spark 1.4版本开始,Spark内置提供了spark.streaming.stopGracefullyOnShutdown参数来决定是否需要以Gracefully方式来关闭Streaming程序(详情请参见)。Spark会在启动 StreamingContext 的时候注册这个钩子,如下:

shutdownHookRef = ShutdownHookManager.addShutdownHook(

StreamingContext.SHUTDOWN _ HOOK _ PRIORITY)(stopOnShutdown)

private def stopOnShutdown() : Unit = {

val stopGracefully = conf.getBoolean( "spark.streaming.stopGracefullyOnShutdown" , false )

logInfo(s "Invoking stop(stopGracefully=$stopGracefully) from shutdown hook" )

// Do not stop SparkContext, let its own shutdown hook stop it

stop(stopSparkContext = false , stopGracefully = stopGracefully)

}

从上面的代码可以看出,我们可以根据自己的需求来设置 spark.streaming.stopGracefullyOnShutdown 的值,而不需要在每个Streaming程序里面手动调用StreamingContext的stop方法,确实方便多了。不过虽然这个参数在Spark 1.4开始引入,但是却是在Spark 1.6才开始才有文档正式介绍(可以参见https://github.com/apache/spark/pull/8898和http://spark.apache.org/docs/1.6.0/configuration.html)

猜你喜欢

欢迎关注本公众号:iteblog_hadoop:

0、回复spark_2017_all获取Spark Summit East 2017高清视频和PPT

1、回复hive_es获取《基于 Hive/ES 金融大数据指标系统》PPT

2、回复bigdata_e获取 《大规模数据处理演变》PPT

3、回复 大数据分析获取 《Big Data Analytics》电子书

4、回复 spark2电子书获取 《Apache Spark 2 for Beginners》电子书

5、回复 spark2_data获取 《Spark for Data Science》电子书

6、回复 架构师大会ppt获取 《2016年中国架构师[大数据场]》PPT

7、回复 intro_flink获取 《Introduction to Apache Flink》电子书

8、回复learning_flink获取《Learning Apache Flink》电子书

8、回复Hadoop权威指南获取 《Hadoop权威指南中文第三版》电子书

9、回复 flink未来获取 《The Future of Apache Flink》

10、更多大数据文章欢迎访问https://www.iteblog.com及本公众号(iteblog_hadoop)返回搜狐,查看更多



推荐阅读
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • 本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。 ... [详细]
  • com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ... [详细]
  • Spark与HBase结合处理大规模流量数据结构设计
    本文将详细介绍如何利用Spark和HBase进行大规模流量数据的分析与处理,包括数据结构的设计和优化方法。 ... [详细]
  • Java中不同类型的常量池(字符串常量池、Class常量池和运行时常量池)的对比与关联分析
    在研究Java虚拟机的过程中,笔者发现存在多种类型的常量池,包括字符串常量池、Class常量池和运行时常量池。通过查阅CSDN、博客园等相关资料,对这些常量池的特性、用途及其相互关系进行了详细探讨。本文将深入分析这三种常量池的差异与联系,帮助读者更好地理解Java虚拟机的内部机制。 ... [详细]
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • Zookeeper作为Apache Hadoop生态系统中的一个重要组件,主要致力于解决分布式应用中的常见数据管理难题。它提供了统一的命名服务、状态同步服务以及集群管理功能,有效提升了分布式系统的可靠性和可维护性。此外,Zookeeper还支持配置管理和临时节点管理,进一步增强了其在复杂分布式环境中的应用价值。 ... [详细]
  • 在处理大图片时,PHP 常常会遇到内存溢出的问题。为了避免这种情况,建议避免使用 `setImageBitmap`、`setImageResource` 或 `BitmapFactory.decodeResource` 等方法直接加载大图。这些函数在处理大图片时会消耗大量内存,导致应用崩溃。推荐采用分块处理、图像压缩和缓存机制等策略,以优化内存使用并提高处理效率。此外,可以考虑使用第三方库如 ImageMagick 或 GD 库来处理大图片,这些库提供了更高效的内存管理和图像处理功能。 ... [详细]
  • 在搭建Hadoop集群以处理大规模数据存储和频繁读取需求的过程中,经常会遇到各种配置难题。本文总结了作者在实际部署中遇到的典型问题,并提供了详细的解决方案,帮助读者避免常见的配置陷阱。通过这些经验分享,希望读者能够更加顺利地完成Hadoop集群的搭建和配置。 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • 构建高可用性Spark分布式集群:大数据环境下的最佳实践
    在构建高可用性的Spark分布式集群过程中,确保所有节点之间的无密码登录是至关重要的一步。通过在每个节点上生成SSH密钥对(使用 `ssh-keygen -t rsa` 命令并保持默认设置),可以实现这一目标。此外,还需将生成的公钥分发到所有节点的 `~/.ssh/authorized_keys` 文件中,以确保节点间的无缝通信。为了进一步提升集群的稳定性和性能,建议采用负载均衡和故障恢复机制,并定期进行系统监控和维护。 ... [详细]
  • HBase客户端Table类中getRpcTimeout方法的应用与编程实例解析 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
author-avatar
月曳柳覀梢
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有