解耦 各位系统之间通过消息系统这个统一的接口交换数据,无须了解彼此的存在;
冗余 部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险;
灵活性和消除峰值 在访问量剧增的情况下,应用仍然需要继续发挥作用,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃;(节省资源)
可恢复性 系统中部分组件失效并不会影响整个系统,它恢复后仍然可从消息系统中获取并处理数据;
顺序保障 在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性;
异步通信 在不需要立即处理请求的场景下,可以将请求放入消息系统,合适的时候再处理。
RabbitMQ Erlang编写,支持多协议 AMQP,XMPP,SMTP,STOMP。支持负载均衡、数据持久化。同时支持Peer-to-Peer和发布/订阅模式;
Redis 基于Key-Value对的NoSQL数据库,同时支持MQ功能,可做轻量级队列服务使用。就入队操作而言, Redis对短消息(小于10KB)的性能比RabbitMQ好,长消息的性能比RabbitMQ差;
ZeroMQ 轻量级,不需要单独的消息服务器或中间件,应用程序本身扮演该角色,Peer-to-Peer。它实质上是 一个库,需要开发人员自己组合多种技术,使用复杂度高;
ActiveMQ JMS实现,Peer-to-Peer,支持持久化、XA事务;
MetaQ/RocketMQ 纯Java实现,发布/订阅消息系统,支持本地事务和XA分布式事务;
Kafka 高性能跨语言的分布式发布/订阅消息系统,数据持久化,全分布式,同时支持实时在线处理和离线数据处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。
高吞吐率 在廉价的商用机器上单机可支持每秒100万条消息的读写;
消息持久化 所有消息均被持久化到磁盘,无消息丢失,支持消息重放;
完全分布式 Producer,Broker,Consumer均支持水平扩展,同时适应在线流处理和离线批处理。
Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker;
Message:消息是Kafka中最基本的数据单元,主要有key和value构成;真正有效的是消息是value数据,key只是作为消息路由分区使用;
Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数 据而不必关心数据存于何处),强调的是kafka不保证topic消息有序;
Partition:Parition是物理上的概念,每个Topic包含一个或多个Partition;kafka只保证一个partiton是有序的;通过配置来设置partition中的文件大小和文件保留策略;
Producer:负责发布消息到Kafka broker;
Consumer:消息消费者,向Kafka broker读取消息的客户端;
Consumer Group:官方称为逻辑上的订阅者,每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group),消息的单播和多播都是基于消费组来实现的,消费组中的消费者不是越多越好,消费者数量超过分区数量时,回导致消费者分配不到资源,造成资源浪费;
Offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息。
topic的配置可参考:
http://kafka.apache.org/documentation.html#topic-config
如上图所示,一个典型的Kafka集群中包含若干Producer,若干broker(broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,在消费组发生变化时进行rebalance(新版本不依赖)。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
在Kafka老版本中,同步和异步都是分开成不同的方法来实现的,最新的都是由KafkaProducer来实现,通过掉future的get阻塞线程来实现同步。实际上两者底层实现相同,都是通过一步实现的。
主要是两个线程的操作:
主线程封装消息成ProducerRecord对象,并调用append方法将消息追加RecordAccumulator中暂时存储;
Sender线程负责将消息构造成请求,并从RecordAccumulator取出消息消息并批量发送。
1 ProducerIntercptor对消息进行拦截;
2 Serialzer对key和value进行序列化;
3 Partitioner对消息选择合适的分区;
4 RecordAccumulator收集消息,实现批量发送;
5 Sender从RecordAccumulator获取消息;
6 构造ClientRequest;
7 将ClientRequest交给Network,准备发送;
8 Network将请求放入KafkaChannel的缓存;
9 发送请求;
10 收到响应,调用ClientRequest;
11 调用RecordBatch的回调函数,最终调用到每一个消息上注册的回调函数。
主线程的send方法:
1、首先调用waitOnMetadata()方法确保该主题topic对应的元数据metadata是可用的;
2、计算剩余等待时间remainingWaitMs;
3、根据record中topic、key,利用valueSerializer得到序列化key:serializedKey;
4、根据record中topic、value,利用valueSerializer得到序列化value:serializedValue;
5、调用partition()方法获得分区号partition;
6、计算序列化后的key、value及其offset、size所占大小serializedSize;
7、调用ensureValidRecordSize()方法确保记录大小serializedSize是有效的;
8、根据record中的topic和partition构造TopicPartition实例tp;
9、调用accumulator的append()方法添加记录,获得记录添加结果RecordAppendResult类型的result;
10、根据结果result的batchIsFull或newBatchCreated确定是否执行sender的wakeup();
11、返回result中的future。
当某个Topic的replication-factor为N且N大于1时,每个Partition都会有N个副本(Replication);
Replica的个数小于等于Broker数,即对每个Partition而言每个Broker上只会有一个Replica,因此 可用Broker ID表示Replication;
所有Partition的所有Replication默认情况会均匀分布到所有Broker上。
要解决的问题:
1:如何Propagate消息?
Producer在发布消息到某个Partition时,先通过Zookeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。
2:何时Commit?
一条消息只有被ISR里的所有Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而造成数据丢失(Consumer无法消费这些数据)。而对于Producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks来设置。这种机制确保了只要ISR有一个或以上的Follower,一条被commit的消息就不会丢失。
3:如何处理Replica恢复?
Kafka producer的ack有3中机制,初始化producer时的producerconfig可以通过配置request.required.acks不同的值来实现。
0:这意味着生产者producer不等待来自broker同步完成的确认继续发送下一条(批)消息。此选项提供最低的延迟但最弱的耐久性保证(当服务器发生故障时某些数据会丢失,如leader已死,但producer并不知情,发出去的信息broker就收不到)。1:这意味着producer在leader已成功收到的数据并得到确认后发送下一条message。此选项提供了更好的耐久性为客户等待服务器确认请求成功(被写入死亡leader但尚未复制将失去了唯一的消息)。-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。 此选项提供最好的耐久性,我们保证没有信息将丢失,只要至少一个同步副本保持存活。
三种机制,性能依次递减 (producer吞吐量降低),数据健壮性则依次递增。
4:如何处理Replica全部宕机
机器恢复,lead选举。(目前都是动态配置);
1.等待ISR中的任一个Replica“活”过来,并且选它作为Leader 2.选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader。
消费者都是线程不安全的,如果发现多线程调用,直接抛异常。
consumer 采用 pull 模式从 broker 中读取数据。
push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
对于 Kafka 而言,pull模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
consumer group
CG是kafka提供的可扩展且具有容错性的消费者机制。组内可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。kafka 的分配单位是 patition。每个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所消费(也就保障了一个消息只能被 group 内的一个 consuemr 所消费),但是多个 group 可以同时消费这个 partition。consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)
传递保证语义有三个级别:At most once: 最多一次,消息可能会丢失,但不会重复传递,At least once: 至少一次,消息绝不会丢,但是可能会重复传递,Exactly once: 每一条消息只会被传递一次。
Kafka服务器端并不会记录消费者的消费位置,而是由消费者自己决定如何保存其消费的offset. 0.8.2版本之前消费者会将其消费位置记录zookeeper中,在后面的新版本中,消费者为了缓解zookeeper集群的压力,在Kafka服务器端添加了一个名字是__consusmer_offsets的内部topic,简称为offset topic,他可以用来保存消费者提交的offset,当出现消费者上线或者下线时会触发消费者组的rebalance操作,对partitions重新进行分配,等待rebalance完成之后,消费者就可以读取offset topic中的记录的offset,并从此offset开始继续消费。你也可以根据业务需求将offset存储在别的存储介质中,比如数据库等
触发rebalance的时机
# 有新的消费者加入;
# 有消费者宕机或者下线;
# 消费者主动退出消费者组;
# 消费者组订阅的topic出现分区数量变化;
# 消费者调用unsubscrible取消对某topic的订阅。
1. 将目标 topic 下的所有 partirtion 排序,存于PT;
2. 对某 consumer group 下所有 consumer 排序,存于 CG,第 i 个consumer 记为 Ci;
3. N=size(PT)/size(CG),向上取整;
4. 解除 Ci 对原来分配的 partition 的消费权(i从0开始);
5. 将第i*N到(i+1)*N-1个 partition 分配给 Ci。
顺序写磁盘,顺序写磁盘性能高于随机写内存;
追加写:数据不更新,不做数据级别的删除,文件级别的删除;
支持多目录(多磁盘)。
这里的零拷贝值得是cpu级别的拷贝,使用nio的调用操作系统的sendfile实现零拷贝,同时减少两次上下文切换和1次系统调用。
传统意义上的拷贝:
kafka的生产者和消费者均支持批量处理数据,指定缓存的消息达到某个量的时候就发出去,或者缓存了固定的时间后就发送出去,如100条消息就发送,或者每5秒发送一次
通过partition实现了并行处理和水平扩展,partition也是kafka并行处理的最小单位;
partitiom可以处在不同的机器上,充分利用多机资源;
同一节点上的partitiom可以位于多个目录下,如果节点下有多个磁盘,可以充分利用多磁盘优势。
ISR实现了可用性和一致性的动态平衡;
ISR容忍了更多节点的失败;
可配置化replica crash处理策略。