点击上方“罗晓胜”,马上关注,您的支持对我帮助很大
/ 前言 /
Kafka是最初由Linkedin公司开发,用scala语言编写的,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统。
Kafka以集群的方式运行,天生就是分布式的,特色在于其负载均衡能力和处理性能、容错能力。
/ 正文 /
解耦
最终一致性
广播(只关注消息是否送到队列,至于谁订阅,那是下游的事情)
错峰和流控
特性 | activeMQ | rabbitMQ | rocketMQ | kafka |
---|---|---|---|---|
单机吞吐量 | 万/秒 | 万/秒 | 10万/秒 | 10万/秒 |
topic对吞吐量的影响 | 无 | 无 | topic达到几百/几千个级别,吞吐量会有小幅下降;这是rocket的最大优势所以非常适用于支撑大批量topic场景 topic可以达到几十/几百个级别,吞吐量会有大幅下降 | kafka不适用大批量topic场景,除非加机器 |
时效性 | 毫秒 | 微秒 这是rabbit 最大优势,延迟低 | 毫秒 | 毫秒 |
可用性 | 高。主从架构 | 高。主从架构 | 非常高。分布式。 | 非常高。分布式。数据多副本,不会丢数据,不会不可用。 |
可靠性 | 有较低概率丢失数据 | ---- | 经配置优化可达到0丢失 | 经配置优化可达到0丢失 |
功能特性 | 功能齐全,但已不怎么维护 | erlang开发,并发强,性能极好,延迟低 | MQ功能较为齐全,扩展好 | 功能简单,主要用于大数据实时计算和日志采集,事实标准 |
Active:官方社区现在对ActiveMQ 5.x维护越来越少,较少在大规模吞吐的场景中使用。
RabbitMQ:结合erlang语言本身的并发优势,性能较好,社区活跃度也比较高,但是不利于做二次开发和维护,適合数据量没有那么大,小公司优先选择功能比较完备的RabbitMQ。erlang开发,很难去看懂源码,基本职能依赖于开源社区的快速维护和修复bug,不利于做二次开发和维护 RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。 需要学习比较复杂的接口和协议,学习和维护成本较高。
RocketMQ:分开源和商业版,商业版为阿里云提供,已在国内企业级应用有一定市场。天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的 支持的客户端语言不多,目前是java及c++,其中c++不成熟; 社区活跃度一般; 没有在 mq 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码 。
Kafka:开源消息系统,主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输 性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。时效性:ms级 ;可用性:非常高,kafka是分布式的,通过控制能够保证所有消息被消费且仅被消费一次; 在日志领域比较成熟,被多家公司和多个开源项目使用; 功能支持:在大数据领域的实时计算以及日志采集被大规模使用。
缺点:Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长; 使用短轮询方式,实时性取决于轮询间隔时间; 消费失败不支持重试; 支持消息顺序,但是一台代理宕机后,就会产生消息乱序; 社区更新较慢
直接使用brew安装,安装过程会自动安装zookeeper(windows使用二进制包安装即可)
相关命令:brew install kafka
安装位置: /usr/local/Cellar/zookeeper /usr/local/Cellar/kafka
配置文件位置:
/usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zookeeper.properties
启动zookeeper服务:
nohup zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
说明:如果是windows则使用./zookeeper-server-start.sh
启动kafka服务(修改kafka配置文件,取消注释listeners=PLAINTEXT://:9092):
nohup kafka-server-start /usr/local/etc/kafka/server.properties &
停止kafka和zookeeper服务(先关闭Kafka,等关闭完之后再关闭Zookeeper,否则,Kafka brokers无法关闭)
kafka-server-stop zookeeper-server-stop
创建topic(示例名称topic_a)
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_a
查看创建的topic
kafka-topics --list --zookeeper localhost:2181
修改分区数
kafka-topics --zookeeper localhost:2181 --alter --topic topic_a --partitions 2 //修改为2个
终端1发送消息
kafka-console-producer --broker-list localhost:9092 --topic topic_a
终端2接受消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_a --from-beginning
此时,在终端1中输入信息回车,终端2会立即显示在屏幕上面,如有结果输出,即安装成功
命令补充拓展:
指定分区消费消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --partition 1 --from-beginning
启动kafka,指定配置文件,后台启动并打印日志到指定位置
nohup kafka-server-start /usr/local/etc/kafka/server.properties > /usr/local/etc/kafka/kafka.log 2>&1 &
查看已有的topic
kafka-topics --list --bootstrap-server localhost:9092
查看指定topic 详细信息
kafka-topics --bootstrap-server localhost:9092 --describe --topic topic_a
查询指定topic内容:
kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_a --from-beginning
查询指定topic内容并使用grep匹配字符串:
kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_a --from-beginning | grep '1234'
控制台指定topic 的内部生产消息
kafka-console-producer --broker-list localhost:9092 --topic topic_a
控制台指定topic 的内部消费消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_a
查看kafka的group列表
kafka-consumer-groups --bootstrap-server localhost:9092 --list
选择一个group(例如test),查看其中的topic列表
kafka-consumer-groups --bootstrap-server localhost:9092 --group test --describe
选择一个topic,查看是否包含某个字段(用来查看kafka是否收到某条消息)
kafka-consumer-groups --bootstrap-server localhost:9092 --topic topic_a --from-beginning | grep '1234'
查看topic对应的log日志、index日志(需要传入日志文件地址,多个用逗号分隔,由于kafka是二进制保存日志文件的,因此需要下方命令才能查看)
kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/topic_a-0/00000000000000000000.log --print-data-log
查看topic对应的日志,并使用grep匹配字符串
kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/topic_a-0/00000000000000000000.log --print-data-log | grep '1234'
把kafka日志从二进制文件转存到txt中
kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/topic_a-0/00000000000000000000.log --print-data-log > ~/Downloads/kafka-topic_a.txt
kafka配置文件示例:
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/usr/local/var/lib/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitiOns=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.cOnnect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
连接相关配置说明:
listeners 配置的是kafka的tcp侦听ip地址;
advertised.listeners 配置的是kafka的broker ip。
listeners不配置则会调用java.net.InetAddress.getCanonicalHostName()获取IP
在没有配置advertised.listeners的情况下,默认取值为kafka所在机器的主机名,端口与listeners中配置的端口一致。也就是kafka的broker ip是kafka所在机器的主机名。很多情况下,与kafka连接成功但无法正确生产消费的原因就是kafka的主机名无法被正确解析,最常见的就是kafka的主机名为localhost。
另外,kafka成功注册zookeeper后,会将broker ip写入到kafka中。这样kafka集群中的每个节点都能知道其他所有节点的broker ip。因此,kafka的客户端无论连接到集群的哪个节点上,都能正确获取到整个集群的元数据信息。
log日志配置说明:
log.dirs=/usr/local/var/lib/kafka-logs #日志存储地址
log.retention.hours=168 #日志文件最少保存时长
log.segment.bytes=1073741824 #Segment存储达到1G,就构建一个新的Segment
zookeeper配置说明:
zookeeper.cOnnect=localhost:2181
zookeeper.connection.timeout.ms=18000
topic是逻辑上的概念,而partition是物理上的概念,即一个topic划分为多个partition,每个partition对应一个log文件
为了防止log文件过大,将每个partition分为多个segment, 每个segment包括:.index文件、 .log文件、 .timeindex文件等,这些文件位于一个文件夹下,文件夹命名规则:topic名称+分区序号。
例:segment文件示例:topic为topic_a,选取segment topic_a-0文件
topic_a-0为一个segment文件夹,就是以topic和第一条消息的offset命名,其中包含00000000000000000000.index、00000000000000000000.log、00000000000000000000.timeindex
-rw-r--r-- 1 luoxiaosheng admin 10M 10 9 14:38 00000000000000000000.index
-rw-r--r-- 1 luoxiaosheng admin 213B 10 9 14:41 00000000000000000000.log
-rw-r--r-- 1 luoxiaosheng admin 10M 10 9 14:38 00000000000000000000.timeindex
Segment设计思想:将一个分区中的数据划分到不同的文件当中去,每一个文件按顺序存储一部分数据,所有分区的第一个Segment就是用20位0来编号的Segment文件。该文件达到一定条件之后不再存,开始往下一个Segment文件进行存储。
这么做的意义:
1. 加快查询效率
通过将分区的数据根据 Offset 划分到多个比较小的Segment文件,在检索数据时,可以根据Offset 快速定位数据所在的Segment
加载单个Segment文件查询数据,可以提高查询效率
2. 删除数据时减少IO
删除数据时,Kafka 以 Segment 为单位删除某个Segment的数据,避免一条一条删除,增加 IO 负载
Kafka将一个分区的文件是按照片段来存储的,一个片段的默认大小为1GB,可以在server.properties配置文件中修改片段大小,并且同时维护了index索引文件。
Segment 的划分规则:满足任何一个条件都会划分segment
1. 按照时间周期生成
#默认7天。如果达到7天,重新生成一个新的Segment
log.roll.hours = 168
2. 按照文件大小生成
#默认大小是1个G。如果一个Segment存储达到1G,就构建一个新的Segment
log.segment.bytes = 1073741824
kafka将一个partiton分割成很多个segment文件,segment下分为几部分
.index/.timeindex 文件:索引文件,与log文件有一定的关联关系,.index为.timeindex为时间戳文件
.log文件:真正存储数据的文件
其文件名的规则如下:
同一个segment下,index和log名字相同
下一个segment是上一个segment的lastOffset
.log文件示例:
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1665297701410 size: 70 magic: 2 compresscodec: NONE crc: 1160496349 isvalid: true
| offset: 0 isValid: true crc: null keySize: -1 valueSize: 2 CreateTime: 1665297701410 baseOffset: 0 lastOffset: 0 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 70 magic: 2 compressType: NONE position: 0 sequence: -1 headerKeys: [] payload: 12
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 70 CreateTime: 1665297704669 size: 72 magic: 2 compresscodec: NONE crc: 4055451736 isvalid: true
| offset: 1 isValid: true crc: null keySize: -1 valueSize: 4 CreateTime: 1665297704669 baseOffset: 1 lastOffset: 1 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 72 magic: 2 compressType: NONE position: 70 sequence: -1 headerKeys: [] payload: 3333
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 142 CreateTime: 1665297716279 size: 71 magic: 2 compresscodec: NONE crc: 155080469 isvalid: true
| offset: 2 isValid: true crc: null keySize: -1 valueSize: 3 CreateTime: 1665297716279 baseOffset: 2 lastOffset: 2 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 71 magic: 2 compressType: NONE position: 142 sequence: -1 headerKeys: [] payload: 444
.log文件解读:
baseOffset:初始偏移量
lastOffset:消息文本结束偏移量
offset:表示的是相对于该分区的记录偏移量,指的是第几条记录,比如0代表第一条记录。
position:表示该记录相对于当前片段文件的偏移量。
CreateTime:记录创建的时间。
isvalid:记录是否有效。
keysize:表示key的长度。
valuesize:表示value的长度
magic:表示本次发布kafka服务程序协议版本号。
compresscodec:压缩工具。
producerId:生产者ID(用于幂等机制)。
sequence:消息的序列号(用于幂等机制)。
payload:表示具体的消息
...
.index文件示例
offset: 0 position: 0
.index文件解读
offset:消息在log文件中的相对offset,相对于当前index文件命名offset的偏移量,这样能确保offset的值占用的控件不会过大,因此能将offset的值控制在固定大小
position:当前消息物理偏移量,对应log文件中的position
index文件有什么用呢?其实就是一个索引,记录了一条消息在log文件中的位置,查找消息的时候先从index获取位置,然后就可以定位到消息在log文件具体哪个地方
index采用了稀疏索引的方式去存储,不是每来一条消息就记录一个索引,而是当消息大于某个值的时候,就会记录一次索引,默认是4KB
稀疏存储也就是选取一些消息的offset以及position进行存储,因为如果把对应片段的所有消息的索引都存储,那么必然会占用大量的内存。
索引分为两类:一种是全量索引,一种是稀疏索引。(1)全量索引指的是:每一条数据对应一条索引;(2)稀疏索引指的是:部分数据有索引,有一部分数据是没有索引。优点:减少了索引存储的数据量,加快索引的检索效率。
index和log写文件流程:
(举个例子,假设segment配置了10KB(log.segment.bytes),每1KB记录一次索引(log.index.interval.bytes参数),topic下只有1个partition,一个消息大小为43)
此时发送多个消息,直到发送了24个消息的时候,此时log文件大小为1032,由于每1KB创建一个索引,大于log.index.interval.bytes指定的值,需要增加索引
offset 0 position 0
offset 24 position 1032
当发送了238个消息后,这时log文件的大小为10234,由于配置的segment大小为10KB(10240),那么就是说,当下一个消息过来,必须要新建segment文件
新文件名以238结尾,这个238即为上一个segment文件最后一个消息的offset(这时候索引建立过程和第一个segment一样)
注意:index文件计算message在log中的位置的公式为offset-baseOffset
offset代表全局的message数,baseOffset为相对数量(即上面第二个segmemnt就为238),那么相减后,index文件内容就和第一个segment一样了
总结一下segment的命名格式:index和log文件以当前segment第一条消息的offset命名(示例:设置segment大小设置为10K,包含2个segment:000.和238)
0000.log + 0000.index:存储第1条message到第238条message
0238.log + 0238.index:存储第239条message到第476条message,以上一个segment最后一条message的offset作为文件名
以此类推
index和log读文件流程
举个例子:文件如下,在log文件的定位到offset=600的record
Segment-0 [offset:0-521] 00000000000000000000.index 00000000000000000000.log
Segment-1 [offset:522-1044] 00000000000000000522.index 00000000000000000522.log
根据目标offset定位segment文件(如要找到offset=600则定位到Segment-1 [offset:522-1044]中)
找到小于等于目标offset的最大offset对应的索引项(index存的都是相对位移,且为稀疏索引,找到小于等于600的最大offset索引项)
定位log文件
向下遍历找到record
/ 总结 /
本文主要讲了kafka相关介绍,相关主流mq比较,kafka安装部署,相关命令,配置文件说明,底层存储结构,log和index文件等
关注我的公众号,学习技术或投稿
长按上图,识别图中二维码即可关注