热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Springboot配置使用RabbitMQ并实现延时队列

Springboot配置使用

Springboot 配置使用 RabbitMQ 并实现延时队列

  • 前言
  • 一、安装 RabbitMQ
    • Windows安装
    • Linux安装
  • 二、新建项目
    • 1、引入依赖
    • 2、配置 yml
    • 3、启动类开启 Rabbitmq 注解
    • 4、配置 provider 的 RabbitmqConfig
    • 5、provider 发送消息
    • 6、consumer 接收消息
    • 7、演示
  • 三、延时队列
    • 1、在 provider 的 RabbitmqConfig 中增加配置
    • 2、在 provider 增加一个接口
    • 3、consumer 接收延时消息
  • 总结


前言

RabbitMQ作用:举几个例子,1、系统解耦,A系统无需关心B系统是否执行成功,无需等待B系统响应,直接把操作扔给mq就可以干其他事情了。2、系统使用高峰期,每秒产生10000条消息需要存储,一次性存入数据库恐怕不太行,所以先把数据发送到 RabbitMQ ,然后设置延时队列,每秒从队列取出1000条存入数据库,这样可以减少数据库压力。3、购买商品下订单以后,发送到延时队列,如果20分钟后没有付款,则从队列删除订单,也就是自动取消订单,如果支付了,则取出存入数据库,下单成功。


一、安装 RabbitMQ

Windows安装

太简单,自己bing一下

Linux安装

rabbitmq需要erlang语言环境
更新 apt 库,安装 erlang 环境,然后执行 erl 查看是否安装成功

apt update
apt install erlang
erl

安装 rabbitmq

apt install rabbitmq-server

查看 rabbitmq 运行状态

systemctl status rabbitmq-server

开启图形化管理界面,然后就可以访问 ip:15672,默认账号密码是 guest

rabbitmq-plugins enable rabbitmq_management

在这里插入图片描述

默认的guest用户是只能通过本机访问的,所以远程管理后台界面登录需要配置个用户,才能通过外网浏览器访问

#账号root,密码root
rabbitmqctl add_user root root
# 设置为管理员账户
rabbitmqctl set_user_tags root administrator
# 分配所有权限
rabbitmqctl set_permissions -p / root “.*” “.*” “.*”

开放防火墙 5672 和 15672 端口

# Debian/Ubuntu ufw
ufw allow 5672
ufw allow 15672
ufw reload
# Debian/Ubuntu iptables(这个叼毛防火墙好麻烦,我没用过,不知道是不是这样)
iptables -A INPUT -p tcp --dport 5672 -j ACCEPT
iptables -A INPUT -p tcp --dport 15672 -j ACCEPT
iptables-restore
# CentOS
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload


二、新建项目

新建一个 provider 一个 consumer,两个 springboot 项目,都需要引入下面的依赖,或者新建的时候勾选自动添加 rabbitmq 的依赖
在这里插入图片描述

1、引入依赖

<dependency>
<groupId>org.springframework.amqpgroupId>
<artifactId>spring-rabbit-testartifactId>
<scope>testscope>
dependency>

2、配置 yml

provider 和 consumer 都这样配置,端口改成不一样就行了

server:
port: 8081
spring:
rabbitmq:
host: 192.168.0.105
port: 5672
username: root
password: root
virtualHost: /
# 确认机制
publisher-confirm-type: correlated
# 发布确认,如果不配置确认机制,发布确认也不用配置
publisher-returns: true

3、启动类开启 Rabbitmq 注解

consumer 和 provider 都需要这个注解

在这里插入图片描述

4、配置 provider 的 RabbitmqConfig

大家可以根据 15672 那个图形化管理界面看看下面的一些概念

  • Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输
  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息的载体,每个消息都会被投到一个或多个队列。
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  • vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
  • Producer:消息生产者,就是投递消息的程序.
  • Consumer:消息消费者,就是接受消息的程序.
  • Channel:消息通道,在客户端的每个连接里,可建立多个channel.

package icu.xuyijie.provider.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;
import org.springframework.web.filter.CharacterEncodingFilter;
import java.util.HashMap;
import java.util.Map;
/**
* @author 徐一杰
* @date 2022/9/30 9:38
* @description
*/

@SpringBootConfiguration
public class RabbitmqConfig {
public static final String QUEUE_MESSAGE = "queue_message";
public static final String QUEUE_ORDER = "queue_order";
public static final String EXCHANGE_A = "exchange_A";
/**
* # 是通配符,可以匹配任意,如下面的可以匹配到 aa.bb.message.cc.dd
* 还有 * 是匹配一个.里面的字符,如 .*.message 只能匹配 .aa.message,不能匹配 .aa.bb.message
*/

public static final String ROUTING_KEY_MESSAGE = "#.message.#";
public static final String ROUTING_KEY_ORDER = "#.order.#";
/**
* 这个可以不写,写了的话,以这个为准,会覆盖掉 yml 的配置
* @return
*/

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.0.105", 5672);
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setVirtualHost("/");
//确认机制
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
//发布确认,如果不配置确认机制,发布确认也不用配置
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
/**
* 使用 yml 连接的话这个可以不写,这个是配合 connectionFactory 的
* RabbitMQ的使用入口。scope必须是prototype类型
* @return
*/

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
template.setMessageConverter(this.jsonMessageConverter());
template.setMandatory(true);
return template;
}
/**
* 序列化json
* @return
*/

@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public CharacterEncodingFilter characterEncodingFilter() {
CharacterEncodingFilter filter = new CharacterEncodingFilter();
filter.setEncoding("UTF-8");
filter.setForceEncoding(true);
return filter;
}
/**
* 声明交换机
* 针对消费者配置
* 设置交换机类型
* 将队列绑定到交换机
* FanoutExchange: 将消息分发到所有的绑定队列,无routing key的概念
* HeadersExchange:通过添加属性key-value匹配
* DirectExchange: 按照routing key分发到指定队列
* TopicExchange: 多关键字匹配
* @return
*/

@Bean
public Exchange exchangeA(){
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_A).durable(true).build();
}
/**
* 声明QUEUE_MESSAGE队列
* durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
* exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
* autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
* 一般设置一下队列的持久化就好,其余两个就是默认false
* @return
*/

@Bean
public Queue queueMessage(){
return new Queue(QUEUE_MESSAGE, true, false, false);
}
/**
* 声明QUEUE_ORDER队列
* @return
*/

@Bean
public Queue queueOrder(){
return new Queue(QUEUE_ORDER);
}
/**
* 队列绑定交换机,指定routingKey
* @return
*/

@Bean
public Binding bindingQueueMessage(){
return BindingBuilder.bind(queueMessage()).to(exchangeA()).with(ROUTING_KEY_MESSAGE).noargs();
}
/**
* 队列绑定交换机,指定routingKey
* @return
*/

@Bean
public Binding bindingQueueOrder(){
return BindingBuilder.bind(queueOrder()).to(exchangeA()).with(ROUTING_KEY_ORDER).noargs();
}
}

5、provider 发送消息

我们因为配置了确认机制,所以我们配置了回调方法,这里使用构造器注入 rabbitTemplate,如果不配置回调方法,则可以使用 @Autowired 注入,并且类无需实现 RabbitTemplate.ConfirmCallback,sendExchange 方法没有使用回调方法,使用回调方法的话需要像 sendCallback 方法一样多传一个值 correlationId

package icu.xuyijie.provider.controller;
import icu.xuyijie.provider.config.RabbitmqConfig;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
/**
* @author 徐一杰
* @date 2022/9/30 10:08
* @description
*/

@RestController
@RequestMapping("/provider")
public class ProviderController implements RabbitTemplate.ConfirmCallback {
/**
* 我们因为配置了确认机制,所以我们配置了回调方法,这里使用构造器注入 rabbitTemplate,如果不配置
* 回调方法,则可以使用 @Autowired 注入,并且类无需实现 RabbitTemplate.ConfirmCallback
*/

private final RabbitTemplate rabbitTemplate;
public ProviderController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
//设置回调为当前类对象
this.rabbitTemplate.setConfirmCallback(this);
}
@RequestMapping("/sendExchange")
public void sendExchange(){
//使用rabbitTemplate发送消息
String message = "这是一条发送到exchangeA的消息";
/**
* 参数:
* 1、交换机名称
* 2、routingKey
* 3、消息内容
*/

rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_A, "a.message", message);
}

/**
* 如果使用回调方法,则需要多传一个参数 correlationId
*/

@RequestMapping("/sendCallback")
public void sendCallback(){
String message = "这是一条发送到exchangeA的消息";
//构建回调id为uuid
String callBackId = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(callBackId);
//发送消息到消息队列
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_A, "a.message", message, correlationId);
System.out.println("发送回调id: " + callBackId);
}
/**
* 消息回调确认方法
* @param correlationData 请求数据对象
* @param ack 是否发送成功
* @param s
*/

@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
assert correlationData != null;
System.out.println("这是回调方法打印的:回调id: " + correlationData.getId());
try {
System.out.println("这是回调方法打印的:回调message: " + correlationData.getFuture().get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
if (ack) {
System.out.println("这是回调方法打印的:消息发送成功");
} else {
System.out.println("消息发送失败" + s);
}
}
}

6、consumer 接收消息

@RabbitListener就是监听的队列,可以监听多个

package icu.xuyijie.consumer.consumer;
import com.rabbitmq.client.Channel;
import icu.xuyijie.consumer.config.RabbitmqConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 徐一杰
* @date 2022/9/30 10:10
* @description
*/

@Component
public class ReceiveHandler {
@RabbitListener(queues = {"queue_message", "queue_order"})
public void receiveMessage(Message message, Channel channel){
System.out.println("接收 queue_message 和 queue_order 队列的消息 " + message);
System.out.println("对应Channel " + channel);
}
@RabbitListener(queues = {"queue_order"})
public void receiveOrder(Message message, Channel channel){
System.out.println("接收 queue_order 队列的消息" + message);
System.out.println("对应Channel " + channel);
}
}

7、演示

我们直接调用 sendCallback 这个接口

在这里插入图片描述

consumer 接收到消息

在这里插入图片描述

provider 触发回调方法

在这里插入图片描述


三、延时队列

1、在 provider 的 RabbitmqConfig 中增加配置

增加了一个一个交换机、一个队列、一个路由键的配置,注意 delayQueue() 方法,我的注释有解释

public static final String QUEUE_DELAY = "queue_delay";
public static final String EXCHANGE_DELAY = "exchange_delay";
public static final String ROUTER_DELAY_KEY = "router_delay_key";
/**
* 延迟交换机
*
* @return
*/

@Bean
public DirectExchange delayExchange() {
return new DirectExchange(EXCHANGE_DELAY);
}
/**
* 延迟队列
* map 的设置意思是接收此队列的延迟消息需要监听 EXCHANGE_RECEIVE 队列,直接监听 QUEUE_DELAY 无法实现延时队列
*
* @return
*/

@Bean
public Queue delayQueue() {
Map<String, Object> map = new HashMap<>(16);
map.put("x-dead-letter-exchange", EXCHANGE_A);
map.put("x-dead-letter-routing-key", ROUTING_KEY_ORDER);
return new Queue(QUEUE_DELAY, true, false, false, map);
}
/**
* 给延迟队列绑定交换机
*
* @return
*/

@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(ROUTER_DELAY_KEY);
}

2、在 provider 增加一个接口

发送消息到我们刚刚配置的延时交换机

/**
* 给延迟队列发送消息
*/

@RequestMapping("/sendDelay")
public void sendDelay(){
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_DELAY, RabbitmqConfig.ROUTER_DELAY_KEY, "这是一条延时队列的消息", message -> {
message.getMessageProperties().setExpiration("3000");
return message;
}, new CorrelationData(UUID.randomUUID().toString()));
System.out.println("延时队列发送成功");
}

3、consumer 接收延时消息

注意,上面我们把延时消息发送到了延时队列 EXCHANGE_DELAY,但是我们接收,要监听 RabbitmqConfig中 delayQueue() 方法 配置的 EXCHANGE_A,路由键为 ROUTING_KEY_ORDER,也就是说无需改动 consumer ,consumer 的两个方法都能收到延时消息,因为他们都监听了 ROUTING_KEY_ORDER 对应的队列

直接调用 sendDelay 方法,2次(因为两个方法都监听 queue_order,所以他们会交替获得消息)

在这里插入图片描述
3秒后 consumer 的两个方法都能接收到延时消息

在这里插入图片描述


总结
推荐阅读
  • 本文介绍了lua语言中闭包的特性及其在模式匹配、日期处理、编译和模块化等方面的应用。lua中的闭包是严格遵循词法定界的第一类值,函数可以作为变量自由传递,也可以作为参数传递给其他函数。这些特性使得lua语言具有极大的灵活性,为程序开发带来了便利。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • Metasploit攻击渗透实践
    本文介绍了Metasploit攻击渗透实践的内容和要求,包括主动攻击、针对浏览器和客户端的攻击,以及成功应用辅助模块的实践过程。其中涉及使用Hydra在不知道密码的情况下攻击metsploit2靶机获取密码,以及攻击浏览器中的tomcat服务的具体步骤。同时还讲解了爆破密码的方法和设置攻击目标主机的相关参数。 ... [详细]
  • 本文介绍了通过ABAP开发往外网发邮件的需求,并提供了配置和代码整理的资料。其中包括了配置SAP邮件服务器的步骤和ABAP写发送邮件代码的过程。通过RZ10配置参数和icm/server_port_1的设定,可以实现向Sap User和外部邮件发送邮件的功能。希望对需要的开发人员有帮助。摘要长度:184字。 ... [详细]
  • 本文介绍了在Windows环境下如何配置php+apache环境,包括下载php7和apache2.4、安装vc2015运行时环境、启动php7和apache2.4等步骤。希望对需要搭建php7环境的读者有一定的参考价值。摘要长度为169字。 ... [详细]
  • 本文介绍了在mac环境下使用nginx配置nodejs代理服务器的步骤,包括安装nginx、创建目录和文件、配置代理的域名和日志记录等。 ... [详细]
  • 本文介绍了Windows操作系统的版本及其特点,包括Windows 7系统的6个版本:Starter、Home Basic、Home Premium、Professional、Enterprise、Ultimate。Windows操作系统是微软公司研发的一套操作系统,具有人机操作性优异、支持的应用软件较多、对硬件支持良好等优点。Windows 7 Starter是功能最少的版本,缺乏Aero特效功能,没有64位支持,最初设计不能同时运行三个以上应用程序。 ... [详细]
  • Docker安装Rabbitmq(配合宝塔)
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Docker安装Rabbitmq(配合宝塔)相关的知识,希望对你有一定的参考价值。一、事前准备 ... [详细]
  • 基于layUI的图片上传前预览功能的2种实现方式
    本文介绍了基于layUI的图片上传前预览功能的两种实现方式:一种是使用blob+FileReader,另一种是使用layUI自带的参数。通过选择文件后点击文件名,在页面中间弹窗内预览图片。其中,layUI自带的参数实现了图片预览功能。该功能依赖于layUI的上传模块,并使用了blob和FileReader来读取本地文件并获取图像的base64编码。点击文件名时会执行See()函数。摘要长度为169字。 ... [详细]
  • HDU 2372 El Dorado(DP)的最长上升子序列长度求解方法
    本文介绍了解决HDU 2372 El Dorado问题的一种动态规划方法,通过循环k的方式求解最长上升子序列的长度。具体实现过程包括初始化dp数组、读取数列、计算最长上升子序列长度等步骤。 ... [详细]
  • 本文讨论了Alink回归预测的不完善问题,指出目前主要针对Python做案例,对其他语言支持不足。同时介绍了pom.xml文件的基本结构和使用方法,以及Maven的相关知识。最后,对Alink回归预测的未来发展提出了期待。 ... [详细]
  • Android Studio Bumblebee | 2021.1.1(大黄蜂版本使用介绍)
    本文介绍了Android Studio Bumblebee | 2021.1.1(大黄蜂版本)的使用方法和相关知识,包括Gradle的介绍、设备管理器的配置、无线调试、新版本问题等内容。同时还提供了更新版本的下载地址和启动页面截图。 ... [详细]
  • maven阿里云镜像一路繁花似锦绣前程
    重点:找到maven引用的settings.xml配置文件,将以下代码复制至&amp;lt;mirrors&amp;gt;&amp;lt;mirrors&a ... [详细]
author-avatar
杨芷寻找最真的自己
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有