作者:尹子 | 来源:互联网 | 2023-08-31 15:37
零、前言这又是学习尚硅谷springcloud中遇到的一个坑因为之前有一个所以这里索引从2开始2、springcloudstream3.1StreamListener官方不建议
零、前言 这又是学习尚硅谷 spring cloud中遇到的一个坑 因为之前有一个 所以这里索引从2开始
2、spring cloud stream 3.1 @StreamListener 官方不建议使用了 在使用stream 整合rabbitMQ的时候 突然发现新版本不建议使用@Binding(Source.class)
、@StreamListener(Sink.class)
查看了 [官方文档][https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#spring_cloud_function],和一些网站 摸索了一会发现了这个新的写法,使用java.util.function.[Supplier/Function/Consumer]
的方式
下面通过代码来细说:有些需要注意的地方在代码下方单独提出来了
这里的前提需要大家先把rabbitMQ安装好 这里就不赘述怎么安装的了
2.1、首先是消息提供者8801 pom.xml
com.sxt.sc2021 org.example 1.0-SNAPSHOT 4.0.0 cloud-stram-rabbitmq-consumer8802 org.springframework.cloud spring-cloud-starter-stream-rabbit org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-actuator org.springframework.cloud spring-cloud-starter-netflix-eureka-client org.springframework.boot spring-boot-devtools runtime true org.projectlombok lombok 1.18.18 provided true org.springframework.boot spring-boot-starter-test test org.example cloud-api-commons ${project.version} compile
yaml
server:port: 8801 spring:application:name: cloud-stream-privider #端口和微服务名称 老两样#中间cloud部分和8802一样cloud:stream:binders: #自此处配置要绑定的rabbitmq的服务信息defaultRabbit: #表示定义的名称,用于binding整合type: rabbit #消息组件类型environment: # 设置rabbitmq的相关的环境配置 stream3.1只需要配置这些即可spring:rabbitmq:host: localhostport: 5672 #mq调用的端口为5672username: guestpassword: guesteureka:client:service-url:defaultZone: http://localhost:7001/eureka/instance:lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔instance-id: send-8801.comprefer-ip-address: true #访问的路径变为IP地址
service
package com.peach.springcloud.service;import java.util.function.Supplier;public interface IMessage {void sendMethod(); }
serviceImpl
package com.peach.springcloud.impl;import com.peach.springcloud.service.IMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.stereotype.Component;import java.util.UUID;&#64;Component public class MessageProviderImpl implements IMessage {//这里直接装配一个桥 用来连接rabbit或者kafka&#64;AutowiredStreamBridge streamBridge;&#64;Overridepublic void sendMethod() {String message &#61; UUID.randomUUID().toString();//这里说明一下这个 streamBridge.send 方法的参数 第一个参数是exchange或者topic 就是主题名称//默认的主题名称是通过//输入: <方法名> &#43; -in- &#43; //输出: <方法名> &#43; -out- &#43; //这里我们接收的时候就要用send方法 参数是consumer接收 详情看8802的controller//consumer的参数类型是这里message的类型streamBridge.send("send-in-0", message);System.out.println("************发送了message&#xff1a;"&#43;message);} }
controller
package com.peach.springcloud.controller;import com.peach.springcloud.impl.MessageProviderImpl; import com.peach.springcloud.service.IMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;&#64;RestController public class SendMessageController {&#64;Resourceprivate MessageProviderImpl messageProvider;&#64;RequestMapping("/send")public void send(){messageProvider.sendMethod();} }
2.2 然后是8802 消息消费者 pom 和上面一样
yaml 除了端口和eureka实例其他的和8801一样
server:port: 8802spring:application:name: cloud-stream-privider#中间cloud部分和8802一样cloud:stream:binders: #自此处配置要绑定的rabbitmq的服务信息defaultRabbit: #表示定义的名称&#xff0c;用于binding整合type: rabbit #消息组件类型environment: # 设置rabbitmq的相关的环境配置 stream3.1只需要配置这些即可spring:rabbitmq:host: localhostport: 5672 #mq调用的端口为5672username: guestpassword: guesteureka:client:service-url:defaultZone: http://localhost:7001/eureka/instance:lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔instance-id: send-8802.comprefer-ip-address: true #访问的路径变为IP地址
controller
package com.peach.springcloud.controller;import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component;import java.util.function.Consumer;&#64;Component public class ReceiveMessageController {&#64;Value("${server.port}")private String serverPort;&#64;Bean//这里接收rabbitmq的条件是参数为Consumer 并且 方法名和supplier方法名相同//这里的返回值是一个匿名函数 返回类型是consumer 类型和提供者的类型一致//supplier发送的exchange是 send-in-0 这里只需要用send方法名即可Consumer send() {return str -> {System.out.println("我是消费者"&#43;serverPort&#43;"&#xff0c;我收到了消息&#xff1a;"&#43;str);};}}
先启动注册中心和8801 8802 之后访问我们的15672 也就是rabbitMQ的管理页面看看有没有这个exchange
展示效果&#xff1a;
看来是没问题的 名字和我们想的一样
绑定信息也有了
然后我们访问supplier的send方法发几个消息先&#xff0c;之后看8801控制台
8802控制台
之后看管理页面
也可以看到峰值的变动
2.3总结一下 1、在stream3.1中 我们不需要像以前一样用&#64;Binding &#64;StreamListener来监听了 这样少写了很多代码和配置 我们可以使用StreamBrige来进行发送
2、StreamBrige.send() 方法的参数拼写规则&#xff1a;
可以直接看官方文档[命名规则][https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_binding_and_binding_names]
//输入: <方法名> &#43; -in- &#43; //输出: <方法名> &#43; -out- &#43;
3、接收的时候直接用前面的方法名即可
4、看官方文档还是有些用的&#xff0c;虽然有时候看不懂但是是成为大佬的必经之路