热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

rabbitMQ的消息可靠性投递,手动确认,消费端限流,队列过期时间的实现

1、消息可靠性投递当生产者向交换机发送消息的时候,可能会发生消息泄露。比如:当交换器重启的时候,生产者这事向交换机发送消息,
1、消息可靠性投递

在这里插入图片描述

  • 当生产者向交换机发送消息的时候,可能会发生消息泄露。比如:当交换器重启的时候,生产者这事向交换机发送消息,交换机没有接受到消息,那么消息就会被丢失
  • 当交换机向队列发送消息的时候,也可能发生消息的泄露。

为了确保消息的可靠性传递,提供了两种方式:

  • confirm 确认模式
  • return 返回模式

实现的代码如下:

  • 启动了中的配置:

server:port: 8080spring:rabbitmq:# rabbitmq的ip地址host: 192.168.31.70# 开启确认模式,默认为nonepublisher-confirm-type: correlated# 开启返回模式,默认为falsepublisher-returns: true

  • 测试类

@RestController
public class ProductController {&#64;Autowiredprivate RabbitTemplate rabbitTemplate;&#64;GetMapping("/hello")public String hello(){// 生产者向交换机传递消息的时候会走这个方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {&#64;Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// ack表示传递的消息是否收到&#xff0c;false为未收到&#xff0c;true为收到if (!ack){System.out.println("aaaaaa");}}});// 交换机向队列传递消息的时候会走这个方法rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {//ReturnedMessage里面包含传递的消息内容、交换机的信息、路由key的信息&#64;Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println(returned);}});Map map &#61; new HashMap();map.put("id","1");map.put("num","10");String s &#61; JSON.toJSONString(map);for (int i &#61; 0; i < 10; i&#43;&#43;) {rabbitTemplate.convertAndSend("rabbit_exchange_Topic","aaa.orange.rabbit",s);}return "下单成功";}
}

2、Consumer ACK

ACK&#xff1a;表示消费者收到消息后的确认方式

  • 自动确认&#xff1a;一旦消息被消费者收到&#xff0c;则自动确认收到消息&#xff0c;并将信息从队列中移除
  • 手动确认&#xff1a;需要调用channel.basicAck()手动签收&#xff0c;如果出现异常&#xff0c;则调用channel.basicNack()方法&#xff0c;让自动重新发送消息

在实际开发过程中&#xff0c;如果在消费者消费消息的时候&#xff0c;业务处理的时候&#xff0c;出现异常&#xff0c;那么该消息就可能丢失。需要使用手动模式&#xff0c;在业务处理完之后再使用channel.basicAck()方法手动签收。

代码实现&#xff1a;

  • 配置文件

server:port: 8082
spring:rabbitmq:host: 192.168.31.70listener:simple:# 开启手动确认模式acknowledge-mode: manual

  • 代码测试&#xff1a;

&#64;RestController
public class ConsumerOneController {&#64;RabbitListener(queues &#61; {"rabbit_hello_topics_one"})public void test(Message message , Channel channel){long deliveryTag &#61; message.getMessageProperties().getDeliveryTag();byte[] body &#61; message.getBody();System.out.println(new String(body));try{//long deliveryTag, 消息发送的标志// boolean multiple 是否允许多确认&#xff0c;指的是&#xff0c;如果之前有消息没有被确认&#xff0c;那么这里设置为true&#xff0c;就可以将之前的消息一并确认System.out.println("业务逻辑");
// channel.basicAck(deliveryTag,true);}catch (Exception e){//long deliveryTag,// boolean multiple,// boolean requeue 是否允许队列重新发布信息try {channel.basicNack(deliveryTag,true,true);} catch (IOException ex) {ex.printStackTrace();}}}
}

3、消费端限流

服务器消费的信息是有限制的&#xff0c;假如队列里面有10000条信息&#xff0c;这10000条信息全部被消费者一次性消费&#xff0c;可能会导致消费者直接宕机

  • 必须设置为手动确认
  • 必须配置限流的个数
  • 配置文件如下&#xff1a;

spring:rabbitmq:host: 192.168.213.188listener:simple:#表示手动确认acknowledge-mode: manual# 表示自动确认模式# acknowledge-mode: none# 设置每次消费的个数。prefetch: 5

  • 测试如下&#xff1a;
  • 我们可以将手动设置为不确认&#xff0c;看是否只是收到5条信息

&#64;RestController
public class ConsumerOneController {&#64;RabbitListener(queues &#61; {"rabbit_hello_topics_one"})public void test(Message message , Channel channel){long deliveryTag &#61; message.getMessageProperties().getDeliveryTag();byte[] body &#61; message.getBody();System.out.println(new String(body));try{//long deliveryTag, 消息发送的标志// boolean multiple 是否允许多确认System.out.println("业务逻辑");//channel.basicAck(deliveryTag,true);}catch (Exception e){//long deliveryTag,// boolean multiple,// boolean requeue 是否允许队列重新发布信息try {channel.basicNack(deliveryTag,true,true);} catch (IOException ex) {ex.printStackTrace();}}}
}

4、TTL

  • 1、可以设置队列的过期时间&#xff0c;所有放到该队列中的消息&#xff0c;只要时间过了就直接消失&#xff0c;并且该消息必须在头部
  • 2、给消息设置过期时间&#xff0c;该消息时间到了之后&#xff0c;必须在队列的头部才能消失

代码测试&#xff1a;

//为队列设置过期时间 相当于该队列里面的消息都由过期时间&#64;Testpublic void testSend(){rabbitTemplate.convertAndSend("myexchange","","hello xiaoxi");}//设置消息的过期时间 如果由设置了队列的过期时间 也设置了消息的过期时间 谁的过期时间短 以谁为准。//该消息必须在头部才能从队列中移除。&#64;Testpublic void testSend02(){for(int i&#61;0;i<10;i&#43;&#43;) {if(i&#61;&#61;3){MessagePostProcessor messagePostProcessor &#61; new MessagePostProcessor() {&#64;Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置消息的过期时间 message.getMessageProperties().setExpiration("20000");return message;}};//String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessorrabbitTemplate.convertAndSend("myexchange", "", "hello xiaoxi"&#43;i, messagePostProcessor);}else {//String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessorrabbitTemplate.convertAndSend("myexchange", "", "hello xiaoxi"&#43;i);}}}

5、通过代码的方式去创建队列以及绑定

&#64;Configuration
public class RabbitConfig {private final static String EXCHANEG_NAME&#61;"kaiwanxiao";private final static String QUEUE_NAME&#61;"xiaokai";&#64;Beanpublic Exchange getExchange(){Exchange build &#61; ExchangeBuilder.directExchange(EXCHANEG_NAME).durable(true).autoDelete().build();return build;}&#64;Beanpublic Queue getQueue(){Queue build &#61; QueueBuilder.durable(QUEUE_NAME).autoDelete().withArgument("x-message-ttl", 20000).build();return build;}&#64;Beanpublic Binding getBinding(Queue queue ,Exchange exchange){Binding info &#61; BindingBuilder.bind(queue).to(exchange).with("info").noargs();return info;}}


推荐阅读
  • 本文介绍了在Android开发中使用软引用和弱引用的应用。如果一个对象只具有软引用,那么只有在内存不够的情况下才会被回收,可以用来实现内存敏感的高速缓存;而如果一个对象只具有弱引用,不管内存是否足够,都会被垃圾回收器回收。软引用和弱引用还可以与引用队列联合使用,当被引用的对象被回收时,会将引用加入到关联的引用队列中。软引用和弱引用的根本区别在于生命周期的长短,弱引用的对象可能随时被回收,而软引用的对象只有在内存不够时才会被回收。 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • C++字符字符串处理及字符集编码方案
    本文介绍了C++中字符字符串处理的问题,并详细解释了字符集编码方案,包括UNICODE、Windows apps采用的UTF-16编码、ASCII、SBCS和DBCS编码方案。同时说明了ANSI C标准和Windows中的字符/字符串数据类型实现。文章还提到了在编译时需要定义UNICODE宏以支持unicode编码,否则将使用windows code page编译。最后,给出了相关的头文件和数据类型定义。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 图像因存在错误而无法显示 ... [详细]
  • 本文讨论了微软的STL容器类是否线程安全。根据MSDN的回答,STL容器类包括vector、deque、list、queue、stack、priority_queue、valarray、map、hash_map、multimap、hash_multimap、set、hash_set、multiset、hash_multiset、basic_string和bitset。对于单个对象来说,多个线程同时读取是安全的。但如果一个线程正在写入一个对象,那么所有的读写操作都需要进行同步。 ... [详细]
  • 本文介绍了一个React Native新手在尝试将数据发布到服务器时遇到的问题,以及他的React Native代码和服务器端代码。他使用fetch方法将数据发送到服务器,但无法在服务器端读取/获取发布的数据。 ... [详细]
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • Hibernate延迟加载深入分析-集合属性的延迟加载策略
    本文深入分析了Hibernate延迟加载的机制,特别是集合属性的延迟加载策略。通过延迟加载,可以降低系统的内存开销,提高Hibernate的运行性能。对于集合属性,推荐使用延迟加载策略,即在系统需要使用集合属性时才从数据库装载关联的数据,避免一次加载所有集合属性导致性能下降。 ... [详细]
  • 本文介绍了一道经典的状态压缩题目——关灯问题2,并提供了解决该问题的算法思路。通过使用二进制表示灯的状态,并枚举所有可能的状态,可以求解出最少按按钮的次数,从而将所有灯关掉。本文还对状压和位运算进行了解释,并指出了该方法的适用性和局限性。 ... [详细]
  • 在开发中,有时候一个业务上要求的原子操作不仅仅包括数据库,还可能涉及外部接口或者消息队列。此时,传统的数据库事务无法满足需求。本文介绍了Java中如何利用java.lang.Runtime.addShutdownHook方法来保证业务线程的完整性。通过添加钩子,在程序退出时触发钩子,可以执行一些操作,如循环检查某个线程的状态,直到业务线程正常退出,再结束钩子程序。例子程序展示了如何利用钩子来保证业务线程的完整性。 ... [详细]
author-avatar
傻要傻到嗨样
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有