2 工作队列(Work Queue)关注公众号:Java课代表,每日文章更新,及时获取更多常识。
在第一篇教程中,咱们写了两个程序用来从指定的 queue 中发送和接管音讯。这篇教程,咱们将创立一个工作队列,用来给多个 worker 散发一些”耗时的”工作。
工作队列(或者称之为工作队列)背地的思维,是用来防止立刻解决那些很耗资源并且须要期待其运行完结的工作(课代表注:说白了就是削峰)。取而代之的是,将工作安顿到稍后进行(课代表注:说白了就是异步执行)。一个后盾运行的工作程序将会接管到并执行该工作。当你运行了多个工作程序,工作队列中的工作将会被他们独特分担解决。
这个思维在web利用中十分有用,因为在web利用中,通过一个短的http申请窗口无奈解决简单的工作。
在后面的教程中,咱们发送了一个字符串音讯:“”Hello World!”。接下来咱们发送一些用来代表工作很简单的字符串。咱们并没有真实世界中那些像图片缩放,PDF文件渲染之类的简单工作,所以,让咱们应用Thread.sleep()
办法来伪装很忙。用字符串中点号的个数当做工作的复杂度:每个点号代表一秒钟的“工作”。例如:由字符串Hello...
代表的工作将耗时3秒钟。
将后面例子中Send.java的代码略微扭转一下,使其容许任意音讯从终端输出。该利用会将工作安顿到咱们的工作队列,所以给它命名为:NewTask.java
String message = String.join(" ", argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
老的 Recv.java 利用也要做一些改变:它须要为音讯中的每个点号伪造一秒钟的工作。它将负责接管音讯并解决工作,所以将它命名为Worker.java
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
用来模仿执行工夫的假工作:
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
像教程1中那样编译一下(确保须要的jar包都在工作目录中,并且设置了环境变量:CP):
javac -cp $CP NewTask.java Worker.java
Windows下自行将 $CP 替换为 %CP%,下同。——课代表注
应用工作队列的劣势之一是不便横向扩大。假如工作积压了,咱们能够减少更多的 worker 程序,轻松扩大。
首先,让咱们同时运行两个 worker
实例。他们都将从队列中获取音讯,但具体是怎么运行的呢?咱们一起探索一下。
你须要关上三个终端。两个用来运行worker
程序。这两个将会是消费者——C1和C2
# shell 1 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
# shell 2 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
第三个终端用来公布新工作。当消费者启动之后,能够发送几个音讯:
# shell 3 java -cp $CP NewTask First message. # => [x] Sent 'First message.' java -cp $CP NewTask Second message.. # => [x] Sent 'Second message..' java -cp $CP NewTask Third message... # => [x] Sent 'Third message...' java -cp $CP NewTask Fourth message.... # => [x] Sent 'Fourth message....' java -cp $CP NewTask Fifth message..... # => [x] Sent 'Fifth message.....'
让咱们看一看运行 worker 的终端打印了什么:
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
默认状况下,RabbitMQ 会将每个音讯按程序发送给下一个消费者。每个消费者都会被平均分配到雷同数量的音讯。这种音讯散发机制称为轮询。
能够多运行几个 worker 实例自行尝试。
执行工作可能须要一段时间。你有没有想过,如果工作还没执行完,利用挂掉了怎么办?以咱们目前的代码,一旦 RabbitMQ 将音讯分发给了消费者,它会立即将该音讯标记为已删除。如此看来,一旦终止 worker 程序,就会失落它正在解决的音讯,以及它曾经接管,但还没开始解决的音讯。
但咱们并不心愿失落工作。如果一个 worker 利用挂掉了,咱们心愿他所解决的工作能交给给别的 worker 解决。
为了确保音讯不会失落,RabbitMQ 提供音讯确认机制。音讯确认由消费者发回,通知 RabbitMQ 某个指定的音讯曾经被接管、解决,并且 RabbitMQ 能够删掉该音讯了。
如果某个消费者没有返回确认(ack) 就挂掉了(channel 敞开,链接敞开或者TCP连贯失落了),RabbitMQ 将会认为该音讯没有被正确处理,会将其从新入队(re-queue)。如果此时有其余消费者在线,RabbitMQ 会迅速将该音讯发送给他们。这样就能够保障,即便 worker 忽然挂了,音讯也不会失落。
音讯不会超时:RabbitMQ 将会在某个消费者挂掉时从新发送该音讯。即便解决一条音讯须要破来源gaodai#ma#com搞@代~码网费很长时间也无所谓。
手工音讯确认 默认开启。在后面的示例中咱们通过设置autoAck=true
将其敞开了。当初咱们将标记位设为false
,并让worker 在工作实现时发送确认信息。
channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
下面的代码能够确保即便你应用 CTRL+C 进行一个正在解决音讯的worker,也不会失落任何音讯。worker 挂掉后未被确认的音讯将会很快被从新投递。
确认音讯的发送必须和接管音讯时的 channel 雷同。尝试应用不同的 channel 返回确认将会报 channel 协定异样。具体参见确认机制的参考文档
遗记确认
一个常见的谬误就是遗记调用
basicAck
。这个简略谬误,将会导致严重后果。当你的程序处理完音讯,却遗记发送确认,音讯将会被从新投递,RabbitMQ 因为无奈删除未被确认的音讯,导致内存占用越来越多。为了不便排查此类问题,能够应用
rabbitmqctl
工具打印messages_unacknowledged
字段:sudo rabbitmqctl list_queues name messages_ready messages_unacknowledgedWindows上来掉 sudo :
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
咱们曾经学习了如何在消费者挂掉的状况下保障工作不失落。然而,如果 RabbitMQ 服务进行了,工作还是会丢。
如果没有通过配置,当 RabbitMQ 进行或解体时,它将会失落 队列(queue) 中已有的音讯。为了防止这种状况,咱们须要将队列(queue) 和音讯(message) 都设置为长久化(durable):
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
只管下面的命令是对的,但目前还不能正确工作。因为咱们曾经在 RabbitMQ 中申明了一个名为“hello”的非长久化队列。RabbitMQ 无奈批改已存在队列的参数。咱们能够换个思路,命名一个新的,开启长久化的队列,比方task_queue
:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
长久化参数为true
的queueDeclare
办法须要在生产者和消费者代码中都加上。
此时,咱们能够确定,即便 RabbitMQ 重启,task_queue
这个队列也不会丢。接下来咱们通过将MessageProperties
的值设置为PERSISTENT_TEXT_PLAIN
,从而将音讯设置为长久化。
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
音讯长久化的注意事项
将音讯标记为长久化并不能齐全保障音讯不失落。只管通知了
RabbitMQ
将音讯保留到磁盘,依然存在一段小的窗口期RabbitMQ接管了音讯但还没来得及保留。此外,RabbitMQ
不会对每条音讯都执行fsync(2)
—— 它可能刚刚被写入缓存,还没真正写到磁盘上。长久化机制并不强壮,但对于task
来说队列足够了。如果须要更牢靠的长久化,你须要应用 publisher confirms。
轮询散发有时候并不能满足咱们的须要。比方在只有两个 worker 的场景下,序号为奇数的音讯波及大量运算,而序号为偶数的音讯都很简略。RabbitMQ 并不知道音讯的难易水平,他只会平均分发给两个 worker。
呈现这种状况是因为,RabbitMQ 只负责将队列中收到的音讯散发进来,他并不关怀消费者未确认的音讯数量。它只是自觉地将第N的音讯发给第N个消费者。
为了解决这个问题,咱们能够调用 basicQos
办法,将它的参数 prefetchCount 设置为 1。这将通知 RabbitMQ 同一时间内给 worker 的音讯数量不要超过 1。换句话说,在 worker 没有返回确认之前,不要给他散发新音讯。这样一来,RabbitMQ 会将音讯发送给其余不忙的 worker。
int prefetchCount = 1; channel.basicQos(prefetchCount);
对于队列大小
如果所有 worker 都很忙,队列有可能被塞满。你须要实时监控他的大小,或者减少 worker 的数量,或者采纳其余策略(课代表注:比方管制生产者和消费者的比例)
最终的 NewTask.java
代码如下:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
(NewTask.java 源文件)”)
Worker.java:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection cOnnection= factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
(Worker.java 源文件)”)
应用音讯确认并设置prefetchCount
参数建设的工作队列。其长久化设置能够让音讯在 RabbitMQ 重启后仍然存在。
更多对于 Channel
和 MessageProperties
的内容,请拜访:JavaDocs online.
接下来咱们进入教程3,学习如何将同一个音讯发送给多个消费者。
举荐浏览
RabbitMQ教程 1.“Hello World”
Freemarker 教程(一)-模板开发手册
下载的附件名总乱码?你该去读一下 RFC 文档了!
应用Spring Validation优雅地校验参数
深入浅出 MySQL 优先队列(你肯定会踩到的order by limit 问题)
码字不易,欢送点赞分享。
搜寻:【Java课代表】,关注公众号,及时获取更多Java干货。