热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

kafka0.9+消费者配置参数说明

ConsumerConfiguration在kafka0.9使用JavaConsumer替代了老版本的scalaConsumer。新版的配置如下:bootstrap.

Consumer Configuration在kafka 0.9+使用Java Consumer替代了老版本的scala Consumer。新版的配置如下:


  • bootstrap.servers
    在启动consumer时配置的broker地址的。不需要将cluster中所有的broker都配置上,因为启动后会自动的发现cluster所有的broker。
    它配置的格式是:host1:port1;host2:port2…

  • key.descrializer

      Message record 的key的反序列化类。

      一般都是StringDeserializer, org.apache.kafka.common.serialization.StringDeserializer


  • value.descrializer
    Message record 的value的反序列化类。

      一般都是StringDeserializer。org.apache.kafka.common.serialization.StringDeserializer


  • group.id
    用于表示该consumer想要加入到哪个group中。默认值是 “”。

  • heartbeat.interval.ms
    心跳间隔。心跳是在consumer与coordinator之间进行的。心跳是确定consumer存活,加入或者退出group的有效手段。这个值必须设置的小于session.timeout.ms,因为:
    当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。
    通常设置的值要低于session.timeout.ms的1/3。
    默认值是:3000 (3s)

  • session.timeout.ms
    Consumer session 过期时间。这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。
    其默认值是:10000 (10 s)

  • enable.auto.commit
    Consumer 在commit offset时有两种模式:自动提交,手动提交。手动提交在前面已经说过。自动提交:是Kafka Consumer会在后台周期性的去commit。
    默认值是true。

  • auto.commit.interval.ms
    自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)

  • auto.offset.reset
    这个配置项,是告诉Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(如果一个record被删除,就肯定不存在了)时,该如何处理。它有4种处理方式:
    1) earliest:自动重置到最早的offset。
    2) latest:看上去重置到最晚的offset。
    3) none:如果边更早的offset也没有的话,就抛出异常给consumer,告诉consumer在整个consumer group中都没有发现有这样的offset。
    4) 如果不是上述3种,只抛出异常给consumer。
    默认值是latest。

  • connections.max.idle.ms
    连接空闲超时时间。因为consumer只与broker有连接(coordinator也是一个broker),所以这个配置的是consumer到broker之间的。
    默认值是:540000 (9 min)

  • fetch.max.wait.ms
    Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。

  • fetch.min.bytes
    当consumer向一个broker发起fetch请求时,broker返回的records的大小最小值。如果broker中数据量不够的话会wait,直到数据大小满足这个条件。
    取值范围是:[0, Integer.Max],默认值是1。
    默认值设置为1的目的是:使得consumer的请求能够尽快的返回。

  • fetch.max.bytes
    一次fetch请求,从一个broker中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。
    broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes 和 topic的max.message.bytes的配置。
    取值范围是:[0, Integer.Max],默认值是:52428800 (5 MB)

  • max.partition.fetch.bytes
    一次fetch请求,从一个partition中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。
    broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes 和 topic的max.message.bytes的配置。

  • max.poll.interval.ms
    前面说过要求程序中不间断的调用poll()。如果长时间没有调用poll,且间隔超过这个值时,就会认为这个consumer失败了。

  • max.poll.records
    Consumer每次调用poll()时取到的records的最大数。

  • receive.buffer.byte
    Consumer receiver buffer (SO_RCVBUF)的大小。这个值在创建Socket连接时会用到。
    取值范围是:[-1, Integer.MAX]。默认值是:65536 (64 KB)
    如果值设置为-1,则会使用操作系统默认的值。

  • request.timeout.ms
    请求发起后,并不一定会很快接收到响应信息。这个配置就是来配置请求超时时间的。默认值是:305000 (305 s)

  • client.id
    Consumer进程的标识。如果设置一个人为可读的值,跟踪问题会比较方便。

  • interceptor.classes
    用户自定义interceptor。

  • metadata.max.age.ms
    Metadata数据的刷新间隔。即便没有任何的partition订阅关系变更也行执行。
    范围是:[0, Integer.MAX],默认值是:300000 (5 min)


Consumer配置说明

bootstrap.servers

参考生产者中的bootstrap.servers配置说明

key.descrializer、value.descrializer
Message record 的key, value的反序列化类。

group.id


A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.


用于表示该consumer想要加入到哪个group中。默认值是 “”。
heartbeat.interval.ms


The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.


心跳间隔。心跳是在consumer与coordinator之间进行的。心跳用来保持consumer的会话,并且在有consumer加入或者离开group时帮助进行rebalance。
这个值必须设置的小于session.timeout.ms,因为:当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。
通常设置的值要低于session.timeout.ms的1/3。默认值是:3000 (3s)

session.timeout.ms


The timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.


Consumer session 过期时间。consumer会发送周期性的心跳表明该consumer是活着的。如果超过session.timeout.ms设定的值仍然没有收到心跳,zebroker会把这个consumer从group中移除,并且重新rebalance。
这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。
其默认值是:10000 (10 s)

enable.auto.commit


If true the consumer’s offset will be periodically committed in the background.


设置Consumer 在 commit 方式是否是自动调焦。默认值是true。

auto.commit.interval.ms


The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true.


自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)

auto.offset.reset


What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):



  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer’s group
  • anything else: throw exception to the consumer.

这个配置项,是告诉Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(如果一个record被删除,就肯定不存在了)时,该如何处理。它有4种处理方式:


  • earliest:自动重置到最早的offset。
  • latest:看上去重置到最晚的offset。
  • none:如果边更早的offset也没有的话,就抛出异常给consumer,告诉consumer在整个consumer group中都没有发现有这样的offset。
  • 如果不是上述3种,只抛出异常给consumer。
    默认值是latest。

fetch.max.wait.ms


The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by fetch.min.bytes.


Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。

fetch.min.bytes


The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.


当consumer向一个broker发起fetch请求时,broker返回的records的大小最小值。如果broker中数据量不够的话会wait,直到数据大小满足这个条件。
取值范围是:[0, Integer.Max],默认值是1。默认值设置为1的目的是:使得consumer的请求能够尽快的返回。

fetch.max.bytes


The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel.
一次fetch请求,从一个broker中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes和topic的max.message.bytes的配置。
取值范围是:[0, Integer.Max],默认值是:52428800 (5 MB)


max.partition.fetch.bytes
一次fetch请求,从一个partition中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。 broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker 的message.max.bytes和 topic 的max.message.bytes的配置。

max.poll.interval.ms


The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.


前面说过要求程序中不间断的调用poll()。如果长时间没有调用poll,且间隔超过这个值时,就会认为这个consumer失败了。

max.poll.records


The maximum number of records returned in a single call to poll().


Consumer每次调用poll()时取到的records的最大数。

 

转载处:https://blog.csdn.net/lixiang987654321/article/details/99437168


推荐阅读
  • Kafka入门指南
    本文将详细介绍如何在CentOS 7上安装和配置Kafka,包括必要的环境准备、JDK和Zookeeper的配置步骤。 ... [详细]
  • 深入解析Dubbo:使用与源码分析
    本文详细介绍了Dubbo的使用方法和源码分析,涵盖其架构设计、核心特性和调用流程。 ... [详细]
  • RocketMQ 运维监控实践指南
    本文详细介绍了如何实现 RocketMQ 的运维监控,包括监控平台的搭建、常用运维命令及其具体用法。适合对 RocketMQ 监控感兴趣的读者参考。 ... [详细]
  • 在运行于MS SQL Server 2005的.NET 2.0 Web应用中,我偶尔会遇到令人头疼的SQL死锁问题。过去,我们主要通过调整查询来解决这些问题,但这既耗时又不可靠。我希望能找到一种确定性的查询模式,确保从设计上彻底避免SQL死锁。 ... [详细]
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • 本文介绍了实时流协议(RTSP)的基本概念、组成部分及其与RTCP的交互过程,详细解析了客户端请求格式、服务器响应格式、常用方法分类及协议流程,并提供了SDP格式的深入解析。 ... [详细]
  • 在尝试启动Java应用服务器Tomcat时,遇到了org.apache.catalina.LifecycleException异常。本文详细记录了异常的具体表现形式,并提供了有效的解决方案。 ... [详细]
  • 在Ubuntu 18.04上使用Nginx搭建RTMP流媒体服务器
    本文详细介绍了如何在Ubuntu 18.04上使用Nginx和nginx-rtmp-module模块搭建RTMP流媒体服务器,包括环境搭建、配置文件修改和推流拉流操作。适用于需要搭建流媒体服务器的技术人员。 ... [详细]
  • 在Java开发中,保护代码安全是一个重要的课题。由于Java字节码容易被反编译,因此使用代码混淆工具如ProGuard变得尤为重要。本文将详细介绍如何使用ProGuard进行代码混淆,以及其基本原理和常见问题。 ... [详细]
  • 本文介绍如何通过参数化查询来防止SQL注入攻击,确保数据库的安全性。示例代码展示了在C#中使用参数化查询添加学生信息的方法。 ... [详细]
  • Linux环境下MySQL的安装与配置
    本文详细介绍了在Linux系统上安装和配置MySQL的步骤,包括安装前的准备工作、下载和解压安装包、初始化数据库、配置文件编辑、启动服务以及设置开机自启动等。 ... [详细]
  • Nacos 0.3 数据持久化详解与实践
    本文详细介绍了如何将 Nacos 0.3 的数据持久化到 MySQL 数据库,并提供了具体的步骤和注意事项。 ... [详细]
  • Ext JS MVC系列一:环境搭建与框架概览
    本文主要介绍了如何在项目中使用Ext JS 4作为前端框架,并详细讲解了Ext JS 4的MVC开发模式。文章将从项目目录结构、相关CSS和JS文件的引用以及MVC框架的整体认识三个方面进行总结。 ... [详细]
  • 解决Bootstrap DataTable Ajax请求重复问题
    在最近的一个项目中,我们使用了JQuery DataTable进行数据展示,虽然使用起来非常方便,但在测试过程中发现了一个问题:当查询条件改变时,有时查询结果的数据不正确。通过FireBug调试发现,点击搜索按钮时,会发送两次Ajax请求,一次是原条件的请求,一次是新条件的请求。 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
author-avatar
手机用户2702934324
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有