热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SpringCloud笔记SpringCloudStream消息驱动(十五)

1.消息驱动概述1.SpringCloudStream是什么SpringCloudStream是一个构建消息驱动微服务的框架。应用程序通过Inpust和Outputs与Spri

1.消息驱动概述


1.Spring Cloud Stream是什么

Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过Inpust和Outputs与Spring Cloud中的Binder对象进行交互。首先我们需要配置绑定关系,也就是将具体的消息中间件和Stream进行绑定,后面使用Binder来操作消息中间件。此时,我们只需要了解Stream中的Binder对象相关的操作,即可完成消息中间件的交互。

Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前支持的消息中间件有:RabbitMQ和Kafka。目的在于屏蔽不同消息中间件的差异,统一消息的编程模型。


2.设计思想

对于普通的MQ,生产者和消费者之间依靠消息媒介传输信息,消息通过特定的通道进行传输,此时Spring Boot和消息中间件直接进行交互,因为不同消息中间件的设计不同,在实现细节上会有差异。通过定义Binder作为中间层,实现应用程序和消息中间件实现细节的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,降低了学习成本。

Inputs对应消息生产者,向Binder发送消息,Binder将消息发给消息中间件,Outputs对应消息消费者,从Binder中取消息,Binder去消息中间件中获取。

Stream中的消息通信遵循发布-订阅模式,也就是通过Topic进行传播。


3.Spring Cloud Stream标准流程套路


4.编码API和常用注解


2.案例说明

为了演示Stream,我们需要再创建3个子模块,分别是cloud-stream-rabbitmq-provider8801,cloud-stream-rabbitmq-consumer8802,cloud-stream-rabbitmq-consumer8803,另外,需要把RabbitMQ安装并启动。


3.消息驱动之生产者

创建cloud-stream-rabbitmq-provider8801模块,修改pom.xml,加入spring-cloud-starter-stream-rabbit的坐标。

cloud2020com.atguigu.springcloud1.0-SNAPSHOT4.0.0cloud-stream-rabbitmq-provider8801org.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-actuatororg.springframework.cloudspring-cloud-starter-netflix-eureka-clientorg.springframework.cloudspring-cloud-starter-stream-rabbitorg.springframework.bootspring-boot-devtoolsruntimetrueorg.projectlomboklomboktrueorg.springframework.bootspring-boot-starter-testtest

添加application.yml配置文件。

server:port: 8801
spring:application:name: cloud-stream-providercloud:stream:binders: # 此处配置要绑定的rabbitmq的服务信息binderName: # 表示为binder对象定义一个名称,用于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关环境配置spring:rabbitmq:host: 192.168.0.123port: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 一个通道的名称,不能修改destination: exchangeName # 表示RabbitMQ中要使用的Exchange名称,给Exchange起一个名字content-type: application/json # 设置消息类型,本次为json,文本要设置为“text/plain”binder: binderName # 表示binding对应哪个binder
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S)lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点(默认是90s)instance-id: provider-8801.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址

添加主启动类。

package com.atguigu.springcloud;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class StreamMQMain8801 {public static void main(String[] args) {SpringApplication.run(StreamMQMain8801.class, args);}
}

添加业务类,这是一个provider模块,所以需要提供send()方法用于发送消息,添加IMessageProvider和MessageProviderImpl。

package com.atguigu.springcloud.service;public interface IMessageProvider {String send();
}

package com.atguigu.springcloud.service.impl;import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;import javax.annotation.Resource;
import java.util.UUID;// 这里使用@EnableBinding(Source.class),这是Stream里的注解,定义一个消息推送管道,不需要使用@Service注解
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {@Resourceprivate MessageChannel output;// 定义消息发送管道,名称必须是output,否则会报错@Overridepublic String send() {String uuid = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(uuid).build());System.out.println("uuid=" + uuid);return null;}
}

编写Controller用于接收请求。

package com.atguigu.springcloud.controller;import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController
public class SendMessageController {@Resourceprivate IMessageProvider iMessageProvider;@GetMapping("/sendMessage")public String sendMessage() {return iMessageProvider.send();}
}

启动Eureka7001,启动RabbitMQ服务,启动Provider8801模块测试。查看RabbitMQ管理后台,在Exchange标签下,可以看到我们定义的exchangeName。浏览器访问http://localhost:8801/sendMessage发送消息,不断刷新页面发送请求,在控制台可以看到消息输出,在RabbitMQ管理后台的Overview可以看到消息发送的速率等信息。


4.消息驱动之消费者

新建cloud-stream-rabbitmq-consumer8802模块,修改pom.xml,依赖和cloud-stream-rabbitmq-provider8801一样。添加application.yml配置文件,注意这里通道名称改成input,之前cloud-stream-rabbitmq-provider8801使用的是output。

server:port: 8802
spring:application:name: cloud-stream-consumercloud:stream:binders: # 此处配置要绑定的rabbitmq的服务信息binderName: # 表示为binder对象定义一个名称,用于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关环境配置spring:rabbitmq:host: 192.168.0.123port: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 一个通道的名称,不能修改destination: exchangeName # 表示RabbitMQ中要使用的Exchange名称,给Exchange起一个名字content-type: application/json # 设置消息类型,本次为json,文本要设置为“text/plain”binder: binderName # 表示binding对应哪个binder
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S)lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点(默认是90s)instance-id: provider-8802.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址

添加主启动类。

package com.atguigu.springcloud;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class StreamMQMain8802 {public static void main(String[] args) {SpringApplication.run(StreamMQMain8802.class, args);}
}

添加业务类。

package com.atguigu.springcloud.service;import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;// 这里使用@EnableBinding(Sink.class),这是Stream里的注解,定义一个消息接收管道
@EnableBinding(Sink.class)
public class ReceiveMessageService {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message message) {System.out.println("ServerPort=" + serverPort + "消费者,消费消息:" + message.getPayload());}
}

启动cloud-stream-rabbitmq-consumer8802模块,浏览器请求http://localhost:8801/sendMessage发送消息,在8802的控制台,可以看到消息接收的情况,测试通过。


5.分组消费与持久化

参照cloud-stream-rabbitmq-consumer8802模块,创建cloud-stream-rabbitmq-consumer8803模块并启动。浏览器请求http://localhost:8801/sendMessage,结果8802和8803都消费了消息,存在重复消费的问题。

在Stream中,处于同一个group的多个消费者是竞争关系,一条消息只能消费一次,处于不同group的消费者是可以重复消费的。通过查看RabbitMQ的管理后台,点击exchange标签,找到我们指定的exchange,发现Bindings里面有两个组,所以出现了重复消费问题。所以,解决重复消费的办法也很简单,将不需要重复消费的消息指定分组即可。

修改两个consumer的application.yml,在inputs结点下,加一个group属性,值为myGroup。再次发送消息,查看8802和8803的控制台打印信息,可知同一个组的消费者每次只有一个可以消费消息,而且默认采用轮询的方式。

添加了group属性的消费者,在重启后,会自动获取未消费的消息,而没有添加group属性的消费者,在重启后,即使有未消费的消息,也无法消费了。比如,停掉8802和8803消费者,去掉8802的group属性,保留8803的group属性,生产者发送几条消息,重启8802模块,它并不能处理未消费的消息,重启8803模块,因为它指定了group,它可以拿到未消费的消息,继续消费,实现了消息的持久化。指定过group的消费者重启时候,在RabbitMQ的Exchange中,会继续保留group的名称,用于下次重启消费者时候,将消息再次发送给消费者,未指定group名称的消费者,每次启动,都会随机生成一个group放在Exchange的Bindings里面,当消费者重启,会把这个随机值从Bindings里移除,再加入一个新的随机值,所以说,未指定group的消费者,无法获取到之前未消费的消息,也就没法继续消费了。

 


推荐阅读
  • Spring Boot + RabbitMQ 消息确认机制详解
    本文详细介绍如何在 Spring Boot 项目中使用 RabbitMQ 的消息确认机制,包括消息发送确认和消息接收确认,帮助开发者解决在实际操作中可能遇到的问题。 ... [详细]
  • 可参照github代码:https:github.comrabbitmqrabbitmq-tutorialsblobmasterjavaEmitLogTopic.ja ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 问题场景用Java进行web开发过程当中,当遇到很多很多个字段的实体时,最苦恼的莫过于编辑字段的查看和修改界面,发现2个页面存在很多重复信息,能不能写一遍?有没有轮子用都不如自己造。解决方式笔者根据自 ... [详细]
  • 从理想主义者的内心深处萌发的技术信仰,推动了云原生技术在全球范围内的快速发展。本文将带你深入了解阿里巴巴在开源领域的贡献与成就。 ... [详细]
  • Web动态服务器Python基本实现
    Web动态服务器Python基本实现 ... [详细]
  • 本文档介绍了如何使用ESP32开发板在STA模式下实现与TCP服务器的通信,包括环境搭建、代码解析及实验步骤。 ... [详细]
  • 在Effective Java第三版中,建议在方法返回类型中优先考虑使用Collection而非Stream,以提高代码的灵活性和兼容性。 ... [详细]
  • oracle 对硬件环境要求,Oracle 10G数据库软硬件环境的要求 ... [详细]
  • 本文总结了近年来在实际项目中使用消息中间件的经验和常见问题,旨在为Java初学者和中级开发者提供实用的参考。文章详细介绍了消息中间件在分布式系统中的作用,以及如何通过消息中间件实现高可用性和可扩展性。 ... [详细]
  • 在RabbitMQ中,消息发布者默认情况下不会接收到关于消息在Broker中状态的反馈,这可能导致消息丢失的问题。为了确保消息的可靠传输与投递,可以采用确认机制(如发布确认和事务模式)来验证消息是否成功抵达Broker,并采取相应的重试策略以提高系统的可靠性。此外,还可以配置消息持久化和镜像队列等高级功能,进一步增强消息的可靠性和高可用性。 ... [详细]
  • 本文探讨了如何通过Service Locator模式来简化和优化在B/S架构中的服务命名访问,特别是对于需要频繁访问的服务,如JNDI和XMLNS。该模式通过缓存机制减少了重复查找的成本,并提供了对多种服务的统一访问接口。 ... [详细]
  • C# 中创建和执行存储过程的方法
    本文详细介绍了如何使用 C# 创建和调用 SQL Server 存储过程,包括连接数据库、定义命令类型、设置参数等步骤。 ... [详细]
  • RTThread线程间通信
    线程中通信在裸机编程中,经常会使用全局变量进行功能间的通信,如某些功能可能由于一些操作而改变全局变量的值,另一个功能对此全局变量进行读取& ... [详细]
  • 本文深入解析了通过JDBC实现ActiveMQ消息持久化的机制。JDBC能够将消息可靠地存储在多种关系型数据库中,如MySQL、SQL Server、Oracle和DB2等。采用JDBC持久化方式时,数据库会自动生成三个关键表:`activemq_msgs`、`activemq_lock`和`activemq_ACKS`,分别用于存储消息数据、锁定信息和确认状态。这种机制不仅提高了消息的可靠性,还增强了系统的可扩展性和容错能力。 ... [详细]
author-avatar
波Z-
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有