Spark Streaming 支持多种输入源数据的读取,其中基本数据源有:File System、Socket connections;而高级数据源有:Kafka、Flume、Kinesis等。但是高级数据源需要额外依赖,而且不能在 Spark Shell 中测试这些高级数据源,如果想要在Spark Shell 中测试需要下载依赖到Spark 依赖库中。
关于读取Kafka 的方式,Spark Streaming 官方提供了两种方式:Receiver 和 Direct,此两种读取方式存在很大的不同,当然也各有优劣,接下来就让我们具体刨解这两种数据读取方式。此外,因为Kafka 在0.8 和0.10 之间引入了一个新的消费者API,因此有两个独立的Spark Streaming 包可用,根据功能来选择合适的包,0.8 版本兼容后来的0.9 和0.10,但是0.10 不向前兼容。官网地址:http://spark.apache.org/docs/2.1.2/streaming-kafka-integration.html
Spark 官方最先提供了基于 Receiver 的Kafka 数据消费模式。但会存在程序失败丢失数据的风险,在Spark 1.2 时引入了一个配置参数 spark.streaming.receiver.writeAheadLog.enable (WAL)以规避此风险。
Receiver-based 的Kafka 读取方式是基于Kafka 高阶API 来实现对Kafka 数据的消费。在提交Spark Streaming 任务后,Spark 集群会划出指定的Receivers 来专门、持续不断、异步读取Kafka 数据,读取时间间隔以及每次去读offsets 范围可以由参数来配置。读取的数据保存在Receiver 中,具体StorageLevel 方式由用户指定,入MEMORY_ONLY等。当driver 触发Batch 任务的时候,Receivers 中的数据会转移到剩余的Executors 中去执行。在执行完后,Receivers 会相应更新Zookeeper 的offsets。如要确保at least once 的读取方式,可以设置spark.streaming.receiver.writeAheadLog.enable为true。集体Receiver 执行流程如下图:
Kafka 的high-level 数据读取方式让用户可以专注于所读数据,而不用关注或维护consumer 的offsets,这减少用户的工作量以及代码量而且相对比较简单。因此,在刚开始引入Spark Streaming 计算引擎时,我们优先考虑采用此种方式来读取数据,具体代码如下:
# 添加依赖groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.1.2# 调用方法
import org.apache.spark.streaming.kafka._val kafkaStream = KafkaUtils.createStream(streamingContext,[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) # 注意
1. Kafka中的主题分区与Spark Streaming中生成的RDD分区无关。因此,增加KafkaUtils.createStream()中特定于主题的分区的数量只会增加使用单个接收器中使用的主题的线程数。它不会增加Spark在处理数据时的并行性。
2. 可以使用不同的组和主题创建多个Kafka输入DStream,以使用多个接收器并行接收数据。
3. 如果开启了WAL,可以使用KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)# 部署
对于Scala和Java应用程序,如果您使用SBT或Maven进行项目管理,则将spark-streaming-kafka-0-8_2.11及其依赖项打包到应用程序JAR中。确保spark-core_2.11和spark-streaming_2.11被标记为provider的依赖项,因为它们已存在于Spark安装中。然后使用spark-submit启动您的应用程序.
./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2 ...
或者
you can also download the JAR of the Maven artifact spark-streaming-kafka-0-8-assembly from the Maven repository and add it to spark-submit with --jars. # 参考资料
官方地址:http://spark.apache.org/docs/2.1.2/streaming-kafka-0-8-integration.html
为了防数据丢失,我们做了checkpoint 操作以及配置了spark.streaming.receiver.writeAheadLog.enable 参数,提高了receiver 的吞吐量。采用MEMORY_AND_DISK_SER 方式读取数据、提高单Receiver 的内存或事调大并行度,将数据分散到多个Receiver 中去,但是也是会出现各种情况的问题:
区别与Receiver-based 的数据消费方法,Spark 官方在 1.3 时引入了Direct 方式的Kafka 消费方式。相对于Receiver-based 的方法,Direct 方式具有一下的优势:
Direct 方式采用Kafka 简单的 consumer API 方式来读取数据,无需经由Zookeeper,此种方式不再需要专门的Receiver 来持续不断的读取数据。当Batch 任务触发时,由Executor 读取数据,并参与到其他的Executor 的数据计算过程中去。driver 来决定读取多少offsets,并将offsets 交由checkpoint 来维护。当触发下次Batch 任务时,再由executor 读取Kafka 数据并计算。从此过程我们可以发现Direct 方式无需Receiver 读取数据,而是需要计算时在读取数据,所以Direct 方式的数据消费对内存的要求不高,只需要考虑批量计算所需要的内存即可;另外Batch 任务堆积时,也不会影响数据堆积。其具体读取方式如图:
Direct 去读实现
# 引入依赖groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.11
version = 2.1.2# 调用方法val kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092,anotherhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean)
)val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](streamingContext,PreferConsistent,Subscribe[String, String](topics, kafkaParams)
)stream.map(record => (record.key, record.value))# 参考地址:
官方地址:http://spark.apache.org/docs/2.1.2/streaming-kafka-0-10-integration.html
Direct 方式与Receiver-based 方式相比,具有以下优势:
Spark Streaming 集成了 Kafka 允许用户从 Kafka 中读取一个或多个 topic 的数据。一个 Kafka topic 包含多个存储消息的分区(partition)。每个分区中的消息都是顺序存储,并且用 offset 来标记消息。开发者可以在 Spark Streaming 应用中通过offset 来控制数据的读取位置,此时就需要好的 offset 的管理机制。
Offsets 管理对于保证流式应用在整个生命周期中数据的连贯性式非常有益的。比如,在应用停止或者报错推出之前没有将 offset 保存在持久化数据库中,那么 offset rangges 就会丢失。进一步说,如果没有保存每个分区已经读取的 offset,那么 Spark Streaming 就没有办法从上次断开的位置继续读取消息。
上图描述通常的 Spark Streaming 应用管理 offset 流程。Offsets 可以通过多种方式来管理,但是一般都遵循下面的步骤:
使用 Spark Streaming 的 checkpoint 是最简单的存储方式,并且在 Spark 框架中很容易实现。Spark Streaming checkpoint 就是为了保存应用状态而设计的,我们将路径设置在 HDFS 上,所以能够从失败中恢复数据。
对 Kafka Stream 执行 checkpoint 操作使得 offset 保存在 checkpoint 中,如果是应用挂掉的话,那么 Spark Streaming 应用功能可以从保存的 offset 中开始读取消息。但是,如果对 Spark Streaming 应用进行升级的话,不能 checkpoint 的数据没有使用,所以这种机制并不可靠,因此我们不推荐这种方式。
HBase 可以作为一个可靠的外部数据库来持久化 offsets。通过将 offsets 存储在外部系统中,Spark Streaming 应用功能能够重读或者回放任何仍然存储在 Kafka 中的数据。
根据 HBase 的设计模式,允许应用能够以 rowkey 和 column 的结构将过个 Spark Streaming 应用和多个 Kafka topic 存放在一张表格中。在这个例子中,表格以 topic 名称、消费者 group ID 和 Spark Streaming 的 batchTime.milliSeconds 作为 rowkey 以做唯一标示。尽管 batchTime.milliSeconds 不是必须的,但是它能够更好的展示历史的每批次的 offsets。表格将存储30天的积累数据,如果超出30天则会被移除。下面是创建表格的 DDL 和结构
DDL
create 'stream_kafka_offsets', {NAME=>'offsets', TTL=>2592000}RowKey Layout:
row:
column family: offsets
qualifier:
value:
具体代码实现如下:
import kafka.utils.ZkUtils
import org.apache.hadoop.hbase.filter.PrefixFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.client.{Scan, Put, ConnectionFactory}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.{OffsetRange, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}/*** Created by gmedasani on 6/10/17.*/
object KafkaOffsetsBlogStreamingDriver {def main(args: Array[String]) {if (args.length <6) {System.err.println("Usage: KafkaDirectStreamTest
}
在 SparkStreaming 连接 Kafka 应用中使用 Zookeeper 来存储 offsets 也是一种比较可靠的方式。
在这个方案中&#xff0c;Spark Streaming 任务在启动时回去 Zookeeper 中读取每个分区的 offsets。如果有新的分区出现&#xff0c;那么他的 offset 将会设置在最开始的位置。在每批数据处理完之后&#xff0c;用户需要存储已处理数据的一个 offset 。此外&#xff0c;新消费者将使用跟旧的 Kafka 消费者 API 一样的格式将 offset 保存在 Zookeeper 中。因此&#xff0c;任何追踪或监控 Zookeeper 中 Kafka offset 的工具仍然生效。
package com.baidu.hec.zkimport kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils}
import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import scala.util.Try/*** Kafka的连接和Offset管理工具类** &#64;param zkHosts Zookeeper地址* &#64;param kafkaParams Kafka启动参数* &#64;author Leibniz*/
class KafkaManager(zkHosts: String, kafkaParams: Map[String, Object]) extends Serializable {//Logback日志对象&#xff0c;使用slf4j框架&#64;transient private lazy val log &#61; LoggerFactory.getLogger(getClass)//建立ZkUtils对象所需的参数val (zkClient, zkConnection) &#61; ZkUtils.createZkClientAndConnection(zkHosts, 3000, 3000)//ZkUtils对象&#xff0c;用于访问Zookeeperval zkUtils &#61; new ZkUtils(zkClient, zkConnection, false)/*** 包装createDirectStream方法&#xff0c;支持Kafka Offset&#xff0c;用于创建Kafka Streaming流** &#64;param ssc Spark Streaming Context* &#64;param topics Kafka话题* &#64;tparam K Kafka消息Key类型* &#64;tparam V Kafka消息Value类型* &#64;return Kafka Streaming流* &#64;author Leibniz*/def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] &#61; {val groupId &#61; kafkaParams("group.id").toStringval storedOffsets &#61; readOffsets(topics, groupId)log.info("Kafka消息偏移量汇总(格式:(话题,分区号,偏移量)):{}", storedOffsets.map(off &#61;> (off._1.topic, off._1.partition(), off._2)))val kafkaStream &#61; KafkaUtils.createDirectStream[K, V](ssc, PreferConsistent, ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, storedOffsets))kafkaStream}/*** 从Zookeeper读取Kafka消息队列的Offset** &#64;param topics Kafka话题* &#64;param groupId Kafka Group ID* &#64;return 返回一个Map[TopicPartition, Long]&#xff0c;记录每个话题每个Partition上的offset&#xff0c;如果还没消费&#xff0c;则offset为0* &#64;author Leibniz*/def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] &#61; {val topicPartOffsetMap &#61; collection.mutable.HashMap.empty[TopicPartition, Long]val partitionMap &#61; zkUtils.getPartitionsForTopics(topics)// /consumers/
}
package com.baidu.hec.zkimport com.alibaba.fastjson.JSON
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}object Test {def main(args: Array[String]): Unit &#61; {val zkHosts &#61; "0.0.0.0:1181"val kafkaParams &#61; Map[String, Object]("auto.offset.reset" -> "latest","bootstrap.servers" -> "0.0.0.0:9092","group.id" -> "test_topic_group","enable.auto.commit" -> (false: java.lang.Boolean), //禁用自动提交Offset&#xff0c;否则可能没正常消费完就提交了&#xff0c;造成数据错误"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer])val km &#61; new KafkaManager(zkHosts, kafkaParams)val sparkConf &#61; new SparkConf().setAppName("SearchContentAnalysis").setMaster("local[2]")val ssc &#61; new StreamingContext(sparkConf, Seconds(60))val topics &#61; Array("test-topic")val stream: InputDStream[ConsumerRecord[String, String]] &#61; km.createDirectStream(ssc, topics)stream.foreachRDD(rdd &#61;> {val message &#61; rdd.map(recode &#61;> recode.value())if(!message.isEmpty()) {val spark &#61; SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)import spark.implicits._val dataFrame &#61; message.map(data &#61;>{val jsonData &#61; JSON.parseObject(data)val content &#61; jsonData.getString("content")val userId &#61; jsonData.getLong("userId")val createTime &#61; jsonData.getLong("createTime")Record(content, userId, createTime)}).toDF()// val dataFrame &#61; message.toDF()dataFrame.createOrReplaceTempView("temp_search_table")spark.sql("select * from temp_search_table").show(100)km.persistOffsets(rdd)printf("******************************************")}})ssc.start()ssc.awaitTermination()}case class Record(content: String, userId: Long, createTime: Long)object SparkSessionSingleton {&#64;transient private var instance: SparkSession &#61; _def getInstance(sparkConf: SparkConf): SparkSession &#61; {if (instance &#61;&#61; null) {instance &#61; SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()}instance}}}
Apache Spark 2.1.x以及spark-streaming-kafka-0-10使用新的的消费者API即异步提交API。你可以在你确保你处理后的数据已经妥善保存之后使用commitAsync API&#xff08;异步提交 API&#xff09;来向Kafka提交offsets。新的消费者API会以消费者组id作为唯一标识来提交offsets。
将 offsets 提交到 Kafka 中&#xff1a;
stream.foreachRDD { rdd &#61;>val offsetRanges &#61; rdd.asInstanceOf[HasOffsetRanges].offsetRanges// some time later, after outputs have completedstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
但是该中方式在测试中&#xff0c;无法根据 offset 来消费消息。
参考&#xff1a;https://juejin.im/entry/5acd7224f265da237c693f7d