作者:GXtingker | 来源:互联网 | 2023-08-21 20:28
利用RetryOperationsInterceptor做重试机制,假如某个消息抛异常重试,会导致这个消息之后的来的消息一直被阻塞吗?还是这个重试的消息被插到队尾,让其他消息消费?
利用RetryOperationsInterceptor做重试机制,假如某个消息抛异常重试,会导致这个消息之后的来的消息一直被阻塞吗?还是这个重试的消息被插到队尾,让其他消息消费?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| @Bean
SimpleRabbitListenerContainerFactory lowLoadRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory()
factory.cOnnectionFactory= connectionFactory
factory.cOncurrentConsumers= 1
factory.maxCOncurrentConsumers= 1
factory.recoveryInterval = 1000L
factory.setAdviceChain(retryOperationsInterceptor())
return factory
}
@Bean
RetryOperationsInterceptor retryOperationsInterceptor() {
RetryTemplate retryTemplate = new RetryTemplate()
RetryPolicy retryPolicy = new SimpleRetryPolicy(Integer.MAX_VALUE)
retryPolicy.setMaxAttempts(5)
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy()
backOffPolicy.setInitialInterval(60000)
backOffPolicy.setMultiplier(2)
backOffPolicy.setMaxInterval(3600000)
retryTemplate.setRetryPolicy(retryPolicy)
retryTemplate.setBackOffPolicy(backOffPolicy)
retryTemplate.registerListener(new RetryListener() {
@Override
boolean open(RetryContext context, RetryCallback callback) {
return true
}
@Override
void close(RetryContext context, RetryCallback callback, Throwable throwable) {
if (throwable != null) {
log.error("Failed: Retry count " + context.getRetryCount(), throwable)
}
}
@Override
void onError(RetryContext context, RetryCallback callback, Throwable throwable) {
log.error("Retry count " + context.getRetryCount(), throwable)
}
})
RetryOperationsInterceptor interceptor = RetryInterceptorBuilder.stateless()
.retryOperations(retryTemplate)
.build()
return interceptor
} |