热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ消息100%不丢失?

消息消费流程:1.生产端发送消息到RabbitMQ;2.RabbitMQ发送消息到消费端;3.消费端消费消息;以上3个步骤每个步骤都

消息消费流程:

1.生产端发送消息到RabbitMQ;

2.RabbitMQ发送消息到消费端;

3.消费端消费消息;

以上3个步骤每个步骤都可能导致消息丢失,消息丢失并不可怕,可怕的是丢失了我们还不知道,所以要有一系列措施来保证我们系统的可靠性。


生产端可靠性投递

一、事务消息机制

事务消息机制由于会严重降低性能,所以一般不采用这种方法。

二、confirm消息确认机制

生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端,让生产端知道我已经收到消息了,否则这条消息就可能已经丢失了,需要生产端重新发送消息了。

通过下面这句代码来开启确认模式:

channel.confirmSelect();// 开启发送方确认模式

然后异步监听确认和未确认的消息:

channel.addConfirmListener(new ConfirmListener() {//消息正确到达broker@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("已收到消息");//做一些其他处理}//RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("未确认消息,标识:" + deliveryTag);//做一些其他处理,比如消息重发等}
});

三、消息持久化

RabbitMQ收到消息后将这个消息暂时存在了内存中,如果RabbitMQ挂了,那重启后数据就丢失了,所以相关的数据应该持久化到硬盘中,这样就算RabbitMQ重启后也可以到硬盘中取数据恢复。

message消息到达RabbitMQ后先是到exchange交换机中,然后路由给queue队列,最后发送给消费端。

 所有需要给exchange、queue和message都进行持久化:

exchange持久化:

//第三个参数true表示这个exchange持久化
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

queue持久化:

//第二个参数true表示这个queue持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

message持久化:

//第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

四、消息入库

首先发送消息前先将消息保存到数据库中,有一个状态字段status=0,表示生产端将消息发送给了RabbitMQ但还没收到确认;在生产端收到确认后将status设为1,表示RabbitMQ已收到消息。这里有可能会出现上面说的两种情况,所以生产端这边开一个定时器,定时检索消息表,将status=0并且超过固定时间后(可能消息刚发出去还没来得及确认这边定时器刚好检索到这条status=0的消息,所以给个时间)还没收到确认的消息取出重发(第二种情况下这里会造成消息重复,消费者端要做幂等性),可能重发还会失败,所以可以做一个最大重发次数,超过就做另外的处理。


 消费端消息不丢失

默认情况下,以下3种情况会导致消息丢失:


  • 在RabbitMQ将消息发出后,消费端还没接收到消息之前,发生网络故障,消费端与RabbitMQ断开连接,此时消息会丢失;

  • 在RabbitMQ将消息发出后,消费端还没接收到消息之前,消费端挂了,此时消息会丢失;

  • 消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。

 一、自动ack机制改为手动ack机制

因为RabbitMQ的自动ack机制,即默认RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ自己又没有这条消息了。

消费端手动确认消息:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {try {//接收到消息,做处理//手动确认channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {//出错处理,这里可以让消息重回队列重新发送或直接丢弃消息}
};
//第二个参数autoAck设为false表示关闭自动确认机制,需手动确认
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});

二、保证消息幂等性

让每个消息携带一个全局的唯一ID,即可保证消息的幂等性,具体消费过程为:


  1. 消费者获取到消息后先根据id去查询redis/db是否存在该消息
  2. 如果不存在,则正常消费,消费完毕后写入redis/db
  3. 如果存在,则证明消息被消费过,直接丢弃。

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public class Consumer {@RabbitHandlerpublic void receiveMessage(Message message) throws Exception {Jedis jedis = new Jedis("localhost", 6379);String messageId = message.getMessageProperties().getMessageId();String msg = new String(message.getBody(),"UTF-8");System.out.println("接收到的消息为:"+msg+"==消息id为:"+messageId);String messageIdRedis = jedis.get("messageId");if(messageId == messageIdRedis){return;}JSONObject jsonObject = JSONObject.parseObject(msg);String email = jsonObject.getString("message");jedis.set("messageId",messageId);}
}


推荐阅读
  • 2019年后蚂蚁集团与拼多多面试经验详述与深度剖析
    2019年后蚂蚁集团与拼多多面试经验详述与深度剖析 ... [详细]
  • 在Ubuntu系统中,由于预装了MySQL,因此无需额外安装。通过命令行登录MySQL时,可使用 `mysql -u root -p` 命令,并按提示输入密码。常见问题包括:1. 错误 1045 (28000):访问被拒绝,这通常是由于用户名或密码错误导致。为确保顺利连接,建议检查MySQL服务是否已启动,并确认用户名和密码的正确性。此外,还可以通过配置文件调整权限设置,以增强安全性。 ... [详细]
  • 本文深入探讨了 MXOTDLL.dll 在 C# 环境中的应用与优化策略。针对近期公司从某生物技术供应商采购的指纹识别设备,该设备提供的 DLL 文件是用 C 语言编写的。为了更好地集成到现有的 C# 系统中,我们对原生的 C 语言 DLL 进行了封装,并利用 C# 的互操作性功能实现了高效调用。此外,文章还详细分析了在实际应用中可能遇到的性能瓶颈,并提出了一系列优化措施,以确保系统的稳定性和高效运行。 ... [详细]
  • Java 零基础入门:SQL Server 学习笔记(第21篇)
    Java 零基础入门:SQL Server 学习笔记(第21篇) ... [详细]
  • 如何使用 net.sf.extjwnl.data.Word 类及其代码示例详解 ... [详细]
  • 优化后的标题:数据网格视图(DataGridView)在应用程序中的高效应用与优化策略
    在应用程序中,数据网格视图(DataGridView)的高效应用与优化策略至关重要。本文探讨了多种优化方法,包括但不限于:1)通过合理的数据绑定提升性能;2)利用虚拟模式处理大量数据,减少内存占用;3)在格式化单元格内容时,推荐使用CellParsing事件,以确保数据的准确性和一致性。此外,还介绍了如何通过自定义列类型和优化渲染过程,进一步提升用户体验和系统响应速度。 ... [详细]
  • PHP中元素的计量单位是什么? ... [详细]
  • 在进行网络编程时,准确获取本地主机的IP地址是一项基本但重要的任务。Winsock作为20世纪90年代初由Microsoft与多家公司共同制定的Windows平台网络编程接口,为开发者提供了一套高效且易用的工具。通过Winsock,开发者可以轻松实现网络通信功能,并准确获取本地主机的IP地址,从而确保应用程序在网络环境中的稳定运行。此外,了解Winsock的工作原理及其API函数的使用方法,有助于提高开发效率和代码质量。 ... [详细]
  • 本文探讨了如何在C#中实现USB条形码扫描仪的数据读取,并自动过滤掉键盘输入,即使不知道设备的供应商ID(VID)和产品ID(PID)。通过详细的技术指导和代码示例,展示了如何高效地处理条形码数据,确保系统能够准确识别并忽略来自键盘的干扰信号。该方法适用于多种USB条形码扫描仪,无需额外配置设备信息。 ... [详细]
  • 本文介绍了如何通过掌握 IScroll 技巧来实现流畅的上拉加载和下拉刷新功能。首先,需要按正确的顺序引入相关文件:1. Zepto;2. iScroll.js;3. scroll-probe.js。此外,还提供了完整的代码示例,可在 GitHub 仓库中查看。通过这些步骤,开发者可以轻松实现高效、流畅的滚动效果,提升用户体验。 ... [详细]
  • 在Java中,匿名函数作为一种无名的函数结构,无法独立调用;而在JavaScript中,不仅有类似的匿名函数,还有立即执行函数(IIFE)和闭包等高级特性。立即执行函数同样基于匿名函数实现,但会在定义时立即执行,而闭包则通过嵌套函数来捕获外部变量,实现数据封装和持久化。这些不同的函数形式在实际开发中各有应用场景,理解其特点有助于更好地利用语言特性进行编程。 ... [详细]
  • Spring框架入门指南:专为新手打造的详细学习笔记
    Spring框架是Java Web开发中广泛应用的轻量级应用框架,以其卓越的功能和出色的性能赢得了广大开发者的青睐。本文为初学者提供了详尽的学习指南,涵盖基础概念、核心组件及实际应用案例,帮助新手快速掌握Spring框架的核心技术与实践技巧。 ... [详细]
  • 在 Android 开发中,通过合理利用系统通知服务,可以显著提升应用的用户交互体验。针对 Android 8.0 及以上版本,开发者需首先创建并注册通知渠道。本文将详细介绍如何在应用中实现这一功能,包括初始化通知管理器、创建通知渠道以及发送通知的具体步骤,帮助开发者更好地理解和应用这些技术细节。 ... [详细]
  • 本文深入探讨了数据库性能优化与管理策略,通过实例分析和理论研究,详细阐述了如何有效提升数据库系统的响应速度和处理能力。文章首先介绍了数据库性能优化的基本原则和常用技术,包括索引优化、查询优化和存储管理等。接着,结合实际应用场景,讨论了如何利用容器化技术(如Docker)来部署和管理数据库,以提高系统的可扩展性和稳定性。最后,文章还提供了具体的配置示例和最佳实践,帮助读者在实际工作中更好地应用这些策略。 ... [详细]
  • MySQL性能优化与调参指南【数据库管理】
    本文详细探讨了MySQL数据库的性能优化与参数调整技巧,旨在帮助数据库管理员和开发人员提升系统的运行效率。内容涵盖索引优化、查询优化、配置参数调整等方面,结合实际案例进行深入分析,提供实用的操作建议。此外,还介绍了常见的性能监控工具和方法,助力读者全面掌握MySQL性能优化的核心技能。 ... [详细]
author-avatar
你的依靠isme
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有