热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SpringCloudStream集成rabbitMQ(延时队列)

上节咱们讲了SpringCloudStream集成rabbitMQ,本章节咱们将实现延时队列功能。在实际开发中我们有些场景用延时队列实现非常方便。下面列举延时队列适合

上节咱们讲了SpringCloud Stream集成rabbitMQ,本章节咱们将实现延时队列功能。在实际开发中我们有些场景用延时队列实现非常方便。下面列举延时队列适合使用的场景:


  1. 用户下单30分钟后未付款自动关闭订单
  2. 用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时
  3. 邮箱编辑好邮件定时发送等等

上节已经讲过SpringCloud Stream集成rabbitMQ步骤,下面就不在重复赘述了,不明白的同学可以去看上节的内容。


1.主要的变化是配置文件

spring:application:name: rabbitmq-domecloud:function:definition: source;sink; #函数名称,对应服务中的注入的Beanstream:binders: #需要绑定的rabbitmq的服务信息default-binder: #定义的名称,用于bidding整合type: rabbit #消息组件类型environment: #配置rabbimq连接环境spring:rabbitmq:addresses: 10.0.1.141:5672 #服务器的地址和端口username: xt3dev #用户名password: 4V9prcFbRoYxLCMd #密码bindings:source-out-0: #自定义消息通道的名称destination: QUEUE_DOME #exchange名称,交换模式默认是topic,创建时同时会创建QUEUE_DOME.${spring.application.name}队列content-type: application/json #设置消息的类型为jsongroup: ${spring.application.name} #分组binder: default-binder #绑定的binder名称sink-in-0:destination: QUEUE_DOMEcontent-type: application/jsongroup: ${spring.application.name}binder: default-binderrabbit:bindings:source-out-0:producer:ttl: 5000 #延时队列的延时时间,单位毫秒auto-bind-dlq: true #为true是开启死信队列dead-letter-exchange: QUEUE_DOME_IN #死信队列的交换机dead-letter-queueName: QUEUE_DOME_IN.${spring.application.name} #死信队列名称

2.启动项目测试

浏览器访问http://localhost:8080/sendMessage?message=hello rabbitMQ,从控制台我看可以看出生产者发送消息到消费者接收到消息是有5秒钟的延时,我们配置文件配置的也是5000毫秒,说明延时队列功能是可以的。


3.延时队列原理分析

RabbitMQ本身没有直接支持延迟队列功能,两种方式可以实现延迟队列,一种是利用对的TTL特性来实现,另外一种是使用RabbitMQ延迟插件来实现。本文主要讲述的是TTL的方式来实现的

RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
RabbitMQ针对队列中的消息过期时间有两种方法可以设置。


  • A: 通过队列属性设置,队列中所有消息都有相同的过期时间。
  • B: 对消息进行单独设置,每条消息TTL可以不同。
    如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter

延迟队列原理分析图


 4.设置单条消息过期时间

使用spring cloud stream的方式是没办法设置单条消息过期时间的,只能利用RabbitTemplate来设置单条消息过期时间。


4.1 创建RabbitConfig类

@Configuration
@RequiredArgsConstructor
public class RabbitConfig {private final BinderFactory binderFactory;private final static String binderName = "default-binder";@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) throws NoSuchFieldException, IllegalAccessException {// 获取目标binderBinder binder =binderFactory.getBinder(binderName, BinderConfiguration.class);Assert.notNull(binder, binderName + " is null");// 获取binder的connectionFactoryField field = binder.getClass().getDeclaredField("connectionFactory");field.setAccessible(true);connectionFactory = (ConnectionFactory) field.get(binder);// newreturn new RabbitTemplate(connectionFactory);}
}


4.2 改造生产者实现类

@Slf4j
@Service
@RequiredArgsConstructor
public class ProducerServiceImpl implements ProducerService {private final RabbitTemplate rabbitTemplate;private final Sinks.Many> sinks =Sinks.many().multicast().onBackpressureBuffer();@Beanpublic Supplier>> source(){return sinks::asFlux;}@Overridepublic void sendMessage(String message, String expiration) {log.info("发送时间:{}, 生产者发送消息:{}", new Date(), message);rabbitTemplate.convertAndSend("QUEUE_DOME_OUT", "#", message, msg -> {MessageProperties messageProperties = msg.getMessageProperties();// 设置过期时间,单位:毫秒messageProperties.setExpiration(expiration);return msg;});}
}

4.3 改造MessageController类

@RestController
@RequiredArgsConstructor
public class MessageController {private final ProducerService producerService;@GetMapping("/sendMessage")public void sendMessage(@RequestParam(value = "message", defaultValue = "hello world") String message,@RequestParam(value = "expiration", defaultValue = "5000") String expiration) {producerService.sendMessage(message, expiration);}
}

4.4 启动项目测试

浏览器访问http://localhost:8080/sendMessage?message=hello%20rabbitMQ&expiration=3000,设置消息过期时间为3秒

 结果:

浏览器访问http://localhost:8080/sendMessage?message=hello%20rabbitMQ&expiration=3000,设置消息过期时间为5秒 

结果:


推荐阅读
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 电商高并发解决方案详解
    本文以京东为例,详细探讨了电商中常见的高并发解决方案,包括多级缓存和Nginx限流技术,旨在帮助读者更好地理解和应用这些技术。 ... [详细]
  • Spring Boot + RabbitMQ 消息确认机制详解
    本文详细介绍如何在 Spring Boot 项目中使用 RabbitMQ 的消息确认机制,包括消息发送确认和消息接收确认,帮助开发者解决在实际操作中可能遇到的问题。 ... [详细]
  • 本文详细介绍如何安装和配置DedeCMS的移动端站点,包括新版本安装、老版本升级、模板适配以及必要的代码修改,以确保移动站点的正常运行。 ... [详细]
  • JavaScript 跨域解决方案详解
    本文详细介绍了JavaScript在不同域之间进行数据传输或通信的技术,包括使用JSONP、修改document.domain、利用window.name以及HTML5的postMessage方法等跨域解决方案。 ... [详细]
  • RTThread线程间通信
    线程中通信在裸机编程中,经常会使用全局变量进行功能间的通信,如某些功能可能由于一些操作而改变全局变量的值,另一个功能对此全局变量进行读取& ... [详细]
  • Redis:缓存与内存数据库详解
    本文介绍了数据库的基本分类,重点探讨了关系型与非关系型数据库的区别,并详细解析了Redis作为非关系型数据库的特点、工作模式、优点及持久化机制。 ... [详细]
  • 关于进程的复习:#管道#数据的共享Managerdictlist#进程池#cpu个数1#retmap(func,iterable)#异步自带close和join#所有 ... [详细]
  • 大华股份2013届校园招聘软件算法类试题D卷
    一、填空题(共17题,每题3分,总共51分)1.设有inta5,*b,**c,执行语句c&b,b&a后,**c的值为________答:5 ... [详细]
  • 我自己做了一个网站图片的抓取,感觉速度有点慢抓取4000张图片可能得用15分钟左右的时间,我百度看用线程可以加快抓取,然后创建了5个线程抓取,但是5个线程是同步执行同样的操作一个图片就 ... [详细]
  • 在iOS开发中,多线程技术的应用非常广泛,能够高效地执行多个调度任务。本文将重点介绍GCD(Grand Central Dispatch)在多线程开发中的应用,包括其函数和队列的实现细节。 ... [详细]
  • Java虚拟机及其发展历程
    Java虚拟机(JVM)是每个Java开发者日常工作中不可或缺的一部分,但其背后的运作机制却往往显得神秘莫测。本文将探讨Java及其虚拟机的发展历程,帮助读者深入了解这一关键技术。 ... [详细]
  • WebBenchmark:强大的Web API性能测试工具
    本文介绍了一款名为WebBenchmark的Web API性能测试工具,该工具不仅支持HTTP和HTTPS服务的测试,还提供了丰富的功能来帮助开发者进行高效的性能评估。 ... [详细]
  • 本文详细介绍了如何在 Ubuntu 14.04 系统上搭建仅使用 CPU 的 Caffe 深度学习框架,包括环境准备、依赖安装及编译过程。 ... [详细]
  • 前言:由于Android系统本身决定了其自身的单线程模型结构。在日常的开发过程中,我们又不能把所有的工作都交给主线程去处理(会造成UI卡顿现象)。因此,适当的创建子线程去处理一些耗 ... [详细]
author-avatar
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有