作者:MC_炽焰 | 来源:互联网 | 2023-09-13 04:30
本文属于原创,转载注明出处,欢迎关注微信小程序小白AI博客
微信公众号小白AI
或者网站 https://xiaobaiai.net
文章目录 1 前言 2 Spring Kafka功能概览 2.1 自动创建主题 2.2 发送消息 2.3 接收消息 2.3.1 消息监听器 2.3.1.1 消息监听器容器 2.3.1.2 使用KafkaMessageListenerContainer 2.3.1.3 使用 ConcurrentMessageListenerContainer 2.3.1.4 提交偏移量 2.3.1.5 侦听器容器自动启动和手动启动 2.3.2 @KafkaListener注解 2.3.2.1 Record Listeners 2.3.2.2 批处理侦听器 2.3.3 @KafkaListener@Payload验证 2.3.4 重新平衡监听者 2.3.5 转发监听者消息 2.3.6 @KafkaListener生命周期管理 2.4 流处理 2.5 附加配置 2.6 使用Embdded Kafka做测试 2.7 Spring Integration支持 3 Spring Kafka配置参数 3.1 全局配置 3.2 生产者 3.3 消费者 3.4 监听器 3.5 管理 3.6 授权服务(JAAS) 3.7 SSL认证 3.8 Stream流处理 4 Kafka订阅发布基本特性回顾 5 发布订阅示例 5.1 使用Embedded Kafka Server 5.2 简单的发布订阅实现(无自定义配置) 5.2.1 添加依赖及配置Kafka 5.2.2 添加生产者 5.2.3 添加消费者 5.2.4 添加WEB控制器 5.2.5 测试 5.3 基于自定义配置发布订阅实现 5.3 基于Spring Integration发布订阅实现 6 总结 7 知识扩展 8 参考资料
1 前言 本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!
本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!
本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完肯定可以的,有任何问题可以随时交流!
本篇文章主要介绍Spring Kafka的常用配置、主题自动创建、发布消息到集群、订阅消息(群组)、流处理配置以及嵌入式Kafka做测试配置相关内容,最后通过两种方式去实现消息的发布和订阅功能,其中一种是基于Spring Integration
方式。本文内容基于Spring Kafka2.3.3文档及Spring Boot Kafka相关文档,Spring创建了一个名为Spring kafka
的项目,它封装了Apache的kafka客户端部分(生产者/消费者/流处理等),以便在Spring项目中快速集成kafka,Spring-Kafka项目提供了Apache Kafka自动化配置,通过Spring Boot的简化配置(以spring.kafka.*
作为前缀的配置参数),在Spring Boot中使用Kafka特别简单。并且Spring Boot还提供了一个嵌入式Kafka代理方便做测试。
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=myGroup
实现下面的所涉及到的功能实现,需要有如下环境:
Java运行或开发环境(JRE/JDK) Kafka安装成功 更多的配置可以参考《Kafka,ZK集群开发或部署环境搭建及实验》
这一篇文章。
本文尽量做到阐述逻辑清晰,主要路线就是全局介绍Spring Kafka的主要功能及重点配置,而Spring Boot对Spring Kafka进一步简化配置,通过Spring Boot中的Kafka几大注解实现发布订阅功能,同时通过Spring Integration + 自定义Kafka配置方式实现一个较为复杂的Kafka发布订阅功能,本文通过自己实验和整理了较久的时间,涵盖了Spring Kafka大部分内容,希望大家耐心读下来,有什么问题随时反馈,一起学习。
2 Spring Kafka功能概览Spring Kafka、Spring Integration和Kafka客户端版本联系或者兼容性如下(截至2019年12月9日):
Spring for Apache Kafka Spring Integration for Apache Kafka Version kafka-clients 2.3.x 3.2.x 2.3.1 2.2.x 3.1.x 2.0.1, 2.1.x, 2.2.x 2.1.x 3.0.x 1.0.x, 1.1.x, 2.0.0 1.3.x 2.3.x 0.11.0.x, 1.0.x
具体更多版本特点可以看官网,spring kafka当前最新为2.3.4版本。
Spring Kafka相关的注解有如下几个:
注解类型 描述 EnableKafka 启用由AbstractListenerContainerFactory
在封面(covers)下创建的Kafka监听器注解端点,用于配置类; EnableKafkaStreams 启用默认的Kafka流组件 KafkaHandler 在用KafkaListener注解的类中,将方法标记为Kafka消息监听器的目标的注解 KafkaListener 将方法标记为指定主题上Kafka消息监听器的目标的注解 KafkaListeners 聚合多个KafkaListener注解的容器注解 PartitionOffset 用于向KafkaListener添加分区/初始偏移信息 TopicPartition 用于向KafkaListener添加主题/分区信息
如使用@EnableKafka
可以监听AbstractListenerContainerFactory
子类目标端点,如ConcurrentKafkaListenerContainerFactory
是AbstractKafkaListenerContainerFactory
的子类。
public class ConcurrentKafkaListenerContainerFactory < K, V> extends AbstractKafkaListenerContainerFactory < ConcurrentMessageListenerContainer< K, V> , K, V>
@Configuration @EnableKafka public class AppConfig { @Bean public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory ( ) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory ( ) ; factory. setConsumerFactory ( consumerFactory ( ) ) ; factory. setConcurrency ( 4 ) ; return factory; } }
@EnableKafka
并不是在Spring Boot中启用Kafka必须的,Spring Boot附带了Spring Kafka的自动配置,因此不需要使用显式的@EnableKafka
。如果想要自己实现Kafka配置类,则需要加上@EnableKafka
,如果你不想要Kafka自动配置,比如测试中,需要做的只是移除KafkaAutoConfiguration
:
@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")
2.1 自动创建主题