使用场景:调用第三方接口时如果调用失败需要隔几秒再去尝试下一次调用,直到调用N次还失败就停止调用。最常用的场景就是支付成功异步通知第三方支付成功。
如果调用1次就成功了就停止调用,如果失败可能由于网络原因没有请求到服务器需要再次尝试,第二次很可能就会调用成功了。
如果是因为网络原因没有请求到服务器如果再立刻调用,很可能此时网络还是没有好,可能等几秒后网络就恢复了,此时再去调用就好了。
实现效果类似于支付宝中的回调延迟重试:
在介绍具体的实现思路之前,我们先来介绍一下RabbitMQ的两个特性,一个是Time-To-Live Extensions,另一个是Dead Letter Exchanges。
RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。更多资料请查阅官方文档。
刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:
消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。消息因为设置了TTL而过期。消息进入了一条已经达到最大长度的队列。 如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。更多资料请查阅官方文档。
通过RabbitMQ的TTL和DLX特性结合在一起,实现一个延迟队列。
针对于上述的延迟队列的两个场景,我们分别有以下两种流程图:
延迟消费是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果。
延迟重试本质上也是延迟消费的一种,但是这种模式的结构与普通的延迟消费的流程图较为不同,所以单独拎出来介绍。
如下图所示,消费者发现该消息处理出现了异常,比如是因为网络波动引起的异常。那么如果不等待一段时间,直接就重试的话,很可能会导致在这期间内一直无法成功,造成一定的资源浪费。那么我们可以将其先放在缓冲队列中(图中红色队列),等消息经过一段的延迟时间后再次进入实际消费队列中(图中蓝色队列),此时由于已经过了“较长”的时间了,异常的一些波动通常已经恢复,这些消息可以被正常地消费。
功能示例:每隔2秒、4秒、8秒、16秒去重试调用接口,总共调用4次。
org.springframework.boot spring-boot-starter-amqp org.apache.httpcomponents httpclient 4.5.6com.alibaba fastjson 1.2.62org.projectlombok lombok true
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest ## http component http: maxTotal: 100 #最大连接数 defaultMaxPerRoute: 20 # 并发数 connectTimeout: 1000 #创建连接的最长时间 connectionRequestTimeout: 500 #从连接池中获取到连接的最长时间 socketTimeout: 10000 #数据传输的最长时间 validateAfterInactivity: 1000
import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class HttpClientConfig { @Value("${http.maxTotal}") private Integer maxTotal; @Value("${http.defaultMaxPerRoute}") private Integer defaultMaxPerRoute; @Value("${http.connectTimeout}") private Integer connectTimeout; @Value("${http.connectionRequestTimeout}") private Integer connectionRequestTimeout; @Value("${http.socketTimeout}") private Integer socketTimeout; @Value("${http.validateAfterInactivity}") private Integer validateAfterInactivity; @Bean public PoolingHttpClientConnectionManager poolingHttpClientConnectionManager(){ PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); connectionManager.setMaxTotal(maxTotal); connectionManager.setDefaultMaxPerRoute(defaultMaxPerRoute); connectionManager.setValidateAfterInactivity(validateAfterInactivity); return connectionManager; } @Bean public HttpClientBuilder httpClientBuilder(PoolingHttpClientConnectionManager poolingHttpClientConnectionManager){ HttpClientBuilder httpClientBuilder = HttpClientBuilder.create(); httpClientBuilder.setConnectionManager(poolingHttpClientConnectionManager); return httpClientBuilder; } @Bean public CloseableHttpClient closeableHttpClient(HttpClientBuilder httpClientBuilder){ return httpClientBuilder.build(); } @Bean public RequestConfig.Builder builder(){ RequestConfig.Builder builder = RequestConfig.custom(); return builder.setConnectTimeout(connectTimeout) .setConnectionRequestTimeout(connectionRequestTimeout) .setSocketTimeout(socketTimeout); } @Bean public RequestConfig requestConfig(RequestConfig.Builder builder){ return builder.build(); } }
import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpHeaders; import org.apache.http.HttpStatus; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.utils.URIBuilder; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Map; @Slf4j @Component public class HttpClient { /** 默认字符集 */ public static final String DEFAULT_CHARSET = "UTF-8"; @Autowired private CloseableHttpClient closeableHttpClient; @Autowired private RequestConfig config; public T doPost(String url, Map requestParameter, Class clazz) throws Exception { HttpResponse httpResponse = this.doPost(url, requestParameter); if (clazz == String.class) { return (T) httpResponse.getBody(); } T response = JSONObject.parseObject(httpResponse.getBody(), clazz); return response; } public HttpResponse doPost(String url, Map requestParameter) throws Exception { HttpPost httpPost = new HttpPost(url); httpPost.setConfig(config); httpPost.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); if (requestParameter != null) { String requestBody = JSONObject.toJSONString(requestParameter); StringEntity postEntity = new StringEntity(requestBody, "UTF-8"); httpPost.setEntity(postEntity); } CloseableHttpResponse response = this.closeableHttpClient.execute(httpPost); // 对请求的响应进行简单的包装成自定义的类型 return new HttpResponse(response.getStatusLine().getStatusCode(), EntityUtils.toString( response.getEntity(), DEFAULT_CHARSET)); } /** * 封装请求的响应码和响应的内容 */ public class HttpResponse { /** http status */ private Integer code; /** http response content */ private String body; public HttpResponse() { } public HttpResponse(Integer code, String body) { this.code = code; this.body = body; } public Integer getCode() { return code; } public void setCode(Integer code) { this.code = code; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } } }
import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; /** * 设置消息过期时间 */ public class ExpirationMessagePostProcessor implements MessagePostProcessor{ private final Long ttl; public ExpirationMessagePostProcessor(Long ttl) { this.ttl = ttl; } @Override public Message postProcessMessage(Message message) throws AmqpException { // 设置失效时间 message.getMessageProperties().setExpiration(ttl.toString()); return message; } }
package com.example.rabbitmq.retry; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 延迟队列 */ @Configuration public class DelayQueueConfig { /** 缓冲队列名称:过期时间针对于每个消息 */ public final static String DELAY_QUEUE_PER_MESSAGE_TTL_NAME = "delay_queue_per_message_ttl"; /** 死亡交换机:过期消息将通过该死亡交换机放入到实际消费的队列中 */ public final static String DELAY_EXCHANGE_NAME = "delay_exchange"; /** 死亡交换机对应的路由键,通过该路由键路由到实际消费的队列 */ public final static String DELAY_PROCESS_QUEUE_NAME = "delay_process_queue"; /** 路由到 delay_queue_per_message_ttl(统一失效时间的队列)的exchange(用于队列延迟重试) */ public final static String PER_MESSAGE_TTL_EXCHANGE_NAME = "per_message_ttl_exchange"; /** * delay_queue_per_message_ttl * 每个消息都可以控制自己的失效时间 * x-dead-letter-exchange声明了队列里的死信转发到的DLX名称 * x-dead-letter-routing-key声明了这些死信在转发时携带的routing-key名称 */ @Bean Queue delayQueuePerMessageTTL() { return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME) .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) .build(); } /** * 死亡交换机 DLX * @return */ @Bean DirectExchange delayExchange() { return new DirectExchange(DELAY_EXCHANGE_NAME); } /** * 实际消费队列:过期之后会进入到该队列中来 * @return */ @Bean Queue delayProcessQueue() { return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME).build(); } /** * 将死亡交换机delayExchange和实际消费队列delay_process_queue绑定在一起,并携带路由键delay_process_queue * @param delayProcessQueue * @param delayExchange * @return */ @Bean Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) { return BindingBuilder.bind(delayProcessQueue) .to(delayExchange) .with(DELAY_PROCESS_QUEUE_NAME); } /** * 重试交换机:消费失败后通过该交换机转发到队列中 * @return */ @Bean DirectExchange perMessageTtlExchange() { return new DirectExchange(PER_MESSAGE_TTL_EXCHANGE_NAME); } /** * 重试交换机和缓冲队列绑定 * @param delayQueuePerMessageTTL 缓冲队列 * @param perMessageTtlExchange 重试交换机 * @return */ @Bean Binding messageTtlBinding(Queue delayQueuePerMessageTTL, DirectExchange perMessageTtlExchange) { return BindingBuilder.bind(delayQueuePerMessageTTL) .to(perMessageTtlExchange) .with(DELAY_QUEUE_PER_MESSAGE_TTL_NAME); } }
import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; import java.util.Map; import java.util.Objects; /** * 支付回调重试 * 2秒、4秒、8秒后分别重试&#xff0c;算上0秒这次总共重试4次 */ &#64;Slf4j &#64;Component public class CallbackConsumer { &#64;Autowired private AmqpTemplate rabbitTemplate; &#64;Autowired private HttpClient httpClient; /** 最大重试次数 */ private Integer maxRetryTime &#61; 4; &#64;RabbitListener(queues &#61; DelayQueueConfig.DELAY_PROCESS_QUEUE_NAME) public void process(String msg) { log.info("----------date &#61; {}, msg &#61;{}", new Date(), msg); Map callbackRequestMap &#61; JSONObject.parseObject(msg, Map.class); // 重试次数 Integer retryTime &#61; (Integer)callbackRequestMap.get("retryTime"); String notifyUrl &#61; (String)callbackRequestMap.get("notifyUrl"); try { if (maxRetryTime <&#61; 4) { log.info("################Callback retry time &#61; {}, date &#61; {}################", retryTime, new Date()); String response &#61; httpClient.doPost(notifyUrl, callbackRequestMap, String.class); if (Objects.equals("SUCCESS", response)) { log.info("ok"); } else { // 下一次重试 log.error("error request&#61;{} response&#61;{}", callbackRequestMap, response); if (retryTime &#61;&#61; maxRetryTime) { lastTimeRetryResult(response, null); return; } retryCallback(callbackRequestMap); } } } catch (Exception e) { if (maxRetryTime
package com.example.rabbitmq.retry; import com.alibaba.fastjson.JSONObject; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; import java.util.UUID; &#64;RestController &#64;RequestMapping("/mock") public class MockController { &#64;Autowired private AmqpTemplate rabbitTemplate; &#64;RequestMapping("/callback") public void callback(String notifyUrl) { Map requestMap &#61; new HashMap<>(); requestMap.put("retryTime", 1); requestMap.put("notifyUrl", notifyUrl); requestMap.put("orderCode", UUID.randomUUID().toString()); Object callbackMessage &#61; JSONObject.toJSONString(requestMap); rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME, callbackMessage, new ExpirationMessagePostProcessor(0L)); } }
欢迎关注Java实用技术&#xff0c;每天发布一篇实用技术文章。