热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

.NetCore&RabbitMQ限制循环消费

前言当消费者端接收消息处理业务时,如果出现异常或是拒收消息将消息又变更为等待投递再次推送给消费者,这样一来,则形成循环的条件。循环场景生产者发送100条消息到RabbitMQ中,消费者设定读取到第50条消息时,设置拒收,同时设定是否还留存在当前队列中(当requeue为f

前言

当消费者端接收消息处理业务时,如果出现异常或是拒收消息将消息又变更为等待投递再次推送给消费者,这样一来,则形成循环的条件。

图片

循环场景

生产者发送100条消息到RabbitMQ中,消费者设定读取到第50条消息时,设置拒收,同时设定是否还留存在当前队列中(当requeue为false时,设置了死信队列则进入死信队列,否则移除消息)。

consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
    {
        Console.WriteLine("拒收");
        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
        return;
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};

当第50条消息拒收,则仍在队列中且处在队列头部,重新推送给消费者,再次拒收,再次推送,反反复复。
图片

最终其他消息全部消费完毕,仅剩第50条消息往复间不断消费,拒收,消费,这将可能导致RabbitMQ出现内存泄漏问题。

图片

解决方案

RabbitMQ及AMQP协议本身没有提供这类重试功能,但可以利用一些已有的功能来间接实现重试限定(以下只考虑基于手动确认模式情况)。此处只想到或是只查到了如下几种方案解决消息循环消费问题。

  • 一次消费
    • 无论成功与否,消费者都对外返回ack,将拒收原因或是异常信息catch存入本地或是新队列中另作重试。
    • 消费者拒绝消息或是出现异常,返回Nack或Reject,消息进入死信队列或丢弃(requeue设定为false)。
  • 限定重试次数
    • 在消息的头中添加重试次数,并将消息重新发送出去,再每次重新消费时从头中判断重试次数,递增或递减该值,直到达到限制,requeue改为false,最终进入死信队列或丢弃。
    • 可以在Redis、Memcache或其他存储中存储消息唯一键(例如Guid、雪花Id等,但必须在发布消息时手动设置它),甚至在mysql中连同重试次数一起存储,然后在每次重新消费时递增/递减该值,直到达到限制,requeue改为false,最终进入死信队列或丢弃。
    • 队列使用Quorum类型,限制投递次数,超过次数消息被删除。
  • 队列消息过期
    • 设置过期时间,给队列或是消息设置TTL,重试一定次数消息达到过期时间后进入死信队列或丢弃(requeue设定为true)。
  • 也许还有更多好的方案...

一次消费

对外总是Ack

消息到达了消费端,可因某些原因消费失败了,对外可以发送Ack,而在内部走额外的方式去执行补偿操作,比如将消息转发到内部的RabbitMQ或是其他处理方式,终归是只消费一次。

var queueName = "alwaysack_queue";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 5, false);

var cOnsumer= new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    try
    {
        var message = ea.Body;
        Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));

        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
        {
            throw new Exception("模拟异常");
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }
    finally
    {
        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
    }
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

当消费端收到消息,处理时出现异常,可以另想办法去处理,而对外保持着ack的返回,以避免消息的循环消费。图片

消息不重入队列

在消费者端,因异常或是拒收消息时,对requeue设置为false时,如果设置了死信队列,则符合“消息被拒绝且不重入队列”这一进入死信队列的情况,从而避免消息反复重试。如未设置死信队列,则消息被丢失。

图片

此处假定接收100条消息,在接收到第50条消息时设置拒收,并且设置了requeue为false。

var dlxExchangeName = "dlx_exchange";
channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
var dlxQueueName = "dlx_queue";
channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");

var queueName = "nackorreject_queue";
var arguments = new Dictionary
{
    { "x-dead-letter-exchange", dlxExchangeName }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);
channel.BasicQos(0, 5, false);

var cOnsumer= new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
    {
        Console.WriteLine("拒收");
        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);//关键在于requeue=false
        return;
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

如此一来,拒收消息不会重入队列,并且现有队列绑定了死信交换机,因此,消息进入到死信队列中,如不绑定,则消息丢失。
图片

限定重试次数

设置重试次数,限定循环消费的次数,允许短暂的循环,但最终打破循环。

消息头设定次数

在消息头中设置次数记录作为标记,但是,消费端无法对接收到的消息修改消息头然后将原消息送回MQ,因此,需要将原消息内容重新发送消息到MQ,具体步骤如下

  1. 原消息设置不重入队列。
  2. 再发送新的消息其内容与原消息一致,可设置新消息的消息头来携带重试次数。
  3. 消费端再次消费时,便可从消息头中查看消息被消费的次数。
    图片

此处假定接收10条消息,在接收到第5条消息时设置拒收, 当消息头中重试次数未超过设定的3次时,消息可以重入队列,再次被消费。

var queueName = "messageheaderretrycount_queue";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 5, false);

var cOnsumer= new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("5"))
    {
        var maxRetryCount = 3;

        Console.WriteLine($"拒收 {DateTime.Now}");

        //初次消费
        if (ea.BasicProperties.Headers == null)
        {
            //原消息设置为不重入队列
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);

            //发送新消息到队列中
            RetryPublishMessage(channel, queueName, message.ToArray(), 1);
            return;
        }

        //获取重试次数
        var retryCount = ParseRetryCount(ea);
        if (retryCount ();
    basicProperties.Headers.Add("retryCount", retryCount);
    channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: basicProperties, body: body);
}

static int ParseRetryCount(BasicDeliverEventArgs ea)
{
    var existRetryRecord = ea.BasicProperties.Headers.TryGetValue("retryCount", out object retryCount);
    if (!existRetryRecord)
    {
        throw new Exception("没有设置重试次数");
    }

    return (int)retryCount;
}

消息被拒收后,再重新发送消息到原有交换机或是队列下中,以使得消息像是消费失败回到了队列中,如此来控制消费次数,但是这种场景下,新消息排在了队列的尾部,而不是原消息排在队列头部。
图片

存储重试次数

在存储服务中存储消息的唯一标识与对应重试次数,消费消息前对消息进行判断是否存在。

图片

与消息头判断一致,只是消息重试次数的存储从消息本身挪入存储服务中了。需要注意的是,消息发送端需要设置消息的唯一标识(MessageId属性)

//模拟外部存储服务
var MessageRetryCounts = new Dictionary();

var queueName = "storageretrycount_queue";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 5, false);

var cOnsumer= new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
    {
        var maxRetryCount = 3;
        Console.WriteLine("拒收");
    
        //重试次数判断
        var existRetryRecord = MessageRetryCounts.ContainsKey(ea.BasicProperties.MessageId);
        if (!existRetryRecord)
        {
            //重入队列,继续重试
            MessageRetryCounts.Add(ea.BasicProperties.MessageId, 1);
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
            return;
        }
    
        if (MessageRetryCounts[ea.BasicProperties.MessageId] 

除第一次拒收外,允许三次重试机会,三次重试完毕后,设置requeue为false,消息丢失或进入死信队列(如有设置的话)。
图片

队列使用Quorum类型

第一种和第二种分别是消息自身、外部存储服务来管理消息重试次数,使用Quorum,由MQ来限定消息的投递次数,也就控制了重试次数。

图片

设置队列类型为quorum,设置投递最大次数,当超过投递次数后,消息被丢弃。

var queueName = "quorumtype_queue";
var arguments = new Dictionary()
{
    { "x-queue-type", "quorum"},
    { "x-delivery-limit", 3 }
};
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
channel.BasicQos(0, 5, false);

var cOnsumer= new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
    {
        Console.WriteLine($"拒收 {DateTime.Now}");
        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
        return;
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

第一次消费被拒收重入队列后,经最大三次投递后,消费端不再收到消息,如此一来也限制了消息的循环消费。
图片

队列消息过期

当为消息设置了过期时间时,当消息没有受到Ack,且还在队列中,受到过期时间的限制,反复消费但未能成功时,消息将走向过期,进入死信队列或是被丢弃。

聚焦于过期时间的限制,因此在消费者端,因异常或是拒收消息时,需要对requeue设置为true,将消息再次重入到原队列中。

图片

设定消费者端第五十条消息会被拒收,且队列的TTL设置为5秒。

//死信交换机和死信队列
var dlxExchangeName = "dlx_exchange";
channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
var dlxQueueName = "dlx_queue";
channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");

//常规队列
var queueName = "normalmessage_queue";
var arguments = new Dictionary
{
    { "x-message-ttl", 5000},
    { "x-dead-letter-exchange", dlxExchangeName }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);
channel.BasicQos(0, 5, false);

var cOnsumer= new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
    {
        Console.WriteLine($"拒收 {DateTime.Now}");

        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
        return;
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

当消费者端拒收消息后消息重入队列,再次消费,反复进行超过5秒后,消息在队列中达到了过期时间,则被挪入到死信队列中。
图片

从Web管理中死信队列中可查看该条过期的消息。

图片

参考资料

  1. https://www.jianshu.com/p/f77a0b10c140
  2. https://www.jianshu.com/p/4904c609632f
  3. https://***.com/questions/23158310/how-do-i-set-a-number-of-retry-attempts-in-rabbitmq

2022-10-29,望技术有成后能回来看见自己的脚步


推荐阅读
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 基于Node.js的高性能实时消息推送系统通过集成Socket.IO和Express框架,实现了高效的高并发消息转发功能。该系统能够支持大量用户同时在线,并确保消息的实时性和可靠性,适用于需要即时通信的应用场景。 ... [详细]
  • 本文详细介绍了HDFS的基础知识及其数据读写机制。首先,文章阐述了HDFS的架构,包括其核心组件及其角色和功能。特别地,对NameNode进行了深入解析,指出其主要负责在内存中存储元数据、目录结构以及文件块的映射关系,并通过持久化方案确保数据的可靠性和高可用性。此外,还探讨了DataNode的角色及其在数据存储和读取过程中的关键作用。 ... [详细]
  • Prim算法在处理稠密图时表现出色,尤其适用于边数远多于顶点数的情形。传统实现的时间复杂度为 \(O(n^2)\),但通过引入优先队列进行优化,可以在点数为 \(m\)、边数为 \(n\) 的情况下显著降低时间复杂度,提高算法效率。这种优化方法不仅能够加速最小生成树的构建过程,还能在大规模数据集上保持良好的性能表现。 ... [详细]
  • PHP中元素的计量单位是什么? ... [详细]
  • 在《PHP应用性能优化实战指南:从理论到实践的全面解析》一文中,作者分享了一次实际的PHP应用优化经验。文章回顾了先前进行的一次优化项目,指出即使系统运行时间较长后出现的各种问题和性能瓶颈,通过采用一些通用的优化策略仍然能够有效解决。文中不仅详细阐述了优化的具体步骤和方法,还结合实例分析了优化前后的性能对比,为读者提供了宝贵的参考和借鉴。 ... [详细]
  • 在Java中,匿名函数作为一种无名的函数结构,无法独立调用;而在JavaScript中,不仅有类似的匿名函数,还有立即执行函数(IIFE)和闭包等高级特性。立即执行函数同样基于匿名函数实现,但会在定义时立即执行,而闭包则通过嵌套函数来捕获外部变量,实现数据封装和持久化。这些不同的函数形式在实际开发中各有应用场景,理解其特点有助于更好地利用语言特性进行编程。 ... [详细]
  • 本文详细解析了 MySQL 5.7.20 版本中二进制日志(binlog)崩溃恢复机制的工作流程。假设使用 InnoDB 存储引擎,并且启用了 `sync_binlog=1` 配置,文章深入探讨了在系统崩溃后如何通过 binlog 进行数据恢复,确保数据的一致性和完整性。 ... [详细]
  • Java 零基础入门:SQL Server 学习笔记(第21篇)
    Java 零基础入门:SQL Server 学习笔记(第21篇) ... [详细]
  • 本文介绍了一种简化版的在线购物车系统,重点探讨了用户登录和购物流程的设计与实现。该系统通过优化界面交互和后端逻辑,提升了用户体验和操作便捷性。具体实现了用户注册、登录验证、商品浏览、加入购物车以及订单提交等功能,旨在为用户提供高效、流畅的购物体验。 ... [详细]
  • MongoDB Aggregates.group() 方法详解与编程实例 ... [详细]
  • 深入解析:RKHunter与AIDE在入侵检测中的应用与优势
    本文深入探讨了RKHunter与AIDE在入侵检测领域的应用及其独特优势。通过对比分析,详细阐述了这两种工具在系统完整性验证、恶意软件检测及日志文件监控等方面的技术特点和实际效果,为安全管理人员提供了有效的防护策略建议。 ... [详细]
  • RancherOS 是由 Rancher Labs 开发的一款专为 Docker 设计的轻量级 Linux 发行版,提供了一个全面的 Docker 运行环境。其引导镜像仅 20MB,非常适合在资源受限的环境中部署。本文将详细介绍如何在 ESXi 虚拟化平台上安装和配置 RancherOS,帮助用户快速搭建高效、稳定的容器化应用环境。 ... [详细]
  • 最大化两个非空子集之间的和的差异:集合划分策略分析 ... [详细]
  • 在RabbitMQ中,消息发布者默认情况下不会接收到关于消息在Broker中状态的反馈,这可能导致消息丢失的问题。为了确保消息的可靠传输与投递,可以采用确认机制(如发布确认和事务模式)来验证消息是否成功抵达Broker,并采取相应的重试策略以提高系统的可靠性。此外,还可以配置消息持久化和镜像队列等高级功能,进一步增强消息的可靠性和高可用性。 ... [详细]
author-avatar
郑越与焕柳的88_679
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有