热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SpringCloud笔记SpringCloudStream消息驱动(十五)

1.消息驱动概述1.SpringCloudStream是什么SpringCloudStream是一个构建消息驱动微服务的框架。应用程序通过Inpust和Outputs与Spri

1.消息驱动概述


1.Spring Cloud Stream是什么

Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过Inpust和Outputs与Spring Cloud中的Binder对象进行交互。首先我们需要配置绑定关系,也就是将具体的消息中间件和Stream进行绑定,后面使用Binder来操作消息中间件。此时,我们只需要了解Stream中的Binder对象相关的操作,即可完成消息中间件的交互。

Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前支持的消息中间件有:RabbitMQ和Kafka。目的在于屏蔽不同消息中间件的差异,统一消息的编程模型。


2.设计思想

对于普通的MQ,生产者和消费者之间依靠消息媒介传输信息,消息通过特定的通道进行传输,此时Spring Boot和消息中间件直接进行交互,因为不同消息中间件的设计不同,在实现细节上会有差异。通过定义Binder作为中间层,实现应用程序和消息中间件实现细节的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,降低了学习成本。

Inputs对应消息生产者,向Binder发送消息,Binder将消息发给消息中间件,Outputs对应消息消费者,从Binder中取消息,Binder去消息中间件中获取。

Stream中的消息通信遵循发布-订阅模式,也就是通过Topic进行传播。


3.Spring Cloud Stream标准流程套路


4.编码API和常用注解


2.案例说明

为了演示Stream,我们需要再创建3个子模块,分别是cloud-stream-rabbitmq-provider8801,cloud-stream-rabbitmq-consumer8802,cloud-stream-rabbitmq-consumer8803,另外,需要把RabbitMQ安装并启动。


3.消息驱动之生产者

创建cloud-stream-rabbitmq-provider8801模块,修改pom.xml,加入spring-cloud-starter-stream-rabbit的坐标。

cloud2020com.atguigu.springcloud1.0-SNAPSHOT4.0.0cloud-stream-rabbitmq-provider8801org.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-actuatororg.springframework.cloudspring-cloud-starter-netflix-eureka-clientorg.springframework.cloudspring-cloud-starter-stream-rabbitorg.springframework.bootspring-boot-devtoolsruntimetrueorg.projectlomboklomboktrueorg.springframework.bootspring-boot-starter-testtest

添加application.yml配置文件。

server:port: 8801
spring:application:name: cloud-stream-providercloud:stream:binders: # 此处配置要绑定的rabbitmq的服务信息binderName: # 表示为binder对象定义一个名称,用于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关环境配置spring:rabbitmq:host: 192.168.0.123port: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 一个通道的名称,不能修改destination: exchangeName # 表示RabbitMQ中要使用的Exchange名称,给Exchange起一个名字content-type: application/json # 设置消息类型,本次为json,文本要设置为“text/plain”binder: binderName # 表示binding对应哪个binder
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S)lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点(默认是90s)instance-id: provider-8801.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址

添加主启动类。

package com.atguigu.springcloud;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class StreamMQMain8801 {public static void main(String[] args) {SpringApplication.run(StreamMQMain8801.class, args);}
}

添加业务类,这是一个provider模块,所以需要提供send()方法用于发送消息,添加IMessageProvider和MessageProviderImpl。

package com.atguigu.springcloud.service;public interface IMessageProvider {String send();
}

package com.atguigu.springcloud.service.impl;import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;import javax.annotation.Resource;
import java.util.UUID;// 这里使用@EnableBinding(Source.class),这是Stream里的注解,定义一个消息推送管道,不需要使用@Service注解
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {@Resourceprivate MessageChannel output;// 定义消息发送管道,名称必须是output,否则会报错@Overridepublic String send() {String uuid = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(uuid).build());System.out.println("uuid=" + uuid);return null;}
}

编写Controller用于接收请求。

package com.atguigu.springcloud.controller;import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController
public class SendMessageController {@Resourceprivate IMessageProvider iMessageProvider;@GetMapping("/sendMessage")public String sendMessage() {return iMessageProvider.send();}
}

启动Eureka7001,启动RabbitMQ服务,启动Provider8801模块测试。查看RabbitMQ管理后台,在Exchange标签下,可以看到我们定义的exchangeName。浏览器访问http://localhost:8801/sendMessage发送消息,不断刷新页面发送请求,在控制台可以看到消息输出,在RabbitMQ管理后台的Overview可以看到消息发送的速率等信息。


4.消息驱动之消费者

新建cloud-stream-rabbitmq-consumer8802模块,修改pom.xml,依赖和cloud-stream-rabbitmq-provider8801一样。添加application.yml配置文件,注意这里通道名称改成input,之前cloud-stream-rabbitmq-provider8801使用的是output。

server:port: 8802
spring:application:name: cloud-stream-consumercloud:stream:binders: # 此处配置要绑定的rabbitmq的服务信息binderName: # 表示为binder对象定义一个名称,用于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关环境配置spring:rabbitmq:host: 192.168.0.123port: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 一个通道的名称,不能修改destination: exchangeName # 表示RabbitMQ中要使用的Exchange名称,给Exchange起一个名字content-type: application/json # 设置消息类型,本次为json,文本要设置为“text/plain”binder: binderName # 表示binding对应哪个binder
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30S)lease-expiration-duration-in-seconds: 5 # 如果超过5S间隔就注销节点(默认是90s)instance-id: provider-8802.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址

添加主启动类。

package com.atguigu.springcloud;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class StreamMQMain8802 {public static void main(String[] args) {SpringApplication.run(StreamMQMain8802.class, args);}
}

添加业务类。

package com.atguigu.springcloud.service;import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;// 这里使用@EnableBinding(Sink.class),这是Stream里的注解,定义一个消息接收管道
@EnableBinding(Sink.class)
public class ReceiveMessageService {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message message) {System.out.println("ServerPort=" + serverPort + "消费者,消费消息:" + message.getPayload());}
}

启动cloud-stream-rabbitmq-consumer8802模块,浏览器请求http://localhost:8801/sendMessage发送消息,在8802的控制台,可以看到消息接收的情况,测试通过。


5.分组消费与持久化

参照cloud-stream-rabbitmq-consumer8802模块,创建cloud-stream-rabbitmq-consumer8803模块并启动。浏览器请求http://localhost:8801/sendMessage,结果8802和8803都消费了消息,存在重复消费的问题。

在Stream中,处于同一个group的多个消费者是竞争关系,一条消息只能消费一次,处于不同group的消费者是可以重复消费的。通过查看RabbitMQ的管理后台,点击exchange标签,找到我们指定的exchange,发现Bindings里面有两个组,所以出现了重复消费问题。所以,解决重复消费的办法也很简单,将不需要重复消费的消息指定分组即可。

修改两个consumer的application.yml,在inputs结点下,加一个group属性,值为myGroup。再次发送消息,查看8802和8803的控制台打印信息,可知同一个组的消费者每次只有一个可以消费消息,而且默认采用轮询的方式。

添加了group属性的消费者,在重启后,会自动获取未消费的消息,而没有添加group属性的消费者,在重启后,即使有未消费的消息,也无法消费了。比如,停掉8802和8803消费者,去掉8802的group属性,保留8803的group属性,生产者发送几条消息,重启8802模块,它并不能处理未消费的消息,重启8803模块,因为它指定了group,它可以拿到未消费的消息,继续消费,实现了消息的持久化。指定过group的消费者重启时候,在RabbitMQ的Exchange中,会继续保留group的名称,用于下次重启消费者时候,将消息再次发送给消费者,未指定group名称的消费者,每次启动,都会随机生成一个group放在Exchange的Bindings里面,当消费者重启,会把这个随机值从Bindings里移除,再加入一个新的随机值,所以说,未指定group的消费者,无法获取到之前未消费的消息,也就没法继续消费了。

 


推荐阅读
  • Python自动化处理:从Word文档提取内容并生成带水印的PDF
    本文介绍如何利用Python实现从特定网站下载Word文档,去除水印并添加自定义水印,最终将文档转换为PDF格式。该方法适用于批量处理和自动化需求。 ... [详细]
  • XNA 3.0 游戏编程:从 XML 文件加载数据
    本文介绍如何在 XNA 3.0 游戏项目中从 XML 文件加载数据。我们将探讨如何将 XML 数据序列化为二进制文件,并通过内容管道加载到游戏中。此外,还会涉及自定义类型读取器和写入器的实现。 ... [详细]
  • 从 .NET 转 Java 的自学之路:IO 流基础篇
    本文详细介绍了 Java 中的 IO 流,包括字节流和字符流的基本概念及其操作方式。探讨了如何处理不同类型的文件数据,并结合编码机制确保字符数据的正确读写。同时,文中还涵盖了装饰设计模式的应用,以及多种常见的 IO 操作实例。 ... [详细]
  • Hadoop入门与核心组件详解
    本文详细介绍了Hadoop的基础知识及其核心组件,包括HDFS、MapReduce和YARN。通过本文,读者可以全面了解Hadoop的生态系统及应用场景。 ... [详细]
  • 优化局域网SSH连接延迟问题的解决方案
    本文介绍了解决局域网内SSH连接到服务器时出现长时间等待问题的方法。通过调整配置和优化网络设置,可以显著缩短SSH连接的时间。 ... [详细]
  • 本文详细介绍了Java中的输入输出(IO)流,包括其基本概念、分类及应用。IO流是用于在程序和外部资源之间传输数据的一套API。根据数据流动的方向,可以分为输入流(从外部流向程序)和输出流(从程序流向外部)。此外,还涵盖了字节流和字符流的区别及其具体实现。 ... [详细]
  • 本文介绍了一种从与src同级的config目录中读取属性文件内容的方法。通过使用Java的Properties类和InputStream,可以轻松加载并获取指定键对应的值。 ... [详细]
  • 本文深入探讨了HTTP请求和响应对象的使用,详细介绍了如何通过响应对象向客户端发送数据、处理中文乱码问题以及常见的HTTP状态码。此外,还涵盖了文件下载、请求重定向、请求转发等高级功能。 ... [详细]
  • 本文详细探讨了HTML表单中GET和POST请求的区别,包括它们的工作原理、数据传输方式、安全性及适用场景。同时,通过实例展示了如何在Servlet中处理这两种请求。 ... [详细]
  • 本文详细探讨了Java中的ClassLoader类加载器的工作原理,包括其如何将class文件加载至JVM中,以及JVM启动时的动态加载策略。文章还介绍了JVM内置的三种类加载器及其工作方式,并解释了类加载器的继承关系和双亲委托机制。 ... [详细]
  • 近期我们开发了一款包含天气预报功能的万年历应用,为了满足这一需求,团队花费数日时间精心打造并测试了一个稳定可靠的天气API接口,现正式对外开放。 ... [详细]
  • 利用GitHub热门资源,成功斩获阿里、京东、腾讯三巨头Offer
    Spring框架作为Java生态系统中的重要组成部分,因其强大的功能和灵活的扩展性,被广泛应用于各种规模的企业级应用开发中。本文将通过一份在GitHub上获得极高评价的Spring全家桶文档,探讨如何掌握Spring框架及其相关技术,助力职业发展。 ... [详细]
  • 本文详细探讨了在Android 8.0设备上使用ChinaCock的TCCBarcodeScanner进行扫码时出现的应用闪退问题,并提供了解决方案。通过调整配置文件,可以有效避免这一问题。 ... [详细]
  • 本文介绍如何使用阿里云的fastjson库解析包含时间戳、IP地址和参数等信息的JSON格式文本,并进行数据处理和保存。 ... [详细]
  • 毕业设计:基于机器学习与深度学习的垃圾邮件(短信)分类算法实现
    本文详细介绍了如何使用机器学习和深度学习技术对垃圾邮件和短信进行分类。内容涵盖从数据集介绍、预处理、特征提取到模型训练与评估的完整流程,并提供了具体的代码示例和实验结果。 ... [详细]
author-avatar
波Z-
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有