热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

关于java:RabbitMQ教程2工作队列WorkQueue

关注公众号:Java课代表,每日文章更新,及时获取更多常识。2工作队列(WorkQueue)在第一篇教程中,咱们写了两个程序用来从指定的queue中发送和接管音讯。这篇教程,咱们将
文章目录[隐藏]
  • 筹备工作(Preparation)
  • 轮询散发(Round-robin dispatching)
  • 音讯长久化(Message durability)
  • 偏心散发(Fair dispatch)
  • 代码整合(Putting it all together)

关注公众号:Java课代表,每日文章更新,及时获取更多常识。

2 工作队列(Work Queue)

在第一篇教程中,咱们写了两个程序用来从指定的 queue 中发送和接管音讯。这篇教程,咱们将创立一个工作队列,用来给多个 worker 散发一些”耗时的”工作。

工作队列(或者称之为工作队列)背地的思维,是用来防止立刻解决那些很耗资源并且须要期待其运行完结的工作(课代表注:说白了就是削峰)。取而代之的是,将工作安顿到稍后进行(课代表注:说白了就是异步执行)。一个后盾运行的工作程序将会接管到并执行该工作。当你运行了多个工作程序,工作队列中的工作将会被他们独特分担解决。

这个思维在web利用中十分有用,因为在web利用中,通过一个短的http申请窗口无奈解决简单的工作。

筹备工作(Preparation)

在后面的教程中,咱们发送了一个字符串音讯:“”Hello World!”。接下来咱们发送一些用来代表工作很简单的字符串。咱们并没有真实世界中那些像图片缩放,PDF文件渲染之类的简单工作,所以,让咱们应用Thread.sleep()办法来伪装很忙。用字符串中点号的个数当做工作的复杂度:每个点号代表一秒钟的“工作”。例如:由字符串Hello...代表的工作将耗时3秒钟。

将后面例子中Send.java的代码略微扭转一下,使其容许任意音讯从终端输出。该利用会将工作安顿到咱们的工作队列,所以给它命名为:NewTask.java

String message = String.join(" ", argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

老的 Recv.java 利用也要做一些改变:它须要为音讯中的每个点号伪造一秒钟的工作。它将负责接管音讯并解决工作,所以将它命名为Worker.java

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

用来模仿执行工夫的假工作:

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}

像教程1中那样编译一下(确保须要的jar包都在工作目录中,并且设置了环境变量:CP):

javac -cp $CP NewTask.java Worker.java

Windows下自行将 $CP 替换为 %CP%,下同。——课代表注

轮询散发(Round-robin dispatching)

应用工作队列的劣势之一是不便横向扩大。假如工作积压了,咱们能够减少更多的 worker 程序,轻松扩大。

首先,让咱们同时运行两个 worker 实例。他们都将从队列中获取音讯,但具体是怎么运行的呢?咱们一起探索一下。

你须要关上三个终端。两个用来运行worker程序。这两个将会是消费者——C1和C2

# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

第三个终端用来公布新工作。当消费者启动之后,能够发送几个音讯:

# shell 3
java -cp $CP NewTask First message.
# => [x] Sent 'First message.'
java -cp $CP NewTask Second message..
# => [x] Sent 'Second message..'
java -cp $CP NewTask Third message...
# => [x] Sent 'Third message...'
java -cp $CP NewTask Fourth message....
# => [x] Sent 'Fourth message....'
java -cp $CP NewTask Fifth message.....
# => [x] Sent 'Fifth message.....'

让咱们看一看运行 worker 的终端打印了什么:

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默认状况下,RabbitMQ 会将每个音讯按程序发送给下一个消费者。每个消费者都会被平均分配到雷同数量的音讯。这种音讯散发机制称为轮询

能够多运行几个 worker 实例自行尝试。

音讯确认(Message acknowledgment)

执行工作可能须要一段时间。你有没有想过,如果工作还没执行完,利用挂掉了怎么办?以咱们目前的代码,一旦 RabbitMQ 将音讯分发给了消费者,它会立即将该音讯标记为已删除。如此看来,一旦终止 worker 程序,就会失落它正在解决的音讯,以及它曾经接管,但还没开始解决的音讯。

但咱们并不心愿失落工作。如果一个 worker 利用挂掉了,咱们心愿他所解决的工作能交给给别的 worker 解决。

为了确保音讯不会失落,RabbitMQ 提供音讯确认机制。音讯确认由消费者发回,通知 RabbitMQ 某个指定的音讯曾经被接管、解决,并且 RabbitMQ 能够删掉该音讯了。

如果某个消费者没有返回确认(ack) 就挂掉了(channel 敞开,链接敞开或者TCP连贯失落了),RabbitMQ 将会认为该音讯没有被正确处理,会将其从新入队(re-queue)。如果此时有其余消费者在线,RabbitMQ 会迅速将该音讯发送给他们。这样就能够保障,即便 worker 忽然挂了,音讯也不会失落。

音讯不会超时:RabbitMQ 将会在某个消费者挂掉时从新发送该音讯。即便解决一条音讯须要破来源gaodai#ma#com搞@代~码网费很长时间也无所谓。

手工音讯确认 默认开启。在后面的示例中咱们通过设置autoAck=true将其敞开了。当初咱们将标记位设为false,并让worker 在工作实现时发送确认信息。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

下面的代码能够确保即便你应用 CTRL+C 进行一个正在解决音讯的worker,也不会失落任何音讯。worker 挂掉后未被确认的音讯将会很快被从新投递。

确认音讯的发送必须和接管音讯时的 channel 雷同。尝试应用不同的 channel 返回确认将会报 channel 协定异样。具体参见确认机制的参考文档

遗记确认

一个常见的谬误就是遗记调用basicAck。这个简略谬误,将会导致严重后果。当你的程序处理完音讯,却遗记发送确认,音讯将会被从新投递,RabbitMQ 因为无奈删除未被确认的音讯,导致内存占用越来越多。

为了不便排查此类问题,能够应用 rabbitmqctl 工具打印 messages_unacknowledged 字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Windows上来掉 sudo :

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

音讯长久化(Message durability)

咱们曾经学习了如何在消费者挂掉的状况下保障工作不失落。然而,如果 RabbitMQ 服务进行了,工作还是会丢。

如果没有通过配置,当 RabbitMQ 进行或解体时,它将会失落 队列(queue) 中已有的音讯。为了防止这种状况,咱们须要将队列(queue) 和音讯(message) 都设置为长久化(durable)

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

只管下面的命令是对的,但目前还不能正确工作。因为咱们曾经在 RabbitMQ 中申明了一个名为“hello”的非长久化队列。RabbitMQ 无奈批改已存在队列的参数。咱们能够换个思路,命名一个新的,开启长久化的队列,比方task_queue

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

长久化参数为truequeueDeclare 办法须要在生产者和消费者代码中都加上。

此时,咱们能够确定,即便 RabbitMQ 重启,task_queue 这个队列也不会丢。接下来咱们通过将MessageProperties 的值设置为PERSISTENT_TEXT_PLAIN,从而将音讯设置为长久化。

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

音讯长久化的注意事项

将音讯标记为长久化并不能齐全保障音讯不失落。只管通知了RabbitMQ将音讯保留到磁盘,依然存在一段小的窗口期RabbitMQ接管了音讯但还没来得及保留。此外,RabbitMQ不会对每条音讯都执行 fsync(2) —— 它可能刚刚被写入缓存,还没真正写到磁盘上。长久化机制并不强壮,但对于task 来说队列足够了。如果须要更牢靠的长久化,你须要应用 publisher confirms。

偏心散发(Fair dispatch)

轮询散发有时候并不能满足咱们的须要。比方在只有两个 worker 的场景下,序号为奇数的音讯波及大量运算,而序号为偶数的音讯都很简略。RabbitMQ 并不知道音讯的难易水平,他只会平均分发给两个 worker。

呈现这种状况是因为,RabbitMQ 只负责将队列中收到的音讯散发进来,他并不关怀消费者未确认的音讯数量。它只是自觉地将第N的音讯发给第N个消费者。

为了解决这个问题,咱们能够调用 basicQos办法,将它的参数 prefetchCount 设置为 1。这将通知 RabbitMQ 同一时间内给 worker 的音讯数量不要超过 1。换句话说,在 worker 没有返回确认之前,不要给他散发新音讯。这样一来,RabbitMQ 会将音讯发送给其余不忙的 worker。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

对于队列大小

如果所有 worker 都很忙,队列有可能被塞满。你须要实时监控他的大小,或者减少 worker 的数量,或者采纳其余策略(课代表注:比方管制生产者和消费者的比例)

代码整合(Putting it all together)

最终的 NewTask.java 代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection cOnnection= factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        String message = String.join(" ", argv);

        channel.basicPublish("", TASK_QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    }
  }

}

(NewTask.java 源文件)”)

Worker.java:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection cOnnection= factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    };
    channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
        if (ch == '.') {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
  }
}

(Worker.java 源文件)”)

应用音讯确认并设置prefetchCount参数建设的工作队列。其长久化设置能够让音讯在 RabbitMQ 重启后仍然存在。

更多对于 ChannelMessageProperties 的内容,请拜访:JavaDocs online.

接下来咱们进入教程3,学习如何将同一个音讯发送给多个消费者。

举荐浏览
RabbitMQ教程 1.“Hello World”

Freemarker 教程(一)-模板开发手册

下载的附件名总乱码?你该去读一下 RFC 文档了!

应用Spring Validation优雅地校验参数

深入浅出 MySQL 优先队列(你肯定会踩到的order by limit 问题)

码字不易,欢送点赞分享。
搜寻:【Java课代表】,关注公众号,及时获取更多Java干货。



推荐阅读
  • 本文详细介绍了HDFS的基础知识及其数据读写机制。首先,文章阐述了HDFS的架构,包括其核心组件及其角色和功能。特别地,对NameNode进行了深入解析,指出其主要负责在内存中存储元数据、目录结构以及文件块的映射关系,并通过持久化方案确保数据的可靠性和高可用性。此外,还探讨了DataNode的角色及其在数据存储和读取过程中的关键作用。 ... [详细]
  • 深入解析Gradle中的Project核心组件
    在Gradle构建系统中,`Project` 是一个核心组件,扮演着至关重要的角色。通过使用 `./gradlew projects` 命令,可以清晰地列出当前项目结构中包含的所有子项目,这有助于开发者更好地理解和管理复杂的多模块项目。此外,`Project` 对象还提供了丰富的配置选项和生命周期管理功能,使得构建过程更加灵活高效。 ... [详细]
  • 深入理解Spark框架:RDD核心概念与操作详解
    RDD是Spark框架的核心计算模型,全称为弹性分布式数据集(Resilient Distributed Dataset)。本文详细解析了RDD的基本概念、特性及其在Spark中的关键操作,包括创建、转换和行动操作等,帮助读者深入理解Spark的工作原理和优化策略。通过具体示例和代码片段,进一步阐述了如何高效利用RDD进行大数据处理。 ... [详细]
  • 深入解析JWT的实现与应用
    本文深入探讨了JSON Web Token (JWT) 的实现机制及其应用场景。JWT 是一种基于 RFC 7519 标准的开放性认证协议,用于在各方之间安全地传输信息。文章详细分析了 JWT 的结构、生成和验证过程,并讨论了其在现代 Web 应用中的实际应用案例,为开发者提供了全面的理解和实践指导。 ... [详细]
  • 本文详细解析了如何使用 jQuery 实现一个在浏览器地址栏运行的射击游戏。通过源代码分析,展示了关键的 JavaScript 技术和实现方法,并提供了在线演示链接供读者参考。此外,还介绍了如何在 Visual Studio Code 中进行开发和调试,为开发者提供了实用的技巧和建议。 ... [详细]
  • Ceph API微服务实现RBD块设备的高效创建与安全删除
    本文旨在实现Ceph块存储中RBD块设备的高效创建与安全删除功能。开发环境为CentOS 7,使用 IntelliJ IDEA 进行开发。首先介绍了 librbd 的基本概念及其在 Ceph 中的作用,随后详细描述了项目 Gradle 配置的优化过程,确保了开发环境的稳定性和兼容性。通过这一系列步骤,我们成功实现了 RBD 块设备的快速创建与安全删除,提升了系统的整体性能和可靠性。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • 进程(Process)是指计算机中程序对特定数据集的一次运行活动,是系统资源分配与调度的核心单元,构成了操作系统架构的基础。在早期以进程为中心的计算机体系结构中,进程被视为程序的执行实例,其状态和控制信息通过任务描述符(task_struct)进行管理和维护。本文将深入探讨进程的概念及其关键数据结构task_struct,解析其在操作系统中的作用和实现机制。 ... [详细]
  • 全面解析Java虚拟机:内存模型深度剖析 ... [详细]
  • 深入解析Tomcat:开发者的实用指南
    深入解析Tomcat:开发者的实用指南 ... [详细]
  • Java 零基础入门:SQL Server 学习笔记(第21篇)
    Java 零基础入门:SQL Server 学习笔记(第21篇) ... [详细]
  • 如何在Java中高效构建WebService
    本文介绍了如何利用XFire框架在Java中高效构建WebService。XFire是一个轻量级、高性能的Java SOAP框架,能够简化WebService的开发流程。通过结合MyEclipse集成开发环境,开发者可以更便捷地进行项目配置和代码编写,从而提高开发效率。此外,文章还详细探讨了XFire的关键特性和最佳实践,为读者提供了实用的参考。 ... [详细]
  • 题目描述:小K不幸被LL邪教洗脑,洗脑程度之深使他决定彻底脱离这个邪教。在最终离开前,他计划再进行一次亚瑟王游戏。作为最后一战,他希望这次游戏能够尽善尽美。众所周知,亚瑟王游戏的结果很大程度上取决于运气,但通过合理的策略和算法优化,可以提高获胜的概率。本文将详细解析洛谷P3239 [HNOI2015] 亚瑟王问题,并提供具体的算法实现方法,帮助读者更好地理解和应用相关技术。 ... [详细]
  • 深入解析零拷贝技术(Zerocopy)及其应用优势
    零拷贝技术(Zero-copy)是Netty框架中的一个关键特性,其核心在于减少数据在操作系统内核与用户空间之间的传输次数。通过避免不必要的内存复制操作,零拷贝显著提高了数据传输的效率和性能。本文将深入探讨零拷贝的工作原理及其在实际应用中的优势,包括降低CPU负载、减少内存带宽消耗以及提高系统吞吐量等方面。 ... [详细]
  • 在软件开发领域,“池”技术被广泛应用,如数据库连接池、线程池等。本文重点探讨Java中的线程池ThreadPoolExecutor,通过详细解析其内部机制,帮助开发者理解如何高效利用线程池管理任务执行。线程池不仅能够显著减少系统资源的消耗,提高响应速度,还能通过合理的配置,如饱和策略,确保在高负载情况下系统的稳定性和可靠性。文章还将结合实际案例,展示线程池在不同应用场景下的具体实现与优化技巧。 ... [详细]
author-avatar
jiajian123
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有