热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

kafka0.9+消费者配置参数说明

ConsumerConfiguration在kafka0.9使用JavaConsumer替代了老版本的scalaConsumer。新版的配置如下:bootstrap.

Consumer Configuration在kafka 0.9+使用Java Consumer替代了老版本的scala Consumer。新版的配置如下:


  • bootstrap.servers
    在启动consumer时配置的broker地址的。不需要将cluster中所有的broker都配置上,因为启动后会自动的发现cluster所有的broker。
    它配置的格式是:host1:port1;host2:port2…

  • key.descrializer

      Message record 的key的反序列化类。

      一般都是StringDeserializer, org.apache.kafka.common.serialization.StringDeserializer


  • value.descrializer
    Message record 的value的反序列化类。

      一般都是StringDeserializer。org.apache.kafka.common.serialization.StringDeserializer


  • group.id
    用于表示该consumer想要加入到哪个group中。默认值是 “”。

  • heartbeat.interval.ms
    心跳间隔。心跳是在consumer与coordinator之间进行的。心跳是确定consumer存活,加入或者退出group的有效手段。这个值必须设置的小于session.timeout.ms,因为:
    当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。
    通常设置的值要低于session.timeout.ms的1/3。
    默认值是:3000 (3s)

  • session.timeout.ms
    Consumer session 过期时间。这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。
    其默认值是:10000 (10 s)

  • enable.auto.commit
    Consumer 在commit offset时有两种模式:自动提交,手动提交。手动提交在前面已经说过。自动提交:是Kafka Consumer会在后台周期性的去commit。
    默认值是true。

  • auto.commit.interval.ms
    自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)

  • auto.offset.reset
    这个配置项,是告诉Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(如果一个record被删除,就肯定不存在了)时,该如何处理。它有4种处理方式:
    1) earliest:自动重置到最早的offset。
    2) latest:看上去重置到最晚的offset。
    3) none:如果边更早的offset也没有的话,就抛出异常给consumer,告诉consumer在整个consumer group中都没有发现有这样的offset。
    4) 如果不是上述3种,只抛出异常给consumer。
    默认值是latest。

  • connections.max.idle.ms
    连接空闲超时时间。因为consumer只与broker有连接(coordinator也是一个broker),所以这个配置的是consumer到broker之间的。
    默认值是:540000 (9 min)

  • fetch.max.wait.ms
    Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。

  • fetch.min.bytes
    当consumer向一个broker发起fetch请求时,broker返回的records的大小最小值。如果broker中数据量不够的话会wait,直到数据大小满足这个条件。
    取值范围是:[0, Integer.Max],默认值是1。
    默认值设置为1的目的是:使得consumer的请求能够尽快的返回。

  • fetch.max.bytes
    一次fetch请求,从一个broker中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。
    broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes 和 topic的max.message.bytes的配置。
    取值范围是:[0, Integer.Max],默认值是:52428800 (5 MB)

  • max.partition.fetch.bytes
    一次fetch请求,从一个partition中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。
    broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes 和 topic的max.message.bytes的配置。

  • max.poll.interval.ms
    前面说过要求程序中不间断的调用poll()。如果长时间没有调用poll,且间隔超过这个值时,就会认为这个consumer失败了。

  • max.poll.records
    Consumer每次调用poll()时取到的records的最大数。

  • receive.buffer.byte
    Consumer receiver buffer (SO_RCVBUF)的大小。这个值在创建Socket连接时会用到。
    取值范围是:[-1, Integer.MAX]。默认值是:65536 (64 KB)
    如果值设置为-1,则会使用操作系统默认的值。

  • request.timeout.ms
    请求发起后,并不一定会很快接收到响应信息。这个配置就是来配置请求超时时间的。默认值是:305000 (305 s)

  • client.id
    Consumer进程的标识。如果设置一个人为可读的值,跟踪问题会比较方便。

  • interceptor.classes
    用户自定义interceptor。

  • metadata.max.age.ms
    Metadata数据的刷新间隔。即便没有任何的partition订阅关系变更也行执行。
    范围是:[0, Integer.MAX],默认值是:300000 (5 min)


Consumer配置说明

bootstrap.servers

参考生产者中的bootstrap.servers配置说明

key.descrializer、value.descrializer
Message record 的key, value的反序列化类。

group.id


A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.


用于表示该consumer想要加入到哪个group中。默认值是 “”。
heartbeat.interval.ms


The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.


心跳间隔。心跳是在consumer与coordinator之间进行的。心跳用来保持consumer的会话,并且在有consumer加入或者离开group时帮助进行rebalance。
这个值必须设置的小于session.timeout.ms,因为:当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。
通常设置的值要低于session.timeout.ms的1/3。默认值是:3000 (3s)

session.timeout.ms


The timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.


Consumer session 过期时间。consumer会发送周期性的心跳表明该consumer是活着的。如果超过session.timeout.ms设定的值仍然没有收到心跳,zebroker会把这个consumer从group中移除,并且重新rebalance。
这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。
其默认值是:10000 (10 s)

enable.auto.commit


If true the consumer’s offset will be periodically committed in the background.


设置Consumer 在 commit 方式是否是自动调焦。默认值是true。

auto.commit.interval.ms


The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true.


自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)

auto.offset.reset


What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):



  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer’s group
  • anything else: throw exception to the consumer.

这个配置项,是告诉Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(如果一个record被删除,就肯定不存在了)时,该如何处理。它有4种处理方式:


  • earliest:自动重置到最早的offset。
  • latest:看上去重置到最晚的offset。
  • none:如果边更早的offset也没有的话,就抛出异常给consumer,告诉consumer在整个consumer group中都没有发现有这样的offset。
  • 如果不是上述3种,只抛出异常给consumer。
    默认值是latest。

fetch.max.wait.ms


The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by fetch.min.bytes.


Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。

fetch.min.bytes


The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.


当consumer向一个broker发起fetch请求时,broker返回的records的大小最小值。如果broker中数据量不够的话会wait,直到数据大小满足这个条件。
取值范围是:[0, Integer.Max],默认值是1。默认值设置为1的目的是:使得consumer的请求能够尽快的返回。

fetch.max.bytes


The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel.
一次fetch请求,从一个broker中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes和topic的max.message.bytes的配置。
取值范围是:[0, Integer.Max],默认值是:52428800 (5 MB)


max.partition.fetch.bytes
一次fetch请求,从一个partition中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。 broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker 的message.max.bytes和 topic 的max.message.bytes的配置。

max.poll.interval.ms


The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.


前面说过要求程序中不间断的调用poll()。如果长时间没有调用poll,且间隔超过这个值时,就会认为这个consumer失败了。

max.poll.records


The maximum number of records returned in a single call to poll().


Consumer每次调用poll()时取到的records的最大数。

 

转载处:https://blog.csdn.net/lixiang987654321/article/details/99437168


推荐阅读
  • 本文探讨了如何优化和正确配置Kafka Streams应用程序以确保准确的状态存储查询。通过调整配置参数和代码逻辑,可以有效解决数据不一致的问题。 ... [详细]
  • 本文详细介绍了Java中org.neo4j.helpers.collection.Iterators.single()方法的功能、使用场景及代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • Explore a common issue encountered when implementing an OAuth 1.0a API, specifically the inability to encode null objects and how to resolve it. ... [详细]
  • 优化ListView性能
    本文深入探讨了如何通过多种技术手段优化ListView的性能,包括视图复用、ViewHolder模式、分批加载数据、图片优化及内存管理等。这些方法能够显著提升应用的响应速度和用户体验。 ... [详细]
  • 本文详细介绍了 GWT 中 PopupPanel 类的 onKeyDownPreview 方法,提供了多个代码示例及应用场景,帮助开发者更好地理解和使用该方法。 ... [详细]
  • 本文详细介绍了Java中org.eclipse.ui.forms.widgets.ExpandableComposite类的addExpansionListener()方法,并提供了多个实际代码示例,帮助开发者更好地理解和使用该方法。这些示例来源于多个知名开源项目,具有很高的参考价值。 ... [详细]
  • 深入解析Spring Cloud Ribbon负载均衡机制
    本文详细介绍了Spring Cloud中的Ribbon组件如何实现服务调用的负载均衡。通过分析其工作原理、源码结构及配置方式,帮助读者理解Ribbon在分布式系统中的重要作用。 ... [详细]
  • 本文深入探讨了 Java 中的 Serializable 接口,解释了其实现机制、用途及注意事项,帮助开发者更好地理解和使用序列化功能。 ... [详细]
  • 本文详细介绍了Akka中的BackoffSupervisor机制,探讨其在处理持久化失败和Actor重启时的应用。通过具体示例,展示了如何配置和使用BackoffSupervisor以实现更细粒度的异常处理。 ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • 本文详细介绍了如何构建一个高效的UI管理系统,集中处理UI页面的打开、关闭、层级管理和页面跳转等问题。通过UIManager统一管理外部切换逻辑,实现功能逻辑分散化和代码复用,支持多人协作开发。 ... [详细]
  • 本文探讨了如何在给定整数N的情况下,找到两个不同的整数a和b,使得它们的和最大,并且满足特定的数学条件。 ... [详细]
  • 本题探讨如何通过最大流算法解决农场排水系统的设计问题。题目要求计算从水源点到汇合点的最大水流速率,使用经典的EK(Edmonds-Karp)和Dinic算法进行求解。 ... [详细]
  • 毕业设计:基于机器学习与深度学习的垃圾邮件(短信)分类算法实现
    本文详细介绍了如何使用机器学习和深度学习技术对垃圾邮件和短信进行分类。内容涵盖从数据集介绍、预处理、特征提取到模型训练与评估的完整流程,并提供了具体的代码示例和实验结果。 ... [详细]
  • 全面解析运维监控:白盒与黑盒监控及四大黄金指标
    本文深入探讨了白盒和黑盒监控的概念,以及它们在系统监控中的应用。通过详细分析基础监控和业务监控的不同采集方法,结合四个黄金指标的解读,帮助读者更好地理解和实施有效的监控策略。 ... [详细]
author-avatar
手机用户2702934324
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有