热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SparkStreaming集成Kafka详解

概述SparkStreaming支持多种输入源数据的读取,其中基本数据源有:FileSystem、Socketconnections;而

概述

Spark Streaming 支持多种输入源数据的读取,其中基本数据源有:File System、Socket connections;而高级数据源有:Kafka、Flume、Kinesis等。但是高级数据源需要额外依赖,而且不能在 Spark Shell 中测试这些高级数据源,如果想要在Spark Shell 中测试需要下载依赖到Spark 依赖库中。

关于读取Kafka 的方式,Spark Streaming 官方提供了两种方式:Receiver 和 Direct,此两种读取方式存在很大的不同,当然也各有优劣,接下来就让我们具体刨解这两种数据读取方式。此外,因为Kafka 在0.8 和0.10 之间引入了一个新的消费者API,因此有两个独立的Spark Streaming 包可用,根据功能来选择合适的包,0.8 版本兼容后来的0.9 和0.10,但是0.10 不向前兼容。官网地址:http://spark.apache.org/docs/2.1.2/streaming-kafka-integration.html

 


Receiver-based Approach

Spark 官方最先提供了基于 Receiver 的Kafka 数据消费模式。但会存在程序失败丢失数据的风险,在Spark 1.2 时引入了一个配置参数 spark.streaming.receiver.writeAheadLog.enable (WAL)以规避此风险。


 


Receiver-based 读取方式

Receiver-based 的Kafka 读取方式是基于Kafka 高阶API 来实现对Kafka 数据的消费。在提交Spark Streaming 任务后,Spark 集群会划出指定的Receivers 来专门、持续不断、异步读取Kafka 数据,读取时间间隔以及每次去读offsets 范围可以由参数来配置。读取的数据保存在Receiver 中,具体StorageLevel 方式由用户指定,入MEMORY_ONLY等。当driver 触发Batch 任务的时候,Receivers 中的数据会转移到剩余的Executors 中去执行。在执行完后,Receivers 会相应更新Zookeeper 的offsets。如要确保at least once 的读取方式,可以设置spark.streaming.receiver.writeAheadLog.enable为true。集体Receiver 执行流程如下图:

 


Receiver-based 读取实现

Kafka 的high-level 数据读取方式让用户可以专注于所读数据,而不用关注或维护consumer 的offsets,这减少用户的工作量以及代码量而且相对比较简单。因此,在刚开始引入Spark Streaming 计算引擎时,我们优先考虑采用此种方式来读取数据,具体代码如下:

# 添加依赖groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.1.2# 调用方法
import org.apache.spark.streaming.kafka._val kafkaStream = KafkaUtils.createStream(streamingContext,[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) # 注意
1. Kafka中的主题分区与Spark Streaming中生成的RDD分区无关。因此,增加KafkaUtils.createStream()中特定于主题的分区的数量只会增加使用单个接收器中使用的主题的线程数。它不会增加Spark在处理数据时的并行性。
2. 可以使用不同的组和主题创建多个Kafka输入DStream,以使用多个接收器并行接收数据。
3. 如果开启了WAL,可以使用KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)# 部署
对于Scala和Java应用程序,如果您使用SBT或Maven进行项目管理,则将spark-streaming-kafka-0-8_2.11及其依赖项打包到应用程序JAR中。确保spark-core_2.11和spark-streaming_2.11被标记为provider的依赖项,因为它们已存在于Spark安装中。然后使用spark-submit启动您的应用程序.
./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2 ...
或者
you can also download the JAR of the Maven artifact spark-streaming-kafka-0-8-assembly from the Maven repository and add it to spark-submit with --jars. # 参考资料
官方地址:http://spark.apache.org/docs/2.1.2/streaming-kafka-0-8-integration.html

 


Receiver-based 读取问题

为了防数据丢失,我们做了checkpoint 操作以及配置了spark.streaming.receiver.writeAheadLog.enable 参数,提高了receiver 的吞吐量。采用MEMORY_AND_DISK_SER 方式读取数据、提高单Receiver 的内存或事调大并行度,将数据分散到多个Receiver 中去,但是也是会出现各种情况的问题:


  • 配置spark.streaming.receiver.writeAheadLog.enable参数,每次处理之前需要将该batch内的日志备份到checkpoint目录中,这降低了数据处理效率,反过来又加重了Receiver端的压力;另外由于数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。
  • 采用MEMORY_AND_DISK_SER降低对内存的要求。但是在一定程度上影响计算的速度。
  • 单Receiver内存。由于receiver也是属于Executor的一部分,那么为了提高吞吐量,提高Receiver的内存。但是在每次batch计算中,参与计算的batch并不会使用到这么多的内存,导致资源严重浪费。
  • 提高并行度,采用多个Receiver来保存Kafka的数据。Receiver读取数据是异步的,并不参与计算。如果开较高的并行度来平衡吞吐量很不划算。
  • Receiver和计算的Executor的异步的,那么遇到网络等因素原因,导致计算出现延迟,计算队列一直在增加,而Receiver则在一直接收数据,这非常容易导致程序崩溃。
  • 在程序失败恢复时,有可能出现数据部分落地,但是程序失败,未更新offsets的情况,这导致数据重复消费。

 


Direct Approach(No Receivers)

区别与Receiver-based 的数据消费方法,Spark 官方在 1.3 时引入了Direct 方式的Kafka 消费方式。相对于Receiver-based 的方法,Direct 方式具有一下的优势:


  • 简化并行性
    • 无需创建多个输入Kafka流并将它们联合起来。使用directStream,Spark Streaming将创建与要使用的Kafka分区一样多的RDD分区,这些分区将并行地从Kafka读取数据。因此,Kafka和RDD分区之间存在一对一的映射,这更容易理解和调整。
  • 高效
    • 在第一种方法中实现零数据丢失需要将数据存储在Write Ahead Log中,这进一步复制了数据。这实际上是低效的,因为数据有效地被复制两次 - 一次由Kafka复制,第二次由Write Ahead Log复制。第二种方法消除了问题,因为没有接收器,因此不需要Write Ahead Logs。只要您有足够的Kafka保留,就可以从Kafka恢复消息。
  • 强一致性
    • 第一种方法使用Kafka的高级API在Zookeeper中存储消耗的偏移量。传统上,这是从Kafka使用数据的方式。虽然这种方法(与预写日志结合使用)可以确保零数据丢失(即至少一次语义),但某些记录在某些故障下可能会被消耗两次的可能性很小。这是因为Spark Streaming可靠接收的数据与Zookeeper跟踪的偏移之间存在不一致。因此,在第二种方法中,我们使用不使用Zookeeper的简单Kafka API。 Spark Streaming在其检查点内跟踪偏移量。这消除了Spark Streaming和Zookeeper / Kafka之间的不一致,因此尽管出现故障,Spark Streaming也会有效地接收每条记录一次。为了实现输出结果的一次性语义,将数据保存到外部数据存储的输出操作必须是幂等的,或者是保存结果和偏移的原子事务。

 


Direct 读取方式

Direct 方式采用Kafka 简单的 consumer API 方式来读取数据,无需经由Zookeeper,此种方式不再需要专门的Receiver 来持续不断的读取数据。当Batch 任务触发时,由Executor 读取数据,并参与到其他的Executor 的数据计算过程中去。driver 来决定读取多少offsets,并将offsets 交由checkpoint 来维护。当触发下次Batch 任务时,再由executor 读取Kafka 数据并计算。从此过程我们可以发现Direct 方式无需Receiver 读取数据,而是需要计算时在读取数据,所以Direct 方式的数据消费对内存的要求不高,只需要考虑批量计算所需要的内存即可;另外Batch 任务堆积时,也不会影响数据堆积。其具体读取方式如图:

 

Direct 去读实现

# 引入依赖groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.11
version = 2.1.2# 调用方法val kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092,anotherhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean)
)val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](streamingContext,PreferConsistent,Subscribe[String, String](topics, kafkaParams)
)stream.map(record => (record.key, record.value))# 参考地址:
官方地址:http://spark.apache.org/docs/2.1.2/streaming-kafka-0-10-integration.html

 


Direct 读取问题

Direct 方式与Receiver-based 方式相比,具有以下优势:


  • 降低资源
    • Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。 降低内存。Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。
  • 鲁棒性更好
    • Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。
  • 加大了开发成本
    • Direct需要用户采用checkpoint或者第三方存储来维护offsets,而不像Receiver-based那样,通过ZooKeeper来维护Offsets,此提高了用户的开发成本。
  • 不方便可视化监控
    • Receiver-based方式指定topic指定consumer的消费情况均能通过ZooKeeper来监控,而Direct则没有这种便利,如果做到监控并可视化,则需要投入人力开发

 


kafka offsets 管理方式

Spark Streaming 集成了 Kafka 允许用户从 Kafka 中读取一个或多个 topic 的数据。一个 Kafka topic 包含多个存储消息的分区(partition)。每个分区中的消息都是顺序存储,并且用 offset 来标记消息。开发者可以在 Spark Streaming 应用中通过offset 来控制数据的读取位置,此时就需要好的 offset 的管理机制。

Offsets 管理对于保证流式应用在整个生命周期中数据的连贯性式非常有益的。比如,在应用停止或者报错推出之前没有将 offset 保存在持久化数据库中,那么 offset rangges 就会丢失。进一步说,如果没有保存每个分区已经读取的 offset,那么 Spark Streaming 就没有办法从上次断开的位置继续读取消息。

上图描述通常的 Spark Streaming 应用管理 offset 流程。Offsets 可以通过多种方式来管理,但是一般都遵循下面的步骤:


  • 在 Direct DStream 初始化的说话,需要指定一个包含每个 topic 的每个分区的 offset 用于让 Direct DStream 从指定位置读取数据
  • 读取并处理数据
  • 处理完之后存储结果数据
  • 最后,将 offsets 保存在外部持久化数据库如 HBase,Kafka,HDFS,Zookeeper等

 


Spark Streming checkpoints

使用 Spark Streaming 的 checkpoint 是最简单的存储方式,并且在 Spark 框架中很容易实现。Spark Streaming checkpoint 就是为了保存应用状态而设计的,我们将路径设置在 HDFS 上,所以能够从失败中恢复数据。

对 Kafka Stream 执行 checkpoint 操作使得 offset 保存在 checkpoint 中,如果是应用挂掉的话,那么 Spark Streaming 应用功能可以从保存的 offset 中开始读取消息。但是,如果对 Spark Streaming 应用进行升级的话,不能 checkpoint 的数据没有使用,所以这种机制并不可靠,因此我们不推荐这种方式。

 


将 offsets 存储在 HBase 中

HBase 可以作为一个可靠的外部数据库来持久化 offsets。通过将 offsets 存储在外部系统中,Spark Streaming 应用功能能够重读或者回放任何仍然存储在 Kafka 中的数据。

根据 HBase 的设计模式,允许应用能够以 rowkey 和 column 的结构将过个 Spark Streaming 应用和多个 Kafka topic 存放在一张表格中。在这个例子中,表格以 topic 名称、消费者 group ID 和 Spark Streaming 的 batchTime.milliSeconds 作为 rowkey 以做唯一标示。尽管 batchTime.milliSeconds 不是必须的,但是它能够更好的展示历史的每批次的 offsets。表格将存储30天的积累数据,如果超出30天则会被移除。下面是创建表格的 DDL 和结构

DDL
create 'stream_kafka_offsets', {NAME=>'offsets', TTL=>2592000}RowKey Layout:
row: ::
column family: offsets
qualifier:
value:

 

具体代码实现如下:

import kafka.utils.ZkUtils
import org.apache.hadoop.hbase.filter.PrefixFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.client.{Scan, Put, ConnectionFactory}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.{OffsetRange, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}/*** Created by gmedasani on 6/10/17.*/
object KafkaOffsetsBlogStreamingDriver {def main(args: Array[String]) {if (args.length <6) {System.err.println("Usage: KafkaDirectStreamTest " &#43;" ")System.exit(1)}val batchDuration &#61; args(0)val bootstrapServers &#61; args(1).toStringval topicsSet &#61; args(2).toString.split(",").toSetval consumerGroupID &#61; args(3)val hbaseTableName &#61; args(4)val zkQuorum &#61; args(5)val zkKafkaRootDir &#61; "kafka"val zkSessionTimeOut &#61; 10000val zkConnectionTimeOut &#61; 10000val sparkConf &#61; new SparkConf().setAppName("Kafka-Offset-Management-Blog").setMaster("local[4]")//Uncomment this line to test while developing on a workstationval sc &#61; new SparkContext(sparkConf)val ssc &#61; new StreamingContext(sc, Seconds(batchDuration.toLong))val topics &#61; topicsSet.toArrayval topic &#61; topics(0)val kafkaParams &#61; Map[String, Object]("bootstrap.servers" -> bootstrapServers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> consumerGroupID,"auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))/*Create a dummy process that simply returns the message as is.*/def processMessage(message:ConsumerRecord[String,String]):ConsumerRecord[String,String]&#61;{message}/*Save Offsets into HBase*/def saveOffsets(TOPIC_NAME:String,GROUP_ID:String,offsetRanges:Array[OffsetRange],hbaseTableName:String,batchTime: org.apache.spark.streaming.Time) &#61;{val hbaseConf &#61; HBaseConfiguration.create()hbaseConf.addResource("src/main/resources/hbase-site.xml")val conn &#61; ConnectionFactory.createConnection(hbaseConf)val table &#61; conn.getTable(TableName.valueOf(hbaseTableName))val rowKey &#61; TOPIC_NAME &#43; ":" &#43; GROUP_ID &#43; ":" &#43; String.valueOf(batchTime.milliseconds)val put &#61; new Put(rowKey.getBytes)for(offset <- offsetRanges){put.addColumn(Bytes.toBytes("offsets"),Bytes.toBytes(offset.partition.toString),Bytes.toBytes(offset.untilOffset.toString))}table.put(put)conn.close()}/*Returns last committed offsets for all the partitions of a given topic from HBase in following cases.- CASE 1: SparkStreaming job is started for the first time. This function gets the number of topic partitions fromZookeeper and for each partition returns the last committed offset as 0- CASE 2: SparkStreaming is restarted and there are no changes to the number of partitions in a topic. Lastcommitted offsets for each topic-partition is returned as is from HBase.- CASE 3: SparkStreaming is restarted and the number of partitions in a topic increased. For old partitions, lastcommitted offsets for each topic-partition is returned as is from HBase as is. For newly added partitions,function returns last committed offsets as 0*/def getLastCommittedOffsets(TOPIC_NAME:String,GROUP_ID:String,hbaseTableName:String,zkQuorum:String,zkRootDir:String, sessionTimeout:Int,connectionTimeOut:Int):Map[TopicPartition,Long] &#61;{val hbaseConf &#61; HBaseConfiguration.create()hbaseConf.addResource("src/main/resources/hbase-site.xml")val zkUrl &#61; zkQuorum&#43;"/"&#43;zkRootDirval zkClientAndConnection &#61; ZkUtils.createZkClientAndConnection(zkUrl,sessionTimeout,connectionTimeOut)val zkUtils &#61; new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2,false)val zKNumberOfPartitionsForTopic &#61; zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME)).get(TOPIC_NAME).toList.head.size//Connect to HBase to retrieve last committed offsetsval conn &#61; ConnectionFactory.createConnection(hbaseConf)val table &#61; conn.getTable(TableName.valueOf(hbaseTableName))val startRow &#61; TOPIC_NAME &#43; ":" &#43; GROUP_ID &#43; ":" &#43; String.valueOf(System.currentTimeMillis())val stopRow &#61; TOPIC_NAME &#43; ":" &#43; GROUP_ID &#43; ":" &#43; 0val scan &#61; new Scan()val scanner &#61; table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(stopRow.getBytes).setReversed(true))val result &#61; scanner.next()var hbaseNumberOfPartitionsForTopic &#61; 0 //Set the number of partitions discovered for a topic in HBase to 0if (result !&#61; null){//If the result from hbase scanner is not null, set number of partitions from hbase to the number of cellshbaseNumberOfPartitionsForTopic &#61; result.listCells().size()}val fromOffsets &#61; collection.mutable.Map[TopicPartition,Long]()if(hbaseNumberOfPartitionsForTopic &#61;&#61; 0){// initialize fromOffsets to beginningfor (partition <- 0 to zKNumberOfPartitionsForTopic-1){fromOffsets &#43;&#61; (new TopicPartition(TOPIC_NAME,partition) -> 0)}} else if(zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic){// handle scenario where new partitions have been added to existing kafka topicfor (partition <- 0 to hbaseNumberOfPartitionsForTopic-1){val fromOffset &#61; Bytes.toString(result.getValue(Bytes.toBytes("offsets"),Bytes.toBytes(partition.toString)))fromOffsets &#43;&#61; (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)}for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic-1){fromOffsets &#43;&#61; (new TopicPartition(TOPIC_NAME,partition) -> 0)}} else {//initialize fromOffsets from last runfor (partition <- 0 to hbaseNumberOfPartitionsForTopic-1 ){val fromOffset &#61; Bytes.toString(result.getValue(Bytes.toBytes("offsets"),Bytes.toBytes(partition.toString)))fromOffsets &#43;&#61; (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)}}scanner.close()conn.close()fromOffsets.toMap}val fromOffsets&#61; getLastCommittedOffsets(topic,consumerGroupID,hbaseTableName,zkQuorum,zkKafkaRootDir,zkSessionTimeOut,zkConnectionTimeOut)val inputDStream &#61; KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets))/*For each RDD in a DStream apply a map transformation that processes the message.*/inputDStream.foreachRDD((rdd,batchTime) &#61;> {val offsetRanges &#61; rdd.asInstanceOf[HasOffsetRanges].offsetRangesoffsetRanges.foreach(offset &#61;> println(offset.topic, offset.partition, offset.fromOffset,offset.untilOffset))val newRDD &#61; rdd.map(message &#61;> processMessage(message))newRDD.count()saveOffsets(topic,consumerGroupID,offsetRanges,hbaseTableName,batchTime) //save the offsets to HBase})println("Number of messages processed " &#43; inputDStream.count())ssc.start()ssc.awaitTermination()}
}

 


将 offsets 存储到 Zookeeper 中

在 SparkStreaming 连接 Kafka 应用中使用 Zookeeper 来存储 offsets 也是一种比较可靠的方式。

在这个方案中&#xff0c;Spark Streaming 任务在启动时回去 Zookeeper 中读取每个分区的 offsets。如果有新的分区出现&#xff0c;那么他的 offset 将会设置在最开始的位置。在每批数据处理完之后&#xff0c;用户需要存储已处理数据的一个 offset 。此外&#xff0c;新消费者将使用跟旧的 Kafka 消费者 API 一样的格式将 offset 保存在 Zookeeper 中。因此&#xff0c;任何追踪或监控 Zookeeper 中 Kafka offset 的工具仍然生效。

package com.baidu.hec.zkimport kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils}
import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import scala.util.Try/*** Kafka的连接和Offset管理工具类** &#64;param zkHosts Zookeeper地址* &#64;param kafkaParams Kafka启动参数* &#64;author Leibniz*/
class KafkaManager(zkHosts: String, kafkaParams: Map[String, Object]) extends Serializable {//Logback日志对象&#xff0c;使用slf4j框架&#64;transient private lazy val log &#61; LoggerFactory.getLogger(getClass)//建立ZkUtils对象所需的参数val (zkClient, zkConnection) &#61; ZkUtils.createZkClientAndConnection(zkHosts, 3000, 3000)//ZkUtils对象&#xff0c;用于访问Zookeeperval zkUtils &#61; new ZkUtils(zkClient, zkConnection, false)/*** 包装createDirectStream方法&#xff0c;支持Kafka Offset&#xff0c;用于创建Kafka Streaming流** &#64;param ssc Spark Streaming Context* &#64;param topics Kafka话题* &#64;tparam K Kafka消息Key类型* &#64;tparam V Kafka消息Value类型* &#64;return Kafka Streaming流* &#64;author Leibniz*/def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] &#61; {val groupId &#61; kafkaParams("group.id").toStringval storedOffsets &#61; readOffsets(topics, groupId)log.info("Kafka消息偏移量汇总(格式:(话题,分区号,偏移量)):{}", storedOffsets.map(off &#61;> (off._1.topic, off._1.partition(), off._2)))val kafkaStream &#61; KafkaUtils.createDirectStream[K, V](ssc, PreferConsistent, ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, storedOffsets))kafkaStream}/*** 从Zookeeper读取Kafka消息队列的Offset** &#64;param topics Kafka话题* &#64;param groupId Kafka Group ID* &#64;return 返回一个Map[TopicPartition, Long]&#xff0c;记录每个话题每个Partition上的offset&#xff0c;如果还没消费&#xff0c;则offset为0* &#64;author Leibniz*/def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] &#61; {val topicPartOffsetMap &#61; collection.mutable.HashMap.empty[TopicPartition, Long]val partitionMap &#61; zkUtils.getPartitionsForTopics(topics)// /consumers//offsets//partitionMap.foreach(topicPartitions &#61;> {val zkGroupTopicDirs &#61; new ZKGroupTopicDirs(groupId, topicPartitions._1)topicPartitions._2.foreach(partition &#61;> {val offsetPath &#61; zkGroupTopicDirs.consumerOffsetDir &#43; "/" &#43; partitionval tryGetKafkaOffset &#61; Try {val offsetStatTuple &#61; zkUtils.readData(offsetPath)if (offsetStatTuple !&#61; null) {log.info("查询Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong)}}if(tryGetKafkaOffset.isFailure){//http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.htmlval consumer &#61; new KafkaConsumer[String, Object](kafkaParams)val partitionList &#61; List(new TopicPartition(topicPartitions._1, partition))consumer.assign(partitionList)val minAvailableOffset &#61; consumer.beginningOffsets(partitionList).values.headconsumer.close()log.warn("查询Kafka消息偏移量详情: 没有上一次的ZK节点:{}, 话题:{}, 分区:{}, ZK节点路径:{}, 使用最小可用偏移量:{}", Seq[AnyRef](tryGetKafkaOffset.failed.get.getMessage, topicPartitions._1, partition.toString, offsetPath, minAvailableOffset): _*)topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), minAvailableOffset)}})})topicPartOffsetMap.toMap}/*** 保存Kafka消息队列消费的Offset** &#64;param rdd SparkStreaming的Kafka RDD&#xff0c;RDD[ConsumerRecord[K, V]]* &#64;param storeEndOffset true&#61;保存结束offset&#xff0c; false&#61;保存起始offset* &#64;author Leibniz*/def persistOffsets[K, V](rdd: RDD[ConsumerRecord[K, V]], storeEndOffset: Boolean &#61; true): Unit &#61; {val groupId &#61; kafkaParams("group.id").toStringval offsetsList &#61; rdd.asInstanceOf[HasOffsetRanges].offsetRangesoffsetsList.foreach(or &#61;> {val zkGroupTopicDirs &#61; new ZKGroupTopicDirs(groupId, or.topic)val offsetPath &#61; zkGroupTopicDirs.consumerOffsetDir &#43; "/" &#43; or.partitionval offsetVal &#61; if (storeEndOffset) or.untilOffset else or.fromOffsetzkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir &#43; "/" &#43; or.partition, offsetVal &#43; "" /*, JavaConversions.bufferAsJavaList(acls)*/)log.info("保存Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)})}
}

 

package com.baidu.hec.zkimport com.alibaba.fastjson.JSON
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}object Test {def main(args: Array[String]): Unit &#61; {val zkHosts &#61; "0.0.0.0:1181"val kafkaParams &#61; Map[String, Object]("auto.offset.reset" -> "latest","bootstrap.servers" -> "0.0.0.0:9092","group.id" -> "test_topic_group","enable.auto.commit" -> (false: java.lang.Boolean), //禁用自动提交Offset&#xff0c;否则可能没正常消费完就提交了&#xff0c;造成数据错误"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer])val km &#61; new KafkaManager(zkHosts, kafkaParams)val sparkConf &#61; new SparkConf().setAppName("SearchContentAnalysis").setMaster("local[2]")val ssc &#61; new StreamingContext(sparkConf, Seconds(60))val topics &#61; Array("test-topic")val stream: InputDStream[ConsumerRecord[String, String]] &#61; km.createDirectStream(ssc, topics)stream.foreachRDD(rdd &#61;> {val message &#61; rdd.map(recode &#61;> recode.value())if(!message.isEmpty()) {val spark &#61; SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)import spark.implicits._val dataFrame &#61; message.map(data &#61;>{val jsonData &#61; JSON.parseObject(data)val content &#61; jsonData.getString("content")val userId &#61; jsonData.getLong("userId")val createTime &#61; jsonData.getLong("createTime")Record(content, userId, createTime)}).toDF()// val dataFrame &#61; message.toDF()dataFrame.createOrReplaceTempView("temp_search_table")spark.sql("select * from temp_search_table").show(100)km.persistOffsets(rdd)printf("******************************************")}})ssc.start()ssc.awaitTermination()}case class Record(content: String, userId: Long, createTime: Long)object SparkSessionSingleton {&#64;transient private var instance: SparkSession &#61; _def getInstance(sparkConf: SparkConf): SparkSession &#61; {if (instance &#61;&#61; null) {instance &#61; SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()}instance}}}

 


Kafka 本身

Apache Spark 2.1.x以及spark-streaming-kafka-0-10使用新的的消费者API即异步提交API。你可以在你确保你处理后的数据已经妥善保存之后使用commitAsync API&#xff08;异步提交 API&#xff09;来向Kafka提交offsets。新的消费者API会以消费者组id作为唯一标识来提交offsets。

将 offsets 提交到 Kafka 中&#xff1a;

stream.foreachRDD { rdd &#61;>val offsetRanges &#61; rdd.asInstanceOf[HasOffsetRanges].offsetRanges// some time later, after outputs have completedstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}

 

但是该中方式在测试中&#xff0c;无法根据 offset 来消费消息。

参考&#xff1a;https://juejin.im/entry/5acd7224f265da237c693f7d


推荐阅读
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • 开机自启动的几种方式
    0x01快速自启动目录快速启动目录自启动方式源于Windows中的一个目录,这个目录一般叫启动或者Startup。位于该目录下的PE文件会在开机后进行自启动 ... [详细]
  • 构建高可用性Spark分布式集群:大数据环境下的最佳实践
    在构建高可用性的Spark分布式集群过程中,确保所有节点之间的无密码登录是至关重要的一步。通过在每个节点上生成SSH密钥对(使用 `ssh-keygen -t rsa` 命令并保持默认设置),可以实现这一目标。此外,还需将生成的公钥分发到所有节点的 `~/.ssh/authorized_keys` 文件中,以确保节点间的无缝通信。为了进一步提升集群的稳定性和性能,建议采用负载均衡和故障恢复机制,并定期进行系统监控和维护。 ... [详细]
  • php更新数据库字段的函数是,php更新数据库字段的函数是 ... [详细]
  • 在对WordPress Duplicator插件0.4.4版本的安全评估中,发现其存在跨站脚本(XSS)攻击漏洞。此漏洞可能被利用进行恶意操作,建议用户及时更新至最新版本以确保系统安全。测试方法仅限于安全研究和教学目的,使用时需自行承担风险。漏洞编号:HTB23162。 ... [详细]
  • 本文介绍了如何利用Struts1框架构建一个简易的四则运算计算器。通过采用DispatchAction来处理不同类型的计算请求,并使用动态Form来优化开发流程,确保代码的简洁性和可维护性。同时,系统提供了用户友好的错误提示,以增强用户体验。 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • 在第二课中,我们将深入探讨Scala的面向对象编程核心概念及其在Spark源码中的应用。首先,通过详细的实战案例,全面解析Scala中的类和对象。作为一门纯面向对象的语言,Scala的类设计和对象使用是理解其面向对象特性的关键。此外,我们还将介绍如何通过阅读Spark源码来进一步巩固对这些概念的理解。这不仅有助于提升编程技能,还能为后续的高级应用开发打下坚实的基础。 ... [详细]
  • This feature automatically validates new regions using the AWS SDK, ensuring compatibility and accuracy. ... [详细]
  • 如果应用程序经常播放密集、急促而又短暂的音效(如游戏音效)那么使用MediaPlayer显得有些不太适合了。因为MediaPlayer存在如下缺点:1)延时时间较长,且资源占用率高 ... [详细]
  • javascript分页类支持页码格式
    前端时间因为项目需要,要对一个产品下所有的附属图片进行分页显示,没考虑ajax一张张请求,所以干脆一次性全部把图片out,然 ... [详细]
  • 原文网址:https:www.cnblogs.comysoceanp7476379.html目录1、AOP什么?2、需求3、解决办法1:使用静态代理4 ... [详细]
  • PTArchiver工作原理详解与应用分析
    PTArchiver工作原理及其应用分析本文详细解析了PTArchiver的工作机制,探讨了其在数据归档和管理中的应用。PTArchiver通过高效的压缩算法和灵活的存储策略,实现了对大规模数据的高效管理和长期保存。文章还介绍了其在企业级数据备份、历史数据迁移等场景中的实际应用案例,为用户提供了实用的操作建议和技术支持。 ... [详细]
  • 如何将TS文件转换为M3U8直播流:HLS与M3U8格式详解
    在视频传输领域,MP4虽然常见,但在直播场景中直接使用MP4格式存在诸多问题。例如,MP4文件的头部信息(如ftyp、moov)较大,导致初始加载时间较长,影响用户体验。相比之下,HLS(HTTP Live Streaming)协议及其M3U8格式更具优势。HLS通过将视频切分成多个小片段,并生成一个M3U8播放列表文件,实现低延迟和高稳定性。本文详细介绍了如何将TS文件转换为M3U8直播流,包括技术原理和具体操作步骤,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 本文介绍了如何利用Shell脚本高效地部署MHA(MySQL High Availability)高可用集群。通过详细的脚本编写和配置示例,展示了自动化部署过程中的关键步骤和注意事项。该方法不仅简化了集群的部署流程,还提高了系统的稳定性和可用性。 ... [详细]
author-avatar
lily的思念
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有