作者:缅甸新葡京国际 | 来源:互联网 | 2023-08-24 11:34
0)摘要主要介绍了SparkStreaming整合Kafka,两种整合方式:Receiver-based和Direct方式。这里使用的是Kafkabrokervers
0)摘要
主要介绍了Spark Streaming整合Kafka,两种整合方式:Receiver-based和Direct方式。这里使用的是Kafka broker version 0.8.2.1,官方文档地址:(http://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html)。
1)Kafka准备
启动生产者脚本:
./kafka-console-producer.sh --broker-list hadoop:9092 --topic test
启动消费者脚本:
./kafka-console-consumer.sh --zookeeper hadoop:2181 --topic test --from-beginning
准备工作已经就绪。
2)Receiver-based方式整合
注意:这种方式为了保证数据不会丢失,需要开启Write Ahead Logs机制,开启后,接收数据的正确性只有被预写到日志以后Receive才会确认,可以从日志中恢复数据,会增加额外的开销。如何开启?设置SparkConf的“Spark Streaming writeAheadLog.enable”属性为“true”,这种模式基本被淘汰
1 添加kafka依赖
org.apache.spark
spark-streaming-kafka-0-8_2.11
2.2.0
2 本地代码编写
1 package flume_streaming
2
3 import org.apache.spark.SparkConf
4 import org.apache.spark.streaming.kafka._
5 import org.apache.spark.streaming.{Durations, StreamingContext}
6
7 /**
8 * @Author: SmallWild
9 * @Date: 2019/10/30 10:00
10 * @Desc:
11 */
12
13 object kafkaReceiveWordCount {
14 def main(args: Array[String]): Unit = {
15 if (args.length != 4) {
16 System.err.println("错误参数")
17 System.exit(1)
18 }
19 //接收参数
20 //numPartitions 线程数
21 val Array(zkQuorum, groupId, topics, numPartitions) = args
22 //一定不能使用local[1]
23 val sparkCOnf= new SparkConf().setMaster("local[2]").setAppName("kafkaReceiveWordCount")
24 val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
25 //设置日志级别
26 ssc.sparkContext.setLogLevel("WARN")
27 //多个topic用,分开
28 val topicMap = topics.split(",").map((_, numPartitions.toInt)).toMap
29 //TODO 业务逻辑,简单进行wordcount,输出到控制台
30 /**
31 * * @param ssc StreamingContext object
32 * * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
33 * * @param groupId The group id for this consumer topic所在的组,可以设置为自己想要的名称
34 * * @param topics Map of (topic_name to numPartitions) to consume. Each partition is consumed
35 * * in its own thread
36 * * @param storageLevel Storage level to use for storing the received objects
37 * * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
38 */
39 val lineMap = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap)
40 lineMap.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
41 ssc.start()
42 ssc.awaitTermination()
43 }
44 }
3 提交到服器上运行
如果生产中没有联网,需要使用 --jars 传入kafka的jar包
- 把项目达成jar包
- 使用local模式提交,提交的脚本:
提交到服务器上运行
./spark-submit --master local[2] /
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /
--class flume_streaming.kafkaReceiveWordCount /
/smallwild/app/SparkStreaming-1.0.jar /
hadoop:2181 1 sparkStreaming 1
4 运行结果
首先在控制台,启动kafka生产者,输入一些单词,然后,启动SparkStreaming程序。
3)Direct方式整合
使用的是:Simple Consumer API,自己管理offset,把kfka看成存储数据的地方,根据offset去读。没有使用zk管理消费者的offset,spark自己管理,默认的offset在内存中,如果设置了checkpoint,那么也也有一份,一般要设置。Direct模式生成的Dstream中的RDD的并行度与读取的topic中的partition一致(增加topic的partition个数)
注意点:
- 没有使用receive,直接查询的kafka偏移量
1 添加kafka依赖
org.apache.spark
spark-streaming-kafka-0-8_2.11
2.2.0
2 代码编写
1 package kafka_streaming
2
3 import kafka.serializer.StringDecoder
4 import org.apache.spark.SparkConf
5 import org.apache.spark.streaming.{Durations, StreamingContext}
6 import org.apache.spark.streaming.kafka.KafkaUtils
7
8 /**
9 * @Author: SmallWild
10 * @Date: 2019/10/31 21:21
11 * @Desc:
12 */
13 object kafkaDirectWordCount {
14
15 def main(args: Array[String]): Unit = {
16 if (args.length != 2) {
17 System.err.println("错误参数")
18 System.exit(1)
19 }
20 //接收参数
21 //numPartitions 线程数
22 val Array(brokers, topics) = args
23 //一定不能使用local[1]
24 val sparkCOnf= new SparkConf().setMaster("local[2]").setAppName("kafkaDirectWordCount")
25 val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
26 //设置日志级别
27 ssc.sparkContext.setLogLevel("WARN")
28 //多个topic用,分开
29 val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers
30 )
31 val topicsa = topics.split(",").toSet
32 /**
33 *
34 */
35 val lineMap = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsa)
36 //TODO 业务逻辑,简单进行wordcount,输出到控制台
37 lineMap.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
38 ssc.start()
39 ssc.awaitTermination()
40 }
41
42 }
View Code3 提交到服务器上运行和第一种方式是上面一样
4 自己管理offset
使用spark自己管理offset方便,但是当业务逻辑改变的时候,恢复就难了,需要自己手动编写代码管理offset
4)总结
注意两种模式差别,receive模式几乎被淘汰,可以扩展的地方,1)使程序具备高可用的能力,挂掉之后,能否从上次的状态恢复过来,2)手动管理offset,改变了业务逻辑也能从上次的状态恢复过来