热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

必读:再讲Spark与kafka0.8.2.1+整合

Kafka在0.8和0.10版本引入了新的消费者API,所以sparkStreaming与kafka的整合提供了两个包。请根据你的集群选用正确的包。注意,0.8和后期的

Kafka在0.8和0.10版本引入了新的消费者API,所以spark Streaming与kafka的整合提供了两个包。  请根据你的集群选用正确的包。注意, 0.8和后期的版本0.9及0.10是兼容的,但是0.10整合是不兼容之前的版本的。

包与版本特性之间的对应关系如下:

本文主要讲述spark Streaming与kafka 0.8.2.1+版本整合,要求kafka集群的版本是0.8.2.1或者更高版本。

基于Receiver的方式

这种方式使用一个Receiver来接受数据。Receiver是使用kafka的高级消费者API来实现的。所有的Receiver从kafka里面接受数据,然后存储于Executors,spark Streaming再生成任务来处理数据。

然而,默认配置的情况,这种方式在失败的情况下有可能丢失数据,为了确保零数据丢失,可以配置预写日志(WAL,从spark1.2引入)。这会将Receiver接收到的数据写入分布式文件系统,如hdfs,所以所有的数据可以在从失败恢复运行的时候加载到。

导包(MVN或者sbt):

groupId = org.apache.spark

artifactId = spark-streaming-kafka-0-8_2.11

version = 2.2.1

测试代码如下:

val sparkCOnf= new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topics = "topic1,topic2 1"
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
 .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()

ssc.start()
ssc.awaitTermination()

注意事项:

1,打包的时候    spark-streaming-kafka-0-8对应的jar包一定要带上

2消费的kafka分区和生成的RDD分区并不是一一对应的。所以,增加KafkaUtils.createStream()命令中topic指定的分区,也即map里面topic名字对应的value,只会增加消费该命令创建的Receiver的内部消费者线程数目,不会增加spark处理数据的并行度恰当线程数会增加Receiver的接收数据的速度

3,KafkaUtils.createStream()命令执行只会创建一个Receiver,我们可以结合消费的topic分区和group名称来多创建几个Receiver,来增加接收数据的并行度。

4如果你启动了预写日志,日志存储系统时hdfs,日志已经会被存储副本。所以,可以设置存储等级为StorageLevel.MEMORY_AND_DISK_SER.

5要配置该机制,首先要调用 StreamingContext 的 checkpoint ( ) 方法设置一个 checkpoint 目录,然后需要将 spark.streaming.receiver.writeAheadLog.enable 参数设置为 true

Direct Approach

在spark 1.3以后引入了一种新的spark Streaming api,新的api回自己在driver内部维护一个偏移,然后自动计算指定的topic+partition该批次需要拉去数据的范围,然后从kafka拉去数据来计算。不同于基于Receiver的方式,direct模式不会将偏移记录到Zookeeper,以保证故障恢复从上次偏移处消费消息。Direct模式你可以通过Checkpoint或者自己编写工具来实现偏移的维护,保证数据消费不丢失。

这种方式相比于基于Receiver的方式有以下优势:

1, 简化并行度:不需要创建多个kafka stream,然后union他们。使用directStream,spark streaming 生成的RDD分区和kafka的分区是一一对应的,这种方式理解起来更简单而且便于调优。

2, 高效:基于Receiver的方式要保证数据不丢失,必须启用预写日志。这个行为实际上是非常抵消的,数据会被复制两次,一次是kafka集群,一次是预写日志。Direct方式解决了这个问题,由于没有Receiver,故而也不需要预写日志。只要你kafka里面存有数据,那么消息就可以从kafka里面恢复。

3, 仅一次消费语义:基于Receiver的会把偏移提交到Zookeeper。这种方式结合预写日志能保证数据不丢失,也即是最少一次消费语义,但是有几率导致消费者在存在失败的情况下消费消息两次。比如,消息处理并经过存储之后,但是偏移并没有提交到Zookeeper,这个时候发生故障了,那么恢复之后,就会按照Zookeeper上的偏移再一次消费数据并处理,导致消息重复处理。但是direct 方式偏移不会提交到Zookeeper,是spark streaming在driver使用内存变量加Checkpoint进行追踪的,所以尽管会存在任务失败,但是仍然能保证消费的一次处理。

注意,由于direct方式不会提交偏移到Zookeeper,所以,基于Zookeeper的kafka监控工具就不能监控到spark streaming的消费情况。然而,你可以自己讲偏移提交道Zookeeper,来满足你的需求。

导包(MVN或者sbt):

groupId = org.apache.spark

artifactId = spark-streaming-kafka-0-8_2.11

version = 2.2.1

测试代码如下:

val Array(brokers, topics) = args

// Create context with 2 second batch interval
val sparkCOnf= new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
 ssc, kafkaParams, topicsSet)

// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

// Start the computation
ssc.start()
ssc.awaitTermination()

关于自己编代码提交到Zookeeper,限于篇幅的原因,不在这里啰嗦。

调优限速

现实系统中会有流量尖峰,比如淘宝的双十一,那一秒钟的流量,大的吓人,假如有spark streaming处理的话,会有可能导致消息不能及时处理,甚至出现故障,应对这种流量尖峰,spark streaming内部实现了一个控制器,基于PID,具体PID的概念是啥,请自行百度。

这里只是想介绍两个主要的参数:

基于Receiver的要配置的参数是spark.streaming.receiver.maxRate

基于direct的要配置的参数是spark.streaming.kafka.maxRatePerPartition

通过我们压测我们的spark streaming任务每秒钟最大消费处理的消息数,然后使用这两个参数限消费消息的速率,来避免高峰期一批次消费过量消息导致应用不正常执行。

推荐阅读:

1,论Spark Streaming的数据可靠性和一致性

2,Spark Structured Streaming高级特性

3,构建Flink工程及demo演示

4,Flink系列之时间


推荐阅读
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 在Android平台中,播放音频的采样率通常固定为44.1kHz,而录音的采样率则固定为8kHz。为了确保音频设备的正常工作,底层驱动必须预先设定这些固定的采样率。当上层应用提供的采样率与这些预设值不匹配时,需要通过重采样(resample)技术来调整采样率,以保证音频数据的正确处理和传输。本文将详细探讨FFMpeg在音频处理中的基础理论及重采样技术的应用。 ... [详细]
  • 在处理 XML 数据时,如果需要解析 `` 标签的内容,可以采用 Pull 解析方法。Pull 解析是一种高效的 XML 解析方式,适用于流式数据处理。具体实现中,可以通过 Java 的 `XmlPullParser` 或其他类似的库来逐步读取和解析 XML 文档中的 `` 元素。这样不仅能够提高解析效率,还能减少内存占用。本文将详细介绍如何使用 Pull 解析方法来提取 `` 标签的内容,并提供一个示例代码,帮助开发者快速解决问题。 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • Spring框架的核心组件与架构解析 ... [详细]
  • 构建高可用性Spark分布式集群:大数据环境下的最佳实践
    在构建高可用性的Spark分布式集群过程中,确保所有节点之间的无密码登录是至关重要的一步。通过在每个节点上生成SSH密钥对(使用 `ssh-keygen -t rsa` 命令并保持默认设置),可以实现这一目标。此外,还需将生成的公钥分发到所有节点的 `~/.ssh/authorized_keys` 文件中,以确保节点间的无缝通信。为了进一步提升集群的稳定性和性能,建议采用负载均衡和故障恢复机制,并定期进行系统监控和维护。 ... [详细]
  • 【并发编程】全面解析 Java 内存模型,一篇文章带你彻底掌握
    本文深入解析了 Java 内存模型(JMM),从基础概念到高级特性进行全面讲解,帮助读者彻底掌握 JMM 的核心原理和应用技巧。通过详细分析内存可见性、原子性和有序性等问题,结合实际代码示例,使开发者能够更好地理解和优化多线程并发程序。 ... [详细]
  • 修复一个 Bug 竟耗时两天?真的有那么复杂吗?
    修复一个 Bug 竟然耗费了两天时间?这背后究竟隐藏着怎样的复杂性?本文将深入探讨这个看似简单的 Bug 为何会如此棘手,从代码层面剖析问题根源,并分享解决过程中遇到的技术挑战和心得。 ... [详细]
  • 在使用 Cacti 进行监控时,发现已运行的转码机未产生流量,导致 Cacti 监控界面显示该转码机处于宕机状态。进一步检查 Cacti 日志,发现数据库中存在 SQL 查询失败的问题,错误代码为 145。此问题可能是由于数据库表损坏或索引失效所致,建议对相关表进行修复操作以恢复监控功能。 ... [详细]
  • 本文详细解析了 Android 系统启动过程中的核心文件 `init.c`,探讨了其在系统初始化阶段的关键作用。通过对 `init.c` 的源代码进行深入分析,揭示了其如何管理进程、解析配置文件以及执行系统启动脚本。此外,文章还介绍了 `init` 进程的生命周期及其与内核的交互方式,为开发者提供了深入了解 Android 启动机制的宝贵资料。 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • 如何正确配置与使用日志组件:Log4j、SLF4J及Logback的连接与整合方法
    在当前的软件开发实践中,无论是开源项目还是日常工作中,日志框架都是不可或缺的工具之一。本文详细探讨了如何正确配置与使用Log4j、SLF4J及Logback这三个流行的日志组件,并深入解析了它们之间的连接与整合方法,旨在帮助开发者高效地管理和优化日志记录流程。 ... [详细]
author-avatar
mobiledu2502920087
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有