zookpeeper生产环境可用版本 下载地址:
http://archive.apache.org/dist/zookeeper/zookeeper-3.4.8
kafka消费者在会保存其消费的进度,也就是offset,存储的位置根据选用的kafka api不同而不同。
kafka常见术语:
Group : 消费者组
Topic : topic的名字
Pid : partition的ID
Offset : kafka消费者在对应分区上已经消费的消息数【位置】
logSize : 已经写到该分区的消息数【位置】
Lag : 还有多少消息未读取(Lag = logSize - Offset)
Owner : 分区创建在哪个broker
offset更新的方式,不区分是用的哪种api,大致分为两类:
(1)自动提交,设置enable.auto.commit=true,更新的频率根据参数【auto.commit.interval.ms】来定。这种方式也被称为【at most once】,fetch到消息后就可以更新offset,无论是否消费成功(2)手动提交,设置enable.auto.commit=false,这种方式称为【at least once】。fetch到消息后,等消费完成再调用方法【consumer.commitSync()】,手动更新offset;如果消费失败,则offset也不会更新,此条消息会被重复消费一次。
Kafka监控需要关注的几个指标:
1、lag:多少消息没有消费
2、logsize:Kafka存的消息总数
3、offset:已经消费的消息
Kafka中的消息是否会丢失和重复消费
要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费
1、消息发送
Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置。Kafka通过配置request.required.acks属性来确认消息的生产:
0---表示不进行消息接收是否成功的确认;1---表示当Leader接收成功时确认;-1---表示Leader和Follower都接收成功时确认;
综上所述,有6种消息生产的情况,下面分情况来分析消息丢失的场景:
(1)acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失;(2)acks=1、同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失;
2、消息消费
Kafka消息消费有两个consumer接口,Low-level API和High-level API:
Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制;
High-level API:封装了对parition和offset的管理,使用简单;
如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了;
解决办法:
针对消息丢失:同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;
针对消息重复:将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。