热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

kafka系列——安装部署,相关命令,配置文件,底层存储结构,log和index文件

点击上方“罗晓胜”,马上关注,您的支持对我帮助很大前言Kafka是最初由Linkedin公司开发,用scala语言编写的,是

点击上方“罗晓胜”,马上关注,您的支持对我帮助很大

/   前言   /

        Kafka是最初由Linkedin公司开发,用scala语言编写的,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统。 

        Kafka以集群的方式运行,天生就是分布式的,特色在于其负载均衡能力和处理性能、容错能力。

/   正文   /


为什么要使用消息队列


  1. 解耦

  2. 最终一致性

  3. 广播(只关注消息是否送到队列,至于谁订阅,那是下游的事情)

  4. 错峰和流控


主流MQ比较


特性activeMQrabbitMQrocketMQkafka
单机吞吐量万/秒万/秒10万/秒10万/秒
topic对吞吐量的影响topic达到几百/几千个级别,吞吐量会有小幅下降;这是rocket的最大优势所以非常适用于支撑大批量topic场景 topic可以达到几十/几百个级别,吞吐量会有大幅下降kafka不适用大批量topic场景,除非加机器
时效性毫秒微秒 这是rabbit 最大优势,延迟低毫秒毫秒
可用性高。主从架构高。主从架构非常高。分布式。非常高。分布式。数据多副本,不会丢数据,不会不可用。
可靠性有较低概率丢失数据----经配置优化可达到0丢失经配置优化可达到0丢失
功能特性功能齐全,但已不怎么维护erlang开发,并发强,性能极好,延迟低MQ功能较为齐全,扩展好功能简单,主要用于大数据实时计算和日志采集,事实标准

Active:官方社区现在对ActiveMQ 5.x维护越来越少,较少在大规模吞吐的场景中使用。

RabbitMQ:结合erlang语言本身的并发优势,性能较好,社区活跃度也比较高,但是不利于做二次开发和维护,適合数据量没有那么大,小公司优先选择功能比较完备的RabbitMQ。erlang开发,很难去看懂源码,基本职能依赖于开源社区的快速维护和修复bug,不利于做二次开发和维护 RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。 需要学习比较复杂的接口和协议,学习和维护成本较高。

RocketMQ:分开源和商业版,商业版为阿里云提供,已在国内企业级应用有一定市场。天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的 支持的客户端语言不多,目前是java及c++,其中c++不成熟; 社区活跃度一般; 没有在 mq 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码 。

Kafka:开源消息系统,主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输 性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。时效性:ms级 ;可用性:非常高,kafka是分布式的,通过控制能够保证所有消息被消费且仅被消费一次; 在日志领域比较成熟,被多家公司和多个开源项目使用; 功能支持:在大数据领域的实时计算以及日志采集被大规模使用。

缺点:Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长; 使用短轮询方式,实时性取决于轮询间隔时间; 消费失败不支持重试; 支持消息顺序,但是一台代理宕机后,就会产生消息乱序; 社区更新较慢


brew安装kafka


  • 直接使用brew安装,安装过程会自动安装zookeeper(windows使用二进制包安装即可)

    相关命令:brew install kafka

  • 安装位置: /usr/local/Cellar/zookeeper /usr/local/Cellar/kafka

  • 配置文件位置: 

    /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zookeeper.properties

  • 启动zookeeper服务:

    nohup zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &

    说明:如果是windows则使用./zookeeper-server-start.sh

  • 启动kafka服务(修改kafka配置文件,取消注释listeners=PLAINTEXT://:9092):

    nohup kafka-server-start /usr/local/etc/kafka/server.properties &

  • 停止kafka和zookeeper服务(先关闭Kafka,等关闭完之后再关闭Zookeeper,否则,Kafka brokers无法关闭)

    kafka-server-stop zookeeper-server-stop

  • 创建topic(示例名称topic_a)

    kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_a

  • 查看创建的topic

    kafka-topics --list --zookeeper localhost:2181

  • 修改分区数

    kafka-topics --zookeeper localhost:2181 --alter --topic topic_a --partitions 2 //修改为2个

  • 终端1发送消息

    kafka-console-producer --broker-list localhost:9092 --topic topic_a

  • 终端2接受消息

    kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_a --from-beginning

    此时,在终端1中输入信息回车,终端2会立即显示在屏幕上面,如有结果输出,即安装成功

命令补充拓展:


  • 指定分区消费消息

    kafka-console-consumer --bootstrap-server localhost:9092 --topic test --partition 1 --from-beginning

  • 启动kafka,指定配置文件,后台启动并打印日志到指定位置

    nohup kafka-server-start /usr/local/etc/kafka/server.properties > /usr/local/etc/kafka/kafka.log 2>&1 &

  • 查看已有的topic

    kafka-topics --list --bootstrap-server localhost:9092

  • 查看指定topic 详细信息

    kafka-topics --bootstrap-server localhost:9092 --describe --topic topic_a

  • 查询指定topic内容:

    kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_a --from-beginning

  • 查询指定topic内容并使用grep匹配字符串:

    kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_a --from-beginning | grep '1234'

  • 控制台指定topic 的内部生产消息

    kafka-console-producer --broker-list localhost:9092 --topic topic_a

  • 控制台指定topic 的内部消费消息

    kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_a

  • 查看kafka的group列表

    kafka-consumer-groups --bootstrap-server localhost:9092 --list

  • 选择一个group(例如test),查看其中的topic列表

    kafka-consumer-groups --bootstrap-server localhost:9092 --group test --describe

  • 选择一个topic,查看是否包含某个字段(用来查看kafka是否收到某条消息)

    kafka-consumer-groups --bootstrap-server localhost:9092 --topic topic_a --from-beginning | grep '1234'

  • 查看topic对应的log日志、index日志(需要传入日志文件地址,多个用逗号分隔,由于kafka是二进制保存日志文件的,因此需要下方命令才能查看)

    kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/topic_a-0/00000000000000000000.log --print-data-log

  • 查看topic对应的日志,并使用grep匹配字符串

    kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/topic_a-0/00000000000000000000.log --print-data-log | grep '1234'

  • 把kafka日志从二进制文件转存到txt中

    kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/topic_a-0/00000000000000000000.log --print-data-log > ~/Downloads/kafka-topic_a.txt


kafka配置文件

kafka配置文件示例:

############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured.# FORMAT:# listeners = listener_name://host_name:port# EXAMPLE:# listeners = PLAINTEXT://your.host.name:9092listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value# returned from java.net.InetAddress.getCanonicalHostName().#advertised.listeners=PLAINTEXT://your.host.name:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the networknum.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/Onum.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket serversocket.send.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)socket.request.max.bytes=104857600############################# Log Basics #############################
# A comma separated list of directories under which to store log fileslog.dirs=/usr/local/var/lib/kafka-logs
# The default number of log partitions per topic. More partitions allow greater# parallelism for consumption, but this will also result in more files across# the brokers.num.partitiOns=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.# This value is recommended to be increased for installations with data dirs located in RAID array.num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings ############################## The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync# the OS cache lazily. The following configurations control the flush of data to disk.# There are a few important trade-offs here:# 1. Durability: Unflushed data may be lost if you are not using replication.# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.# The settings below allow one to configure the flush policy to flush data after a period of time or# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can# be set to delete segments after a period of time, or after a given size has accumulated.# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to agelog.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining# segments drop below log.retention.bytes. Functions independently of log.retention.hours.#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according# to the retention policieslog.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).# This is a comma separated host:port pairs, each corresponding to a zk# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".# You can also append an optional chroot string to the urls to specify the# root directory for all kafka znodes.zookeeper.cOnnect=localhost:2181
# Timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=18000############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.# The default value for this is 3 seconds.# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.group.initial.rebalance.delay.ms=0


连接相关配置说明:


  • listeners    配置的是kafka的tcp侦听ip地址;

  • advertised.listeners    配置的是kafka的broker ip。

  • listeners不配置则会调用java.net.InetAddress.getCanonicalHostName()获取IP

  • 在没有配置advertised.listeners的情况下,默认取值为kafka所在机器的主机名,端口与listeners中配置的端口一致。也就是kafka的broker ip是kafka所在机器的主机名。很多情况下,与kafka连接成功但无法正确生产消费的原因就是kafka的主机名无法被正确解析,最常见的就是kafka的主机名为localhost。

  • 另外,kafka成功注册zookeeper后,会将broker ip写入到kafka中。这样kafka集群中的每个节点都能知道其他所有节点的broker ip。因此,kafka的客户端无论连接到集群的哪个节点上,都能正确获取到整个集群的元数据信息。

log日志配置说明:


  • log.dirs=/usr/local/var/lib/kafka-logs     #日志存储地址

  • log.retention.hours=168     #日志文件最少保存时长

  • log.segment.bytes=1073741824     #Segment存储达到1G,就构建一个新的Segment

zookeeper配置说明:


  • zookeeper.cOnnect=localhost:2181

  • zookeeper.connection.timeout.ms=18000


kafka底层存储——Segment

        topic是逻辑上的概念,而partition是物理上的概念,即一个topic划分为多个partition,每个partition对应一个log文件

        为了防止log文件过大,将每个partition分为多个segment, 每个segment包括:.index文件、 .log文件、 .timeindex文件等,这些文件位于一个文件夹下,文件夹命名规则:topic名称+分区序号。

例:segment文件示例:topic为topic_a,选取segment topic_a-0文件

topic_a-0为一个segment文件夹,就是以topic和第一条消息的offset命名,其中包含00000000000000000000.index、00000000000000000000.log、00000000000000000000.timeindex​​​​​​​

-rw-r--r-- 1 luoxiaosheng admin 10M 10 9 14:38 00000000000000000000.index-rw-r--r-- 1 luoxiaosheng admin 213B 10 9 14:41 00000000000000000000.log-rw-r--r--  1 luoxiaosheng  admin    10M 10  9 14:38 00000000000000000000.timeindex

Segment设计思想:将一个分区中的数据划分到不同的文件当中去,每一个文件按顺序存储一部分数据,所有分区的第一个Segment就是用20位0来编号的Segment文件。该文件达到一定条件之后不再存,开始往下一个Segment文件进行存储。

这么做的意义:

1. 加快查询效率


  • 通过将分区的数据根据 Offset 划分到多个比较小的Segment文件,在检索数据时,可以根据Offset 快速定位数据所在的Segment

  • 加载单个Segment文件查询数据,可以提高查询效率

2. 删除数据时减少IO


  • 删除数据时,Kafka 以 Segment 为单位删除某个Segment的数据,避免一条一条删除,增加 IO 负载

Kafka将一个分区的文件是按照片段来存储的,一个片段的默认大小为1GB,可以在server.properties配置文件中修改片段大小,并且同时维护了index索引文件。 

Segment 的划分规则:满足任何一个条件都会划分segment

1. 按照时间周期生成

#默认7天。如果达到7天,重新生成一个新的Segment

log.roll.hours = 168

2. 按照文件大小生成

#默认大小是1个G。如果一个Segment存储达到1G,就构建一个新的Segment
log.segment.bytes = 1073741824

kafka将一个partiton分割成很多个segment文件,segment下分为几部分


  • .index/.timeindex 文件:索引文件,与log文件有一定的关联关系,.index为.timeindex为时间戳文件

  • .log文件:真正存储数据的文件

其文件名的规则如下:


  • 同一个segment下,index和log名字相同

  • 下一个segment是上一个segment的lastOffset


kafka底层存储——Segment log和index文件

.log文件示例:

Starting offset: 0baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1665297701410 size: 70 magic: 2 compresscodec: NONE crc: 1160496349 isvalid: true| offset: 0 isValid: true crc: null keySize: -1 valueSize: 2 CreateTime: 1665297701410 baseOffset: 0 lastOffset: 0 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 70 magic: 2 compressType: NONE position: 0 sequence: -1 headerKeys: [] payload: 12baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 70 CreateTime: 1665297704669 size: 72 magic: 2 compresscodec: NONE crc: 4055451736 isvalid: true| offset: 1 isValid: true crc: null keySize: -1 valueSize: 4 CreateTime: 1665297704669 baseOffset: 1 lastOffset: 1 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 72 magic: 2 compressType: NONE position: 70 sequence: -1 headerKeys: [] payload: 3333baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 142 CreateTime: 1665297716279 size: 71 magic: 2 compresscodec: NONE crc: 155080469 isvalid: true| offset: 2 isValid: true crc: null keySize: -1 valueSize: 3 CreateTime: 1665297716279 baseOffset: 2 lastOffset: 2 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 71 magic: 2 compressType: NONE position: 142 sequence: -1 headerKeys: [] payload: 444

.log文件解读:

baseOffset:初始偏移量
lastOffset:消息文本结束偏移量
offset:表示的是相对于该分区的记录偏移量,指的是第几条记录,比如0代表第一条记录。
position:表示该记录相对于当前片段文件的偏移量。
CreateTime:记录创建的时间。
isvalid:记录是否有效。
keysize:表示key的长度。
valuesize:表示value的长度
magic:表示本次发布kafka服务程序协议版本号。
compresscodec:压缩工具。
producerId:生产者ID(用于幂等机制)。
sequence:消息的序列号(用于幂等机制)。
payload:表示具体的消息
...

.index文件示例​​​​​​​

offset: 0 position: 0

.index文件解读

offset:消息在log文件中的相对offset,相对于当前index文件命名offset的偏移量,这样能确保offset的值占用的控件不会过大,因此能将offset的值控制在固定大小
position:当前消息物理偏移量,对应log文件中的position

index文件有什么用呢?其实就是一个索引,记录了一条消息在log文件中的位置,查找消息的时候先从index获取位置,然后就可以定位到消息在log文件具体哪个地方 

index采用了稀疏索引的方式去存储,不是每来一条消息就记录一个索引,而是当消息大于某个值的时候,就会记录一次索引,默认是4KB 

稀疏存储也就是选取一些消息的offset以及position进行存储,因为如果把对应片段的所有消息的索引都存储,那么必然会占用大量的内存。

索引分为两类:一种是全量索引,一种是稀疏索引。(1)全量索引指的是:每一条数据对应一条索引;(2)稀疏索引指的是:部分数据有索引,有一部分数据是没有索引。优点:减少了索引存储的数据量,加快索引的检索效率。

index和log写文件流程

(举个例子,假设segment配置了10KB(log.segment.bytes),每1KB记录一次索引(log.index.interval.bytes参数),topic下只有1个partition,一个消息大小为43)

此时发送多个消息,直到发送了24个消息的时候,此时log文件大小为1032,由于每1KB创建一个索引,大于log.index.interval.bytes指定的值,需要增加索引​​​​​​​

offset 0 position 0offset 24 position 1032

当发送了238个消息后,这时log文件的大小为10234,由于配置的segment大小为10KB(10240),那么就是说,当下一个消息过来,必须要新建segment文件

新文件名以238结尾,这个238即为上一个segment文件最后一个消息的offset(这时候索引建立过程和第一个segment一样) 

注意:index文件计算message在log中的位置的公式为offset-baseOffset

offset代表全局的message数,baseOffset为相对数量(即上面第二个segmemnt就为238),那么相减后,index文件内容就和第一个segment一样了

总结一下segment的命名格式:index和log文件以当前segment第一条消息的offset命名(示例:设置segment大小设置为10K,包含2个segment:000.和238) 

0000.log + 0000.index:存储第1条message到第238条message

0238.log + 0238.index:存储第239条message到第476条message,以上一个segment最后一条message的offset作为文件名 

以此类推

index和log读文件流程

举个例子:文件如下,在log文件的定位到offset=600的record

Segment-0 [offset:0-521] 00000000000000000000.index 00000000000000000000.logSegment-1 [offset:522-1044] 00000000000000000522.index  00000000000000000522.log


  1. 根据目标offset定位segment文件(如要找到offset=600则定位到Segment-1 [offset:522-1044]中)

  2. 找到小于等于目标offset的最大offset对应的索引项(index存的都是相对位移,且为稀疏索引,找到小于等于600的最大offset索引项)

  3. 定位log文件

  4. 向下遍历找到record

/   总结   /

本文主要讲了kafka相关介绍,相关主流mq比较,kafka安装部署,相关命令,配置文件说明,底层存储结构,log和index文件等

关注我的公众号,学习技术或投稿

长按上图,识别图中二维码即可关注


推荐阅读
author-avatar
特别要_966
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有