Kafka是一个基于zookeeper协调的开源分布式消息系统,它最大的特性是可以实时的处理大量数据来满足各种需求场景:如基于hadoop的批处理系统、storm/spark流式处理等。它是由Linkedin开发,Scala语言编写的。
1 概述
1.1 Kafka的特性:
- 高吞吐量、低延迟:即使在廉价的机器上,Kafka也能支持单机每秒处理十万条消息的传输,延迟最低只有几毫秒
- 可扩展性:Kafka是分布式系统,支持在线水平扩展,新增机器,集群无需停机自动感知
- 持久性、可靠性:使用时间复杂度O(1)的磁盘存储结构,将消息持久化到本地磁盘,即使TB级以上数据也能保证常数时间的访问速度,且支持数据备份防止数据丢失
- 容错性:支持多分区(partition)和多副本(replication),允许集群中节点失败(若节点数为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
- 支持离线/实时数据处理
1.2 应用场景
- 解耦:消息系统在应用处理过程中插入一个隐含的、基于数据的接口层,两边的处理过程都实现这一接口,这样只要确保遵守同样的接口约束就可以独立的扩展或修改两边的处理过程了。
- 峰值处理:遇到访问量剧增情况,应用仍能继续发挥作用。使用消息队列能够使关键组件顶住突发的访问压力,不会因超负荷请求而崩溃。
- 异步通信:消息队列Kafka提供了异步处理机制,允许用户把一个消息放入队列,但不立即处理。
2 Kafka原理
Kafka分布式消息系统由Broker、Producer、Consumer、Zookeeper几大构件组成,下面对各构件以及Kafka工作机制、数据存储进行总结。
2.1 基本概念
1)Broker:服务器,一台服务器可以看作是一个Broker,多个Broker就构成了Kafka集群
2)Topic:Kafka的消息通过主题(Topic)进行分类,Topic相当于数据库的表或者文件系统里的文件夹
- Producer:消息生产者,负责向Kafka Broker发布消息的客户端,并将同一类别消息数据写入同一个Topic
- Counsumer:消息消费者,从Kafka Broker读取消息的客户端,从同一个Topic中读取同类别消息数据
- 物理上不同Topic的消息分开存储,Topic是逻辑概念,逻辑上一个Topic的消息可以保存于一个或多个Broker上,但用户只需指定消息的Topic就可以生产或者消费数据,而不必关心数据存于何处
3)Partition(分区)
- 分区是一个有序的、不可修改的消息队列,分区内消息有序存储
- 一个Topic可以分为多个分区,相当于把一个数据集分成多份,分别存储于不同的分区中
- Partition是物理上的概念,每个分区对应一个文件夹,其中存储分区的数据和索引文件
4)Segment:Partition物理上由多个Segment组成,每个Segment内保存message信息
5)Replication(副本):一个Partition可以设置多个副本,副本存储在不同的Broker中
注:kafka中,replication策略是基于partition,而不是topic;kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定。leader处理所有的read-write请求,follower需要和leader保持同步.Follower就像一个"consumer",消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具有良好的网络环境.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可。
选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader。
6)Consumer Group(CG,消费者组)
- 每个Consumer属于一个特定的CG
- 一条消息可以发送给多个不同的CG,但一个CG中只有一个Consumer消费该条消息
7)Zookeeper
- Kafka将元数据保存在Zookeeper中,但发送给Topic本身的数据不会发送到Zookeeper上
- Zookeeper负责Kafka集群管理以及协调代理,包括配置管理、动态集群扩展、Broker负载均衡、Leader选举和CG变化时的Rebalance等
2.2 工作机制
典型的Kafka集群中包含若干Producer、若干Broker(Kafka支持水平扩展,一般Broker数量越多,集群吞吐率越高)、若干Consumer Group和一个Zookeeper集群。Kafka发送端采用push模式将消息发送到Broker,Kafka消费端采用pull模式订阅并消费消息。
每个Partition可以在其他的Kafka Broker节点上存副本,以便某个Kafka Broker节点宕机不会影响Kafka集群,存副本的方式是按照Kafka Broker的顺序存,如上图所示,有4个Kafka Broker节点,1个Topic有4个Partition,每个Partiton有2个副本,那么Partition-0存在Broker1和Broker2,Partiton-1存在Broker2和Broker3,Partition-2存在Broker3和Broker4,Partition-3存在Broker4和Broker1。
当集群中新增2节点,Partition增加到6个时分布情况如下
注:副本数目不能大于Kafka Broker节点的数目,否则会报错。这里的副本数其实时Partition的副本总数,其中包括一个leader,其他都是follwer。因此某个Broker宕机,其实整个Kafka内数据依然是完整的,但副本数越多,系统月稳定,同时也会带来资源和性能上的下降;副本少也会造成系统数据丢失的风险。
Partition是一个先进先出队列,写入数据时采用在队列尾部追加的方式,消费消息时采用在队列头部顺序读取的方式。一个Topic可分为多个Partition,仅保证同一分区内消息有序存储,不保证整个Topic(多个分区)有序。
注:每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐量的一个重要保证)。
为加快消费速度,多个Consumer可以划分为一个消费组(CG),并行消费同一个Topic。
一个Topic可以被多个CG订阅,CG之间是平等的,即一个消息可以同时被多个CG消费
一个CG中可以有多个Consumer,CG中的Consumer之间是竞争关系,即一个消息在一个CG中只能被一个Consumer消费。
2.3 Partition中数据存储方式
前面提到Partition物理上由多个Segment组成,每个Segment内保存message信息,这里对Segment做下补充。把每个Partition看作一个目录,目录中包含了多个Segment文件。也就是说Partition本来是一个大文件被平均分配到多个大小相等的Segment数据文件中。但每个Segment file消息数据不一定相等,这种特性便于old segment file快速删除。每个Partition只需要支持顺序读写,Segment文件生命周期由服务端配置参数决定。这样做好处是可以快速删除无用文件,有效提高磁盘利用率。
Segment
- Segment是Kafka的最小数据存储单元,一个Partition包含多个Sement文件
- Segment文件由以Message在Partition中的起始偏移量命令的数据文件(.log)和索引文件(.index、*.timeindex)组成
解析: Producer发Message到一个Topic,Message会被均匀的分布到多个Partition(随机或根据用户指定的回调函数进行分布),Kafka Broker收到Message往对应Partition的最后一个Segment上添加该消息,当某个Segment上的消息条数达到配置值或消息发布时间超过阈值时,Segment上的消息会被flush到磁盘,只有flush到磁盘上的消息Consumer才能消费,Segment达到一定的大小后将不会再往该Segment写数据,Broker会创建新的Segment。每个Partition在内存中对应一个index,记录每个segment中的第一条消息偏移。
Segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为Segment索引文件、数据文件
Segment文件命名规则:Partion全局的第一个Segment从0开始,后续每个Segment文件名为上一个全局Partion的最大Offset(偏移Message数)。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
Offset
- Offset用于定位分区中消息的顺序编号,在分区中唯一标识消息
- Zookeeper维护Offset
Kafka索引
为了提高消息写入和查询速度,为每个Partition创建索引,索引文件存储在Partition文件夹下
偏移量索引
- 文件以offset偏移量为名称,以index为后缀
- 索引内容格式:offset,position
- 采用稀疏存储方式
时间戳索引 - 文件以timeindex为后缀
- 索引内容格式:timestamp,offset
- 采用稀疏存储方式
偏移量索引图:
时间戳索引图:
2.4 Kafka高可用
多分区多副本
- Kafka早期版本没有Replication概念,一旦某个Brocker宕机,其上的分区数据就可能丢失
- 一个Topic可以有多个分区,每个分区可以有多个副本,副本存储在不同的Broker中
- 从一个分区的多个副本中选举一个Partition Leader,由Leader负责读写,其他副本作为Follower从Leader同步消息
Kafka Controller Leader选举
- 每个Broker启动时都会创建一个Kafka Controller进程
- 通过Zookeeper,从Kafka集群中选举出一个Broker作为Kafka Controller Leader
- Kafka Controller Leader负责管理Kafka集群的分区和副本状态,避免分区副本直接在Zookeeper上注册Watcher和竞争创建临时Znode,导致Zookeeper集群负载过重
Kafka Partition Leader选举
- Kafka Controller Leader负责Partition Leader的选举
- ISR列表(In Sync Replica)
- ISR是Zookeeper中的候选副本同步列表,负责保存候选副本(Partition Follower)的状态信息
- Partition Leader负责跟踪和维护ISR
- Partition Follower定期从Leader同步数据 ,若Follower心跳超时或消息落后太多,将被移除出ISR
- Partition Leader挂掉后,Kafka Controller Leader从ISR中选择一个Follower作为新的Leader
参考:Kafka史上最详细总结 Kafka背景及原理介绍