热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Springcloud Stream消息驱动工具使用介绍_java

SpringCloud Stream由一个中间件中立的核组成,应用通过SpringCloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和

springcloud Stream

什么是springcloud Stream

  现在市面上有很多的消息中间件,每一个公司使用的都有所不同,为了减少学习的成本,springcloud Stream可以让我们不再关注消息中间件MQ的具体细节,我们只需要通过适配绑定的方式即可实现不同MQ之间的切换,但是遗憾的是springcloud Stream目前只支持RabbitMQ和Kafka。

  SpringCloud Stream是一个构建消息驱动微服务的框架,应用程序通过inputs或者 outputs来与SpringCloud Stream中的binder进行交互,我们可以通过配置来binding ,而 SpringCloud Stream 的binder负责与中间件交互,所以我们只需要搞清楚如何与Stream交互就可以很方便的使用消息驱动了!

什么是Binder

  Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间的粘合剂,通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离,可以动态的改变消息的destinations(对应于 Kafka的topic,RabbitMQ的exchanges),这些都可以通过外部配置项来做到,甚至可以任意的改变中间件的类型但是不需要修改一行代码

为什么使用Stream

  比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同像RabbitMQ有exchange,kafka有Topic和Partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移;这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这衬候springcloud Stream给我们提供了一种解耦合的方式。

Stream使用案例

前置知识

Stream处理消息的架构

  Source、Sink: 简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。Channel: 通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介。Binder: 消息的生产者和消费者中间层,实现了应用程序与消息中间件细节之间的隔离

  通过以上两张图片可知,消息的处理流向是:消息生产者处理完业务逻辑之后消息到达source中,接着前往Channel通道进行排队,然后通过binder绑定器将消息数据发送到底层mq,然后又通过binder绑定器接收到底层mq发送来的消息数据,接着前往Channel通道进行排队,由Sink接收到消息数据,消息消费者拿到消息数据执行相应的业务逻辑

Stream常用注解

消息生产者8801模块搭建

  第一步: 创建一个maven模块,引入相关依赖,最主要的就是stream整合rabbitmq的依赖



org.springframework.cloud
spring-cloud-starter-stream-rabbit

  第二步: 配置文件的编写

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

  第三步: 主程序类

@SpringBootApplication
public class CloudStreamRabbitmqProvider8801Application {
public static void main(String[] args) {
SpringApplication.run(CloudStreamRabbitmqProvider8801Application.class, args);
System.out.println("启动成功");
}
}

  第四步: 业务层service代码编写,注意:这里实现类注入的对象由之前的dao层对象换成了channel通道对象,详细的发送由实现类的第12完成

public interface IMessageProviderService {
/**
* 定义消息的推送管道
*
* @return
*/
String send();
}


@EnableBinding(Source.class)
public class MessageProviderServiceImpl implements IMessageProviderService {
/**
* 消息发送管道/信道
*/
@Resource
private MessageChannel output;
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial: " + serial);
return serial;
}
}

  第五步: controller接口

@RestController
public class SendMessageController {
@Resource
private IMessageProviderService messageProviderService;
@GetMapping(value = "/sendMessage")
public String sendMessage() {
return messageProviderService.send();
}
}

消息消费者8802模块搭建

  第一步: 创建一个maven模块,引入相关依赖,最主要的就是stream整合rabbitmq的依赖



org.springframework.cloud
spring-cloud-starter-stream-rabbit

  第二步: 配置文件的编写,与生产者的区别就在于bindings下的是input而不是output

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置
eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

  第三步: 主程序类

@SpringBootApplication
public class CloudStreamRabbitmqConsumer8802Application {
public static void main(String[] args) {
SpringApplication.run(CloudStreamRabbitmqConsumer8802Application.class, args);
System.out.println("启动成功");
}
}

  第四步: controller接口,使用url请求生产者8801,即可在消费者8802端接收到8801发送的消息

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message message) {
System.out.println("消费者1号 ----> port:" + serverPort + "\t从8801接受到的消息是:" + message.getPayload());
}
}

  两个模块搭建完成进行测试,首先启动注册中心7001,然后分别启动消息生产者8801和消息消费者8802,通过url请求访问8001的发送消息请求,会向指定管道中发送一条消息,如果此时这个管道中有消费者即可接收到这条消息。而如何指定消息的管道归属呢,就是通过配置文件中的indings.input.destination来指定,命名相同的服务就会处在同一条管道中

Stream带来的问题

重复消费问题

  按照之前的使用,会带来重复消费问题: 也就是说一个通道上有不止一个消息消费者,stream上默认每一个消费者都属于不同的组,这样的话就会导致这个消息被多个组的消费者重复消费

  知道了问题出现的原因就很容易解决了,只要我们自定义配置分组,将这些消费者都分配到同一个组中就能避免重复消费的问题出现了(同一个组间的消费者是竞争关系,不管组间有多少的消费者都只会消费一次)

自定义分组

  只需要在配置文件修改一处配置即可实现自定义组名并且自定义分组,组名相同的服务会被分配到同一组,通道内的消息数据会被该组中的所有消费者轮询消费

持久化问题

  上面自定义分组使用的group配置除了可以自定义分组和分组名之外,还可以实现消息的持久化,也就是说使用group配置自定义分组和分组名的消息消费者,就算在消息生产者发送消息的时候挂掉了,等这个消费者重启之后依然是能够消费之前发送的消息

这里一个生产者和两个消费者存在以下十三种情况(生产者发送四次消息):

1、都使用group分组的两个不同组成员,在生产者生产的时候


  • 都没挂(各消费四次)

  • 挂了其中一个(各消费四次)

  • 都挂了(各消费四次)

2、都使用group分组的两个同组成员,在生产者生产的时候


  • 都没挂(各消费两次)

  • 挂了其中一个(没挂的把四次消费完)

  • 都挂了(各消费两次)

3、其中一个使用group分组的两个成员,在生产者生产的时候


  • 都挂了(都不消费)

  • group的挂了(各消费四次)

  • 没group的挂了(没挂的消费四次,挂的由于没有持久化所以不消费)

  • 都没挂(各消费四次)

4、都不使用group分组的两个成员,在生产者生产的时候


  • 都挂了(都不消费)

  • 挂了其中一个(没挂的消费四次,挂的由于没有持久化所以不消费)

  • 都没挂(各消费四次)

  总之一句话,通道里的消息会持久化给使用group配置的消息消费者(每一组都有一份),就算发送消息的时候这些消费者挂了,如果同组的消费者有没挂的就会把这些消息竞争消费完;如果同组没有消费者,等他重启之后还是会消费这些消息


推荐阅读
  • 本文深入探讨了Spring Cloud Eureka在企业级应用中的高级使用场景及优化策略。首先,介绍了Eureka的安全配置,确保服务注册与发现过程的安全性。接着,分析了Eureka的健康检查机制,提高系统的稳定性和可靠性。随后,详细讨论了Eureka的各项参数调优技巧,以提升性能和响应速度。最后,阐述了如何实现Eureka的高可用性部署,保障服务的连续性和可用性。通过这些内容,开发者可以更好地理解和运用Eureka,提升微服务架构的整体效能。 ... [详细]
  • 深入解析Tomcat:开发者的实用指南
    深入解析Tomcat:开发者的实用指南 ... [详细]
  • 本文详细介绍了如何在Linux系统中搭建51单片机的开发与编程环境,重点讲解了使用Makefile进行项目管理的方法。首先,文章指导读者安装SDCC(Small Device C Compiler),这是一个专为小型设备设计的C语言编译器,适合用于51单片机的开发。随后,通过具体的实例演示了如何配置Makefile文件,以实现代码的自动化编译与链接过程,从而提高开发效率。此外,还提供了常见问题的解决方案及优化建议,帮助开发者快速上手并解决实际开发中可能遇到的技术难题。 ... [详细]
  • Go语言实现Redis客户端与服务器的交互机制深入解析
    在前文对Godis v1.0版本的基础功能进行了详细介绍后,本文将重点探讨如何实现客户端与服务器之间的交互机制。通过具体代码实现,使客户端与服务器能够顺利通信,赋予项目实际运行的能力。本文将详细解析Go语言在实现这一过程中的关键技术和实现细节,帮助读者深入了解Redis客户端与服务器的交互原理。 ... [详细]
  • HTTP请求与响应机制:基础概览
    在Web浏览过程中,HTTP协议通过请求和响应报文实现客户端与服务器之间的通信。当用户访问一个网页时,浏览器会发送一个HTTP请求报文至服务器,服务器接收到请求后,会生成并返回一个HTTP响应报文。这两种报文均包含三个主要部分:起始行、头部字段和消息体,确保了数据的有效传输和解析。 ... [详细]
  • PHP中元素的计量单位是什么? ... [详细]
  • 本文推荐了六款高效的Java Web应用开发工具,并详细介绍了它们的实用功能。其中,分布式敏捷开发系统架构“zheng”项目,基于Spring、Spring MVC和MyBatis技术栈,提供了完整的分布式敏捷开发解决方案,支持快速构建高性能的企业级应用。此外,该工具还集成了多种中间件和服务,进一步提升了开发效率和系统的可维护性。 ... [详细]
  • 在CentOS上部署和配置FreeSWITCH
    在CentOS系统上部署和配置FreeSWITCH的过程涉及多个步骤。本文详细介绍了从源代码安装FreeSWITCH的方法,包括必要的依赖项安装、编译和配置过程。此外,还提供了常见的配置选项和故障排除技巧,帮助用户顺利完成部署并确保系统的稳定运行。 ... [详细]
  • 在主从复制架构中,Bingo_MySQL 同步工具的应用与优化具有重要意义。为确保高效同步,建议使用相同或兼容的 MySQL 版本,并确保两台服务器位于同一局域网内,且网络连接畅通无阻。若无法 ping 通,请检查 IP 配置及防火墙设置,以保证网络连通性。此外,合理的配置参数和定期维护也是提升同步性能的关键因素。 ... [详细]
  • 在Hive中合理配置Map和Reduce任务的数量对于优化不同场景下的性能至关重要。本文探讨了如何控制Hive任务中的Map数量,分析了当输入数据超过128MB时是否会自动拆分,以及Map数量是否越多越好的问题。通过实际案例和实验数据,本文提供了具体的配置建议,帮助用户在不同场景下实现最佳性能。 ... [详细]
  • 抖音AI特效风靡网络,真人瞬间变身动漫角色,吴亦凡、PDD和戚薇纷纷沉迷其中
    近期,抖音推出的一款名为“变身漫画”的AI特效在社交媒体上迅速走红,吸引了大量用户尝试。不仅普通网友积极参与,连吴亦凡、PDD和戚薇等明星也纷纷加入,体验将真人瞬间转化为动漫角色的神奇效果。这一特效凭借其高度的趣味性和创新性,迅速成为网络热议的话题。 ... [详细]
  • MySQL性能优化与调参指南【数据库管理】
    本文详细探讨了MySQL数据库的性能优化与参数调整技巧,旨在帮助数据库管理员和开发人员提升系统的运行效率。内容涵盖索引优化、查询优化、配置参数调整等方面,结合实际案例进行深入分析,提供实用的操作建议。此外,还介绍了常见的性能监控工具和方法,助力读者全面掌握MySQL性能优化的核心技能。 ... [详细]
  • 宏基因组学经典文献重现(一):利用ggplot2进行散点图可视化分析
    宏基因组学经典文献重现(一):利用ggplot2进行散点图可视化分析 ... [详细]
  • MySQL:不仅仅是数据库那么简单
    MySQL不仅是一款高效、可靠的数据库管理系统,它还具备丰富的功能和扩展性,支持多种存储引擎,适用于各种应用场景。从简单的网站开发到复杂的企业级应用,MySQL都能提供强大的数据管理和优化能力,满足不同用户的需求。其开源特性也促进了社区的活跃发展,为技术进步提供了持续动力。 ... [详细]
  • 深入解析零拷贝技术(Zerocopy)及其应用优势
    零拷贝技术(Zero-copy)是Netty框架中的一个关键特性,其核心在于减少数据在操作系统内核与用户空间之间的传输次数。通过避免不必要的内存复制操作,零拷贝显著提高了数据传输的效率和性能。本文将深入探讨零拷贝的工作原理及其在实际应用中的优势,包括降低CPU负载、减少内存带宽消耗以及提高系统吞吐量等方面。 ... [详细]
author-avatar
威哥028_438
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有