Consumer Configuration在kafka 0.9+使用Java Consumer替代了老版本的scala Consumer。新版的配置如下:
bootstrap.servers
在启动consumer时配置的broker地址的。不需要将cluster中所有的broker都配置上,因为启动后会自动的发现cluster所有的broker。
它配置的格式是:host1:port1;host2:port2…
key.descrializer
Message record 的key的反序列化类。
一般都是StringDeserializer, org.apache.kafka.common.serialization.StringDeserializer
value.descrializer
Message record 的value的反序列化类。
一般都是StringDeserializer。org.apache.kafka.common.serialization.StringDeserializer
group.id
用于表示该consumer想要加入到哪个group中。默认值是 “”。
heartbeat.interval.ms
心跳间隔。心跳是在consumer与coordinator之间进行的。心跳是确定consumer存活,加入或者退出group的有效手段。这个值必须设置的小于session.timeout.ms,因为:
当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。
通常设置的值要低于session.timeout.ms的1/3。
默认值是:3000 (3s)
session.timeout.ms
Consumer session 过期时间。这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。
其默认值是:10000 (10 s)
enable.auto.commit
Consumer 在commit offset时有两种模式:自动提交,手动提交。手动提交在前面已经说过。自动提交:是Kafka Consumer会在后台周期性的去commit。
默认值是true。
auto.commit.interval.ms
自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)
auto.offset.reset
这个配置项,是告诉Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(如果一个record被删除,就肯定不存在了)时,该如何处理。它有4种处理方式:
1) earliest:自动重置到最早的offset。
2) latest:看上去重置到最晚的offset。
3) none:如果边更早的offset也没有的话,就抛出异常给consumer,告诉consumer在整个consumer group中都没有发现有这样的offset。
4) 如果不是上述3种,只抛出异常给consumer。
默认值是latest。
connections.max.idle.ms
连接空闲超时时间。因为consumer只与broker有连接(coordinator也是一个broker),所以这个配置的是consumer到broker之间的。
默认值是:540000 (9 min)
fetch.max.wait.ms
Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。
fetch.min.bytes
当consumer向一个broker发起fetch请求时,broker返回的records的大小最小值。如果broker中数据量不够的话会wait,直到数据大小满足这个条件。
取值范围是:[0, Integer.Max],默认值是1。
默认值设置为1的目的是:使得consumer的请求能够尽快的返回。
fetch.max.bytes
一次fetch请求,从一个broker中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。
broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes 和 topic的max.message.bytes的配置。
取值范围是:[0, Integer.Max],默认值是:52428800 (5 MB)
max.partition.fetch.bytes
一次fetch请求,从一个partition中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。
broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes 和 topic的max.message.bytes的配置。
max.poll.interval.ms
前面说过要求程序中不间断的调用poll()。如果长时间没有调用poll,且间隔超过这个值时,就会认为这个consumer失败了。
max.poll.records
Consumer每次调用poll()时取到的records的最大数。
receive.buffer.byte
Consumer receiver buffer (SO_RCVBUF)的大小。这个值在创建Socket连接时会用到。
取值范围是:[-1, Integer.MAX]。默认值是:65536 (64 KB)
如果值设置为-1,则会使用操作系统默认的值。
request.timeout.ms
请求发起后,并不一定会很快接收到响应信息。这个配置就是来配置请求超时时间的。默认值是:305000 (305 s)
client.id
Consumer进程的标识。如果设置一个人为可读的值,跟踪问题会比较方便。
interceptor.classes
用户自定义interceptor。
metadata.max.age.ms
Metadata数据的刷新间隔。即便没有任何的partition订阅关系变更也行执行。
范围是:[0, Integer.MAX],默认值是:300000 (5 min)
bootstrap.servers
参考生产者中的bootstrap.servers配置说明
key.descrializer、value.descrializer
Message record 的key, value的反序列化类。
group.id
A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.
用于表示该consumer想要加入到哪个group中。默认值是 “”。
heartbeat.interval.ms
The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.
心跳间隔。心跳是在consumer与coordinator之间进行的。心跳用来保持consumer的会话,并且在有consumer加入或者离开group时帮助进行rebalance。
这个值必须设置的小于session.timeout.ms,因为:当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。
通常设置的值要低于session.timeout.ms的1/3。默认值是:3000 (3s)
session.timeout.ms
The timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.
Consumer session 过期时间。consumer会发送周期性的心跳表明该consumer是活着的。如果超过session.timeout.ms设定的值仍然没有收到心跳,zebroker会把这个consumer从group中移除,并且重新rebalance。
这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。
其默认值是:10000 (10 s)
enable.auto.commit
If true the consumer’s offset will be periodically committed in the background.
设置Consumer 在 commit 方式是否是自动调焦。默认值是true。
auto.commit.interval.ms
The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true.
自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)
auto.offset.reset
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
这个配置项,是告诉Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(如果一个record被删除,就肯定不存在了)时,该如何处理。它有4种处理方式:
fetch.max.wait.ms
The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by fetch.min.bytes.
Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。
fetch.min.bytes
The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.
当consumer向一个broker发起fetch请求时,broker返回的records的大小最小值。如果broker中数据量不够的话会wait,直到数据大小满足这个条件。
取值范围是:[0, Integer.Max],默认值是1。默认值设置为1的目的是:使得consumer的请求能够尽快的返回。
fetch.max.bytes
The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel.
一次fetch请求,从一个broker中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes和topic的max.message.bytes的配置。
取值范围是:[0, Integer.Max],默认值是:52428800 (5 MB)
max.partition.fetch.bytes
一次fetch请求,从一个partition中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。 broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker 的message.max.bytes和 topic 的max.message.bytes的配置。
max.poll.interval.ms
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.
前面说过要求程序中不间断的调用poll()。如果长时间没有调用poll,且间隔超过这个值时,就会认为这个consumer失败了。
max.poll.records
The maximum number of records returned in a single call to poll().
Consumer每次调用poll()时取到的records的最大数。
转载处:https://blog.csdn.net/lixiang987654321/article/details/99437168