作者:波Z- | 来源:互联网 | 2024-09-27 14:07
1.消息驱动概述1.SpringCloudStream是什么SpringCloudStream是一个构建消息驱动微服务的框架。应用程序通过Inpust和Outputs与Spri
1.消息驱动概述
1.Spring Cloud Stream是什么
Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过Inpust和Outputs与Spring Cloud中的Binder对象进行交互。首先我们需要配置绑定关系,也就是将具体的消息中间件和Stream进行绑定,后面使用Binder来操作消息中间件。此时,我们只需要了解Stream中的Binder对象相关的操作,即可完成消息中间件的交互。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前支持的消息中间件有:RabbitMQ和Kafka。目的在于屏蔽不同消息中间件的差异,统一消息的编程模型。
2.设计思想
对于普通的MQ,生产者和消费者之间依靠消息媒介传输信息,消息通过特定的通道进行传输,此时Spring Boot和消息中间件直接进行交互,因为不同消息中间件的设计不同,在实现细节上会有差异。通过定义Binder作为中间层,实现应用程序和消息中间件实现细节的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,降低了学习成本。
Inputs对应消息生产者,向Binder发送消息,Binder将消息发给消息中间件,Outputs对应消息消费者,从Binder中取消息,Binder去消息中间件中获取。
Stream中的消息通信遵循发布-订阅模式,也就是通过Topic进行传播。
3.Spring Cloud Stream标准流程套路
4.编码API和常用注解
2.案例说明
为了演示Stream,我们需要再创建3个子模块,分别是cloud-stream-rabbitmq-provider8801,cloud-stream-rabbitmq-consumer8802,cloud-stream-rabbitmq-consumer8803,另外,需要把RabbitMQ安装并启动。
3.消息驱动之生产者
创建cloud-stream-rabbitmq-provider8801模块,修改pom.xml,加入spring-cloud-starter-stream-rabbit的坐标。
cloud2020com.atguigu.springcloud1.0-SNAPSHOT4.0.0cloud-stream-rabbitmq-provider8801org.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-actuatororg.springframework.cloudspring-cloud-starter-netflix-eureka-clientorg.springframework.cloudspring-cloud-starter-stream-rabbitorg.springframework.bootspring-boot-devtoolsruntimetrueorg.projectlomboklomboktrueorg.springframework.bootspring-boot-starter-testtest
添加application.yml配置文件。
server:port: 8801
spring:application:name: cloud-stream-providercloud:stream:binders: # 此处配置要绑定的rabbitmq的服务信息binderName: # 表示为binder对象定义一个名称,用于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关环境配置spring:rabbitmq:host: 192.168.0.123port: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 一个通道的名称,不能修改destination: exchangeName # 表示RabbitMQ中要使用的Exchange名称,给Exchange起一个名字content-type: application/json # 设置消息类型,本次为json,文本要设置为“text/plain”binder: binderName # 表示binding对应哪个binder
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S)lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点(默认是90s)instance-id: provider-8801.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址
添加主启动类。
package com.atguigu.springcloud;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class StreamMQMain8801 {public static void main(String[] args) {SpringApplication.run(StreamMQMain8801.class, args);}
}
添加业务类,这是一个provider模块,所以需要提供send()方法用于发送消息,添加IMessageProvider和MessageProviderImpl。
package com.atguigu.springcloud.service;public interface IMessageProvider {String send();
}
package com.atguigu.springcloud.service.impl;import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;import javax.annotation.Resource;
import java.util.UUID;// 这里使用@EnableBinding(Source.class),这是Stream里的注解,定义一个消息推送管道,不需要使用@Service注解
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {@Resourceprivate MessageChannel output;// 定义消息发送管道,名称必须是output,否则会报错@Overridepublic String send() {String uuid = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(uuid).build());System.out.println("uuid=" + uuid);return null;}
}
编写Controller用于接收请求。
package com.atguigu.springcloud.controller;import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController
public class SendMessageController {@Resourceprivate IMessageProvider iMessageProvider;@GetMapping("/sendMessage")public String sendMessage() {return iMessageProvider.send();}
}
启动Eureka7001,启动RabbitMQ服务,启动Provider8801模块测试。查看RabbitMQ管理后台,在Exchange标签下,可以看到我们定义的exchangeName。浏览器访问http://localhost:8801/sendMessage发送消息,不断刷新页面发送请求,在控制台可以看到消息输出,在RabbitMQ管理后台的Overview可以看到消息发送的速率等信息。
4.消息驱动之消费者
新建cloud-stream-rabbitmq-consumer8802模块,修改pom.xml,依赖和cloud-stream-rabbitmq-provider8801一样。添加application.yml配置文件,注意这里通道名称改成input,之前cloud-stream-rabbitmq-provider8801使用的是output。
server:port: 8802
spring:application:name: cloud-stream-consumercloud:stream:binders: # 此处配置要绑定的rabbitmq的服务信息binderName: # 表示为binder对象定义一个名称,用于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关环境配置spring:rabbitmq:host: 192.168.0.123port: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 一个通道的名称,不能修改destination: exchangeName # 表示RabbitMQ中要使用的Exchange名称,给Exchange起一个名字content-type: application/json # 设置消息类型,本次为json,文本要设置为“text/plain”binder: binderName # 表示binding对应哪个binder
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S)lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点(默认是90s)instance-id: provider-8802.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址
添加主启动类。
package com.atguigu.springcloud;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class StreamMQMain8802 {public static void main(String[] args) {SpringApplication.run(StreamMQMain8802.class, args);}
}
添加业务类。
package com.atguigu.springcloud.service;import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;// 这里使用@EnableBinding(Sink.class),这是Stream里的注解,定义一个消息接收管道
@EnableBinding(Sink.class)
public class ReceiveMessageService {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message message) {System.out.println("ServerPort=" + serverPort + "消费者,消费消息:" + message.getPayload());}
}
启动cloud-stream-rabbitmq-consumer8802模块,浏览器请求http://localhost:8801/sendMessage发送消息,在8802的控制台,可以看到消息接收的情况,测试通过。
5.分组消费与持久化
参照cloud-stream-rabbitmq-consumer8802模块,创建cloud-stream-rabbitmq-consumer8803模块并启动。浏览器请求http://localhost:8801/sendMessage,结果8802和8803都消费了消息,存在重复消费的问题。
在Stream中,处于同一个group的多个消费者是竞争关系,一条消息只能消费一次,处于不同group的消费者是可以重复消费的。通过查看RabbitMQ的管理后台,点击exchange标签,找到我们指定的exchange,发现Bindings里面有两个组,所以出现了重复消费问题。所以,解决重复消费的办法也很简单,将不需要重复消费的消息指定分组即可。
修改两个consumer的application.yml,在inputs结点下,加一个group属性,值为myGroup。再次发送消息,查看8802和8803的控制台打印信息,可知同一个组的消费者每次只有一个可以消费消息,而且默认采用轮询的方式。
添加了group属性的消费者,在重启后,会自动获取未消费的消息,而没有添加group属性的消费者,在重启后,即使有未消费的消息,也无法消费了。比如,停掉8802和8803消费者,去掉8802的group属性,保留8803的group属性,生产者发送几条消息,重启8802模块,它并不能处理未消费的消息,重启8803模块,因为它指定了group,它可以拿到未消费的消息,继续消费,实现了消息的持久化。指定过group的消费者重启时候,在RabbitMQ的Exchange中,会继续保留group的名称,用于下次重启消费者时候,将消息再次发送给消费者,未指定group名称的消费者,每次启动,都会随机生成一个group放在Exchange的Bindings里面,当消费者重启,会把这个随机值从Bindings里移除,再加入一个新的随机值,所以说,未指定group的消费者,无法获取到之前未消费的消息,也就没法继续消费了。