热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SpringCloud笔记SpringCloudStream消息驱动(十五)

1.消息驱动概述1.SpringCloudStream是什么SpringCloudStream是一个构建消息驱动微服务的框架。应用程序通过Inpust和Outputs与Spri

1.消息驱动概述


1.Spring Cloud Stream是什么

Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过Inpust和Outputs与Spring Cloud中的Binder对象进行交互。首先我们需要配置绑定关系,也就是将具体的消息中间件和Stream进行绑定,后面使用Binder来操作消息中间件。此时,我们只需要了解Stream中的Binder对象相关的操作,即可完成消息中间件的交互。

Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前支持的消息中间件有:RabbitMQ和Kafka。目的在于屏蔽不同消息中间件的差异,统一消息的编程模型。


2.设计思想

对于普通的MQ,生产者和消费者之间依靠消息媒介传输信息,消息通过特定的通道进行传输,此时Spring Boot和消息中间件直接进行交互,因为不同消息中间件的设计不同,在实现细节上会有差异。通过定义Binder作为中间层,实现应用程序和消息中间件实现细节的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,降低了学习成本。

Inputs对应消息生产者,向Binder发送消息,Binder将消息发给消息中间件,Outputs对应消息消费者,从Binder中取消息,Binder去消息中间件中获取。

Stream中的消息通信遵循发布-订阅模式,也就是通过Topic进行传播。


3.Spring Cloud Stream标准流程套路


4.编码API和常用注解


2.案例说明

为了演示Stream,我们需要再创建3个子模块,分别是cloud-stream-rabbitmq-provider8801,cloud-stream-rabbitmq-consumer8802,cloud-stream-rabbitmq-consumer8803,另外,需要把RabbitMQ安装并启动。


3.消息驱动之生产者

创建cloud-stream-rabbitmq-provider8801模块,修改pom.xml,加入spring-cloud-starter-stream-rabbit的坐标。

cloud2020com.atguigu.springcloud1.0-SNAPSHOT4.0.0cloud-stream-rabbitmq-provider8801org.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-actuatororg.springframework.cloudspring-cloud-starter-netflix-eureka-clientorg.springframework.cloudspring-cloud-starter-stream-rabbitorg.springframework.bootspring-boot-devtoolsruntimetrueorg.projectlomboklomboktrueorg.springframework.bootspring-boot-starter-testtest

添加application.yml配置文件。

server:port: 8801
spring:application:name: cloud-stream-providercloud:stream:binders: # 此处配置要绑定的rabbitmq的服务信息binderName: # 表示为binder对象定义一个名称,用于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关环境配置spring:rabbitmq:host: 192.168.0.123port: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 一个通道的名称,不能修改destination: exchangeName # 表示RabbitMQ中要使用的Exchange名称,给Exchange起一个名字content-type: application/json # 设置消息类型,本次为json,文本要设置为“text/plain”binder: binderName # 表示binding对应哪个binder
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S)lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点(默认是90s)instance-id: provider-8801.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址

添加主启动类。

package com.atguigu.springcloud;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class StreamMQMain8801 {public static void main(String[] args) {SpringApplication.run(StreamMQMain8801.class, args);}
}

添加业务类,这是一个provider模块,所以需要提供send()方法用于发送消息,添加IMessageProvider和MessageProviderImpl。

package com.atguigu.springcloud.service;public interface IMessageProvider {String send();
}

package com.atguigu.springcloud.service.impl;import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;import javax.annotation.Resource;
import java.util.UUID;// 这里使用@EnableBinding(Source.class),这是Stream里的注解,定义一个消息推送管道,不需要使用@Service注解
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {@Resourceprivate MessageChannel output;// 定义消息发送管道,名称必须是output,否则会报错@Overridepublic String send() {String uuid = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(uuid).build());System.out.println("uuid=" + uuid);return null;}
}

编写Controller用于接收请求。

package com.atguigu.springcloud.controller;import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController
public class SendMessageController {@Resourceprivate IMessageProvider iMessageProvider;@GetMapping("/sendMessage")public String sendMessage() {return iMessageProvider.send();}
}

启动Eureka7001,启动RabbitMQ服务,启动Provider8801模块测试。查看RabbitMQ管理后台,在Exchange标签下,可以看到我们定义的exchangeName。浏览器访问http://localhost:8801/sendMessage发送消息,不断刷新页面发送请求,在控制台可以看到消息输出,在RabbitMQ管理后台的Overview可以看到消息发送的速率等信息。


4.消息驱动之消费者

新建cloud-stream-rabbitmq-consumer8802模块,修改pom.xml,依赖和cloud-stream-rabbitmq-provider8801一样。添加application.yml配置文件,注意这里通道名称改成input,之前cloud-stream-rabbitmq-provider8801使用的是output。

server:port: 8802
spring:application:name: cloud-stream-consumercloud:stream:binders: # 此处配置要绑定的rabbitmq的服务信息binderName: # 表示为binder对象定义一个名称,用于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关环境配置spring:rabbitmq:host: 192.168.0.123port: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 一个通道的名称,不能修改destination: exchangeName # 表示RabbitMQ中要使用的Exchange名称,给Exchange起一个名字content-type: application/json # 设置消息类型,本次为json,文本要设置为“text/plain”binder: binderName # 表示binding对应哪个binder
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S)lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点(默认是90s)instance-id: provider-8802.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址

添加主启动类。

package com.atguigu.springcloud;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class StreamMQMain8802 {public static void main(String[] args) {SpringApplication.run(StreamMQMain8802.class, args);}
}

添加业务类。

package com.atguigu.springcloud.service;import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;// 这里使用@EnableBinding(Sink.class),这是Stream里的注解,定义一个消息接收管道
@EnableBinding(Sink.class)
public class ReceiveMessageService {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message message) {System.out.println("ServerPort=" + serverPort + "消费者,消费消息:" + message.getPayload());}
}

启动cloud-stream-rabbitmq-consumer8802模块,浏览器请求http://localhost:8801/sendMessage发送消息,在8802的控制台,可以看到消息接收的情况,测试通过。


5.分组消费与持久化

参照cloud-stream-rabbitmq-consumer8802模块,创建cloud-stream-rabbitmq-consumer8803模块并启动。浏览器请求http://localhost:8801/sendMessage,结果8802和8803都消费了消息,存在重复消费的问题。

在Stream中,处于同一个group的多个消费者是竞争关系,一条消息只能消费一次,处于不同group的消费者是可以重复消费的。通过查看RabbitMQ的管理后台,点击exchange标签,找到我们指定的exchange,发现Bindings里面有两个组,所以出现了重复消费问题。所以,解决重复消费的办法也很简单,将不需要重复消费的消息指定分组即可。

修改两个consumer的application.yml,在inputs结点下,加一个group属性,值为myGroup。再次发送消息,查看8802和8803的控制台打印信息,可知同一个组的消费者每次只有一个可以消费消息,而且默认采用轮询的方式。

添加了group属性的消费者,在重启后,会自动获取未消费的消息,而没有添加group属性的消费者,在重启后,即使有未消费的消息,也无法消费了。比如,停掉8802和8803消费者,去掉8802的group属性,保留8803的group属性,生产者发送几条消息,重启8802模块,它并不能处理未消费的消息,重启8803模块,因为它指定了group,它可以拿到未消费的消息,继续消费,实现了消息的持久化。指定过group的消费者重启时候,在RabbitMQ的Exchange中,会继续保留group的名称,用于下次重启消费者时候,将消息再次发送给消费者,未指定group名称的消费者,每次启动,都会随机生成一个group放在Exchange的Bindings里面,当消费者重启,会把这个随机值从Bindings里移除,再加入一个新的随机值,所以说,未指定group的消费者,无法获取到之前未消费的消息,也就没法继续消费了。

 


推荐阅读
  • 深入解析 Apache Shiro 安全框架架构
    本文详细介绍了 Apache Shiro,一个强大且灵活的开源安全框架。Shiro 专注于简化身份验证、授权、会话管理和加密等复杂的安全操作,使开发者能够更轻松地保护应用程序。其核心目标是提供易于使用和理解的API,同时确保高度的安全性和灵活性。 ... [详细]
  • EasyMock实战指南
    本文介绍了如何使用EasyMock进行单元测试,特别是当测试对象的合作者依赖于外部资源或尚未实现时。通过具体的示例,展示了EasyMock在模拟对象行为方面的强大功能。 ... [详细]
  • 本文介绍了如何使用 Spring Boot DevTools 实现应用程序在开发过程中自动重启。这一特性显著提高了开发效率,特别是在集成开发环境(IDE)中工作时,能够提供快速的反馈循环。默认情况下,DevTools 会监控类路径上的文件变化,并根据需要触发应用重启。 ... [详细]
  • 本文探讨了领域驱动设计(DDD)的核心概念、应用场景及其实现方式,详细介绍了其在企业级软件开发中的优势和挑战。通过对比事务脚本与领域模型,展示了DDD如何提升系统的可维护性和扩展性。 ... [详细]
  • 实体映射最强工具类:MapStruct真香 ... [详细]
  • 本文详细探讨了HTML表单中GET和POST请求的区别,包括它们的工作原理、数据传输方式、安全性及适用场景。同时,通过实例展示了如何在Servlet中处理这两种请求。 ... [详细]
  • Struts与Spring框架的集成指南
    本文详细介绍了如何将Struts和Spring两个流行的Java Web开发框架进行整合,涵盖从环境配置到代码实现的具体步骤。 ... [详细]
  • 本文作者分享了在阿里巴巴获得实习offer的经历,包括五轮面试的详细内容和经验总结。其中四轮为技术面试,一轮为HR面试,涵盖了大量的Java技术和项目实践经验。 ... [详细]
  • 本文介绍了如何利用 Spring Boot 和 Groovy 构建一个灵活且可扩展的动态计算引擎,以满足钱包应用中类似余额宝功能的推广需求。我们将探讨不同的设计方案,并最终选择最适合的技术栈来实现这一目标。 ... [详细]
  • Nginx 反向代理与负载均衡实验
    本实验旨在通过配置 Nginx 实现反向代理和负载均衡,确保从北京本地代理服务器访问上海的 Web 服务器时,能够依次显示红、黄、绿三种颜色页面以验证负载均衡效果。 ... [详细]
  • 本文探讨了如何通过一系列技术手段提升Spring Boot项目的并发处理能力,解决生产环境中因慢请求导致的系统性能下降问题。 ... [详细]
  • 字节跳动夏季招聘面试经验分享
    本文详细记录了字节跳动夏季招聘的面试经历,涵盖了一、二、三轮面试的技术问题及项目讨论,旨在为准备类似面试的求职者提供参考。 ... [详细]
  • 深入解析Spring Cloud微服务架构与分布式系统实战
    本文详细介绍了Spring Cloud在微服务架构和分布式系统中的应用,结合实际案例和最新技术,帮助读者全面掌握微服务的实现与优化。 ... [详细]
  • Spring Cloud学习指南:深入理解微服务架构
    本文介绍了微服务架构的基本概念及其在Spring Cloud中的实现。讨论了微服务架构的主要优势,如简化开发和维护、快速启动、灵活的技术栈选择以及按需扩展的能力。同时,也探讨了微服务架构面临的挑战,包括较高的运维要求、分布式系统的复杂性、接口调整的成本等问题。最后,文章提出了实施微服务时应遵循的设计原则。 ... [详细]
  • Spring Cloud Config 使用 Vault 作为配置存储
    本文探讨了如何在Spring Cloud Config中集成HashiCorp Vault作为配置存储解决方案,基于Spring Cloud Hoxton.RELEASE及Spring Boot 2.2.1.RELEASE版本。文章还提供了详细的配置示例和实践建议。 ... [详细]
author-avatar
波Z-
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有