热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【Hoxton.SR1版本】SpringCloudStream消息驱动

目录一、简介二、搭建消息生产者端三、搭建消息消费者端四、消息重复消费问题五、消息持久化六、总结一、简介在实际项目中,服务与服务之间的通信往往我们会采用消

目录

一、简介

二、搭建消息生产者端

三、搭建消息消费者端

四、消息重复消费问题

五、消息持久化

六、总结




一、简介

在实际项目中,服务与服务之间的通信往往我们会采用消息中间件方式来处理,比如引入RabbitMQ、Kafka等,但这会有一个问题,就是我们的应用程序跟消息中间件耦合在一块了,还有就是如果我们要替换为Kafka,那么变动会比较大,Spring Cloud官网提供了Spring Cloud Stream组件,用来给我们整合消息中间件,Spring Cloud Stream底层屏蔽了消息中间件的差异,降低了切换成本,统一消息的编程模型,这样就可以降低我们系统和消息中间件的耦合度。总结一句话:就是Spring Cloud Stream有利于应用程序与消息中间件的解耦。

 


  • Spring Cloud Stream是什么?

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs和outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。

通过Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅,消费组、分区的三个核心概念。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。

 


  • Spring Cloud Stream官方文档地址

https://spring.io/projects/spring-cloud-stream

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.6.RELEASE/reference/html/

 


  • Spring Cloud Stream官网结构图

可以看到,通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。


  • Spring Cloud Stream编码API和常用注解

  1. Middleware:中间件,目前只支持RabbitMQ和Kafka。
  2. Binder:Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便地连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
  3. @Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序。
  4. @Output:注解标识输出通道,发布的消息将通过该通道离开应用程序。
  5. @StreamListener:监听队列,用于消费者的队列的消息接收。
  6. @EnableBinding:指信道channel和exchange绑定在一起。

  • Stream几个重要概念

  1. Destination Binders:目标绑定器,目标指的是 kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder;
  2. Destination Bindings:外部消息传递系统和应用程序之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建);
  3. Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信;

二、搭建消息生产者端

新建一个module【springcloud-stream-rabbitmq-provider8801】

【a】pom.xml:引入spring-cloud-starter-stream-rabbit依赖


springcloud2020com.wsh.springcloud1.0-SNAPSHOT4.0.0springcloud-stream-rabbitmq-provider8801org.springframework.cloudspring-cloud-starter-stream-rabbitorg.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-actuatororg.springframework.cloudspring-cloud-starter-netflix-eureka-clientorg.springframework.bootspring-boot-starter-testtest

【b】application.yml:相关配置都在配置文件里面做了较详细的说明

server:port: 8801
spring:application:name: springcloud-stream-rabbitmq-providercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息rabbitmq_binder: # binder绑定器名称,用于binding整合type: rabbit # 消息组件类型 如果消息中间件是kafka,则type:kafkaenvironment: # rabbitmq相关环境配置spring:rabbitmq:host: localhost #rabbitmq主机port: 5672 #rabbitmq端口username: guest #rabbitmq用户名password: guest #rabbitmq用户密码bindings: # 服务的整合处理output: # 输出通道,表示消息生产方destination: rabbitmq_stream_exchange # 指定输出的交换器名称content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: rabbitmq_binder # 指定binder的名称,需与上面spring.cloud.stream.binders.xxx中的xxx绑定器名称对应
eureka:client:service-url:defaultZone: http://springcloud-eureka7001.com:7001/eureka/,http://springcloud-eureka7002.com:7002/eureka/ #集群版Eureka注册中心

【c】主启动类

@SpringBootApplication
public class SpringCloudStreamMQServiceApplicaiton8801 {public static void main(String[] args) {SpringApplication.run(SpringCloudStreamMQServiceApplicaiton8801.class, args);}
}

【d】定义消息发送的接口

/*** @Description 消息发送接口* @Date 2020/8/27 21:37* @Author weishihuai* 说明:*/
public interface IMessageProvider {/*** 发送消息*/String sendMessage();
}

消息发送实现类:

package com.wsh.springcloud.service.impl;import com.wsh.springcloud.service.IMessageProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;import javax.annotation.Resource;
import java.util.UUID;/*** @Description 消息发送实现类* @Date 2020/8/27 21:38* @Author weishihuai* 说明: @EnableBinding表示信道channel和exchange绑定在一起.*/
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {private static final Logger logger = LoggerFactory.getLogger(MessageProviderImpl.class);/*** 消息发送管道*/@Resourceprivate MessageChannel output;@Overridepublic String sendMessage() {String uuid = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(uuid).build());logger.info("消息发送者发送消息: {}", uuid);return "消息发送者发送消息: " + uuid;}}

 【e】定义消息发送Controller

package com.wsh.springcloud.controller;import com.wsh.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** @Description 消息发送测试Controller* @Date 2020/8/27 21:39* @Author weishihuai* 说明:*/
@RestController
public class SendMessageController {@Resourceprivate IMessageProvider messageProvider;@GetMapping(value = "/sendMessage")public String sendMessage() {return messageProvider.sendMessage();}}

【f】测试

启动Eureka注册中心以及消息驱动发送方服务,浏览器访问:http://localhost:8801/sendMessage 测试发送消息,然后我们去RabbitMQ界面观察流量情况:

注意:下图中的rabbitmq_stream_exchange就是我们在application.yml中指定的将消息输出到哪个desitination交换机上面。

观察后台日志:

 可见,消息成功发送到MQ中,正在等待消费方进行消费消息,至此,消息发送者端搭建成功,接下来搭建消息消费方服务。


三、搭建消息消费者端

新建module【springcloud-stream-rabbitmq-consumer8802】

【a】pom.xml


springcloud2020com.wsh.springcloud1.0-SNAPSHOT4.0.0springcloud-stream-rabbitmq-consumer8802org.springframework.cloudspring-cloud-starter-stream-rabbitorg.springframework.bootspring-boot-starter-weborg.springframework.cloudspring-cloud-starter-netflix-eureka-clientorg.springframework.bootspring-boot-starter-actuatororg.springframework.bootspring-boot-starter-testtest

【b】applicaiton.yml

server:port: 8802
spring:application:name: springcloud-stream-rabbitmq-consumercloud:stream:binders:rabbitmq_binder: # binder绑定器名称,用于binding整合type: rabbit # 消息组件类型 如果消息中间件是kafka,则type:kafkaenvironment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhost #rabbitmq主机port: 5672 #rabbitmq端口username: guest #rabbitmq用户名password: guest #rabbitmq用户密码bindings: # 服务的整合处理input: # 输入通道,表示消息消费方destination: rabbitmq_stream_exchange # 指定接收的交换器名称,需与消息发送方的destination对应上content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”binder: rabbitmq_binder # 指定binder的名称,需与上面spring.cloud.stream.binders.xxx对应中的xxx对应
eureka:client:service-url:defaultZone: http://springcloud-eureka7001.com:7001/eureka/,http://springcloud-eureka7002.com:7002/eureka/ #集群版Eureka注册中心

【c】主启动类

@SpringBootApplication
public class RabbitMQStreamServiceApplication8802 {public static void main(String[] args) {SpringApplication.run(RabbitMQStreamServiceApplication8802.class, args);}
}

【d】新增接收消息发送方发送消息的方法

package com.wsh.springcloud.controller;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;/*** @version V1.0* @ClassName: com.wsh.springcloud.controller.ReceiveMessageController.java* @Description: 接收消息发送方发送的消息* @author: weishihuai* @date: 2020/8/28 10:55*/
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageController {private static final Logger logger = LoggerFactory.getLogger(ReceiveMessageController.class);@Value("${server.port}")private String serverPort;/*** 接收消息发送方发送的消息** @param message 消息* @StreamListener 通过@StreamListener注解来监听exchange中的消息*/@StreamListener(Sink.INPUT)private void receiveMessage(Message message) {String payload = message.getPayload();logger.info("消息接收方接收消息: {}, 服务端口号:{}", payload, serverPort);}}

 【e】测试

启动消息消费端,浏览器访问两次:http://localhost:8801/sendMessage 模拟发送两条消息到RabbitMQ中,查看消费者端是否成功消费此消息。

下图是消息发送方的日志:

下图是消息消费方的日志:

由此可见,成功实现了消息接收者获取到了发送者发送的消息,同时我们在RabbitMQ的web界面也可以看到相关的信息:


四、消息重复消费问题

为了模拟消息重复消费的问题,这里我们还需要一个消息消费端,所以我们新建一个module【springcloud-stream-rabbitmq-consumer8803】,此子模块跟【springcloud-stream-rabbitmq-consumer8802】除了端口号,其他一模一样,这里不再过多阐述。

启动8803消费者和8802消费者,浏览器访问两次:http://localhost:8801/sendMessage,模拟发送两条消息。

(1)、消息发送端日志

(2)、消息接收端【8802】日志

(3)、消息接收端【8803】日志

由此可见,同一条消息同时被两个消费者处理,这是不对的。

比如在如下场景中,假如订单服务调用支付服务,支付服务我们做集群部署,那如果支付服务重复消费了订单服务发送过来的支付消息,那么就会造成数据错误,我们得避免这种情况。试想一下重复扣用户的款,这肯定不行的。

接下来,我们谈谈怎么利用Stream来处理重复消费的问题。Spring Cloud Stream提供了Group组的概念,我们可以使用Stream中的消息分组来解决。

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。

导致原因:默认分组group是不同的,组流水号不一样,被认为不同组,可以消费。

从RabbitMQ可视化界面中,我们可以看到【8802】和【8803】被分配的默认分组信息:

可以看到,两个消费者的group组名是不一样的,所以导致了重复消费。Spring Cloud Stream提供了自定义分组配置的功能,我们可以将【8802】和【8803】分配相同的组名,具体配置如下:

 在【8802】和【8803】的application.yml配置文件中都加入: group:group1  指定相同的分组名称,如下图所示:

重启【8802】和【8803】,浏览器访问两次:http://localhost:8801/sendMessage  模拟发送两条消息

(1)、消息发送端日志

(2)、消息接收端【8802】日志

(3)、消息接收端【8803】日志

可以看到,同一条消息同时只能被一个消费者处理,成功防止了消息的重复消费问题。同时我们在RabbitMQ的web界面也可以看到相关的信息:


五、消息持久化

除了使用group能防止消息重复消费,其实group还能将消息进行持久化,下面我们来测试一下。

(1)、停掉【8802】和【8803】两个消息消费者服务

(2)、注释掉【8802】服务中的group分组属性,注意【8803】需要保留group分组属性

(3)、浏览器访问两次http://localhost:8801/sendMessage,模拟发送两条消息。

     (a)、消息发送端日志

接着我们重启【8802】和【8803】服务,注意观察后台日志:

     (b)、消息接收方【8802】日志

可见,没有消息消费的日志信息。 

     (c)、消息接收方【8803】日志 

可以看到,保留group属性的【8803】服务实现了对消息的持久化,当重启之后会自动去拉取未消费的消息来进行消费;而【8802】由于未保留group属性,所以并没有重新去拉取最新消息进行消费。 


六、总结

本篇文章总结了如何使用Spring Cloud Stream消息驱动屏蔽消息中渐渐的底层实现,极大地方便我们开发者。同时讲解了如何使用分组来避免消息重复消费的问题以及消息持久化。Spring Cloud Stream实现了消息中间件和应用程序的高度解耦以上相关项目的代码我已经放在Gitee上,有需要的小伙伴可以去拉取进行学习:https://gitee.com/weixiaohuai/springcloud_Hoxton,由于笔者水平有限,如有不对之处,还请小伙伴们指正,相互学习,一起进步。


推荐阅读
  • 深入解析动态代理模式:23种设计模式之三
    在设计模式中,动态代理模式是应用最为广泛的一种代理模式。它允许我们在运行时动态创建代理对象,并在调用方法时进行增强处理。本文将详细介绍动态代理的实现机制及其应用场景。 ... [详细]
  • 深入解析Spring启动过程
    本文详细介绍了Spring框架的启动流程,帮助开发者理解其内部机制。通过具体示例和代码片段,解释了Bean定义、工厂类、读取器以及条件评估等关键概念,使读者能够更全面地掌握Spring的初始化过程。 ... [详细]
  • 深入解析Java虚拟机(JVM)架构与原理
    本文旨在为读者提供对Java虚拟机(JVM)的全面理解,涵盖其主要组成部分、工作原理及其在不同平台上的实现。通过详细探讨JVM的结构和内部机制,帮助开发者更好地掌握Java编程的核心技术。 ... [详细]
  • 本文探讨了如何在 F# Interactive (FSI) 中通过 AddPrinter 和 AddPrintTransformer 方法自定义类型(尤其是集合类型)的输出格式,提供了详细的指南和示例代码。 ... [详细]
  • 在尝试使用C# Windows Forms客户端通过SignalR连接到ASP.NET服务器时,遇到了内部服务器错误(500)。本文将详细探讨问题的原因及解决方案。 ... [详细]
  • 本文详细介绍了流编辑器sed中的G、H、g、h命令,探讨了它们的工作原理及应用场景。通过实例解析和图解分析,帮助读者掌握这些高级命令的使用方法。 ... [详细]
  • 本文深入探讨了面向切面编程(AOP)的概念及其在Spring框架中的应用。通过详细解释AOP的核心术语和实现机制,帮助读者理解如何利用AOP提高代码的可维护性和开发效率。 ... [详细]
  • 本文介绍了如何在 Node.js 中使用 `setDefaultEncoding` 方法为可写流设置默认编码,并提供了详细的语法说明和示例代码。 ... [详细]
  • 本文详细介绍了在不同操作系统中查找和设置网卡的方法,涵盖了Windows系统的具体步骤,并提供了关于网卡位置、无线网络设置及常见问题的解答。 ... [详细]
  • 使用Nginx反向代理实现多域名端口映射
    本文介绍如何通过配置本地hosts文件和Nginx反向代理,实现多个虚拟域名的端口映射,使用户可以通过标准HTTP端口80访问不同后端服务。 ... [详细]
  • 历经三十年的开发,Mathematica 已成为技术计算领域的标杆,为全球的技术创新者、教育工作者、学生及其他用户提供了一个领先的计算平台。最新版本 Mathematica 12.3.1 增加了多项核心语言、数学计算、可视化和图形处理的新功能。 ... [详细]
  • 深入解析Serverless架构模式
    本文将详细介绍Serverless架构模式的核心概念、工作原理及其优势。通过对比传统架构,探讨Serverless如何简化应用开发与运维流程,并介绍当前主流的Serverless平台。 ... [详细]
  • Linux环境下C语言实现定时向文件写入当前时间
    本文介绍如何在Linux系统中使用C语言编程,实现在每秒钟向指定文件中写入当前时间戳。通过此示例,读者可以了解基本的文件操作、时间处理以及循环控制。 ... [详细]
  • 本文详细探讨了如何通过分析单个或多个线程在瓶颈情况下的表现,来了解处理器资源的消耗。无论是单进程还是多进程环境,监控关键指标如线程数量、占用时间及调度优先级等,有助于揭示潜在的性能问题。 ... [详细]
  • SDN网络拓扑发现机制解析
    本文深入探讨了SDN(软件定义网络)中拓扑发现的原理与实现方法,重点介绍了LLDP协议在OpenFlow环境中的应用,并讨论了非OpenFlow设备存在时的链路发现策略。 ... [详细]
author-avatar
回__复卷轴
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有