热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

kafka依赖_SpringBoot系列:关于Kafka消息发送的事儿

世界上最快的捷径,就是脚踏实地,本文已收录【架构技术专栏】关注这个喜欢分享的地方。开源项目:分布式监控(GiteeGVP最有价值开源项目)

世界上最快的捷径,就是脚踏实地,本文已收录【架构技术专栏】关注这个喜欢分享的地方。

开源项目:

  • 分布式监控(Gitee GVP最有价值开源项目 ):https://gitee.com/sanjiankethree/cubic
  • 个人主站:http://www.jiagoujishu.com

0、简述

Spring Boot 版本:2.3.4.RELEASE

随着大数据的发展,目前Kafka可以说在我们项目中的使用是越来越多了。其高性能的特点也是满足了我们大部分的场景,所以对于学习Kafka的兼容使用也是一件很重要的事情。

下面我们从几个点来说:

  • 发送消息
  • 发送回调
  • 实现原理
  • 异步和同步

1、添加依赖

 org.springframework.kafkaspring-kafka

2、添加配置

在Spring Boot 中kafka的配置属性都是spring.kafka.* 开头的,最简配置如下(application.properties中 )

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

3、发送代码

Spring Boot kafka 依然沿用老套路 XXXTemplate,所以这里发送自然就使用了KafkaTemplate

@Component
public class Producer {

   private static final Logger logger = LoggerFactory.getLogger(Producer.class);

    @Resource
    private KafkaTemplate kafkaTemplate;public void send(String msg) {
         kafkaTemplate.send("testTOPIC", msg);
    }
}

4、参数说明

ListenableFuture> sendDefault(V data);
ListenableFuture> sendDefault(K key, V data);
ListenableFuture> sendDefault(Integer partition, K key, V data);
ListenableFuture> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture> send(String topic, V data);
ListenableFuture> send(String topic, K key, V data);
ListenableFuture> send(String topic, Integer partition, K key, V data);
ListenableFuture> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture> send(ProducerRecord record);
ListenableFuture> send(Message> message);

使用KafkaTemplate.send 会有很多不同的发送参数,这里说明下:

  • topic : 填写要发送的topic名称
  • partition : 要发送的分区id,从0开始
  • timestamp:时间戳
  • key:消息的key
  • data:消息数据
  • ProducerRecord:消息的封装类,包含了上面的参数
  • Message> :Spring自带的Message封装类,包含消息和消息头

5、发送回调

Spring Boot KafkaAutoConfiguration 为我们提供了处理消息回调的handler,以供我们来处理结果。成功调用onSuccess,失败调用onError,增加如下类:

@Component
public class KafkaSendResultHandler implements ProducerListener {

    private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);

    @Override
    public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
        log.info("Message send success : " + producerRecord.toString());
    }

    @Override
    public void onError(ProducerRecord producerRecord, Exception exception) {
        log.info("Message send error : " + producerRecord.toString());
    }
}

6、实现原理

从上面来看,我们基本两三行代码就完成了kafka消息的发送,那他们到底是怎么加载实现的呢。

熟悉Spring Boot 的小伙伴想必也能猜到,基于其扩展SPI的机制,spring-boot-autoconfigure包下一定会有一个KafkaAutoConfiguration配置类。

1、KafkaAutoConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })public class KafkaAutoConfiguration {
 
  
}

通过KafkaAutoConfiguration我们可以看出几件事情

1、@ConditionalOnClass(KafkaTemplate.class) 表示我们必须依赖了spring-kafka 包才会加载KafkaAutoConfiguration

2、此类配置了KafkaProperties属性供我们使用

3、这里在加载时帮我们引入了两个类KafkaAnnotationDrivenConfiguration 提供消费注解支持 和KafkaStreamsAnnotationDrivenConfiguration 提供stream 注解支持

2、KafkaTemplate

这里就不大段贴代码了,可以去看完整的KafkaAutoConfiguration

  @Bean
 @ConditionalOnMissingBean(KafkaTemplate.class)public KafkaTemplate, ?> kafkaTemplate(ProducerFactory kafkaProducerFactory,ProducerListener kafkaProducerListener,ObjectProvider messageConverter) {
  KafkaTemplate kafkaTemplate &#61; new KafkaTemplate<>(kafkaProducerFactory);
  messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
  kafkaTemplate.setProducerListener(kafkaProducerListener);
  kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;
 }&#64;Bean&#64;ConditionalOnMissingBean(ProducerListener.class)public ProducerListener kafkaProducerListener() {return new LoggingProducerListener<>();
 }&#64;Bean&#64;ConditionalOnMissingBean(ProducerFactory.class)public ProducerFactory, ?> kafkaProducerFactory() {return factory;
 }

这里摘取了KafkaTemplate 主要相关的方法&#xff0c;关键的几个点如下&#xff1a;

  • 三个方法使用了&#64;ConditionalOnMissingBean 注解&#xff0c;根本原因就是为了方便我们进行扩展而存在的
  • kafkaProducerListener 方法是为了在调用doSend 方法是构建Callback 使用的&#xff0c;方便我们来监控发送成功或失败的信息(KafkaTemplate 的305行buildCallback)
  • ProducerFactory 是真正用于创建producer的&#xff0c;如果配置了 transactionIdPrefix 那就代表开启了producer 对于事物的支持。如果开启了事物那就会先从本地ThreadLocal 中获取producer&#xff0c;拿不到才去创建。(KafkaTemplate 的 341 行 getTheProducer)
  • 这里如果配置了spring.kafka.producer.transaction-id-prefix还会创建一个KafkaTransactionManager事务管理器

加载流程&#xff1a;

1、因为我们加入spring-kafka jar&#xff0c;所以在启动的时候会通过SPI 机制加载到 KafkaAutoConfiguration

2、这时配置类通过&#64;ConditionalOnMissingBean 发现我们没有独立配置 KafkaTemplate时&#xff0c;会依次加载默认的ProducerListenerProducerFactory来构建KafkaTemplate

发送流程&#xff1a;

1、调用KafkaTemplate.send(String topic, &#64;Nullable V data)方法

2、调用KafkaTemplate 内部 doSend(ProducerRecord producerRecord)方法

3、调用KafkaTemplate 内部 getTheProducer() 方法&#xff0c;如果是事物发送就从Threadlocal 获取&#xff0c;否则创建一个Producer

4、构造SettableListenableFuture回调

5、调用最终的发送方法KafkaProducer 内的 doSend(ProducerRecord record, Callback callback)

7、异步和同步发送

通过KafkaTemplate 的源码我们可以发现&#xff0c;其实发送消息都是采用异步发送的。

KafkaTemplate会把我们传入的参数封装成ProducerRecord&#xff0c;然后调用doSend方法&#xff0c;源码如下&#xff1a;

public ListenableFuture> send(String topic, K key, &#64;Nullable V data) {
        ProducerRecord producerRecord &#61; new ProducerRecord(topic, key, data);return this.doSend(producerRecord);
    }

doSend(producerRecord)方法先检查了下是否开启事物&#xff0c;调用this.getTheProducer()获取到 producer。

后面主要是构造了一个SettableListenableFuture 回调&#xff0c;最后在使用KafkaProducer.send(ProducerRecord record, Callback callback) 进行数据发送&#xff0c;返回一个Future

protected ListenableFuture> doSend(ProducerRecord producerRecord) {if (this.transactional) {
            Assert.state(this.inTransaction(), "No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with &#64;Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
        }
        Producer producer &#61; this.getTheProducer();this.logger.trace(() -> {return "Sending: " &#43; producerRecord;
        });
        SettableListenableFuture> future &#61; new SettableListenableFuture();
        producer.send(producerRecord, this.buildCallback(producerRecord, producer, future));if (this.autoFlush) {this.flush();
        }this.logger.trace(() -> {return "Sent: " &#43; producerRecord;
        });return future;
    }

同步发送消息

因为在某些业务场景下我需要同步发送消息&#xff0c;实现其实也很简单。因为返回了一个Future 所以我们只需要调用get方法就行了

public void syncSend() throws ExecutionException, InterruptedException {
        kafkaTemplate.send("demo", "test sync message").get();
    }

往期推荐

Spring Boot 系列&#xff1a;日志动态配置详解

Spring Boot 系列&#xff1a;最新版优雅停机详解

(最新 9000字)  Spring Boot 配置特性解析

Spring Boot 知识清单(一)SpringApplication

何时用多线程&#xff1f;多线程需要加锁吗&#xff1f;线程数多少最合理&#xff1f;

3c6e838bd59381458eb8c9741fd73a47.png




推荐阅读
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • 欢乐的票圈重构之旅——RecyclerView的头尾布局增加
    项目重构的Git地址:https:github.comrazerdpFriendCircletreemain-dev项目同步更新的文集:http:www.jianshu.comno ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • Week04面向对象设计与继承学习总结及作业要求
    本文总结了Week04面向对象设计与继承的重要知识点,包括对象、类、封装性、静态属性、静态方法、重载、继承和多态等。同时,还介绍了私有构造函数在类外部无法被调用、static不能访问非静态属性以及该类实例可以共享类里的static属性等内容。此外,还提到了作业要求,包括讲述一个在网上商城购物或在班级博客进行学习的故事,并使用Markdown的加粗标记和语句块标记标注关键名词和动词。最后,还提到了参考资料中关于UML类图如何绘制的范例。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 本文介绍了MVP架构模式及其在国庆技术博客中的应用。MVP架构模式是一种演变自MVC架构的新模式,其中View和Model之间的通信通过Presenter进行。相比MVC架构,MVP架构将交互逻辑放在Presenter内部,而View直接从Model中读取数据而不是通过Controller。本文还探讨了MVP架构在国庆技术博客中的具体应用。 ... [详细]
  • 本文介绍了C++中省略号类型和参数个数不确定函数参数的使用方法,并提供了一个范例。通过宏定义的方式,可以方便地处理不定参数的情况。文章中给出了具体的代码实现,并对代码进行了解释和说明。这对于需要处理不定参数的情况的程序员来说,是一个很有用的参考资料。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
  • 闭包一直是Java社区中争论不断的话题,很多语言都支持闭包这个语言特性,闭包定义了一个依赖于外部环境的自由变量的函数,这个函数能够访问外部环境的变量。本文以JavaScript的一个闭包为例,介绍了闭包的定义和特性。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 本文介绍了响应式页面的概念和实现方式,包括针对不同终端制作特定页面和制作一个页面适应不同终端的显示。分析了两种实现方式的优缺点,提出了选择方案的建议。同时,对于响应式页面的需求和背景进行了讨论,解释了为什么需要响应式页面。 ... [详细]
author-avatar
手机用户2502858383_827
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有