热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:SparkStreaming整合Kafka

0)摘要主要介绍了SparkStreaming整合Kafka,两种整合方式:Receiver-based和Direct方式。这里使用的是Kafkabrokervers
0)摘要

  主要介绍了Spark Streaming整合Kafka,两种整合方式:Receiver-based和Direct方式。这里使用的是Kafka broker version 0.8.2.1,官方文档地址:(http://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html)。

1)Kafka准备

  • 启动zookeeper
    ./zkServer.sh start


  • 启动kafka
    ./kafka-server-start.sh -daemon ../config/server.properties //后台启动


  • 创建topic
    ./kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic test


  • 通过控制台测试topic能否正常的生产和消费

       启动生产者脚本:
         ./kafka-console-producer.sh --broker-list hadoop:9092 --topic test

   启动消费者脚本:

    ./kafka-console-consumer.sh --zookeeper hadoop:2181 --topic test --from-beginning

  准备工作已经就绪。

2)Receiver-based方式整合

注意:这种方式为了保证数据不会丢失,需要开启Write Ahead Logs机制,开启后,接收数据的正确性只有被预写到日志以后Receive才会确认,可以从日志中恢复数据,会增加额外的开销。如何开启?设置SparkConf的“Spark Streaming writeAheadLog.enable”属性为“true”,这种模式基本被淘汰

1 添加kafka依赖



org.apache.spark
spark-streaming-kafka-0-8_2.11
2.2.0

2 本地代码编写

1 package flume_streaming
2
3 import org.apache.spark.SparkConf
4 import org.apache.spark.streaming.kafka._
5 import org.apache.spark.streaming.{Durations, StreamingContext}
6
7 /**
8 * @Author: SmallWild
9 * @Date: 2019/10/30 10:00
10 * @Desc:
11 */
12
13 object kafkaReceiveWordCount {
14 def main(args: Array[String]): Unit = {
15 if (args.length != 4) {
16 System.err.println("错误参数")
17 System.exit(1)
18 }
19 //接收参数
20 //numPartitions 线程数
21 val Array(zkQuorum, groupId, topics, numPartitions) = args
22 //一定不能使用local[1]
23 val sparkCOnf= new SparkConf().setMaster("local[2]").setAppName("kafkaReceiveWordCount")
24 val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
25 //设置日志级别
26 ssc.sparkContext.setLogLevel("WARN")
27 //多个topic用,分开
28 val topicMap = topics.split(",").map((_, numPartitions.toInt)).toMap
29 //TODO 业务逻辑,简单进行wordcount,输出到控制台
30 /**
31 * * @param ssc StreamingContext object
32 * * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
33 * * @param groupId The group id for this consumer topic所在的组,可以设置为自己想要的名称
34 * * @param topics Map of (topic_name to numPartitions) to consume. Each partition is consumed
35 * * in its own thread
36 * * @param storageLevel Storage level to use for storing the received objects
37 * * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
38 */
39 val lineMap = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap)
40 lineMap.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
41 ssc.start()
42 ssc.awaitTermination()
43 }
44 }

3 提交到服器上运行

  如果生产中没有联网,需要使用  --jars 传入kafka的jar包


  • 把项目达成jar包

  • 使用local模式提交,提交的脚本:


提交到服务器上运行
.
/spark-submit --master local[2] /
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /
--class flume_streaming.kafkaReceiveWordCount /
/smallwild/app/SparkStreaming-1.0.jar /
hadoop:
2181 1 sparkStreaming 1

4 运行结果

  首先在控制台,启动kafka生产者,输入一些单词,然后,启动SparkStreaming程序。

技术图片

 

  

3)Direct方式整合

使用的是:Simple Consumer API,自己管理offset,把kfka看成存储数据的地方,根据offset去读。没有使用zk管理消费者的offset,spark自己管理,默认的offset在内存中,如果设置了checkpoint,那么也也有一份,一般要设置。Direct模式生成的Dstream中的RDD的并行度与读取的topic中的partition一致(增加topic的partition个数)

注意点:


  • 没有使用receive,直接查询的kafka偏移量

1 添加kafka依赖



org.apache.spark
spark-streaming-kafka-0-8_2.11
2.2.0

2 代码编写

技术图片技术图片

1 package kafka_streaming
2
3 import kafka.serializer.StringDecoder
4 import org.apache.spark.SparkConf
5 import org.apache.spark.streaming.{Durations, StreamingContext}
6 import org.apache.spark.streaming.kafka.KafkaUtils
7
8 /**
9 * @Author: SmallWild
10 * @Date: 2019/10/31 21:21
11 * @Desc:
12 */
13 object kafkaDirectWordCount {
14
15 def main(args: Array[String]): Unit = {
16 if (args.length != 2) {
17 System.err.println("错误参数")
18 System.exit(1)
19 }
20 //接收参数
21 //numPartitions 线程数
22 val Array(brokers, topics) = args
23 //一定不能使用local[1]
24 val sparkCOnf= new SparkConf().setMaster("local[2]").setAppName("kafkaDirectWordCount")
25 val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
26 //设置日志级别
27 ssc.sparkContext.setLogLevel("WARN")
28 //多个topic用,分开
29 val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers
30 )
31 val topicsa = topics.split(",").toSet
32 /**
33 *
34 */
35 val lineMap = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsa)
36 //TODO 业务逻辑,简单进行wordcount,输出到控制台
37 lineMap.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
38 ssc.start()
39 ssc.awaitTermination()
40 }
41
42 }


View Code

3 提交到服务器上运行和第一种方式是上面一样

4 自己管理offset

  使用spark自己管理offset方便,但是当业务逻辑改变的时候,恢复就难了,需要自己手动编写代码管理offset

4)总结

  注意两种模式差别,receive模式几乎被淘汰,可以扩展的地方,1)使程序具备高可用的能力,挂掉之后,能否从上次的状态恢复过来,2)手动管理offset,改变了业务逻辑也能从上次的状态恢复过来

  


推荐阅读
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • Kafka 是由 Apache 软件基金会开发的高性能分布式消息系统,支持高吞吐量的发布和订阅功能,主要使用 Scala 和 Java 编写。本文将深入解析 Kafka 的安装与配置过程,为程序员提供详尽的操作指南,涵盖从环境准备到集群搭建的每一个关键步骤。 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • 如何在Linux服务器上配置MySQL和Tomcat的开机自动启动
    在Linux服务器上部署Web项目时,通常需要确保MySQL和Tomcat服务能够随系统启动而自动运行。本文将详细介绍如何在Linux环境中配置MySQL和Tomcat的开机自启动,以确保服务的稳定性和可靠性。通过合理的配置,可以有效避免因服务未启动而导致的项目故障。 ... [详细]
  • 在PHP中如何正确调用JavaScript变量及定义PHP变量的方法详解 ... [详细]
  • 基于Net Core 3.0与Web API的前后端分离开发:Vue.js在前端的应用
    本文介绍了如何使用Net Core 3.0和Web API进行前后端分离开发,并重点探讨了Vue.js在前端的应用。后端采用MySQL数据库和EF Core框架进行数据操作,开发环境为Windows 10和Visual Studio 2019,MySQL服务器版本为8.0.16。文章详细描述了API项目的创建过程、启动步骤以及必要的插件安装,为开发者提供了一套完整的开发指南。 ... [详细]
  • 微信小程序实现类似微博的无限回复功能,内置云开发数据库支持
    本文详细介绍了如何利用微信小程序实现类似于微博的无限回复功能,并充分利用了微信云开发的数据库支持。文中不仅提供了关键代码片段,还包含了完整的页面代码,方便开发者按需使用。此外,HTML页面中包含了一些示例图片,开发者可以根据个人喜好进行替换。文章还将展示详细的数据库结构设计,帮助读者更好地理解和实现这一功能。 ... [详细]
  • 本文探讨了 Java 中 Pair 类的历史与现状。虽然 Java 标准库中没有内置的 Pair 类,但社区和第三方库提供了多种实现方式,如 Apache Commons 的 Pair 类和 JavaFX 的 javafx.util.Pair 类。这些实现为需要处理成对数据的开发者提供了便利。此外,文章还讨论了为何标准库未包含 Pair 类的原因,以及在现代 Java 开发中使用 Pair 类的最佳实践。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • 构建高可用性Spark分布式集群:大数据环境下的最佳实践
    在构建高可用性的Spark分布式集群过程中,确保所有节点之间的无密码登录是至关重要的一步。通过在每个节点上生成SSH密钥对(使用 `ssh-keygen -t rsa` 命令并保持默认设置),可以实现这一目标。此外,还需将生成的公钥分发到所有节点的 `~/.ssh/authorized_keys` 文件中,以确保节点间的无缝通信。为了进一步提升集群的稳定性和性能,建议采用负载均衡和故障恢复机制,并定期进行系统监控和维护。 ... [详细]
  • 分布式开源任务调度框架 TBSchedule 深度解析与应用实践
    本文深入解析了分布式开源任务调度框架 TBSchedule 的核心原理与应用场景,并通过实际案例详细介绍了其部署与使用方法。首先,从源码下载开始,详细阐述了 TBSchedule 的安装步骤和配置要点。接着,探讨了该框架在大规模分布式环境中的性能优化策略,以及如何通过灵活的任务调度机制提升系统效率。最后,结合具体实例,展示了 TBSchedule 在实际项目中的应用效果,为开发者提供了宝贵的实践经验。 ... [详细]
  • 如何使用 `org.opencb.opencga.core.results.VariantQueryResult.getSource()` 方法及其代码示例详解 ... [详细]
  • 深入解析:React与Webpack配置进阶指南(第二部分)
    在本篇进阶指南的第二部分中,我们将继续探讨 React 与 Webpack 的高级配置技巧。通过实际案例,我们将展示如何使用 React 和 Webpack 构建一个简单的 Todo 应用程序,具体包括 `TodoApp.js` 文件中的代码实现,如导入 React 和自定义组件 `TodoList`。此外,我们还将深入讲解 Webpack 配置文件的优化方法,以提升开发效率和应用性能。 ... [详细]
  • 投融资周报 | Circle 达成 4 亿美元融资协议,唯一艺术平台 A 轮融资超千万美元 ... [详细]
author-avatar
缅甸新葡京国际
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有