热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【Hoxton.SR1版本】SpringCloudStream消息驱动

目录一、简介二、搭建消息生产者端三、搭建消息消费者端四、消息重复消费问题五、消息持久化六、总结一、简介在实际项目中,服务与服务之间的通信往往我们会采用消

目录

一、简介

二、搭建消息生产者端

三、搭建消息消费者端

四、消息重复消费问题

五、消息持久化

六、总结




一、简介

在实际项目中,服务与服务之间的通信往往我们会采用消息中间件方式来处理,比如引入RabbitMQ、Kafka等,但这会有一个问题,就是我们的应用程序跟消息中间件耦合在一块了,还有就是如果我们要替换为Kafka,那么变动会比较大,Spring Cloud官网提供了Spring Cloud Stream组件,用来给我们整合消息中间件,Spring Cloud Stream底层屏蔽了消息中间件的差异,降低了切换成本,统一消息的编程模型,这样就可以降低我们系统和消息中间件的耦合度。总结一句话:就是Spring Cloud Stream有利于应用程序与消息中间件的解耦。

 


  • Spring Cloud Stream是什么?

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs和outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。

通过Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅,消费组、分区的三个核心概念。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。

 


  • Spring Cloud Stream官方文档地址

https://spring.io/projects/spring-cloud-stream

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.6.RELEASE/reference/html/

 


  • Spring Cloud Stream官网结构图

可以看到,通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。


  • Spring Cloud Stream编码API和常用注解

  1. Middleware:中间件,目前只支持RabbitMQ和Kafka。
  2. Binder:Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便地连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
  3. @Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序。
  4. @Output:注解标识输出通道,发布的消息将通过该通道离开应用程序。
  5. @StreamListener:监听队列,用于消费者的队列的消息接收。
  6. @EnableBinding:指信道channel和exchange绑定在一起。

  • Stream几个重要概念

  1. Destination Binders:目标绑定器,目标指的是 kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder;
  2. Destination Bindings:外部消息传递系统和应用程序之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建);
  3. Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信;

二、搭建消息生产者端

新建一个module【springcloud-stream-rabbitmq-provider8801】

【a】pom.xml:引入spring-cloud-starter-stream-rabbit依赖


springcloud2020com.wsh.springcloud1.0-SNAPSHOT4.0.0springcloud-stream-rabbitmq-provider8801org.springframework.cloudspring-cloud-starter-stream-rabbitorg.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-actuatororg.springframework.cloudspring-cloud-starter-netflix-eureka-clientorg.springframework.bootspring-boot-starter-testtest

【b】application.yml:相关配置都在配置文件里面做了较详细的说明

server:port: 8801
spring:application:name: springcloud-stream-rabbitmq-providercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息rabbitmq_binder: # binder绑定器名称,用于binding整合type: rabbit # 消息组件类型 如果消息中间件是kafka,则type:kafkaenvironment: # rabbitmq相关环境配置spring:rabbitmq:host: localhost #rabbitmq主机port: 5672 #rabbitmq端口username: guest #rabbitmq用户名password: guest #rabbitmq用户密码bindings: # 服务的整合处理output: # 输出通道,表示消息生产方destination: rabbitmq_stream_exchange # 指定输出的交换器名称content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: rabbitmq_binder # 指定binder的名称,需与上面spring.cloud.stream.binders.xxx中的xxx绑定器名称对应
eureka:client:service-url:defaultZone: http://springcloud-eureka7001.com:7001/eureka/,http://springcloud-eureka7002.com:7002/eureka/ #集群版Eureka注册中心

【c】主启动类

@SpringBootApplication
public class SpringCloudStreamMQServiceApplicaiton8801 {public static void main(String[] args) {SpringApplication.run(SpringCloudStreamMQServiceApplicaiton8801.class, args);}
}

【d】定义消息发送的接口

/*** @Description 消息发送接口* @Date 2020/8/27 21:37* @Author weishihuai* 说明:*/
public interface IMessageProvider {/*** 发送消息*/String sendMessage();
}

消息发送实现类:

package com.wsh.springcloud.service.impl;import com.wsh.springcloud.service.IMessageProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;import javax.annotation.Resource;
import java.util.UUID;/*** @Description 消息发送实现类* @Date 2020/8/27 21:38* @Author weishihuai* 说明: @EnableBinding表示信道channel和exchange绑定在一起.*/
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {private static final Logger logger = LoggerFactory.getLogger(MessageProviderImpl.class);/*** 消息发送管道*/@Resourceprivate MessageChannel output;@Overridepublic String sendMessage() {String uuid = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(uuid).build());logger.info("消息发送者发送消息: {}", uuid);return "消息发送者发送消息: " + uuid;}}

 【e】定义消息发送Controller

package com.wsh.springcloud.controller;import com.wsh.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** @Description 消息发送测试Controller* @Date 2020/8/27 21:39* @Author weishihuai* 说明:*/
@RestController
public class SendMessageController {@Resourceprivate IMessageProvider messageProvider;@GetMapping(value = "/sendMessage")public String sendMessage() {return messageProvider.sendMessage();}}

【f】测试

启动Eureka注册中心以及消息驱动发送方服务,浏览器访问:http://localhost:8801/sendMessage 测试发送消息,然后我们去RabbitMQ界面观察流量情况:

注意:下图中的rabbitmq_stream_exchange就是我们在application.yml中指定的将消息输出到哪个desitination交换机上面。

观察后台日志:

 可见,消息成功发送到MQ中,正在等待消费方进行消费消息,至此,消息发送者端搭建成功,接下来搭建消息消费方服务。


三、搭建消息消费者端

新建module【springcloud-stream-rabbitmq-consumer8802】

【a】pom.xml


springcloud2020com.wsh.springcloud1.0-SNAPSHOT4.0.0springcloud-stream-rabbitmq-consumer8802org.springframework.cloudspring-cloud-starter-stream-rabbitorg.springframework.bootspring-boot-starter-weborg.springframework.cloudspring-cloud-starter-netflix-eureka-clientorg.springframework.bootspring-boot-starter-actuatororg.springframework.bootspring-boot-starter-testtest

【b】applicaiton.yml

server:port: 8802
spring:application:name: springcloud-stream-rabbitmq-consumercloud:stream:binders:rabbitmq_binder: # binder绑定器名称,用于binding整合type: rabbit # 消息组件类型 如果消息中间件是kafka,则type:kafkaenvironment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhost #rabbitmq主机port: 5672 #rabbitmq端口username: guest #rabbitmq用户名password: guest #rabbitmq用户密码bindings: # 服务的整合处理input: # 输入通道,表示消息消费方destination: rabbitmq_stream_exchange # 指定接收的交换器名称,需与消息发送方的destination对应上content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”binder: rabbitmq_binder # 指定binder的名称,需与上面spring.cloud.stream.binders.xxx对应中的xxx对应
eureka:client:service-url:defaultZone: http://springcloud-eureka7001.com:7001/eureka/,http://springcloud-eureka7002.com:7002/eureka/ #集群版Eureka注册中心

【c】主启动类

@SpringBootApplication
public class RabbitMQStreamServiceApplication8802 {public static void main(String[] args) {SpringApplication.run(RabbitMQStreamServiceApplication8802.class, args);}
}

【d】新增接收消息发送方发送消息的方法

package com.wsh.springcloud.controller;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;/*** @version V1.0* @ClassName: com.wsh.springcloud.controller.ReceiveMessageController.java* @Description: 接收消息发送方发送的消息* @author: weishihuai* @date: 2020/8/28 10:55*/
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageController {private static final Logger logger = LoggerFactory.getLogger(ReceiveMessageController.class);@Value("${server.port}")private String serverPort;/*** 接收消息发送方发送的消息** @param message 消息* @StreamListener 通过@StreamListener注解来监听exchange中的消息*/@StreamListener(Sink.INPUT)private void receiveMessage(Message message) {String payload = message.getPayload();logger.info("消息接收方接收消息: {}, 服务端口号:{}", payload, serverPort);}}

 【e】测试

启动消息消费端,浏览器访问两次:http://localhost:8801/sendMessage 模拟发送两条消息到RabbitMQ中,查看消费者端是否成功消费此消息。

下图是消息发送方的日志:

下图是消息消费方的日志:

由此可见,成功实现了消息接收者获取到了发送者发送的消息,同时我们在RabbitMQ的web界面也可以看到相关的信息:


四、消息重复消费问题

为了模拟消息重复消费的问题,这里我们还需要一个消息消费端,所以我们新建一个module【springcloud-stream-rabbitmq-consumer8803】,此子模块跟【springcloud-stream-rabbitmq-consumer8802】除了端口号,其他一模一样,这里不再过多阐述。

启动8803消费者和8802消费者,浏览器访问两次:http://localhost:8801/sendMessage,模拟发送两条消息。

(1)、消息发送端日志

(2)、消息接收端【8802】日志

(3)、消息接收端【8803】日志

由此可见,同一条消息同时被两个消费者处理,这是不对的。

比如在如下场景中,假如订单服务调用支付服务,支付服务我们做集群部署,那如果支付服务重复消费了订单服务发送过来的支付消息,那么就会造成数据错误,我们得避免这种情况。试想一下重复扣用户的款,这肯定不行的。

接下来,我们谈谈怎么利用Stream来处理重复消费的问题。Spring Cloud Stream提供了Group组的概念,我们可以使用Stream中的消息分组来解决。

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。

导致原因:默认分组group是不同的,组流水号不一样,被认为不同组,可以消费。

从RabbitMQ可视化界面中,我们可以看到【8802】和【8803】被分配的默认分组信息:

可以看到,两个消费者的group组名是不一样的,所以导致了重复消费。Spring Cloud Stream提供了自定义分组配置的功能,我们可以将【8802】和【8803】分配相同的组名,具体配置如下:

 在【8802】和【8803】的application.yml配置文件中都加入: group:group1  指定相同的分组名称,如下图所示:

重启【8802】和【8803】,浏览器访问两次:http://localhost:8801/sendMessage  模拟发送两条消息

(1)、消息发送端日志

(2)、消息接收端【8802】日志

(3)、消息接收端【8803】日志

可以看到,同一条消息同时只能被一个消费者处理,成功防止了消息的重复消费问题。同时我们在RabbitMQ的web界面也可以看到相关的信息:


五、消息持久化

除了使用group能防止消息重复消费,其实group还能将消息进行持久化,下面我们来测试一下。

(1)、停掉【8802】和【8803】两个消息消费者服务

(2)、注释掉【8802】服务中的group分组属性,注意【8803】需要保留group分组属性

(3)、浏览器访问两次http://localhost:8801/sendMessage,模拟发送两条消息。

     (a)、消息发送端日志

接着我们重启【8802】和【8803】服务,注意观察后台日志:

     (b)、消息接收方【8802】日志

可见,没有消息消费的日志信息。 

     (c)、消息接收方【8803】日志 

可以看到,保留group属性的【8803】服务实现了对消息的持久化,当重启之后会自动去拉取未消费的消息来进行消费;而【8802】由于未保留group属性,所以并没有重新去拉取最新消息进行消费。 


六、总结

本篇文章总结了如何使用Spring Cloud Stream消息驱动屏蔽消息中渐渐的底层实现,极大地方便我们开发者。同时讲解了如何使用分组来避免消息重复消费的问题以及消息持久化。Spring Cloud Stream实现了消息中间件和应用程序的高度解耦以上相关项目的代码我已经放在Gitee上,有需要的小伙伴可以去拉取进行学习:https://gitee.com/weixiaohuai/springcloud_Hoxton,由于笔者水平有限,如有不对之处,还请小伙伴们指正,相互学习,一起进步。


推荐阅读
  • 本文深入探讨了 Java 中的 Serializable 接口,解释了其实现机制、用途及注意事项,帮助开发者更好地理解和使用序列化功能。 ... [详细]
  • 2017-2018年度《网络编程与安全》第五次实验报告
    本报告详细记录了2017-2018学年《网络编程与安全》课程第五次实验的具体内容、实验过程、遇到的问题及解决方案。 ... [详细]
  • 近期我们开发了一款包含天气预报功能的万年历应用,为了满足这一需求,团队花费数日时间精心打造并测试了一个稳定可靠的天气API接口,现正式对外开放。 ... [详细]
  • 本文详细解析了Python中的os和sys模块,介绍了它们的功能、常用方法及其在实际编程中的应用。 ... [详细]
  • 从 .NET 转 Java 的自学之路:IO 流基础篇
    本文详细介绍了 Java 中的 IO 流,包括字节流和字符流的基本概念及其操作方式。探讨了如何处理不同类型的文件数据,并结合编码机制确保字符数据的正确读写。同时,文中还涵盖了装饰设计模式的应用,以及多种常见的 IO 操作实例。 ... [详细]
  • 本题探讨如何通过最大流算法解决农场排水系统的设计问题。题目要求计算从水源点到汇合点的最大水流速率,使用经典的EK(Edmonds-Karp)和Dinic算法进行求解。 ... [详细]
  • 本文详细介绍了Java中的输入输出(IO)流,包括其基本概念、分类及应用。IO流是用于在程序和外部资源之间传输数据的一套API。根据数据流动的方向,可以分为输入流(从外部流向程序)和输出流(从程序流向外部)。此外,还涵盖了字节流和字符流的区别及其具体实现。 ... [详细]
  • 不确定性|放入_华为机试题 HJ9提取不重复的整数
    不确定性|放入_华为机试题 HJ9提取不重复的整数 ... [详细]
  • 在软件开发过程中,MD5加密是一种常见的数据保护手段。本文将详细介绍如何在C#中使用两种不同的方式来实现MD5加密:字符串加密和流加密。 ... [详细]
  • Nginx 反向代理与负载均衡实验
    本实验旨在通过配置 Nginx 实现反向代理和负载均衡,确保从北京本地代理服务器访问上海的 Web 服务器时,能够依次显示红、黄、绿三种颜色页面以验证负载均衡效果。 ... [详细]
  • 本文将详细探讨 Java 中提供的不可变集合(如 `Collections.unmodifiableXXX`)和同步集合(如 `Collections.synchronizedXXX`)的实现原理及使用方法,帮助开发者更好地理解和应用这些工具。 ... [详细]
  • 本文详细探讨了Java中的ClassLoader类加载器的工作原理,包括其如何将class文件加载至JVM中,以及JVM启动时的动态加载策略。文章还介绍了JVM内置的三种类加载器及其工作方式,并解释了类加载器的继承关系和双亲委托机制。 ... [详细]
  • 本文探讨了如何在Java中使用JAXB解组两个具有相同名称但不同结构的对象。我们将介绍一个抽象类Bar及其具体实现,并展示如何正确地解析XML文档以获取正确的对象实例。 ... [详细]
  • 尽管使用TensorFlow和PyTorch等成熟框架可以显著降低实现递归神经网络(RNN)的门槛,但对于初学者来说,理解其底层原理至关重要。本文将引导您使用NumPy从头构建一个用于自然语言处理(NLP)的RNN模型。 ... [详细]
  • 深入解析ESFramework中的AgileTcp组件
    本文详细介绍了ESFramework框架中AgileTcp组件的设计与实现。AgileTcp是ESFramework提供的ITcp接口的高效实现,旨在优化TCP通信的性能和结构清晰度。 ... [详细]
author-avatar
回__复卷轴
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有