热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

kafka介绍和使用

1.消息系统简介1.1为什么要用消息系统?解耦各位系统之间通过消息系统这个统一的接口

1.消息系统简介

1.1为什么要用消息系统 ?

解耦 各位系统之间通过消息系统这个统一的接口交换数据,无须了解彼此的存在;
冗余 部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险;

灵活性和消除峰值 在访问量剧增的情况下,应用仍然需要继续发挥作用,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃;(节省资源)
可恢复性 系统中部分组件失效并不会影响整个系统,它恢复后仍然可从消息系统中获取并处理数据;

顺序保障 在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性;
异步通信 在不需要立即处理请求的场景下,可以将请求放入消息系统,合适的时候再处理。

 

1.2.有哪些消息系统 ?

RabbitMQ Erlang编写,支持多协议 AMQP,XMPP,SMTP,STOMP。支持负载均衡、数据持久化。同时支持Peer-to-Peer和发布/订阅模式;
Redis 基于Key-Value对的NoSQL数据库,同时支持MQ功能,可做轻量级队列服务使用。就入队操作而言, Redis对短消息(小于10KB)的性能比RabbitMQ好,长消息的性能比RabbitMQ差;
ZeroMQ 轻量级,不需要单独的消息服务器或中间件,应用程序本身扮演该角色,Peer-to-Peer。它实质上是 一个库,需要开发人员自己组合多种技术,使用复杂度高;
ActiveMQ JMS实现,Peer-to-Peer,支持持久化、XA事务;

MetaQ/RocketMQ 纯Java实现,发布/订阅消息系统,支持本地事务和XA分布式事务;
Kafka 高性能跨语言的分布式发布/订阅消息系统,数据持久化,全分布式,同时支持实时在线处理和离线数据处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

1.3.Kafka设计目标是什么?

高吞吐率 在廉价的商用机器上单机可支持每秒100万条消息的读写;
消息持久化 所有消息均被持久化到磁盘,无消息丢失,支持消息重放;
完全分布式 Producer,Broker,Consumer均支持水平扩展,同时适应在线流处理和离线批处理。

2.kafka简介和架构

2.1.核心概念

       Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker;

       Message:消息是Kafka中最基本的数据单元,主要有key和value构成;真正有效的是消息是value数据,key只是作为消息路由分区使用;

Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数 据而不必关心数据存于何处),强调的是kafka不保证topic消息有序;

Partition:Parition是物理上的概念,每个Topic包含一个或多个Partition;kafka只保证一个partiton是有序的;通过配置来设置partition中的文件大小和文件保留策略;
Producer:负责发布消息到Kafka broker;
Consumer:消息消费者,向Kafka broker读取消息的客户端;

Consumer Group:官方称为逻辑上的订阅者,每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group),消息的单播和多播都是基于消费组来实现的,消费组中的消费者不是越多越好,消费者数量超过分区数量时,回导致消费者分配不到资源,造成资源浪费;
Offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息。

topic的配置可参考:
http://kafka.apache.org/documentation.html#topic-config

2.2.kafka架构

       

         如上图所示,一个典型的Kafka集群中包含若干Producer,若干broker(broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,在消费组发生变化时进行rebalance(新版本不依赖)。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

3.kafka的客户端设计

3.1.生产者设计

3.1.1.producer的使用

在Kafka老版本中,同步和异步都是分开成不同的方法来实现的,最新的都是由KafkaProducer来实现,通过掉future的get阻塞线程来实现同步。实际上两者底层实现相同,都是通过一步实现的。

3.1.2.producer发送消息的过程(0.10.2.1)

        主要是两个线程的操作:

        主线程封装消息成ProducerRecord对象,并调用append方法将消息追加RecordAccumulator中暂时存储;
        Sender线程负责将消息构造成请求,并从RecordAccumulator取出消息消息并批量发送。

1 ProducerIntercptor对消息进行拦截;
2 Serialzer对key和value进行序列化;
3 Partitioner对消息选择合适的分区;
4 RecordAccumulator收集消息,实现批量发送;
5 Sender从RecordAccumulator获取消息;
6 构造ClientRequest;
7 将ClientRequest交给Network,准备发送;
8 Network将请求放入KafkaChannel的缓存;
9 发送请求;
10 收到响应,调用ClientRequest;
11 调用RecordBatch的回调函数,最终调用到每一个消息上注册的回调函数。

3.1.3.Product方法详解

  主线程的send方法:

1、首先调用waitOnMetadata()方法确保该主题topic对应的元数据metadata是可用的;
2、计算剩余等待时间remainingWaitMs;
3、根据record中topic、key,利用valueSerializer得到序列化key:serializedKey;
4、根据record中topic、value,利用valueSerializer得到序列化value:serializedValue;
5、调用partition()方法获得分区号partition;
6、计算序列化后的key、value及其offset、size所占大小serializedSize;
7、调用ensureValidRecordSize()方法确保记录大小serializedSize是有效的;
8、根据record中的topic和partition构造TopicPartition实例tp;
9、调用accumulator的append()方法添加记录,获得记录添加结果RecordAppendResult类型的result;
10、根据结果result的batchIsFull或newBatchCreated确定是否执行sender的wakeup();
11、返回result中的future。

3.1.4.Replication设计

当某个Topic的replication-factor为N且N大于1时,每个Partition都会有N个副本(Replication);
Replica的个数小于等于Broker数,即对每个Partition而言每个Broker上只会有一个Replica,因此 可用Broker ID表示Replication;
所有Partition的所有Replication默认情况会均匀分布到所有Broker上。

要解决的问题:

1:如何Propagate消息?

Producer在发布消息到某个Partition时,先通过Zookeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。

2:何时Commit?

        一条消息只有被ISR里的所有Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而造成数据丢失(Consumer无法消费这些数据)。而对于Producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks来设置。这种机制确保了只要ISR有一个或以上的Follower,一条被commit的消息就不会丢失。 

3:如何处理Replica恢复?

Kafka producer的ack有3中机制,初始化producer时的producerconfig可以通过配置request.required.acks不同的值来实现。
0:这意味着生产者producer不等待来自broker同步完成的确认继续发送下一条(批)消息。此选项提供最低的延迟但最弱的耐久性保证(当服务器发生故障时某些数据会丢失,如leader已死,但producer并不知情,发出去的信息broker就收不到)。1:这意味着producer在leader已成功收到的数据并得到确认后发送下一条message。此选项提供了更好的耐久性为客户等待服务器确认请求成功(被写入死亡leader但尚未复制将失去了唯一的消息)。-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。 此选项提供最好的耐久性,我们保证没有信息将丢失,只要至少一个同步副本保持存活。

       三种机制,性能依次递减 (producer吞吐量降低),数据健壮性则依次递增。

4:如何处理Replica全部宕机

机器恢复,lead选举。(目前都是动态配置);

1.等待ISR中的任一个Replica“活”过来,并且选它作为Leader 2.选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader。

3.2.Consumer设计

3.2.1.创建一个消费者

消费者都是线程不安全的,如果发现多线程调用,直接抛异常。

consumer 采用 pull 模式从 broker 中读取数据。

push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

consumer group

        CG是kafka提供的可扩展且具有容错性的消费者机制。组内可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。kafka 的分配单位是 patition。每个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所消费(也就保障了一个消息只能被 group 内的一个 consuemr 所消费),但是多个 group 可以同时消费这个 partition。consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)

3.2.2.消费获取

        传递保证语义有三个级别:At most once: 最多一次,消息可能会丢失,但不会重复传递,At least once: 至少一次,消息绝不会丢,但是可能会重复传递,Exactly once: 每一条消息只会被传递一次。

        Kafka服务器端并不会记录消费者的消费位置,而是由消费者自己决定如何保存其消费的offset. 0.8.2版本之前消费者会将其消费位置记录zookeeper中,在后面的新版本中,消费者为了缓解zookeeper集群的压力,在Kafka服务器端添加了一个名字是__consusmer_offsets的内部topic,简称为offset topic,他可以用来保存消费者提交的offset,当出现消费者上线或者下线时会触发消费者组的rebalance操作,对partitions重新进行分配,等待rebalance完成之后,消费者就可以读取offset topic中的记录的offset,并从此offset开始继续消费。你也可以根据业务需求将offset存储在别的存储介质中,比如数据库等

3.2.3.rebalance

触发rebalance的时机

# 有新的消费者加入;
# 有消费者宕机或者下线;
# 消费者主动退出消费者组;
# 消费者组订阅的topic出现分区数量变化;
# 消费者调用unsubscrible取消对某topic的订阅。

1. 将目标 topic 下的所有 partirtion 排序,存于PT;
2. 对某 consumer group 下所有 consumer 排序,存于 CG,第 i 个consumer 记为 Ci;
3. N=size(PT)/size(CG),向上取整;
4. 解除 Ci 对原来分配的 partition 的消费权(i从0开始);
5. 将第i*N到(i+1)*N-1个 partition 分配给 Ci。

4.kafka高性能之道

4.1.高效使用磁盘 

顺序写磁盘,顺序写磁盘性能高于随机写内存;

追加写:数据不更新,不做数据级别的删除,文件级别的删除;

支持多目录(多磁盘)。

4.2.零拷贝

这里的零拷贝值得是cpu级别的拷贝,使用nio的调用操作系统的sendfile实现零拷贝,同时减少两次上下文切换和1次系统调用。

传统意义上的拷贝:

NIO拷贝:

4.3.批处理和压缩

        kafka的生产者和消费者均支持批量处理数据,指定缓存的消息达到某个量的时候就发出去,或者缓存了固定的时间后就发送出去,如100条消息就发送,或者每5秒发送一次

这种策略将大大减少服务端的I/O次数;
       生产者支持将数据压缩后发送给broker,从而减少网络传输代价,目前支持:GZIP或Snappy格式。

4.4.Partition

通过partition实现了并行处理和水平扩展,partition也是kafka并行处理的最小单位;

partitiom可以处在不同的机器上,充分利用多机资源;

同一节点上的partitiom可以位于多个目录下,如果节点下有多个磁盘,可以充分利用多磁盘优势。

4.5.ISR 

ISR实现了可用性和一致性的动态平衡;

ISR容忍了更多节点的失败;

可配置化replica crash处理策略。


推荐阅读
  • 本文详细介绍了如何安全地手动卸载Exchange Server 2003,以确保系统的稳定性和数据的完整性。根据微软官方支持文档(https://support.microsoft.com/kb833396/zh-cn),在进行卸载操作前,需要特别注意备份重要数据,并遵循一系列严格的步骤,以避免对现有网络环境造成不利影响。此外,文章还提供了详细的故障排除指南,帮助管理员在遇到问题时能够迅速解决,确保整个卸载过程顺利进行。 ... [详细]
  • 掌握PHP框架开发与应用的核心知识点:构建高效PHP框架所需的技术与能力综述
    掌握PHP框架开发与应用的核心知识点对于构建高效PHP框架至关重要。本文综述了开发PHP框架所需的关键技术和能力,包括但不限于对PHP语言的深入理解、设计模式的应用、数据库操作、安全性措施以及性能优化等方面。对于初学者而言,熟悉主流框架如Laravel、Symfony等的实际应用场景,有助于更好地理解和掌握自定义框架开发的精髓。 ... [详细]
  • 一文了解消息中间件RabbitMQ
    消息中间件---RabbitMQ1消息中间件的作用2.常用的消息中间件3消息中间件RabbitMQ3.1RabbitMQ介绍3.3RabbitMQ的队列模式3.3RabbitMQ的 ... [详细]
  • RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP,SMTP,STOMP,也 ... [详细]
  • 在CentOS 7环境中安装配置Redis及使用Redis Desktop Manager连接时的注意事项与技巧
    在 CentOS 7 环境中安装和配置 Redis 时,需要注意一些关键步骤和最佳实践。本文详细介绍了从安装 Redis 到配置其基本参数的全过程,并提供了使用 Redis Desktop Manager 连接 Redis 服务器的技巧和注意事项。此外,还探讨了如何优化性能和确保数据安全,帮助用户在生产环境中高效地管理和使用 Redis。 ... [详细]
  • 性能测试中的关键监控指标与深入分析
    在软件性能测试中,关键监控指标的选取至关重要。主要目的包括:1. 评估系统的当前性能,确保其符合预期的性能标准;2. 发现软件性能瓶颈,定位潜在问题;3. 优化系统性能,提高用户体验。通过综合分析这些指标,可以全面了解系统的运行状态,为后续的性能改进提供科学依据。 ... [详细]
  • 2012年9月12日优酷土豆校园招聘笔试题目解析与备考指南
    2012年9月12日,优酷土豆校园招聘笔试题目解析与备考指南。在选择题部分,有一道题目涉及中国人的血型分布情况,具体为A型30%、B型20%、O型40%、AB型10%。若需确保在随机选取的样本中,至少有一人为B型血的概率不低于90%,则需要选取的最少人数是多少?该问题不仅考察了概率统计的基本知识,还要求考生具备一定的逻辑推理能力。 ... [详细]
  • 分布式开源任务调度框架 TBSchedule 深度解析与应用实践
    本文深入解析了分布式开源任务调度框架 TBSchedule 的核心原理与应用场景,并通过实际案例详细介绍了其部署与使用方法。首先,从源码下载开始,详细阐述了 TBSchedule 的安装步骤和配置要点。接着,探讨了该框架在大规模分布式环境中的性能优化策略,以及如何通过灵活的任务调度机制提升系统效率。最后,结合具体实例,展示了 TBSchedule 在实际项目中的应用效果,为开发者提供了宝贵的实践经验。 ... [详细]
  • 本文将深入探讨MySQL与MongoDB在游戏账户服务中的应用特点及优劣。通过对比这两种数据库的性能、扩展性和数据一致性,结合实际案例,帮助开发者更好地选择适合游戏账户服务的数据库方案。同时,文章还将介绍如何利用Erlang语言进行高效的游戏服务器开发,提升系统的稳定性和并发处理能力。 ... [详细]
  • Panabit应用层流量管理解决方案
    Panabit是一款国内领先的应用层流量管理解决方案,提供高度开放且免费的专业服务,尤其擅长P2P应用的精准识别与高效控制。截至2009年3月25日,该系统已实现对多种网络应用的全面支持,有效提升了网络资源的利用效率和安全性。 ... [详细]
  • nsitionalENhttp:www.w3.orgTRxhtml1DTDxhtml1-transitional.dtd ... [详细]
  • Alibaba珍藏版mybatis手写文档,值得一读!
    一面问题:MySQLRedisKafka线程算法mysql知道哪些存储引擎,它们的区别mysql索引在什么情况下会失效mysql在项目中的优化场景&# ... [详细]
  • Hudi是一种数据湖的存储格式,在Hadoop文件系统之上提供了更新数据和删除数据的能力以及流式消费变化数据的能力。应用场景近实时数据摄取Hudi支持插入、更新和删除数据的能力。您 ... [详细]
  • Java自学知乎!阿里高级算法专家公开10份资料,涨姿势!
    接口概述:接口是Java语言中的一种引用类型,是方法的集合,所以接口的内部主要就是定义方法,包含常量,抽象方法(JDK ... [详细]
  • UDP协议开发
    UDP是用户数据报协议(UserDatagramProtocol,UDP)的简称,其主要作用是将网络数据流量压缩成数据报形式,提供面向事务的简单信息传送服务。与TCP协议不同,UD ... [详细]
author-avatar
埼埼popo_514
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有