热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

kafka只消费新消息-kafkatoconsumeonlynewmessages

MysparkstreamingjobisconsumingdatafromKafka我的火花流工作正在消耗卡夫卡的数据KafkaUtils.createStream(jssc

My spark streaming job is consuming data from Kafka

我的火花流工作正在消耗卡夫卡的数据

KafkaUtils.createStream(jssc, prop.getProperty(Config.ZOOKEEPER_QUORUM),
                        prop.getProperty(Config.KAFKA_CONSUMER_GROUP), topicMap);

whenever i restart my job it start consuming from last offset store (i am assuming this because it takes a lot of time to send processed data and if i change the consumer group it works instantly with new message)

每当我重新开始工作时,它就开始从最后一个偏移存储中消耗(我假设这是因为它需要花费大量时间来发送已处理的数据,如果我更改了消费者组,它会立即使用新消息)

I am kafka 8.1.1 where auto.offset.reset is default to largest which means whenever i'll restart kafka will send data from where i left.

我是kafka 8.1.1其中auto.offset.reset默认为最大,这意味着每当我重新启动kafka将从我离开的地方发送数据。

My use case ask me to ignore this data and process only arriving data. How can i achieve this? any suggestion

我的用例要求我忽略这些数据并仅处理到达的数据。我怎样才能实现这一目标?任何建议

1 个解决方案

#1


There is two ways you can achieve this:

有两种方法可以实现这一目标:

  1. Create a unique consumer group each time on restart and it will consume from the latest offset.

    每次重新启动时创建一个唯一的使用者组,它将使用最新的偏移量。

  2. Use the direct approach instead of receiver based; here you have more control over how you consume but would have to update zookeeper manually to store your offsets. In the example below it will always start at latest offset.

    使用直接方法而不是基于接收器;在这里,您可以更好地控制使用方式,但必须手动更新zookeeper以存储偏移量。在下面的示例中,它始终以最新的偏移量开始。

    import org.apache.spark.streaming.kafka._
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    

Documentation on direct approach here: https://spark.apache.org/docs/latest/streaming-kafka-integration.html

有关直接方法的文档:https://spark.apache.org/docs/latest/streaming-kafka-integration.html


推荐阅读
  • #usernobody;worker_processes1;#error_loglogserror.log;#error_loglogserror.lognotice;#error ... [详细]
  • Spark 贝叶斯分类算法
    一、贝叶斯定理数学基础我们都知道条件概率的数学公式形式为即B发生的条件下A发生的概率等于A和B同时发生的概率除以B发生的概率。根据此公式变换,得到贝叶斯公式:即贝叶斯定律是关于随机 ... [详细]
  • Vue项目结构分析-项目结构重点在src文件夹:assets——静态资源,如css,jscomponents——公共组件router——路由文件(vuecli3.x没有自 ... [详细]
  • ApacheTrafficServer6.2.2发布了,TrafficServer是一套快速、模块化 ... [详细]
  • iOS之富文本
    之前做项目时遇到一个问题:使用UITextView显示一段电影的简介,由于字数比较多,所以字体设置的很小,行间距和段间距也很小,一大段文字挤在一起看起来很别扭,想要把行间距调大,结 ... [详细]
  • 如何用js 实现依赖注入的思想,后端框架思想搬到前端来
    如何用js实现依赖注入的思想,后端框架思想搬到前端来-大家在做些页面的时候,很多都是用ajax实现的,在显示的时候有很多表单提交的add或者update操作,显然这样很烦,突然想到 ... [详细]
  • kepserver中文手册,kepserver使用教程,kepserver设置
    下面介绍一下KepServer模拟器的使用,以下示例使用服务器随附的Simulator驱动程序来演示创建、配置和运行项目的过程。Simulator驱动程序是基于内存的驱动程序,能为 ... [详细]
  • FluxCD、ArgoCD或Jenkins X,哪个才是适合你的GitOps工具?
    GitOps是一种使用基于Git的工作流程来全面管理应用和基础设施的想法,其在最近获得了极大关注。新一代的部署工具更能说明这一点,它们将GitOps作为 ... [详细]
  • rbac 4表 常规设计
    rbac4表常规设计设计模型:1、管理员表(users)Schema::create('users',function(Blueprint$table){$tabl ... [详细]
  • Eclipse中SpringBoot响应jsp的简单demo
    首先在Eclipse里新建一个maven工程,这里的打包类型和父包如果后续再去pom中添加也可以此时的工程路径是这样的接下来去到pom中添加相关的依赖,如果有报错mavenupda ... [详细]
  • #includestdafx.h#includeiostream#includesstream#includemap#includestring ... [详细]
  • 使用ffmpeg进行视频格式转换的简单例子2006-12-1623:12主要参考FFMPEG里面的apiexample.c以及output_example.c编写intmain(in ... [详细]
  • 稀松数组
    稀松数组1.稀松数组什么?在一个数组中,若数值为0的元素数目远远多于非0元素的数目,并且非0元素分布没有规律时,则称该数组为稀疏数组;如图,一个5*5的数组arr上只有3个有效数值 ... [详细]
  • 1、为什么要对nginx平滑升级随着nginx越来越流行,并且nginx的优势也越来越明显,nginx的版本迭代也来时加速模式,1.9. ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
author-avatar
望尽天涯
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有