1、消息可靠性投递
- 当生产者向交换机发送消息的时候,可能会发生消息泄露。比如:当交换器重启的时候,生产者这事向交换机发送消息,交换机没有接受到消息,那么消息就会被丢失
- 当交换机向队列发送消息的时候,也可能发生消息的泄露。
为了确保消息的可靠性传递,提供了两种方式:
实现的代码如下:
server:port: 8080spring:rabbitmq:host: 192.168.31.70publisher-confirm-type: correlatedpublisher-returns: true
@RestController
public class ProductController {&#64;Autowiredprivate RabbitTemplate rabbitTemplate;&#64;GetMapping("/hello")public String hello(){rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {&#64;Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack){System.out.println("aaaaaa");}}});rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {&#64;Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println(returned);}});Map map &#61; new HashMap();map.put("id","1");map.put("num","10");String s &#61; JSON.toJSONString(map);for (int i &#61; 0; i < 10; i&#43;&#43;) {rabbitTemplate.convertAndSend("rabbit_exchange_Topic","aaa.orange.rabbit",s);}return "下单成功";}
}
2、Consumer ACK
ACK&#xff1a;表示消费者收到消息后的确认方式
- 自动确认&#xff1a;一旦消息被消费者收到&#xff0c;则自动确认收到消息&#xff0c;并将信息从队列中移除
- 手动确认&#xff1a;需要调用channel.basicAck()手动签收&#xff0c;如果出现异常&#xff0c;则调用channel.basicNack()方法&#xff0c;让自动重新发送消息
在实际开发过程中&#xff0c;如果在消费者消费消息的时候&#xff0c;业务处理的时候&#xff0c;出现异常&#xff0c;那么该消息就可能丢失。需要使用手动模式&#xff0c;在业务处理完之后再使用channel.basicAck()方法手动签收。
代码实现&#xff1a;
server:port: 8082
spring:rabbitmq:host: 192.168.31.70listener:simple:acknowledge-mode: manual
&#64;RestController
public class ConsumerOneController {&#64;RabbitListener(queues &#61; {"rabbit_hello_topics_one"})public void test(Message message , Channel channel){long deliveryTag &#61; message.getMessageProperties().getDeliveryTag();byte[] body &#61; message.getBody();System.out.println(new String(body));try{System.out.println("业务逻辑");
}catch (Exception e){try {channel.basicNack(deliveryTag,true,true);} catch (IOException ex) {ex.printStackTrace();}}}
}
3、消费端限流
服务器消费的信息是有限制的&#xff0c;假如队列里面有10000条信息&#xff0c;这10000条信息全部被消费者一次性消费&#xff0c;可能会导致消费者直接宕机
- 必须设置为手动确认
- 必须配置限流的个数
- 配置文件如下&#xff1a;
spring:rabbitmq:host: 192.168.213.188listener:simple:#表示手动确认acknowledge-mode: manual# 表示自动确认模式# acknowledge-mode: none# 设置每次消费的个数。prefetch: 5
- 测试如下&#xff1a;
- 我们可以将手动设置为不确认&#xff0c;看是否只是收到5条信息
&#64;RestController
public class ConsumerOneController {&#64;RabbitListener(queues &#61; {"rabbit_hello_topics_one"})public void test(Message message , Channel channel){long deliveryTag &#61; message.getMessageProperties().getDeliveryTag();byte[] body &#61; message.getBody();System.out.println(new String(body));try{System.out.println("业务逻辑");}catch (Exception e){try {channel.basicNack(deliveryTag,true,true);} catch (IOException ex) {ex.printStackTrace();}}}
}
4、TTL
- 1、可以设置队列的过期时间&#xff0c;所有放到该队列中的消息&#xff0c;只要时间过了就直接消失&#xff0c;并且该消息必须在头部
- 2、给消息设置过期时间&#xff0c;该消息时间到了之后&#xff0c;必须在队列的头部才能消失
代码测试&#xff1a;
&#64;Testpublic void testSend(){rabbitTemplate.convertAndSend("myexchange","","hello xiaoxi");}&#64;Testpublic void testSend02(){for(int i&#61;0;i<10;i&#43;&#43;) {if(i&#61;&#61;3){MessagePostProcessor messagePostProcessor &#61; new MessagePostProcessor() {&#64;Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("20000");return message;}};rabbitTemplate.convertAndSend("myexchange", "", "hello xiaoxi"&#43;i, messagePostProcessor);}else {rabbitTemplate.convertAndSend("myexchange", "", "hello xiaoxi"&#43;i);}}}
5、通过代码的方式去创建队列以及绑定
&#64;Configuration
public class RabbitConfig {private final static String EXCHANEG_NAME&#61;"kaiwanxiao";private final static String QUEUE_NAME&#61;"xiaokai";&#64;Beanpublic Exchange getExchange(){Exchange build &#61; ExchangeBuilder.directExchange(EXCHANEG_NAME).durable(true).autoDelete().build();return build;}&#64;Beanpublic Queue getQueue(){Queue build &#61; QueueBuilder.durable(QUEUE_NAME).autoDelete().withArgument("x-message-ttl", 20000).build();return build;}&#64;Beanpublic Binding getBinding(Queue queue ,Exchange exchange){Binding info &#61; BindingBuilder.bind(queue).to(exchange).with("info").noargs();return info;}}