作者:手机用户2502911617_428 | 来源:互联网 | 2023-08-25 20:53
1.1什么是kafka?Kafka最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息
1.1 什么是kafka?
Kafka最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。随着kafka的发展,功能并不局限于消息系统。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer。kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
1.2 Kafka的特性:
– 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
– 可扩展性:kafka集群支持热扩展
– 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
– 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
– 高并发:支持数千个客户端同时读写
1.3 Kafka适用场景
由于 Kafka存在高容错、高扩展、扩展分布式等特性, Kafka主要应用场景如下:
日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
消息系统:解耦和生产者和消费者、缓存消息等。
用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
流式处理:比如spark streaming和storm
1.4 Kafka中的相关概念
Kafka中发布订阅的对象是topic。可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。
Broker(代理者 ):Kafka集群中的机器 /服务被称为broker, 是一个物理概念。一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。注意:一台节点上可以有多个broker,这和HDFS上一个节点是一个namenode或者datanode有所区别。一台机器上的broker数量由server.properties的数量决定。
Topic:一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。一个broker可以容纳多个topic。
Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。在数据的 产生和消费过程中,不需要关注具体存储的Partition在哪个Broker上,只需要指定 Topic 即可,由 KafkaKafka Kafka负责将数据和对应的 Partition关联上
Segment:partition物理上由多个segment组成,每个Segment存着message信息
Message (消息 ):传递的数据对象,主要由四部分构成(offset(偏移量 )、key、value、 timestamp(插入时间 )
消息和数据不是一个概念,消息是对数据的封装,其中的value部分是可以看成是数据。
Producer : 生产message发送到topic
Consumer : 订阅topic消费message, consumer作为一个线程来消费
Consumer Group:一个Consumer Group包含多个consumer, 这个是预先在配置文件中配置好的。各个consumer(consumer 线程)可以组成一个组(Consumer group ),partition中的每个message只能被组(Consumer group ) 中的一个consumer(consumer 线程 )消费,如果一个message可以被多个consumer(consumer 线程 ) 消费的话,那么这些consumer必须在不同的组。Kafka不支持一个partition中的message由两个或两个以上的consumer thread来处理,即便是来自不同的consumer group的也不行。它不能像AMQ那样可以多个BET作为consumer去处理message,这是因为多个BET去消费一个Queue中的数据的时候,由于要保证不能多个线程拿同一条message,所以就需要行级别悲观锁(for update),这就导致了consume的性能下降,吞吐量不够。而kafka为了保证吞吐量,只允许一个consumer线程去访问一个partition。如果觉得效率不高的时候,可以加partition的数量来横向扩展,那么再加新的consumer thread去消费。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就形成了分布式消费的概念。
1.4 Kafka的拓扑图:
Kafka是显示的分布式消息发布和订阅系统,除了有多个producer, broker,consumer外,还有一个zookeeper集群用于管理producer,broker和consumer之间的协同调用。
从图中可以看出,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的PageView,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
Producer到Broker的过程是push,也就是有数据就推送到Broker,Consumer到Broker的过程是pull,是通过Consumer主动去拉数据的,而不是Broker把数据主动发送到Consumer端的。
1.5 Kafka安装部署及测试
Kafka的开发语言为 Scala。提供了各种不同语言的 API API。 Kafka的安装方式主要由三种,分别是:单机、伪分布式、完全分布式;其中伪分布式和完全分布式几乎完全一样。
安装步骤:
- 安装 JAVA和Scala
- 安装 Zookeeper
- 安装 Kafka
Kafka安装 (伪分布式 )
解压
$ tar -zxf kafka_2.11-2.0.0.tgz
配置
修改配置文件 {KAFKA_HOME}/conf/server.properties。如果是伪分布式,那么需要在单台机器上 copy多个 server.properties文件;如果是完全分布式,那么需要将修改好的 KAFKA 完全copy到其它机器上。
${KAFKA_HOME}/config/server.properties
port=9092
host.name=haotname
advertised.host.name=haotname
advertised.port=9092
zookeeper.cOnnect=hostname:2181
单节点多broker,就应该将上面的这个server.properties,复制几份出来
cp config/server.properties config/server1.properties
cp config/server.properties config/server2.properties
$ mv config/server.properties config/server0.properties
修改${KAFKA_HOME}/config/server1.properties
broker.id=1
listeners=PLAINTEXT://:9093
port=9093
log.dirs=/usr/local/kafka_2.11-0.9.0.1/data/log1
// 因为是同一节点上开启了多个broker,所以只能通过端口区分,若是broker都在不同的节点上,不用通过端口区分
启动测试 1.启动zookeeper
$ bin/zkServer.sh start
查看是否正常
$ bin/zkServer.sh status
2.启动kafaka的broker进程
$bin/kafka-server-start.sh -daemon config/server0.properties
关闭服务使用
{KAFKA_HOME}/bin/kafka-server serverserver -stop.sh