热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

rabbitmq延迟队列_纯干货|Springboot+RabbitMQ实现消息延迟重试具体实现

一:简介使用场景:调用第三方接口时如果调用失败需要隔几秒再去尝试下一次调用,直到调用N次还失败就停止调用。最常用的场景就是支付成功异步通知

一:简介

使用场景:调用第三方接口时如果调用失败需要隔几秒再去尝试下一次调用,直到调用N次还失败就停止调用。最常用的场景就是支付成功异步通知第三方支付成功。

1. 为什么要调用多次?

如果调用1次就成功了就停止调用,如果失败可能由于网络原因没有请求到服务器需要再次尝试,第二次很可能就会调用成功了。

2. 为什么要间隔几秒再尝试下次调用?

如果是因为网络原因没有请求到服务器如果再立刻调用,很可能此时网络还是没有好,可能等几秒后网络就恢复了,此时再去调用就好了。

实现效果类似于支付宝中的回调延迟重试:

716b773499c74d308a3b68ab5896704c

二: 实现原理

在介绍具体的实现思路之前,我们先来介绍一下RabbitMQ的两个特性,一个是Time-To-Live Extensions,另一个是Dead Letter Exchanges。

Time-To-Live Extensions

RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。更多资料请查阅官方文档。

Dead Letter Exchange

刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:

消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。消息因为设置了TTL而过期。消息进入了一条已经达到最大长度的队列。 如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。更多资料请查阅官方文档。

通过RabbitMQ的TTL和DLX特性结合在一起,实现一个延迟队列。

针对于上述的延迟队列的两个场景,我们分别有以下两种流程图:

1. 延迟消费

延迟消费是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果。

703231bad13c4aea83f3e786012e537e

2. 延迟重试

延迟重试本质上也是延迟消费的一种,但是这种模式的结构与普通的延迟消费的流程图较为不同,所以单独拎出来介绍。

如下图所示,消费者发现该消息处理出现了异常,比如是因为网络波动引起的异常。那么如果不等待一段时间,直接就重试的话,很可能会导致在这期间内一直无法成功,造成一定的资源浪费。那么我们可以将其先放在缓冲队列中(图中红色队列),等消息经过一段的延迟时间后再次进入实际消费队列中(图中蓝色队列),此时由于已经过了“较长”的时间了,异常的一些波动通常已经恢复,这些消息可以被正常地消费。

b5a74130dda846dbaa77a53f19ccde6a

三:代码示例

功能示例:每隔2秒、4秒、8秒、16秒去重试调用接口,总共调用4次。

02b09d80a4444df3869cb850f935587a
  1. pom.xml

org.springframework.boot spring-boot-starter-amqp org.apache.httpcomponents httpclient 4.5.6com.alibaba fastjson 1.2.62org.projectlombok lombok true

  1. application.yml

spring: rabbitmq: host: localhost port: 5672 username: guest password: guest ## http component http: maxTotal: 100 #最大连接数 defaultMaxPerRoute: 20 # 并发数 connectTimeout: 1000 #创建连接的最长时间 connectionRequestTimeout: 500 #从连接池中获取到连接的最长时间 socketTimeout: 10000 #数据传输的最长时间 validateAfterInactivity: 1000

  1. HttpClient

import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class HttpClientConfig { @Value("${http.maxTotal}") private Integer maxTotal; @Value("${http.defaultMaxPerRoute}") private Integer defaultMaxPerRoute; @Value("${http.connectTimeout}") private Integer connectTimeout; @Value("${http.connectionRequestTimeout}") private Integer connectionRequestTimeout; @Value("${http.socketTimeout}") private Integer socketTimeout; @Value("${http.validateAfterInactivity}") private Integer validateAfterInactivity; @Bean public PoolingHttpClientConnectionManager poolingHttpClientConnectionManager(){ PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); connectionManager.setMaxTotal(maxTotal); connectionManager.setDefaultMaxPerRoute(defaultMaxPerRoute); connectionManager.setValidateAfterInactivity(validateAfterInactivity); return connectionManager; } @Bean public HttpClientBuilder httpClientBuilder(PoolingHttpClientConnectionManager poolingHttpClientConnectionManager){ HttpClientBuilder httpClientBuilder = HttpClientBuilder.create(); httpClientBuilder.setConnectionManager(poolingHttpClientConnectionManager); return httpClientBuilder; } @Bean public CloseableHttpClient closeableHttpClient(HttpClientBuilder httpClientBuilder){ return httpClientBuilder.build(); } @Bean public RequestConfig.Builder builder(){ RequestConfig.Builder builder = RequestConfig.custom(); return builder.setConnectTimeout(connectTimeout) .setConnectionRequestTimeout(connectionRequestTimeout) .setSocketTimeout(socketTimeout); } @Bean public RequestConfig requestConfig(RequestConfig.Builder builder){ return builder.build(); } }

import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpHeaders; import org.apache.http.HttpStatus; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.utils.URIBuilder; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Map; @Slf4j @Component public class HttpClient { /** 默认字符集 */ public static final String DEFAULT_CHARSET = "UTF-8"; @Autowired private CloseableHttpClient closeableHttpClient; @Autowired private RequestConfig config; public T doPost(String url, Map requestParameter, Class clazz) throws Exception { HttpResponse httpResponse = this.doPost(url, requestParameter); if (clazz == String.class) { return (T) httpResponse.getBody(); } T response = JSONObject.parseObject(httpResponse.getBody(), clazz); return response; } public HttpResponse doPost(String url, Map requestParameter) throws Exception { HttpPost httpPost = new HttpPost(url); httpPost.setConfig(config); httpPost.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); if (requestParameter != null) { String requestBody = JSONObject.toJSONString(requestParameter); StringEntity postEntity = new StringEntity(requestBody, "UTF-8"); httpPost.setEntity(postEntity); } CloseableHttpResponse response = this.closeableHttpClient.execute(httpPost); // 对请求的响应进行简单的包装成自定义的类型 return new HttpResponse(response.getStatusLine().getStatusCode(), EntityUtils.toString( response.getEntity(), DEFAULT_CHARSET)); } /** * 封装请求的响应码和响应的内容 */ public class HttpResponse { /** http status */ private Integer code; /** http response content */ private String body; public HttpResponse() { } public HttpResponse(Integer code, String body) { this.code = code; this.body = body; } public Integer getCode() { return code; } public void setCode(Integer code) { this.code = code; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } } }

  1. MessagePostProcessor

import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; /** * 设置消息过期时间 */ public class ExpirationMessagePostProcessor implements MessagePostProcessor{ private final Long ttl; public ExpirationMessagePostProcessor(Long ttl) { this.ttl = ttl; } @Override public Message postProcessMessage(Message message) throws AmqpException { // 设置失效时间 message.getMessageProperties().setExpiration(ttl.toString()); return message; } }

  1. 声明队列和交换机

package com.example.rabbitmq.retry; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 延迟队列 */ @Configuration public class DelayQueueConfig { /** 缓冲队列名称:过期时间针对于每个消息 */ public final static String DELAY_QUEUE_PER_MESSAGE_TTL_NAME = "delay_queue_per_message_ttl"; /** 死亡交换机:过期消息将通过该死亡交换机放入到实际消费的队列中 */ public final static String DELAY_EXCHANGE_NAME = "delay_exchange"; /** 死亡交换机对应的路由键,通过该路由键路由到实际消费的队列 */ public final static String DELAY_PROCESS_QUEUE_NAME = "delay_process_queue"; /** 路由到 delay_queue_per_message_ttl(统一失效时间的队列)的exchange(用于队列延迟重试) */ public final static String PER_MESSAGE_TTL_EXCHANGE_NAME = "per_message_ttl_exchange"; /** * delay_queue_per_message_ttl * 每个消息都可以控制自己的失效时间 * x-dead-letter-exchange声明了队列里的死信转发到的DLX名称 * x-dead-letter-routing-key声明了这些死信在转发时携带的routing-key名称 */ @Bean Queue delayQueuePerMessageTTL() { return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME) .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) .build(); } /** * 死亡交换机 DLX * @return */ @Bean DirectExchange delayExchange() { return new DirectExchange(DELAY_EXCHANGE_NAME); } /** * 实际消费队列:过期之后会进入到该队列中来 * @return */ @Bean Queue delayProcessQueue() { return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME).build(); } /** * 将死亡交换机delayExchange和实际消费队列delay_process_queue绑定在一起,并携带路由键delay_process_queue * @param delayProcessQueue * @param delayExchange * @return */ @Bean Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) { return BindingBuilder.bind(delayProcessQueue) .to(delayExchange) .with(DELAY_PROCESS_QUEUE_NAME); } /** * 重试交换机:消费失败后通过该交换机转发到队列中 * @return */ @Bean DirectExchange perMessageTtlExchange() { return new DirectExchange(PER_MESSAGE_TTL_EXCHANGE_NAME); } /** * 重试交换机和缓冲队列绑定 * @param delayQueuePerMessageTTL 缓冲队列 * @param perMessageTtlExchange 重试交换机 * @return */ @Bean Binding messageTtlBinding(Queue delayQueuePerMessageTTL, DirectExchange perMessageTtlExchange) { return BindingBuilder.bind(delayQueuePerMessageTTL) .to(perMessageTtlExchange) .with(DELAY_QUEUE_PER_MESSAGE_TTL_NAME); } }

f464a946ca7d4658acaadcc9b03afa76
  1. 消费消息

import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; import java.util.Map; import java.util.Objects; /** * 支付回调重试 * 2秒、4秒、8秒后分别重试&#xff0c;算上0秒这次总共重试4次 */ &#64;Slf4j &#64;Component public class CallbackConsumer { &#64;Autowired private AmqpTemplate rabbitTemplate; &#64;Autowired private HttpClient httpClient; /** 最大重试次数 */ private Integer maxRetryTime &#61; 4; &#64;RabbitListener(queues &#61; DelayQueueConfig.DELAY_PROCESS_QUEUE_NAME) public void process(String msg) { log.info("----------date &#61; {}, msg &#61;{}", new Date(), msg); Map callbackRequestMap &#61; JSONObject.parseObject(msg, Map.class); // 重试次数 Integer retryTime &#61; (Integer)callbackRequestMap.get("retryTime"); String notifyUrl &#61; (String)callbackRequestMap.get("notifyUrl"); try { if (maxRetryTime <&#61; 4) { log.info("################Callback retry time &#61; {}, date &#61; {}################", retryTime, new Date()); String response &#61; httpClient.doPost(notifyUrl, callbackRequestMap, String.class); if (Objects.equals("SUCCESS", response)) { log.info("ok"); } else { // 下一次重试 log.error("error request&#61;{} response&#61;{}", callbackRequestMap, response); if (retryTime &#61;&#61; maxRetryTime) { lastTimeRetryResult(response, null); return; } retryCallback(callbackRequestMap); } } } catch (Exception e) { if (maxRetryTime

  1. 模拟回调

package com.example.rabbitmq.retry; import com.alibaba.fastjson.JSONObject; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; import java.util.UUID; &#64;RestController &#64;RequestMapping("/mock") public class MockController { &#64;Autowired private AmqpTemplate rabbitTemplate; &#64;RequestMapping("/callback") public void callback(String notifyUrl) { Map requestMap &#61; new HashMap<>(); requestMap.put("retryTime", 1); requestMap.put("notifyUrl", notifyUrl); requestMap.put("orderCode", UUID.randomUUID().toString()); Object callbackMessage &#61; JSONObject.toJSONString(requestMap); rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME, callbackMessage, new ExpirationMessagePostProcessor(0L)); } }

8fb9e14e9b0b4b54a92f8ebfadc1b7aa
503768666654446988e5801ce4fd161a

欢迎关注Java实用技术&#xff0c;每天发布一篇实用技术文章。



推荐阅读
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • GoCD 创建管道时遇到 422 错误:权限不足问题分析与解决
    在 GoCD 创建管道时遇到 422 错误,通常是因为反向代理配置不正确,特别是缺少必要的头部信息。具体来说,需要确保在反向代理配置中添加 `proxy_set_header Host $host;` 和 `proxy_set_header X-Real-IP $remote_addr;` 等关键指令,以确保 GoCD 能够正确识别请求来源并进行权限验证。通过调整这些配置,可以有效解决权限不足的问题,确保管道创建过程顺利进行。 ... [详细]
  • 在CentOS上部署和配置FreeSWITCH
    在CentOS系统上部署和配置FreeSWITCH的过程涉及多个步骤。本文详细介绍了从源代码安装FreeSWITCH的方法,包括必要的依赖项安装、编译和配置过程。此外,还提供了常见的配置选项和故障排除技巧,帮助用户顺利完成部署并确保系统的稳定运行。 ... [详细]
  • 掌握PHP框架开发与应用的核心知识点:构建高效PHP框架所需的技术与能力综述
    掌握PHP框架开发与应用的核心知识点对于构建高效PHP框架至关重要。本文综述了开发PHP框架所需的关键技术和能力,包括但不限于对PHP语言的深入理解、设计模式的应用、数据库操作、安全性措施以及性能优化等方面。对于初学者而言,熟悉主流框架如Laravel、Symfony等的实际应用场景,有助于更好地理解和掌握自定义框架开发的精髓。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • 优化后的标题:PHP分布式高并发秒杀系统设计与实现
    PHPSeckill是一个基于PHP、Lua和Redis构建的高效分布式秒杀系统。该项目利用php_apcu扩展优化性能,实现了高并发环境下的秒杀功能。系统设计充分考虑了分布式架构的可扩展性和稳定性,适用于大规模用户同时访问的场景。项目代码已开源,可在Gitee平台上获取。 ... [详细]
  • Windows环境下详细教程:如何搭建Git服务
    Windows环境下详细教程:如何搭建Git服务 ... [详细]
  • IIS 7及7.5版本中应用程序池的最佳配置策略与实践
    在IIS 7及7.5版本中,优化应用程序池的配置是提升Web站点性能的关键步骤。具体操作包括:首先定位到目标Web站点的应用程序池,然后通过“应用程序池”菜单找到对应的池,右键选择“高级设置”。在一般优化方案中,建议调整以下几个关键参数:1. **基本设置**: - **队列长度**:默认值为1000,可根据实际需求调整队列长度,以提高处理请求的能力。此外,还可以进一步优化其他参数,如处理器使用限制、回收策略等,以确保应用程序池的高效运行。这些优化措施有助于提升系统的稳定性和响应速度。 ... [详细]
  • 本文将详细介绍如何在SSM框架中无缝集成ShardingSphere 4.10,以实现高效的数据分片和读写分离。通过实例演示和代码解析,帮助开发者快速掌握这一复杂但实用的技术。文章从基础概念入手,逐步深入到具体配置和应用实践,旨在为读者提供一个全面、易懂的整合指南。 ... [详细]
  • HTTP请求与响应机制:基础概览
    在Web浏览过程中,HTTP协议通过请求和响应报文实现客户端与服务器之间的通信。当用户访问一个网页时,浏览器会发送一个HTTP请求报文至服务器,服务器接收到请求后,会生成并返回一个HTTP响应报文。这两种报文均包含三个主要部分:起始行、头部字段和消息体,确保了数据的有效传输和解析。 ... [详细]
  • 本题库精选了Java核心知识点的练习题,旨在帮助学习者巩固和检验对Java理论基础的掌握。其中,选择题部分涵盖了访问控制权限等关键概念,例如,Java语言中仅允许子类或同一包内的类访问的访问权限为protected。此外,题库还包括其他重要知识点,如异常处理、多线程、集合框架等,全面覆盖Java编程的核心内容。 ... [详细]
  • Go语言实现Redis客户端与服务器的交互机制深入解析
    在前文对Godis v1.0版本的基础功能进行了详细介绍后,本文将重点探讨如何实现客户端与服务器之间的交互机制。通过具体代码实现,使客户端与服务器能够顺利通信,赋予项目实际运行的能力。本文将详细解析Go语言在实现这一过程中的关键技术和实现细节,帮助读者深入了解Redis客户端与服务器的交互原理。 ... [详细]
  • 在生产环境中进行高效部署与优化 ... [详细]
  • MVVM架构~mvc,mvp,mvvm大话开篇
    返回目录百度百科的定义:MVP是从经典的模式MVC演变而来,它们的基本思想有相通的地方:ControllerPresenter负责逻辑的处理,Model提供数据,View负责显示。作为一种新的模 ... [详细]
  • 本章节深入探讨了 Webpack 命令的高级功能,涵盖了官方快速入门教程中未涉及的细节。通过实际操作和案例分析,对官方文档进行了详细解读与补充,帮助读者更好地理解和应用这些进阶技巧。 ... [详细]
author-avatar
Ajax
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有