热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

高吞吐量消息系统—kafka

现在基本上大数据的场景中都会有kafka的身影,那么为什么这些场景下要用kafka而不用其他传统的消息队列呢?例如rabbitmq。主要的原因是因为kafka天然的百万级TPS,以

现在基本上大数据的场景中都会有kafka的身影,那么为什么这些场景下要用kafka而不用其他传统的消息队列呢?例如rabbitmq。主要的原因是因为kafka天然的百万级TPS,以及它对接其他大数据组件的流处理功能,比如可以更好的对接Apache storm。本文只是讨论kafka作为消息队列的功能及一些用法。

 

丑话说在前头

Kafka本身比较重,强依赖于zookeeper,所以使用Kafka必须要先搭建zookeeper(虽然topic offset已经不在zookeeper管理,但是其他重要的meta信息都是保存在zookeeper),本身的topic/partition/replicate/offset等概念需要学习成本,异常情况下存在重复消费数据的风险,需要用户自行规避,例如将消息设计为幂等消息,或者用户层维护一个index自行记录有没有消费过。同一个topic下的不同partition间不能保证消息顺序,只能保证同一个partition的数据是有序的。另外kafka消费者组的rebalance一直是一个用户诟病的点,topic/parition/消费者数量变化都会引发rebalance,rebalance期间整个消费者组不能消费数据(即STW,stop the world)。所以我个人建议,如果不要求百万级TPS的消息队列并且不强依赖kafka的某些特性,可以优先考虑传统的消息队列,比如rabbitmq。

 

消息队列的优势

1.削峰填谷

用于上下游数据处理时长差别很大的应用场景。比如购物网站,前端需要快速返回给用户,后端需要处理一系列的动作(查库存,扣费,发货等等,很有可能需要依赖其他第三方系统),所以如果前端和后端如果没有一个消息队列,前端的流量可能会压垮后端。

2.松耦合设计

生产者只要将数据放进消息队列就完成了任务,至于消费者何时消费数据生产者都不需要关心。这样带来的一个好处是生产者如何发生异常或者变更都不会影响生产者。

 

kafka的优势

1.百万级TPS

Kafka轻松就能达到百万级的TPS,也是为什么大数据场景下kafka受欢迎的最主要的原因。具体的压测方法参考:

https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

Broker端的参数优化可以参考:

https://gist.github.com/jkreps/c7ddb4041ef62a900e6c#file-server-config-properties-L53

 

2.数据强一致性

Kafka收到消息后会立马落地,可以配置所有replicate都落地后再让producer返回。CAP原则,kafka提供了充分的参数让用户选择,数据一致性越强吞吐量越低,需要根据业务场景评估。

 

3.数据可以重复消费

不同于传统的消息队列,队列中的数据只能消费一次。kafka数据能重复消费,队列中的数据消费后,每个消费者通过offset控制自己的消费,多个消费者可以同时消费同一个队列。队列的数据什么时候清理是由broker保存时间配置决定。该特性适合于需要重复加载数据或者多个消费者同时消费一份数据的场景。

高吞吐量消息系统—kafka

 4.只要存活一个broker就能提供服务

对于n个broker组成的kafka集群,意外宕机n-1个broke都能保证对外提供服务。

 

整体介绍

kafka的topic主题是个逻辑意义的队列,或者说是一类队列,内部可以有一个或者多个partition,kafka只保证同一个partition内的消息顺序,也就是说partition是物理意义的消息队列,不同的partition是不同的消息队列。每个partition可以指定一个或者多个副本(replicate),一个leader replicate和n个follower replicate,只有leader replicate提供读写服务,其他的副本只会向leader replicate拉取数据做同步,如果leader replicate异常退出将会从剩下的followe replicate重新选举一个leader。至于其他的副本为什么不像mysql一样提供只读服务?主要原因是kafka是消息队列,一般是一写一读,mysql数据库一般是一写多读,应用场景不一样。

高吞吐量消息系统—kafka

 

上图是两个生产者往一个topic内的不同partition中写入数据。每个partition会维护一个offset,一般从0开始,每append一条消息+1。offset信息之前版本的kafka是存储在zookeeper,由于频繁读写offset触发zookeeper性能瓶颈,所以较新版本的kafka将这些信息维护在kafka内部的topic中。 kafka也会为每个消费者/消费者组保存offset,记录这个消费者/消费者组上一次的消费位置,以便于消费者/消费者组重启后接着消费,消费者/消费者组也可以指定offset进行消费。

 

更多细节参考:https://kafka.apache.org/documentation/#introduction

 

使用注意事项

生产消息

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer producer = new KafkaProducer<>(props);
 for (int i = 0; i <100; i++)
     producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

 

这里acks指定了all,即需要等待所有的ISR拉取到record之后再返回,是kafka吞吐量最低但是数据一致性最高的做法。ProducerRecord指定了topic,以及record的key和value,但是没有指定partition,如果我们需要指定paritiion可以在topic的后面加上partition,参考下面的方法。

ProducerRecord有多种构造方式:

ProducerRecord(String topic, Integer partition, K key, V value)
Creates a record to be sent to a specified topic and partition
ProducerRecord(String topic, Integer partition, K key, V value, Iterable headers)
Creates a record to be sent to a specified topic and partition
ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
Creates a record with a specified timestamp to be sent to a specified topic and partition
ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable headers)
Creates a record with a specified timestamp to be sent to a specified topic and partition
ProducerRecord(String topic, K key, V value)
Create a record to be sent to Kafka
ProducerRecord(String topic, V value)
Create a record with no key

 

如果ProducerRecord没有指定record的时间戳,producer默认会添加当前时间的时间戳。kafka最终是否采用record中的时间取决于topic的配置,如果配置为CreateTime将会采用record中的timestamp,如果配置为LogAppendTime则采用kafka broker添加该record时的本机时间。

 

  

生产者批量发送消息

producer会为每个partition在本地维护一个buffer,作批量发送数据用,producer调用close方法时会释放buffer。buffer的大小由配置batch.size指定。生产者端指定batch.size 和linger.ms 搭配使用,提升客户端和服务端性能。batch.size值默认为16k,即16k以内的record会打包发送。linger.ms默认为0,即不延时发送。可以适当调大batch.size的大小,会增加批量发送的条数,副作用是会消耗一些本地内存,batch.size是每个partition的批量发送大小。

例如指定batch.size=32k linger.ms=5,那么在5ms内batch.size没有满也会等到5ms再发送,所以linger.ms决定了消息延时的上限。

生产者buffer总内存量大小由配置buffer.memory 决定,默认是32M。如果producer生产数据的速度大于发送给server的速度就会满,写满后producer send数据会阻塞,阻塞等待的最大时长由配置max.block.ms(默认1分钟)决定,超过max.block.ms后会抛TimeoutException异常。

 

多线程生产者

kafka producer对象是线程安全的,可以多线程共享一个或者多个producer对象。

 

更多细节参考:https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html 

 

消费者

消费者组:指定相同group.id的消费者属于同一个消费者组。一个partition只会分配给同一个group中的其中一个消费者。例如下图中A消费者组中的C1消费者消费P0和P3两个partition,C2消费者消费P1和P2两个partition。B消费者组中的C3,C4,C5,C6分别消费P0,P3,P1,P2 四个partition,如果B消费组有5个消费者,那么会有一个消费者轮空,即没有partition可以消费。使用消费者组一定要注意的一个地方是:当topic/partition/改消费者组内消费者数量任一数量发生变化时,都会触发kafka rebalance,即重新进行负载均衡,在rebalance期间,改消费者组的消费者都不能进行消费。

高吞吐量消息系统—kafka

kafka是如何知道消费者已经异常/退出从而发起rebalance?有两种机制发现:

1.物理链路异常。通过配置session.timeout.ms心跳保活机制,消费者周期性向kafka发送心跳,超过session.timeout.ms时间没有收到心跳则认为消费者已经异常,从而剔除改消费者,重新rebalance,将改消费者负责的partition分给其他消费者。

2.逻辑异常。消费者和kafka server的心跳仍然存活,但是消费者由于内部逻辑异常,比如死锁等,一直没有poll数据。可以通过设置max.poll.interval.ms来限定两次拉取数据间隔的最大值,超过这个时间kafka会判定消费者已经异常/不活跃。从而rebalance。

设置这两个配置一定要慎重,毕竟kafka的rebalance还是有成本的,rebalance期间整个消费者组不能消费数据。

 

消费场景一:自动提交offset

     Properties props = new Properties();
     props.setProperty("bootstrap.servers", "localhost:9092");
     props.setProperty("group.id", "test");
     props.setProperty("enable.auto.commit", "true");
     props.setProperty("auto.commit.interval.ms", "1000");
     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer cOnsumer= new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
         for (ConsumerRecord record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }

bootstrap.servers kafka server地址,可以是一个或者多个,用于发现其他kafka broker,所以没有必要填写所有的kafka地址,为了高可用写几个就行。

enable.auto.commit/auto.commit.interval.ms 设置自动提交offset和自动提交的周期。

这里需要特别指出的是,设置自动提交有可能会丢失消费数据,有可能poll回来,数据还没有正式消费,但是offset 已经自动提交了,结果消费者异常退出。消费者进程重启后读取kafka存储的offset,那么之前崩溃没有处理的数据将会漏掉,无法感知消费。所以一个可行的办法是,将enable.auto.commit设置为false,while循环消费完后再调用commit。这样做异常崩溃情况下会重复消费部分数据,需要用户自行规避,可以将消息设置为幂等,或者消费体中有序号字段,用户层能够感知到这个消息已经消费过,从而丢弃。即下面的场景二。

 

消费场景二:手动提交offset

     Properties props = new Properties();
     props.setProperty("bootstrap.servers", "localhost:9092");
     props.setProperty("group.id", "test");
     props.setProperty("enable.auto.commit", "false");
     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer cOnsumer= new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
         for (ConsumerRecord record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();
             buffer.clear();
         }
     }

  

手动管理partition分配

为了避免topic/partition/消费者数量变化频繁引起的rebalance,用户层可以自行管理partition的分配,不用消费者组。

    String topic = "foo";
    TopicPartition partition0 = new TopicPartition(topic, 0);
    TopicPartition partition1 = new TopicPartition(topic, 1);
    consumer.assign(Arrays.asList(partition0, partition1));

然后可以像上面的示例使用while循环poll数据。

 

多线程consumer

kafka consumer对象不是线程安全的,换言之,不能多个线程用同一个consumer去poll数据。如果一定要这样做,需要用户自行实现多线程同步访问consumer。建议还是一个线程一个独立的consumer,多线程共享一个或者多个consumer对象还涉及到消费数据顺序的问题。

 

更多细节参考:https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

 

高水平API VS 低水平API

kakfa提供high-level 和low-level api供用户使用,可以根据不同的使用场景选择不同的api。

高水平API(high-level api):API已经屏蔽底层topic/partition/offset管理细节,用户调用API只需要指定kafka地址topic名称就能生产和消费数据。比较简单,用户管理力度较弱。

低水平API(low-level api):API暴露底层topic/partition/offset,需要用户自行管理,包括offset保存,然后根据offset seek到特定的位置开始消费,寻找partition leader副本等。比较复杂,用户管理力度较强。

 

总结

本文介绍了kafka的优缺点,以及围绕生产和消费消息两种场景展开kafka的使用说明以及一些注意事项。下一篇将会介绍代码级别的demo应用。


推荐阅读
  • 本文深入解析了 Apache 配置文件 `httpd.conf` 和 `.htaccess` 的优化方法,探讨了如何通过合理配置提升服务器性能和安全性。文章详细介绍了这两个文件的关键参数及其作用,并提供了实际应用中的最佳实践,帮助读者更好地理解和运用 Apache 配置。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • 多种实现 Windows 定时自动执行任务的专业技巧与方案
    在Windows系统中,实现定时自动执行任务有多种专业技巧和方案。常见的方法包括:使用Windows任务计划程序、开发Windows服务以及利用SQL Server Agent作业。这些方法被广泛应用于各种自动化场景,多数技术人员对此都有所了解。 ... [详细]
  • 本文深入探讨了 C# 中 `SqlCommand` 和 `SqlDataAdapter` 的核心差异及其应用场景。`SqlCommand` 主要用于执行单一的 SQL 命令,并通过 `DataReader` 获取结果,具有较高的执行效率,但灵活性较低。相比之下,`SqlDataAdapter` 则适用于复杂的数据操作,通过 `DataSet` 提供了更多的数据处理功能,如数据填充、更新和批量操作,更适合需要频繁数据交互的场景。 ... [详细]
  • 手机上编写和运行PHP代码的最佳软件推荐 ... [详细]
  • C#中实现高效UDP数据传输技术
    C#中实现高效UDP数据传输技术 ... [详细]
  • 本文深入探讨了 MXOTDLL.dll 在 C# 环境中的应用与优化策略。针对近期公司从某生物技术供应商采购的指纹识别设备,该设备提供的 DLL 文件是用 C 语言编写的。为了更好地集成到现有的 C# 系统中,我们对原生的 C 语言 DLL 进行了封装,并利用 C# 的互操作性功能实现了高效调用。此外,文章还详细分析了在实际应用中可能遇到的性能瓶颈,并提出了一系列优化措施,以确保系统的稳定性和高效运行。 ... [详细]
  • 如何利用Apache与Nginx高效实现动静态内容分离
    如何利用Apache与Nginx高效实现动静态内容分离 ... [详细]
  • NoSQL数据库,即非关系型数据库,有时也被称作Not Only SQL,是一种区别于传统关系型数据库的管理系统。这类数据库设计用于处理大规模、高并发的数据存储与查询需求,特别适用于需要快速读写大量非结构化或半结构化数据的应用场景。NoSQL数据库通过牺牲部分一致性来换取更高的可扩展性和性能,支持分布式部署,能够有效应对互联网时代的海量数据挑战。 ... [详细]
  • 当前,众多初创企业对全栈工程师的需求日益增长,但市场中却存在大量所谓的“伪全栈工程师”,尤其是那些仅掌握了Node.js技能的前端开发人员。本文旨在深入探讨全栈工程师在现代技术生态中的真实角色与价值,澄清对这一角色的误解,并强调真正的全栈工程师应具备全面的技术栈和综合解决问题的能力。 ... [详细]
  • 如何在Java中高效构建WebService
    本文介绍了如何利用XFire框架在Java中高效构建WebService。XFire是一个轻量级、高性能的Java SOAP框架,能够简化WebService的开发流程。通过结合MyEclipse集成开发环境,开发者可以更便捷地进行项目配置和代码编写,从而提高开发效率。此外,文章还详细探讨了XFire的关键特性和最佳实践,为读者提供了实用的参考。 ... [详细]
  • Go语言实现Redis客户端与服务器的交互机制深入解析
    在前文对Godis v1.0版本的基础功能进行了详细介绍后,本文将重点探讨如何实现客户端与服务器之间的交互机制。通过具体代码实现,使客户端与服务器能够顺利通信,赋予项目实际运行的能力。本文将详细解析Go语言在实现这一过程中的关键技术和实现细节,帮助读者深入了解Redis客户端与服务器的交互原理。 ... [详细]
  • 本文深入探讨了 Python Watchdog 库的使用方法和应用场景。通过详细的代码示例,展示了如何利用 Watchdog 监控文件系统的变化,包括文件的创建、修改和删除等操作。文章不仅介绍了 Watchdog 的基本功能,还探讨了其在实际项目中的高级应用,如日志监控和自动化任务触发。读者将能够全面了解 Watchdog 的工作原理及其在不同场景下的应用技巧。 ... [详细]
  • 本文深入探讨了Spring Cloud Eureka在企业级应用中的高级使用场景及优化策略。首先,介绍了Eureka的安全配置,确保服务注册与发现过程的安全性。接着,分析了Eureka的健康检查机制,提高系统的稳定性和可靠性。随后,详细讨论了Eureka的各项参数调优技巧,以提升性能和响应速度。最后,阐述了如何实现Eureka的高可用性部署,保障服务的连续性和可用性。通过这些内容,开发者可以更好地理解和运用Eureka,提升微服务架构的整体效能。 ... [详细]
  • 深入解析零拷贝技术(Zerocopy)及其应用优势
    零拷贝技术(Zero-copy)是Netty框架中的一个关键特性,其核心在于减少数据在操作系统内核与用户空间之间的传输次数。通过避免不必要的内存复制操作,零拷贝显著提高了数据传输的效率和性能。本文将深入探讨零拷贝的工作原理及其在实际应用中的优势,包括降低CPU负载、减少内存带宽消耗以及提高系统吞吐量等方面。 ... [详细]
author-avatar
Jerrefy是不会游泳的鱼_177
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有