作者:白日做梦家_ | 来源:互联网 | 2023-06-28 12:51
kafka概述消息队列点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客
kafka概述
消息队列
点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此
发布/订阅模式(一对多,数据生产后,推送给所有订阅者)
发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。
什么是kafka
Kafka特点
(1)高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consumer操作;
(2)可扩展性:kafka集群支持热扩展;
(3)持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
(4)容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);
(5)高并发:支持数千个客户端同时读写;
(6)支持实时在线处理和离线处理:可以使用Storm这种实时流处理系统对消息进行实时进行处理,同时还可以使用Hadoop这种批处理系统进行离线处理;
Kafka的使用场景
1)日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如Hadoop、Hbase、Solr等;
(2)消息系统:解耦和生产者和消费者、缓存消息等;
(3)用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘;
(4)运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;
(5)流式处理:比如spark streaming和storm;
(6)事件源;
Kafka架构
架构基本介绍
Kafka执行流程
Kafka 哪些信我们看上面的图,我们把 broker 的数量减少,叧有一台。现在假设我们按照上图进行部署:
(1)Server-1 broker 其实就是 kafka 的 server,因为 producer 和 consumer 都要去还它。 Broker 主要还是做存储用。
(2)Server-2 是 zookeeper 的 server 端,它维持了一张表,记录了各个节点的 IP、端口等信息。
(3)Server-3、 4、 5 他们的共同之处就是都配置了 zkClient,更明确的说,就是运行前必须配置 zookeeper的地址,道理也很简单,这之间的连接都是需要 zookeeper 来进行分发的。
(4)Server-1 和 Server-2 的关系,他们可以放在一台机器上,也可以分开放,zookeeper 也可以配集群。目的是防止某一台挂了。
简单说下整个系统运行的顺序:
(1)启动zookeeper 的 server
(2)启动kafka 的 server
(3)Producer 如果生产了数据,会先通过 zookeeper 找到 broker,然后将数据存放到 broker
(4)Consumer 如果要消费数据,会先通过 zookeeper 找对应的 broker,然后消费
哪些信息存储在zookeeper中
Zookeeper 作用及背景
Kafka 术语
Consumer Group( 消费组 ):在kafka中,所有的消费者都属于一个消费组
Topic(消息类型):Topic就是对消息进行逻辑上的分类
Partition(分区):消息的分区,kafka中对消息进行分区管理
Replica(副本):Partition的副本,主要是从服务的容灾上进行备份
Replica Leader(副本-主):Partition分区的leader
Replica Follower(副本-从):Partition分区的Follower
Segment(段文件):Topic消息的文件组成
Offset(偏移量):消息存储的偏移量,在kafka中生产者和消费者主要就是依靠Offset进行控制消息的发送和消费
Zookeeper 集群
Zookeeper集群 在kafka 中的作用
leader 选举 和 follower 信息同步
如上图所示,kafaka集群的 broker,和 Consumer 都需要连接 Zookeeper。
Producer 直接连接 Broker。
Producer 把数据上传到 Broker,Producer可以指定数据有几个分区、几个备份。上面的图中,数据有两个分区 0、1,每个分区都有自己的副本:0’、 1’。
黄色的分区为 leader,白色的为 follower。
leader 处理 partition 的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据。 如下图所示,红色的为 leader,绿色的为 follower,leader复制自己到其他 Broker 中:问题 数据的同步
如果leader发生故障或挂掉,一个新leader被选举并接收客户端的消息。Kafka确保从同步副本列表中选举一个副本为 leader。
关于follower 的同步机制可参考:https://blog.csdn.net/lizhitao/article/details/51718185
Topic 分区被放在不同的 Broker 中,保证 Producer 和 Consumer 错开访问 Broker,避免访问单个 Broker造成过度的IO压力,使得负载均衡
Broker注册
Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来,此时就使用到了Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:
/brokers/ids
每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0…N]。
Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完节点后,每个Broker就会将自己的IP地址和端口信息记录到该节点中去。其中,Broker创建的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除
Topic注册
在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记录,如:
/borkers/topics
Kafka中每个Topic都会以/brokers/topics/[topic]的形式被记录,如/brokers/topics/login和/brokers/topics/search等。Broker服务器启动后,会到对应Topic节点(/brokers/topics)上注册自己的Broker ID并写入针对该Topic的分区总数,如/brokers/topics/login/3->2,这个节点表示Broker ID为3的一个Broker服务器,对于"login"这个Topic的消息,提供了2个分区进行消息存储,同样,这个分区节点也是临时节点。
3、生产者负载均衡
由于同一个Topic消息会被分区并将其分布在多个Broker上,因此,生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。
(1) 四层负载均衡,根据生产者的IP地址和端口来为其确定一个相关联的Broker。通常,一个生产者只会对应单个Broker,然后该生产者产生的消息都发往该Broker。这种方式逻辑简单,每个生产者不需要同其他系统建立额外的TCP连接,只需要和Broker维护单个TCP连接即可。但是,其无法做到真正的负载均衡,因为实际系统中的每个生产者产生的消息量及每个Broker的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的Broker接收到的消息总数差异巨大,同时,生产者也无法实时感知到Broker的新增和删除。
(2) 使用Zookeeper进行负载均衡,由于每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡机制。
4、消费者负载均衡
与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。
5、Consumer Group与 消费者 的关系
消费组 (Consumer Group):
consumer group 下有多个 Consumer(消费者)。
对于每个消费者组 (Consumer Group),Kafka都会为其分配一个全局唯一的Group ID,Group 内部的所有消费者共享该 ID。订阅的topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)。
同时,Kafka为每个消费者分配一个Consumer ID,通常采用"Hostname:UUID"形式表示。
在Kafka中,规定了每个消息分区 只能被同组的一个消费者进行消费,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id]就是一个 消息分区 的标识,节点内容就是该 消息分区 上 消费者的Consumer ID。
6、消息 消费进度Offset 记录
在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
节点内容就是Offset的值。
7、消费者注册
消费者服务器在初始化启动时加入消费者分组的步骤如下
注册到消费者分组。每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。
对 消费者分组 中的 消费者 的变化注册监听。每个 消费者 都需要关注所属 消费者分组 中其他消费者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。
对Broker服务器变化注册监听。消费者需要对/broker/ids/[0-N]中的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。
进行消费者负载均衡。为了让同一个Topic下不同分区的消息尽量均衡地被多个 消费者 消费而进行 消费者 与 消息 分区分配的过程,通常,对于一个消费者分组,如果组内的消费者服务器发生变更或Broker服务器发生变更,会发出消费者负载均衡。
以下是kafka在zookeep中的详细存储结构图
Kafka工作流程分析
Kafka生产过程分析
写入方式
分区
副本(Replication)
写入流程
Broker保存信息
存储方式
存储策略定期清除数据
消息的存储格式
每个Partition由多个Segment段文件组成,每个Segment文件通过log.segment.bytes配置其大小限制。每个段文件由2部分组成,分别是index索引文件和log数据文件。如图是段文件在系统中的存储形式:
Kafka定位具体message是通过index索引文件和log数据文件共同决定的。例如:当查找绝对offset为7的Message时
-
通过二分查找确定它是在哪个LogSegment中。此时为第一个Segment中。
-
打开这个Segment的index文件,再通过用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset,此时基于offset为7,定位到offset为6的索引,通过索引文件得到offset为6的Message在log数据文件中的位置为9807。
-
打开数据文件,从位置为9807开始顺序扫描直到找到offset为7的那条Message,即定位成功。
这套机制是建立在offset是有序的前提下,索引文件被映射到内存中,故极大地提升了查询速度。
先从整体流程了解一下kafka消息的结构,从大到小:
Topic>>partition>>segment>>index/log
一个topic可以被拆分为多个partition进行存储,每个partition还有自己的副本,作为容灾准备;每个partition是由segment文件进行组成;每个segment文件又两类文件组成,一类是index索引文件,主要是存储数据和位置的关系;一类是log文件,主要存储的是数据文件。
Zookeeper 存储结构
Kafka消费过程分析
高级api
低级api
消费者组
消费方式
kafka部署
环境 :linux docker centos 镜像
注意 分开部署 kafka 连不上 zookeeper 可能是因为 hostname 问题
注意 host name 和 ip 的映射
在/etc/hosts 里设置
查看 容器 host
Docker inspect 容器 | grep host
部署文档
https://www.cnblogs.com/5iTech/articles/6043224.html
注意
https://blog.csdn.net/xinaml/article/details/72874564
server.proprties 要配置
Zookeeper
Listener 要监听0.0.0.0
下载
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.1/kafka_2.12-2.1.1.tgz
Kafka基本命令
创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytest-topic
查寻topic
./bin/kafka-topics.sh --list --zookeeper localhost:2181
生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest-topic
消费者
./bin/kafka-console-consumer.sh --bootstrap-server 106.13.63.203:9092 --topic mytest-topic --from-beginning
基本命令
http://orchome.com/454
Kafka管理工具
Kafka manager
Kafka eagle 部署
https://blog.csdn.net/lzwglory/article/details/83078208
https://www.cnblogs.com/smartloli/p/9371904.html
下载地址
http://download.smartloli.org/
Kafka php API 实战
注意 生产者 生产 消息时 一定要先创建 topic
composer require kmelia/monolog-stdout-handler
composer require monolog/monolog
注意配置host
https://packagist.org/packages/nmred/kafka-php
https://segmentfault.com/a/1190000015765348
注意 /etc/hosts 设置 host ip
生产者
消费者
Telnet ip 9092
Telnet ip 2181