热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SparkStreaming集成Kafka详解

概述SparkStreaming支持多种输入源数据的读取,其中基本数据源有:FileSystem、Socketconnections;而

概述

Spark Streaming 支持多种输入源数据的读取,其中基本数据源有:File System、Socket connections;而高级数据源有:Kafka、Flume、Kinesis等。但是高级数据源需要额外依赖,而且不能在 Spark Shell 中测试这些高级数据源,如果想要在Spark Shell 中测试需要下载依赖到Spark 依赖库中。

关于读取Kafka 的方式,Spark Streaming 官方提供了两种方式:Receiver 和 Direct,此两种读取方式存在很大的不同,当然也各有优劣,接下来就让我们具体刨解这两种数据读取方式。此外,因为Kafka 在0.8 和0.10 之间引入了一个新的消费者API,因此有两个独立的Spark Streaming 包可用,根据功能来选择合适的包,0.8 版本兼容后来的0.9 和0.10,但是0.10 不向前兼容。官网地址:http://spark.apache.org/docs/2.1.2/streaming-kafka-integration.html

 


Receiver-based Approach

Spark 官方最先提供了基于 Receiver 的Kafka 数据消费模式。但会存在程序失败丢失数据的风险,在Spark 1.2 时引入了一个配置参数 spark.streaming.receiver.writeAheadLog.enable (WAL)以规避此风险。


 


Receiver-based 读取方式

Receiver-based 的Kafka 读取方式是基于Kafka 高阶API 来实现对Kafka 数据的消费。在提交Spark Streaming 任务后,Spark 集群会划出指定的Receivers 来专门、持续不断、异步读取Kafka 数据,读取时间间隔以及每次去读offsets 范围可以由参数来配置。读取的数据保存在Receiver 中,具体StorageLevel 方式由用户指定,入MEMORY_ONLY等。当driver 触发Batch 任务的时候,Receivers 中的数据会转移到剩余的Executors 中去执行。在执行完后,Receivers 会相应更新Zookeeper 的offsets。如要确保at least once 的读取方式,可以设置spark.streaming.receiver.writeAheadLog.enable为true。集体Receiver 执行流程如下图:

 


Receiver-based 读取实现

Kafka 的high-level 数据读取方式让用户可以专注于所读数据,而不用关注或维护consumer 的offsets,这减少用户的工作量以及代码量而且相对比较简单。因此,在刚开始引入Spark Streaming 计算引擎时,我们优先考虑采用此种方式来读取数据,具体代码如下:

# 添加依赖groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.1.2# 调用方法
import org.apache.spark.streaming.kafka._val kafkaStream = KafkaUtils.createStream(streamingContext,[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) # 注意
1. Kafka中的主题分区与Spark Streaming中生成的RDD分区无关。因此,增加KafkaUtils.createStream()中特定于主题的分区的数量只会增加使用单个接收器中使用的主题的线程数。它不会增加Spark在处理数据时的并行性。
2. 可以使用不同的组和主题创建多个Kafka输入DStream,以使用多个接收器并行接收数据。
3. 如果开启了WAL,可以使用KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)# 部署
对于Scala和Java应用程序,如果您使用SBT或Maven进行项目管理,则将spark-streaming-kafka-0-8_2.11及其依赖项打包到应用程序JAR中。确保spark-core_2.11和spark-streaming_2.11被标记为provider的依赖项,因为它们已存在于Spark安装中。然后使用spark-submit启动您的应用程序.
./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2 ...
或者
you can also download the JAR of the Maven artifact spark-streaming-kafka-0-8-assembly from the Maven repository and add it to spark-submit with --jars. # 参考资料
官方地址:http://spark.apache.org/docs/2.1.2/streaming-kafka-0-8-integration.html

 


Receiver-based 读取问题

为了防数据丢失,我们做了checkpoint 操作以及配置了spark.streaming.receiver.writeAheadLog.enable 参数,提高了receiver 的吞吐量。采用MEMORY_AND_DISK_SER 方式读取数据、提高单Receiver 的内存或事调大并行度,将数据分散到多个Receiver 中去,但是也是会出现各种情况的问题:


  • 配置spark.streaming.receiver.writeAheadLog.enable参数,每次处理之前需要将该batch内的日志备份到checkpoint目录中,这降低了数据处理效率,反过来又加重了Receiver端的压力;另外由于数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。
  • 采用MEMORY_AND_DISK_SER降低对内存的要求。但是在一定程度上影响计算的速度。
  • 单Receiver内存。由于receiver也是属于Executor的一部分,那么为了提高吞吐量,提高Receiver的内存。但是在每次batch计算中,参与计算的batch并不会使用到这么多的内存,导致资源严重浪费。
  • 提高并行度,采用多个Receiver来保存Kafka的数据。Receiver读取数据是异步的,并不参与计算。如果开较高的并行度来平衡吞吐量很不划算。
  • Receiver和计算的Executor的异步的,那么遇到网络等因素原因,导致计算出现延迟,计算队列一直在增加,而Receiver则在一直接收数据,这非常容易导致程序崩溃。
  • 在程序失败恢复时,有可能出现数据部分落地,但是程序失败,未更新offsets的情况,这导致数据重复消费。

 


Direct Approach(No Receivers)

区别与Receiver-based 的数据消费方法,Spark 官方在 1.3 时引入了Direct 方式的Kafka 消费方式。相对于Receiver-based 的方法,Direct 方式具有一下的优势:


  • 简化并行性
    • 无需创建多个输入Kafka流并将它们联合起来。使用directStream,Spark Streaming将创建与要使用的Kafka分区一样多的RDD分区,这些分区将并行地从Kafka读取数据。因此,Kafka和RDD分区之间存在一对一的映射,这更容易理解和调整。
  • 高效
    • 在第一种方法中实现零数据丢失需要将数据存储在Write Ahead Log中,这进一步复制了数据。这实际上是低效的,因为数据有效地被复制两次 - 一次由Kafka复制,第二次由Write Ahead Log复制。第二种方法消除了问题,因为没有接收器,因此不需要Write Ahead Logs。只要您有足够的Kafka保留,就可以从Kafka恢复消息。
  • 强一致性
    • 第一种方法使用Kafka的高级API在Zookeeper中存储消耗的偏移量。传统上,这是从Kafka使用数据的方式。虽然这种方法(与预写日志结合使用)可以确保零数据丢失(即至少一次语义),但某些记录在某些故障下可能会被消耗两次的可能性很小。这是因为Spark Streaming可靠接收的数据与Zookeeper跟踪的偏移之间存在不一致。因此,在第二种方法中,我们使用不使用Zookeeper的简单Kafka API。 Spark Streaming在其检查点内跟踪偏移量。这消除了Spark Streaming和Zookeeper / Kafka之间的不一致,因此尽管出现故障,Spark Streaming也会有效地接收每条记录一次。为了实现输出结果的一次性语义,将数据保存到外部数据存储的输出操作必须是幂等的,或者是保存结果和偏移的原子事务。

 


Direct 读取方式

Direct 方式采用Kafka 简单的 consumer API 方式来读取数据,无需经由Zookeeper,此种方式不再需要专门的Receiver 来持续不断的读取数据。当Batch 任务触发时,由Executor 读取数据,并参与到其他的Executor 的数据计算过程中去。driver 来决定读取多少offsets,并将offsets 交由checkpoint 来维护。当触发下次Batch 任务时,再由executor 读取Kafka 数据并计算。从此过程我们可以发现Direct 方式无需Receiver 读取数据,而是需要计算时在读取数据,所以Direct 方式的数据消费对内存的要求不高,只需要考虑批量计算所需要的内存即可;另外Batch 任务堆积时,也不会影响数据堆积。其具体读取方式如图:

 

Direct 去读实现

# 引入依赖groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.11
version = 2.1.2# 调用方法val kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092,anotherhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean)
)val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](streamingContext,PreferConsistent,Subscribe[String, String](topics, kafkaParams)
)stream.map(record => (record.key, record.value))# 参考地址:
官方地址:http://spark.apache.org/docs/2.1.2/streaming-kafka-0-10-integration.html

 


Direct 读取问题

Direct 方式与Receiver-based 方式相比,具有以下优势:


  • 降低资源
    • Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。 降低内存。Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。
  • 鲁棒性更好
    • Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。
  • 加大了开发成本
    • Direct需要用户采用checkpoint或者第三方存储来维护offsets,而不像Receiver-based那样,通过ZooKeeper来维护Offsets,此提高了用户的开发成本。
  • 不方便可视化监控
    • Receiver-based方式指定topic指定consumer的消费情况均能通过ZooKeeper来监控,而Direct则没有这种便利,如果做到监控并可视化,则需要投入人力开发

 


kafka offsets 管理方式

Spark Streaming 集成了 Kafka 允许用户从 Kafka 中读取一个或多个 topic 的数据。一个 Kafka topic 包含多个存储消息的分区(partition)。每个分区中的消息都是顺序存储,并且用 offset 来标记消息。开发者可以在 Spark Streaming 应用中通过offset 来控制数据的读取位置,此时就需要好的 offset 的管理机制。

Offsets 管理对于保证流式应用在整个生命周期中数据的连贯性式非常有益的。比如,在应用停止或者报错推出之前没有将 offset 保存在持久化数据库中,那么 offset rangges 就会丢失。进一步说,如果没有保存每个分区已经读取的 offset,那么 Spark Streaming 就没有办法从上次断开的位置继续读取消息。

上图描述通常的 Spark Streaming 应用管理 offset 流程。Offsets 可以通过多种方式来管理,但是一般都遵循下面的步骤:


  • 在 Direct DStream 初始化的说话,需要指定一个包含每个 topic 的每个分区的 offset 用于让 Direct DStream 从指定位置读取数据
  • 读取并处理数据
  • 处理完之后存储结果数据
  • 最后,将 offsets 保存在外部持久化数据库如 HBase,Kafka,HDFS,Zookeeper等

 


Spark Streming checkpoints

使用 Spark Streaming 的 checkpoint 是最简单的存储方式,并且在 Spark 框架中很容易实现。Spark Streaming checkpoint 就是为了保存应用状态而设计的,我们将路径设置在 HDFS 上,所以能够从失败中恢复数据。

对 Kafka Stream 执行 checkpoint 操作使得 offset 保存在 checkpoint 中,如果是应用挂掉的话,那么 Spark Streaming 应用功能可以从保存的 offset 中开始读取消息。但是,如果对 Spark Streaming 应用进行升级的话,不能 checkpoint 的数据没有使用,所以这种机制并不可靠,因此我们不推荐这种方式。

 


将 offsets 存储在 HBase 中

HBase 可以作为一个可靠的外部数据库来持久化 offsets。通过将 offsets 存储在外部系统中,Spark Streaming 应用功能能够重读或者回放任何仍然存储在 Kafka 中的数据。

根据 HBase 的设计模式,允许应用能够以 rowkey 和 column 的结构将过个 Spark Streaming 应用和多个 Kafka topic 存放在一张表格中。在这个例子中,表格以 topic 名称、消费者 group ID 和 Spark Streaming 的 batchTime.milliSeconds 作为 rowkey 以做唯一标示。尽管 batchTime.milliSeconds 不是必须的,但是它能够更好的展示历史的每批次的 offsets。表格将存储30天的积累数据,如果超出30天则会被移除。下面是创建表格的 DDL 和结构

DDL
create 'stream_kafka_offsets', {NAME=>'offsets', TTL=>2592000}RowKey Layout:
row: ::
column family: offsets
qualifier:
value:

 

具体代码实现如下:

import kafka.utils.ZkUtils
import org.apache.hadoop.hbase.filter.PrefixFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.client.{Scan, Put, ConnectionFactory}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.{OffsetRange, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}/*** Created by gmedasani on 6/10/17.*/
object KafkaOffsetsBlogStreamingDriver {def main(args: Array[String]) {if (args.length <6) {System.err.println("Usage: KafkaDirectStreamTest " &#43;" ")System.exit(1)}val batchDuration &#61; args(0)val bootstrapServers &#61; args(1).toStringval topicsSet &#61; args(2).toString.split(",").toSetval consumerGroupID &#61; args(3)val hbaseTableName &#61; args(4)val zkQuorum &#61; args(5)val zkKafkaRootDir &#61; "kafka"val zkSessionTimeOut &#61; 10000val zkConnectionTimeOut &#61; 10000val sparkConf &#61; new SparkConf().setAppName("Kafka-Offset-Management-Blog").setMaster("local[4]")//Uncomment this line to test while developing on a workstationval sc &#61; new SparkContext(sparkConf)val ssc &#61; new StreamingContext(sc, Seconds(batchDuration.toLong))val topics &#61; topicsSet.toArrayval topic &#61; topics(0)val kafkaParams &#61; Map[String, Object]("bootstrap.servers" -> bootstrapServers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> consumerGroupID,"auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))/*Create a dummy process that simply returns the message as is.*/def processMessage(message:ConsumerRecord[String,String]):ConsumerRecord[String,String]&#61;{message}/*Save Offsets into HBase*/def saveOffsets(TOPIC_NAME:String,GROUP_ID:String,offsetRanges:Array[OffsetRange],hbaseTableName:String,batchTime: org.apache.spark.streaming.Time) &#61;{val hbaseConf &#61; HBaseConfiguration.create()hbaseConf.addResource("src/main/resources/hbase-site.xml")val conn &#61; ConnectionFactory.createConnection(hbaseConf)val table &#61; conn.getTable(TableName.valueOf(hbaseTableName))val rowKey &#61; TOPIC_NAME &#43; ":" &#43; GROUP_ID &#43; ":" &#43; String.valueOf(batchTime.milliseconds)val put &#61; new Put(rowKey.getBytes)for(offset <- offsetRanges){put.addColumn(Bytes.toBytes("offsets"),Bytes.toBytes(offset.partition.toString),Bytes.toBytes(offset.untilOffset.toString))}table.put(put)conn.close()}/*Returns last committed offsets for all the partitions of a given topic from HBase in following cases.- CASE 1: SparkStreaming job is started for the first time. This function gets the number of topic partitions fromZookeeper and for each partition returns the last committed offset as 0- CASE 2: SparkStreaming is restarted and there are no changes to the number of partitions in a topic. Lastcommitted offsets for each topic-partition is returned as is from HBase.- CASE 3: SparkStreaming is restarted and the number of partitions in a topic increased. For old partitions, lastcommitted offsets for each topic-partition is returned as is from HBase as is. For newly added partitions,function returns last committed offsets as 0*/def getLastCommittedOffsets(TOPIC_NAME:String,GROUP_ID:String,hbaseTableName:String,zkQuorum:String,zkRootDir:String, sessionTimeout:Int,connectionTimeOut:Int):Map[TopicPartition,Long] &#61;{val hbaseConf &#61; HBaseConfiguration.create()hbaseConf.addResource("src/main/resources/hbase-site.xml")val zkUrl &#61; zkQuorum&#43;"/"&#43;zkRootDirval zkClientAndConnection &#61; ZkUtils.createZkClientAndConnection(zkUrl,sessionTimeout,connectionTimeOut)val zkUtils &#61; new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2,false)val zKNumberOfPartitionsForTopic &#61; zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME)).get(TOPIC_NAME).toList.head.size//Connect to HBase to retrieve last committed offsetsval conn &#61; ConnectionFactory.createConnection(hbaseConf)val table &#61; conn.getTable(TableName.valueOf(hbaseTableName))val startRow &#61; TOPIC_NAME &#43; ":" &#43; GROUP_ID &#43; ":" &#43; String.valueOf(System.currentTimeMillis())val stopRow &#61; TOPIC_NAME &#43; ":" &#43; GROUP_ID &#43; ":" &#43; 0val scan &#61; new Scan()val scanner &#61; table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(stopRow.getBytes).setReversed(true))val result &#61; scanner.next()var hbaseNumberOfPartitionsForTopic &#61; 0 //Set the number of partitions discovered for a topic in HBase to 0if (result !&#61; null){//If the result from hbase scanner is not null, set number of partitions from hbase to the number of cellshbaseNumberOfPartitionsForTopic &#61; result.listCells().size()}val fromOffsets &#61; collection.mutable.Map[TopicPartition,Long]()if(hbaseNumberOfPartitionsForTopic &#61;&#61; 0){// initialize fromOffsets to beginningfor (partition <- 0 to zKNumberOfPartitionsForTopic-1){fromOffsets &#43;&#61; (new TopicPartition(TOPIC_NAME,partition) -> 0)}} else if(zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic){// handle scenario where new partitions have been added to existing kafka topicfor (partition <- 0 to hbaseNumberOfPartitionsForTopic-1){val fromOffset &#61; Bytes.toString(result.getValue(Bytes.toBytes("offsets"),Bytes.toBytes(partition.toString)))fromOffsets &#43;&#61; (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)}for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic-1){fromOffsets &#43;&#61; (new TopicPartition(TOPIC_NAME,partition) -> 0)}} else {//initialize fromOffsets from last runfor (partition <- 0 to hbaseNumberOfPartitionsForTopic-1 ){val fromOffset &#61; Bytes.toString(result.getValue(Bytes.toBytes("offsets"),Bytes.toBytes(partition.toString)))fromOffsets &#43;&#61; (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)}}scanner.close()conn.close()fromOffsets.toMap}val fromOffsets&#61; getLastCommittedOffsets(topic,consumerGroupID,hbaseTableName,zkQuorum,zkKafkaRootDir,zkSessionTimeOut,zkConnectionTimeOut)val inputDStream &#61; KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets))/*For each RDD in a DStream apply a map transformation that processes the message.*/inputDStream.foreachRDD((rdd,batchTime) &#61;> {val offsetRanges &#61; rdd.asInstanceOf[HasOffsetRanges].offsetRangesoffsetRanges.foreach(offset &#61;> println(offset.topic, offset.partition, offset.fromOffset,offset.untilOffset))val newRDD &#61; rdd.map(message &#61;> processMessage(message))newRDD.count()saveOffsets(topic,consumerGroupID,offsetRanges,hbaseTableName,batchTime) //save the offsets to HBase})println("Number of messages processed " &#43; inputDStream.count())ssc.start()ssc.awaitTermination()}
}

 


将 offsets 存储到 Zookeeper 中

在 SparkStreaming 连接 Kafka 应用中使用 Zookeeper 来存储 offsets 也是一种比较可靠的方式。

在这个方案中&#xff0c;Spark Streaming 任务在启动时回去 Zookeeper 中读取每个分区的 offsets。如果有新的分区出现&#xff0c;那么他的 offset 将会设置在最开始的位置。在每批数据处理完之后&#xff0c;用户需要存储已处理数据的一个 offset 。此外&#xff0c;新消费者将使用跟旧的 Kafka 消费者 API 一样的格式将 offset 保存在 Zookeeper 中。因此&#xff0c;任何追踪或监控 Zookeeper 中 Kafka offset 的工具仍然生效。

package com.baidu.hec.zkimport kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils}
import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import scala.util.Try/*** Kafka的连接和Offset管理工具类** &#64;param zkHosts Zookeeper地址* &#64;param kafkaParams Kafka启动参数* &#64;author Leibniz*/
class KafkaManager(zkHosts: String, kafkaParams: Map[String, Object]) extends Serializable {//Logback日志对象&#xff0c;使用slf4j框架&#64;transient private lazy val log &#61; LoggerFactory.getLogger(getClass)//建立ZkUtils对象所需的参数val (zkClient, zkConnection) &#61; ZkUtils.createZkClientAndConnection(zkHosts, 3000, 3000)//ZkUtils对象&#xff0c;用于访问Zookeeperval zkUtils &#61; new ZkUtils(zkClient, zkConnection, false)/*** 包装createDirectStream方法&#xff0c;支持Kafka Offset&#xff0c;用于创建Kafka Streaming流** &#64;param ssc Spark Streaming Context* &#64;param topics Kafka话题* &#64;tparam K Kafka消息Key类型* &#64;tparam V Kafka消息Value类型* &#64;return Kafka Streaming流* &#64;author Leibniz*/def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] &#61; {val groupId &#61; kafkaParams("group.id").toStringval storedOffsets &#61; readOffsets(topics, groupId)log.info("Kafka消息偏移量汇总(格式:(话题,分区号,偏移量)):{}", storedOffsets.map(off &#61;> (off._1.topic, off._1.partition(), off._2)))val kafkaStream &#61; KafkaUtils.createDirectStream[K, V](ssc, PreferConsistent, ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, storedOffsets))kafkaStream}/*** 从Zookeeper读取Kafka消息队列的Offset** &#64;param topics Kafka话题* &#64;param groupId Kafka Group ID* &#64;return 返回一个Map[TopicPartition, Long]&#xff0c;记录每个话题每个Partition上的offset&#xff0c;如果还没消费&#xff0c;则offset为0* &#64;author Leibniz*/def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] &#61; {val topicPartOffsetMap &#61; collection.mutable.HashMap.empty[TopicPartition, Long]val partitionMap &#61; zkUtils.getPartitionsForTopics(topics)// /consumers//offsets//partitionMap.foreach(topicPartitions &#61;> {val zkGroupTopicDirs &#61; new ZKGroupTopicDirs(groupId, topicPartitions._1)topicPartitions._2.foreach(partition &#61;> {val offsetPath &#61; zkGroupTopicDirs.consumerOffsetDir &#43; "/" &#43; partitionval tryGetKafkaOffset &#61; Try {val offsetStatTuple &#61; zkUtils.readData(offsetPath)if (offsetStatTuple !&#61; null) {log.info("查询Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong)}}if(tryGetKafkaOffset.isFailure){//http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.htmlval consumer &#61; new KafkaConsumer[String, Object](kafkaParams)val partitionList &#61; List(new TopicPartition(topicPartitions._1, partition))consumer.assign(partitionList)val minAvailableOffset &#61; consumer.beginningOffsets(partitionList).values.headconsumer.close()log.warn("查询Kafka消息偏移量详情: 没有上一次的ZK节点:{}, 话题:{}, 分区:{}, ZK节点路径:{}, 使用最小可用偏移量:{}", Seq[AnyRef](tryGetKafkaOffset.failed.get.getMessage, topicPartitions._1, partition.toString, offsetPath, minAvailableOffset): _*)topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), minAvailableOffset)}})})topicPartOffsetMap.toMap}/*** 保存Kafka消息队列消费的Offset** &#64;param rdd SparkStreaming的Kafka RDD&#xff0c;RDD[ConsumerRecord[K, V]]* &#64;param storeEndOffset true&#61;保存结束offset&#xff0c; false&#61;保存起始offset* &#64;author Leibniz*/def persistOffsets[K, V](rdd: RDD[ConsumerRecord[K, V]], storeEndOffset: Boolean &#61; true): Unit &#61; {val groupId &#61; kafkaParams("group.id").toStringval offsetsList &#61; rdd.asInstanceOf[HasOffsetRanges].offsetRangesoffsetsList.foreach(or &#61;> {val zkGroupTopicDirs &#61; new ZKGroupTopicDirs(groupId, or.topic)val offsetPath &#61; zkGroupTopicDirs.consumerOffsetDir &#43; "/" &#43; or.partitionval offsetVal &#61; if (storeEndOffset) or.untilOffset else or.fromOffsetzkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir &#43; "/" &#43; or.partition, offsetVal &#43; "" /*, JavaConversions.bufferAsJavaList(acls)*/)log.info("保存Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)})}
}

 

package com.baidu.hec.zkimport com.alibaba.fastjson.JSON
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}object Test {def main(args: Array[String]): Unit &#61; {val zkHosts &#61; "0.0.0.0:1181"val kafkaParams &#61; Map[String, Object]("auto.offset.reset" -> "latest","bootstrap.servers" -> "0.0.0.0:9092","group.id" -> "test_topic_group","enable.auto.commit" -> (false: java.lang.Boolean), //禁用自动提交Offset&#xff0c;否则可能没正常消费完就提交了&#xff0c;造成数据错误"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer])val km &#61; new KafkaManager(zkHosts, kafkaParams)val sparkConf &#61; new SparkConf().setAppName("SearchContentAnalysis").setMaster("local[2]")val ssc &#61; new StreamingContext(sparkConf, Seconds(60))val topics &#61; Array("test-topic")val stream: InputDStream[ConsumerRecord[String, String]] &#61; km.createDirectStream(ssc, topics)stream.foreachRDD(rdd &#61;> {val message &#61; rdd.map(recode &#61;> recode.value())if(!message.isEmpty()) {val spark &#61; SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)import spark.implicits._val dataFrame &#61; message.map(data &#61;>{val jsonData &#61; JSON.parseObject(data)val content &#61; jsonData.getString("content")val userId &#61; jsonData.getLong("userId")val createTime &#61; jsonData.getLong("createTime")Record(content, userId, createTime)}).toDF()// val dataFrame &#61; message.toDF()dataFrame.createOrReplaceTempView("temp_search_table")spark.sql("select * from temp_search_table").show(100)km.persistOffsets(rdd)printf("******************************************")}})ssc.start()ssc.awaitTermination()}case class Record(content: String, userId: Long, createTime: Long)object SparkSessionSingleton {&#64;transient private var instance: SparkSession &#61; _def getInstance(sparkConf: SparkConf): SparkSession &#61; {if (instance &#61;&#61; null) {instance &#61; SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()}instance}}}

 


Kafka 本身

Apache Spark 2.1.x以及spark-streaming-kafka-0-10使用新的的消费者API即异步提交API。你可以在你确保你处理后的数据已经妥善保存之后使用commitAsync API&#xff08;异步提交 API&#xff09;来向Kafka提交offsets。新的消费者API会以消费者组id作为唯一标识来提交offsets。

将 offsets 提交到 Kafka 中&#xff1a;

stream.foreachRDD { rdd &#61;>val offsetRanges &#61; rdd.asInstanceOf[HasOffsetRanges].offsetRanges// some time later, after outputs have completedstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}

 

但是该中方式在测试中&#xff0c;无法根据 offset 来消费消息。

参考&#xff1a;https://juejin.im/entry/5acd7224f265da237c693f7d


推荐阅读
  • 我正在使用sql-serverkafka-connect和debezium监视sqlserver数据库,但是当我发布并运行我的wo ... [详细]
  • Apache Shiro 身份验证绕过漏洞 (CVE202011989) 详细解析及防范措施
    本文详细解析了Apache Shiro 身份验证绕过漏洞 (CVE202011989) 的原理和影响,并提供了相应的防范措施。Apache Shiro 是一个强大且易用的Java安全框架,常用于执行身份验证、授权、密码和会话管理。在Apache Shiro 1.5.3之前的版本中,与Spring控制器一起使用时,存在特制请求可能导致身份验证绕过的漏洞。本文还介绍了该漏洞的具体细节,并给出了防范该漏洞的建议措施。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • 本文总结了初学者在使用dubbo设计架构过程中遇到的问题,并提供了相应的解决方法。问题包括传输字节流限制、分布式事务、序列化、多点部署、zk端口冲突、服务失败请求3次机制以及启动时检查。通过解决这些问题,初学者能够更好地理解和应用dubbo设计架构。 ... [详细]
  • Hadoop源码解析1Hadoop工程包架构解析
    1 Hadoop中各工程包依赖简述   Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。   GoogleCluster:ht ... [详细]
  • 开发笔记:读《分布式一致性原理》JAVA客户端API操作2
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了读《分布式一致性原理》JAVA客户端API操作2相关的知识,希望对你有一定的参考价值。创 ... [详细]
  • ConsumerConfiguration在kafka0.9使用JavaConsumer替代了老版本的scalaConsumer。新版的配置如下:bootstrap. ... [详细]
  • GetWindowLong函数
    今天在看一个代码里头写了GetWindowLong(hwnd,0),我当时就有点费解,靠,上网搜索函数原型说明,死活找不到第 ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • HDU 2372 El Dorado(DP)的最长上升子序列长度求解方法
    本文介绍了解决HDU 2372 El Dorado问题的一种动态规划方法,通过循环k的方式求解最长上升子序列的长度。具体实现过程包括初始化dp数组、读取数列、计算最长上升子序列长度等步骤。 ... [详细]
  • 本文介绍了Linux系统中正则表达式的基础知识,包括正则表达式的简介、字符分类、普通字符和元字符的区别,以及在学习过程中需要注意的事项。同时提醒读者要注意正则表达式与通配符的区别,并给出了使用正则表达式时的一些建议。本文适合初学者了解Linux系统中的正则表达式,并提供了学习的参考资料。 ... [详细]
  • 2018深入java目标计划及学习内容
    本文介绍了作者在2018年的深入java目标计划,包括学习计划和工作中要用到的内容。作者计划学习的内容包括kafka、zookeeper、hbase、hdoop、spark、elasticsearch、solr、spring cloud、mysql、mybatis等。其中,作者对jvm的学习有一定了解,并计划通读《jvm》一书。此外,作者还提到了《HotSpot实战》和《高性能MySQL》等书籍。 ... [详细]
  • 我们在之前的文章中已经初步介绍了Cloudera。hadoop基础----hadoop实战(零)-----hadoop的平台版本选择从版本选择这篇文章中我们了解到除了hadoop官方版本外很多 ... [详细]
  • python zookeeeper 学习和操作
    1.zookeeeper介绍ZooKeeper是一个为分布式应用所设计的分布的、开源的协调服务,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,简化分布式应用协调及其管理的 ... [详细]
  • Zookeeper为分布式环境提供灵活的协调基础架构。ZooKeeper框架支持许多当今最好的工业应用程序。我们将在本章中讨论ZooKeeper的一些最显着的应用。雅虎ZooKee ... [详细]
author-avatar
lily的思念
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有