热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

分布式消息系统之Kafka集群部署

kafka是基于发布订阅模式的一个分布式消息队列系统,用java语言研发,是ASF旗下的一个开源项目;类似的消息队列服务还有rabbitmq、activemq、zeromq;kaf
kafka是基于发布/订阅模式的一个分布式消息队列系统,用java语言研发,是ASF旗下的一个开源项目;类似的消息队列服务还有rabbitmq、activemq、zeromq;kafka最主要的优势具备分布式功能,并且结合zookeeper可以实现动态扩容;kafka对消息保存是通过Topic进行分类,发送消息一方称为producer(生产者),接收消息一方称为consumer(消费者);一个kafka集群有多个kafka server组成,我们把每个kafka server称为broker(消息掮客);

  一、kafka简介

  kafka是基于发布/订阅模式的一个分布式消息队列系统,用java语言研发,是ASF旗下的一个开源项目;类似的消息队列服务还有rabbitmq、activemq、zeromq;kafka最主要的优势具备分布式功能,并且结合zookeeper可以实现动态扩容;kafka对消息保存是通过Topic进行分类,发送消息一方称为producer(生产者),接收消息一方称为consumer(消费者);一个kafka集群有多个kafka server组成,我们把每个kafka server称为broker(消息掮客);

  ActiveMQ、RabbitMQ、kafka对比

  二、kafka集群部署

  环境说明

主机名 ip地址
node04 192.168.0.44
node05 192.168.0.45
node06 192.168.0.46

 

 

 

 

 

  

  提示:在部署kafka集群之前,我们要先把zk集群部署起来,因为kafka是强依赖zk集群;zk集群部署请参考上一篇博客https://www.cnblogs.com/qiuhom-1874/p/13841371.html;上面3台server只是kafka集群的三台server;

  1、安装jdk

[root@node04 ~]# yum install -y java-1.8.0-openjdk-devel

  验证java环境

[root@node04 ~]# java -version
openjdk version "1.8.0_262"
OpenJDK Runtime Environment (build 1.8.0_262-b10)
OpenJDK 64-Bit Server VM (build 25.262-b10, mixed mode)
[root@node04 ~]# 

  提示:以上安装Java环境,在kafka集群的每个server都要做一遍;除了上面的java环境,还有基础环境像时间同步,主机名解析,关闭selinux,关闭防火墙,主机免密这些都要提前做好;

  2、下载kafka二进制压缩包

[root@node04 ~]# ll
total 0
[root@node04 ~]# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
--2020-10-21 20:06:28--  https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
Resolving mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)... 101.6.8.193, 2402:f000:1:408:8100::1
Connecting to mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)|101.6.8.193|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 65671917 (63M) [application/octet-stream]
Saving to: ‘kafka_2.12-2.6.0.tgz’

100%[================================================================================>] 65,671,917  6.38MB/s   in 13s    

2020-10-21 20:06:41 (4.96 MB/s) - ‘kafka_2.12-2.6.0.tgz’ saved [65671917/65671917]

[root@node04 ~]# ll
total 64136
-rw-r--r-- 1 root root 65671917 Aug  5 06:01 kafka_2.12-2.6.0.tgz
[root@node04 ~]# 

  3、解压二进制包,并做软连接

[root@node04 ~]# tar xf kafka_2.12-2.6.0.tgz -C /usr/local/
[root@node04 ~]# ln -sv /usr/local/kafka_2.12-2.6.0 /usr/local/kafka
‘/usr/local/kafka’ -> ‘/usr/local/kafka_2.12-2.6.0’
[root@node04 ~]# 

  提示:其他server也是相同的操作;

  4、配置node04上的kafka

  提示:broker.id是配置broker的id,这个id在kafka集群中必须唯一;listeners是用来指定当前节点监听的socket;log.dirs用来指定kafka的日志文件路径;log.retention.hours用来指定保存多少小时的日志;zookeeper.conect用来指定zk集群各节点信息,通常是把zk所有节点都写上,用逗号隔开;其他的参数都可以不用变;我这里用到主机名,是因为我在hosts文件对所有节点都做了主机名解析;

  创建日志目录

[root@node04 config]# mkdir -pv /data/kafka
mkdir: created directory ‘/data’
mkdir: created directory ‘/data/kafka’
[root@node04 config]# 

  提示:后面的kafka-logs目录在kafka启动时会自动创建;到此node04就配置好了;

  把node04上的配置文件拷贝到node05

[root@node04 config]# scp server.properties  node05:/usr/local/kafka/config/
server.properties                                                                       100% 6882     2.0MB/s   00:00    
[root@node04 config]# 

  修改broker.id和listeners配置

  创建日志目录

[root@node05 ~]#  mkdir -pv /data/kafka
mkdir: created directory ‘/data’
mkdir: created directory ‘/data/kafka’
[root@node05 ~]# 

  把node05的配置文件,复制到node06的kafka配置文件目录

[root@node05 ~]# scp /usr/local/kafka/config/server.properties node06:/usr/local/kafka/config/server.properties
The authenticity of host 'node06 (192.168.0.46)' can't be established.
ECDSA key fingerprint is SHA256:lE8/Vyni4z8hsXaa8OMMlDpu3yOIRh6dLcIr+oE57oE.
ECDSA key fingerprint is MD5:14:59:02:30:c0:16:b8:6c:1a:84:c3:0f:a7:ac:67:b3.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'node06,192.168.0.46' (ECDSA) to the list of known hosts.
server.properties                                                                       100% 6882     1.9MB/s   00:00    
[root@node05 ~]# 

  修改broker.id和listeners配置,并创建日志目录

  到此,三个节点的kafka就配置好了;

  启动各节点上的kafka

[root@node04 config]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@node04 config]# ss -tnl
State      Recv-Q Send-Q                Local Address:Port                               Peer Address:Port              
LISTEN     0      128                               *:22                                            *:*                  
LISTEN     0      100                       127.0.0.1:25                                            *:*                  
LISTEN     0      128                              :::22                                           :::*                  
LISTEN     0      100                             ::1:25                                           :::*                  
LISTEN     0      50                               :::39779                                        :::*                  
LISTEN     0      50              ::ffff:192.168.0.44:9092                                         :::*                  
[root@node04 config]# 

  提示:可以看到node04上的9092处于监听状态;用同样的命令把node05,node06上的kafka都启动起来;

  查看日志

  提示:kafka的启动日志放在安装目录下的logs目录,有个server.log;我们刚才创建的日志目录,主要用来保存集群事务的日志;

  测试kafka

  1、在各节点验证kafka进程是否启动

[root@node04 config]# jps
1797 Kafka
2485 Jps
[root@node04 config]# ssh node05 'jps'
1840 Jps
1772 Kafka
[root@node04 config]# ssh node06 'jps'
2321 Kafka
2388 Jps
[root@node04 config]# 

  2、在zk集群上查看,是否有kafka节点注册到上面

zk: localhost:2181(CONNECTED) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: localhost:2181(CONNECTED) 1] ls -R /
/
/admin
/brokers
/cluster
/config
/consumers
/controller
/controller_epoch
/isr_change_notification
/latest_producer_id_block
/log_dir_event_notification
/zookeeper
/admin/delete_topics
/brokers/ids
/brokers/seqid
/brokers/topics
/brokers/ids/0
/brokers/ids/1
/brokers/ids/2
/cluster/id
/config/brokers
/config/changes
/config/clients
/config/topics
/config/users
/zookeeper/config
/zookeeper/quota
[zk: localhost:2181(CONNECTED) 2] 

  提示:可以看到在zk集群上多了很多节点;

  3、创建名为test,partitions为3,replication为3的topic

[root@node04 config]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --partitions 3 --replication-factor 3 --topic test
Created topic test.
[root@node04 config]# 

  在kafka集群的任意节获取topic

[root@node06 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper node01:2181,node01:2181,node03:2181 --topic test
Topic: test     PartitionCount: 3       ReplicationFactor: 3    Configs: 
        Topic: test     Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
        Topic: test     Partition: 1    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: test     Partition: 2    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
[root@node06 ~]# 

  提示:从上面的返回的状态信息可以看到test topic有三个分区分别为0、1、2,分区0的leader是2(broker.id),分区0有三个副本,并且状态都为lsr(ln-sync,表示可以参加选举成为leader)。

  4、删除topic

  6、创建topic,并发送消息

[root@node04 config]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --partitions 3 --replication-factor 3 --topic msgtest
Created topic msgtest.
[root@node04 config]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list node04:9092,node05:9092,node06:9092 --topic msgtest
>hello               
>hi
>  

  在其他节点获取消息

  使用图形工具kafka-tool工具获取消息

  ok,到此kafka这个消息系统就搭建好了;


推荐阅读
  • RocketMQ在秒杀时的应用
    目录一、RocketMQ是什么二、broker和nameserver2.1Broker2.2NameServer三、MQ在秒杀场景下的应用3.1利用MQ进行异步操作3. ... [详细]
  • 电商高并发解决方案详解
    本文以京东为例,详细探讨了电商中常见的高并发解决方案,包括多级缓存和Nginx限流技术,旨在帮助读者更好地理解和应用这些技术。 ... [详细]
  • 本文总结了近年来在实际项目中使用消息中间件的经验和常见问题,旨在为Java初学者和中级开发者提供实用的参考。文章详细介绍了消息中间件在分布式系统中的作用,以及如何通过消息中间件实现高可用性和可扩展性。 ... [详细]
  • Spring Boot + RabbitMQ 消息确认机制详解
    本文详细介绍如何在 Spring Boot 项目中使用 RabbitMQ 的消息确认机制,包括消息发送确认和消息接收确认,帮助开发者解决在实际操作中可能遇到的问题。 ... [详细]
  • RocketMQ 运维监控实践指南
    本文详细介绍了如何实现 RocketMQ 的运维监控,包括监控平台的搭建、常用运维命令及其具体用法。适合对 RocketMQ 监控感兴趣的读者参考。 ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 在 CentOS 7 上部署和配置 RabbitMQ 消息队列系统时,首先需要安装 Erlang,因为 RabbitMQ 是基于 Erlang 语言开发的。具体步骤包括:安装必要的依赖项,下载 Erlang 源码包(可能需要一些时间,请耐心等待),解压源码包,解决可能出现的错误,验证安装是否成功,并将 Erlang 添加到环境变量中。接下来,下载 RabbitMQ 的 tar.xz 压缩包,并进行解压和安装。确保每一步都按顺序执行,以保证系统的稳定性和可靠性。 ... [详细]
  • RTThread线程间通信
    线程中通信在裸机编程中,经常会使用全局变量进行功能间的通信,如某些功能可能由于一些操作而改变全局变量的值,另一个功能对此全局变量进行读取& ... [详细]
  • 零拷贝技术是提高I/O性能的重要手段,常用于Java NIO、Netty、Kafka等框架中。本文将详细解析零拷贝技术的原理及其应用。 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • 深入解析十大经典排序算法:动画演示、原理分析与代码实现
    本文深入探讨了十种经典的排序算法,不仅通过动画直观展示了每种算法的运行过程,还详细解析了其背后的原理与机制,并提供了相应的代码实现,帮助读者全面理解和掌握这些算法的核心要点。 ... [详细]
  • 前言:由于Android系统本身决定了其自身的单线程模型结构。在日常的开发过程中,我们又不能把所有的工作都交给主线程去处理(会造成UI卡顿现象)。因此,适当的创建子线程去处理一些耗 ... [详细]
  • 1.创建目录mkdir-phomerocketmqnamesvr1data&&mkdir-phomerocketmqnamesvr1log&&mkdir-phomerocketm ... [详细]
  • 从单机存储进化为接口和存储的分离概述接口服务层对外提供REST服务,数据服务层提供数据存储功能。两者之间通过消息队列进行通信,数据服务层的所有数据服 ... [详细]
  • Windows下安装RaibbitMQ
    1.软件准备1.1erlang语言包到http:www.erlang.orgdownload.html下载,并且运行!安装目录C:\ProgramFil ... [详细]
author-avatar
菜蔸蔸
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有