热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

07RabbitMQ任务模式

概述WorkQueues,也被称为(TaskQueues)任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处

概述

Work Queues,也被称为(Task Queues)任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

以上的角色分别为如下所解释的:



  • P:生产者:任务的发布者

  • C1:消费者1,领取任务并且完成任务,假设完成速度较慢

  • C2:消费者2:领取任务并完成任务,假设完成速度较快


创建生产者

代码如下所示:



java

/**
* @author: BNZeng
**/
public class Producer {

@Test
public void sendMessage() throws Exception {
Connection cOnnection= RabbitMQUtil.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare("hello", false, false, false, null);

for (int i = 1; i <= 100; i++) {
channel.basicPublish("", "hello", null, ("hello rabbitmq Work Queue → " + i).getBytes());
}

// 关闭通道和连接
RabbitMQUtil.closeChannelAndConnection(channel, connection);

System.out.println("消息发送成功");
}
}

创建消费者 1

代码如下所示:



java

/**
* @author BNZeng
*/
public class Consumer1 {
@Test
public void receiveMessage() throws Exception {
Connection cOnnection= RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者【1】收到消息 → " + new String(body));
}
});
System.out.println("消费者【1】启动成功");
// 不能让程序结束
System.in.read();
// 释放资源
RabbitMQUtil.closeChannelAndConnection(channel, connection);
}
}

创建消费者 2

代码如下所示:



java

/**
* @author BNZeng
*/
public class Consumer2 {
@Test
public void receiveMessage() throws Exception {
Connection cOnnection= RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
// 把签收模式变成 false
channel.basicConsume("hello", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者【2】收到消息 → " + new String(body));
}
});
System.out.println("消费者【2】启动成功");
// 不能让程序结束
System.in.read();
// 释放资源
RabbitMQUtil.closeChannelAndConnection(channel, connection);
}
}

进行测试

先启动消费者1和消费者2,再启动消息生产者发送消息,发现结果如下:

他们是平均消费的,官网有说明:https://www.rabbitmq.com/tutorials/tutorial-two-java.html

那么实际开发中可能有消费者处理的慢,有的处理的快,那么如何配置呢,引入自动确认机制,


消息的自动确认机制

  • 官方的说明

完成一项任务可能需要几秒钟。您可能想知道,如果一个消费者开始了一项很长的任务,但只完成了一部分就去世了,会发生什么。在我们当前的代码中,一旦 RabbitMQ 向使用者传递了一条消息,它就会立即将其标记为删除。在这种情况下,如果你杀死一个工人,我们就会丢失它正在处理的信息。我们还将丢失所有发送给这个特定 worker 但尚未处理的消息。

但我们不想失去任何任务。如果一个工人死亡,我们希望任务被交付给另一个工人。

接下来改造消费者2用来模拟一下某一个消费者消费慢的情况下会怎么样,改造之后的代码如下:



java

/**
* @author BNTang
*/
public class Consumer2 {
@Test
public void receiveMessage() throws Exception {
Connection cOnnection= RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 一次只处理一条消息
channel.basicQos(1);
channel.queueDeclare("hello", false, false, false, null);
// 把签收模式变成 false
channel.basicConsume("hello", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println("消费者【2】收到消息 → " + new String(body));
}
});
System.out.println("消费者【2】启动成功");
// 不能让程序结束
System.in.read();
// 释放资源
RabbitMQUtil.closeChannelAndConnection(channel, connection);
}
}

运行起来进行测试,结果如下所示:

我常常觉得我在为一个没时间享受的人生奋斗,我希望它能有价值



推荐阅读
  • 在RabbitMQ中,消息发布者默认情况下不会接收到关于消息在Broker中状态的反馈,这可能导致消息丢失的问题。为了确保消息的可靠传输与投递,可以采用确认机制(如发布确认和事务模式)来验证消息是否成功抵达Broker,并采取相应的重试策略以提高系统的可靠性。此外,还可以配置消息持久化和镜像队列等高级功能,进一步增强消息的可靠性和高可用性。 ... [详细]
  • 本文详细探讨了Java集合框架的使用方法及其性能特点。首先,通过关系图展示了集合接口之间的层次结构,如`Collection`接口作为对象集合的基础,其下分为`List`、`Set`和`Queue`等子接口。其中,`List`接口支持按插入顺序保存元素且允许重复,而`Set`接口则确保元素唯一性。此外,文章还深入分析了不同集合类在实际应用中的性能表现,为开发者选择合适的集合类型提供了参考依据。 ... [详细]
  • 2019年后蚂蚁集团与拼多多面试经验详述与深度剖析
    2019年后蚂蚁集团与拼多多面试经验详述与深度剖析 ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • Spring Batch 异常处理与任务限制优化策略 ... [详细]
  • PHP中元素的计量单位是什么? ... [详细]
  • 池子比率:BSV 区块链上的去中心化金融应用——Uniswap 分析
    池子比率:BSV 区块链上的去中心化金融应用——Uniswap 分析 ... [详细]
  • 开发心得:深入探讨Servlet、Dubbo与MyBatis中的责任链模式应用
    开发心得:深入探讨Servlet、Dubbo与MyBatis中的责任链模式应用 ... [详细]
  • Java 8 引入了 Stream API,这一新特性极大地增强了集合数据的处理能力。通过 Stream API,开发者可以更加高效、简洁地进行集合数据的遍历、过滤和转换操作。本文将详细解析 Stream API 的核心概念和常见用法,帮助读者更好地理解和应用这一强大的工具。 ... [详细]
  • 在软件开发领域,“池”技术被广泛应用,如数据库连接池、线程池等。本文重点探讨Java中的线程池ThreadPoolExecutor,通过详细解析其内部机制,帮助开发者理解如何高效利用线程池管理任务执行。线程池不仅能够显著减少系统资源的消耗,提高响应速度,还能通过合理的配置,如饱和策略,确保在高负载情况下系统的稳定性和可靠性。文章还将结合实际案例,展示线程池在不同应用场景下的具体实现与优化技巧。 ... [详细]
  • 题目旨在解决树上的路径最优化问题,具体为在给定的树中寻找一条长度介于L到R之间的路径,使该路径上的边权平均值最大化。通过点分治策略,可以有效地处理此类问题。若无长度限制,可采用01分数规划模型,将所有边权减去一个常数m,从而简化计算过程。此外,利用单调队列优化动态规划过程,进一步提高算法效率。 ... [详细]
  • 深入解析十大经典排序算法:动画演示、原理分析与代码实现
    本文深入探讨了十种经典的排序算法,不仅通过动画直观展示了每种算法的运行过程,还详细解析了其背后的原理与机制,并提供了相应的代码实现,帮助读者全面理解和掌握这些算法的核心要点。 ... [详细]
  • Python与R语言在功能和应用场景上各有优势。尽管R语言在统计分析和数据可视化方面具有更强的专业性,但Python作为一种通用编程语言,适用于更广泛的领域,包括Web开发、自动化脚本和机器学习等。对于初学者而言,Python的学习曲线更为平缓,上手更加容易。此外,Python拥有庞大的社区支持和丰富的第三方库,使其在实际应用中更具灵活性和扩展性。 ... [详细]
  • 智能网联汽车自动驾驶功能测试场地的方法与标准要求
    智能网联汽车自动驾驶功能测试场地的方法与标准要求 ... [详细]
  • Java服务问题快速定位与解决策略全面指南 ... [详细]
author-avatar
hareleemu_699
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有