热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

如何确保RabbitMQ中发送端消息的可靠传输与投递

在RabbitMQ中,消息发布者默认情况下不会接收到关于消息在Broker中状态的反馈,这可能导致消息丢失的问题。为了确保消息的可靠传输与投递,可以采用确认机制(如发布确认和事务模式)来验证消息是否成功抵达Broker,并采取相应的重试策略以提高系统的可靠性。此外,还可以配置消息持久化和镜像队列等高级功能,进一步增强消息的可靠性和高可用性。

消息发布者向RabbitMQ进行消息投递时默认情况下是不返回发布者该条消息在broker中的状态的,也就是说发布者不知道这条消息是否真的抵达RabbitMQ的broker之上,也因此会发生消息丢失的情况。

对此,RabbitmQ提供了两种解决方案(以官方提供的SDK为例)

1.通过AMOP提供的事务机制:

C#代码:

                try
{
channel.TxSelect();
channel.BasicPublish(
"yu.exchange", "yu.1", props, msg);
channel.TxCommit();
}
catch (Exception ex)
{
channel.TxRollback();
}

java代码是一样的操作。。。

        byte[] msg = "hello".getBytes();
try {
channel.txSelect();
channel.basicPublish(
"yu.exchange", "yu.1", MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
channel.txCommit();
}
catch (Exception ex) {
channel.txRollback();
}

事务开启,提交,回滚都有了。。。

2.Conform模式,也就是官网推荐的消息批量提交的方式

Conform模式主要包含两种编程模式,一种同步的,一种异步通知的:

同步回调的调用方式与事务模式差不多

C#代码:

                channel.ConfirmSelect();
byte[] msg = Encoding.UTF8.GetBytes("hello");
channel.BasicPublish(
"yu.exchange1", "yu.1", props, msg);
bool success = channel.WaitForConfirms(new TimeSpan(0, 0, 10));
Console.WriteLine(success);

Java代码:

        byte[] msg = "hello".getBytes();
channel.confirmSelect();
channel.basicPublish(
"yu.exchange", "yu.1", MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
boolean success
= channel.waitForConfirms(10);

通道Channel开启Conform,在发送完消息后可以通过WaitForConfirm等待消息的投递结果,这里有个可选参数,就是阻塞等待的时间,如果返回结果为false则表示消息投递失败,则发送端这时候也就可以采取重试之类的策略了。

异步回调的方式也就是通道订阅RabbitMQ的发送完毕确认事件,消息投递成功会回调这个方法给发送方,回调的参数包含当前消息在该通道中发送的编号DeliveryTag(批量提交的时候可以根据这个编号对应提交集合的索引,保证对应集合索引上的消息可靠投递),的最大值是9223372036854775807

C#代码:

           channel.BasicAcks += (sender, eventArgs) =>
{
ulong tag = eventArgs.DeliveryTag;
};
channel.BasicReturn
+= (sender, eventArgs) =>
{

};
byte[] msg = Encoding.UTF8.GetBytes("hello");
channel.BasicPublish(
"yu.exchange1", "yu.1", true, props, msg);

Java代码:

 channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long l, boolean b) throws IOException {
System.
out.println(l);
}

public void handleNack(long l, boolean b) throws IOException {
System.
out.println(l);
}
});
channel.addReturnListener(
new ReturnListener() {
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
System.
out.println("响应状态码-ReplyCode:"+i);
System.
out.println("响应内容-ReplyText:"+s);
System.
out.println("Exchange:"+s1);
System.
out.println("RouteKey"+s2);
System.out.println("投递失败的消息:"+ new String(bytes,"UTF-8") );
            }
});
byte[] msg = "hello".getBytes();
channel.confirmSelect();
channel.basicPublish(
"yu.exchange1", "yu.1",true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);

代码中多订阅了一个BasicReturn事件(addReturnListener),当消息被RabbitMQ拒绝或者说没有成功投递的时候,则会触发这个方法,当然想要获取详细信息则需要设置mandatory参数为true,也就是basicPublish("yu.exchange1", "yu.1",true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);中的第三个参数。

队列中新建一个yu.exchange1的交换机然后不绑定队列的情况下则会投递失败的时间;

 


推荐阅读
  • 深入解析SpringMVC核心组件:DispatcherServlet的工作原理
    本文详细探讨了SpringMVC的核心组件——DispatcherServlet的运作机制,旨在帮助有一定Java和Spring基础的开发人员理解HTTP请求是如何被映射到Controller并执行的。文章将解答以下问题:1. HTTP请求如何映射到Controller;2. Controller是如何被执行的。 ... [详细]
  • 本文介绍如何使用MFC和ADO技术调用SQL Server中的存储过程,以查询指定小区在特定时间段内的通话统计数据。通过用户界面选择小区ID、开始时间和结束时间,系统将计算并展示小时级的通话量、拥塞率及半速率通话比例。 ... [详细]
  • 实用正则表达式有哪些
    小编给大家分享一下实用正则表达式有哪些,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下 ... [详细]
  • 本文介绍了如何使用JavaScript的Fetch API与Express服务器进行交互,涵盖了GET、POST、PUT和DELETE请求的实现,并展示了如何处理JSON响应。 ... [详细]
  • Redux入门指南
    本文介绍Redux的基本概念和工作原理,帮助初学者理解如何使用Redux管理应用程序的状态。Redux是一个用于JavaScript应用的状态管理库,特别适用于React项目。 ... [详细]
  • 历经三十年的开发,Mathematica 已成为技术计算领域的标杆,为全球的技术创新者、教育工作者、学生及其他用户提供了一个领先的计算平台。最新版本 Mathematica 12.3.1 增加了多项核心语言、数学计算、可视化和图形处理的新功能。 ... [详细]
  • 在 Android 开发中,通过 Intent 启动 Activity 或 Service 时,可以使用 putExtra 方法传递数据。接收方可以通过 getIntent().getExtras() 获取这些数据。本文将介绍如何使用 RoboGuice 框架简化这一过程,特别是 @InjectExtra 注解的使用。 ... [详细]
  • 本文介绍如何从字符串中移除大写、小写、特殊、数字和非数字字符,并提供了多种编程语言的实现示例。 ... [详细]
  • 黑马头条项目:Vue 文章详情模块与交互功能实现
    本文详细介绍了如何在黑马头条项目中配置文章详情模块的路由、获取和展示文章详情数据,以及实现关注、点赞、不喜欢和评论功能。通过这些步骤,您可以全面了解如何开发一个完整的前端文章详情页面。 ... [详细]
  • 本文详细介绍了 Java 中 org.geotools.data.shapefile.ShapefileDataStore 类的 getCurrentTypeName() 方法,并提供了多个代码示例,帮助开发者更好地理解和使用该方法。 ... [详细]
  • CSS高级技巧:动态高亮当前页面导航
    本文介绍了如何使用CSS实现网站导航栏中当前页面的高亮显示,提升用户体验。通过为每个页面的body元素添加特定ID,并结合导航项的类名,可以轻松实现这一功能。 ... [详细]
  • 深入解析Spring启动过程
    本文详细介绍了Spring框架的启动流程,帮助开发者理解其内部机制。通过具体示例和代码片段,解释了Bean定义、工厂类、读取器以及条件评估等关键概念,使读者能够更全面地掌握Spring的初始化过程。 ... [详细]
  • 本文详细介绍了如何在Kendo UI for jQuery的数据管理组件中,将行标题字段呈现为锚点(即可点击链接),帮助开发人员更高效地实现这一功能。通过具体的代码示例和解释,即使是新手也能轻松掌握。 ... [详细]
  • 在尝试使用C# Windows Forms客户端通过SignalR连接到ASP.NET服务器时,遇到了内部服务器错误(500)。本文将详细探讨问题的原因及解决方案。 ... [详细]
  • 本文深入探讨了SQL数据库中常见的面试问题,包括如何获取自增字段的当前值、防止SQL注入的方法、游标的作用与使用、索引的形式及其优缺点,以及事务和存储过程的概念。通过详细的解答和示例,帮助读者更好地理解和应对这些技术问题。 ... [详细]
author-avatar
空心悟心
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有