作者:漂漂雪飘飘业_348 | 来源:互联网 | 2023-09-14 12:43
php实战RabbitMQ四(数据持久化与公平调度)
- 前言
- 消息持久化
- RabbitMQ退出或崩溃应对
- 消费者异常退出应对
- 持久化注意
- 公平调度
- 源码
- 运行
前言
在开始之前先讲个故事吧!
憧憬下未来,假如我有2个孩子,龙凤胎,俊男靓女,emmm哈哈哈停不下来了。
有天,我给家里留了纸条,纸条内容:
1. 孩子们记得吃饭
2. 要洗碗
3. 要拖地
4. 出门时记得锁门
哈哈哈[ 爱你们的爸爸 ]
分析
我:消息生产者
纸条:rabbitMQ
孩子们: 消费者
通过将故事与我们实际开发相结合,在这里有两个问题:
1. 谁洗碗?谁拖地?都是男孩吗?那女孩子就没事干了。反映到开发中,有的消费者很忙,有的消费者很闲。是否公平调度?2. 纸条我放到地方安全显眼吗?会不会被小风吹走?会不会被无视?反映到开发中,数据能持久存在吗?
消息持久化
RabbitMQ退出或崩溃应对
RabbitMQ退出或崩溃会丢失队列与消息,所以在这里我们需要将消息与队列标记为持久。
队列持久化
源码
根据源码将第三个参数设为true
$channel->queue_declare('hello', false, true, false, false);
消息持久化
源码
源码
根据源码在消息传递的过程中设置delivery_mode的值为2或者DELIVERY_MODE_PERSISTENT
$msg = new AMQPMessage('你的消息', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] );
消费者异常退出应对
消费者可能接收到消息后,执行一个耗时任务,结果执行在中途,异常退出。默认情况下,rabbitMQ一旦发送消息给客户端立即删除。这就很尴尬了,反映到现实~~张三刚下的订单突然没了。
我们当然希望有消费者异常退出后,赶紧来个接茬的,任务交给另一个消费者来干。
消费者消息确认
为了保证消息不丢失,我们希望消费者完成消息处理后发送ack确认,rabbitMQ收到后才能对消息删除。
源码
根据源码在消费者方法中,第四个参数设置为false,要求确认
紧接着在回调函数中处理完消息后调用ack()方法,发送确认
$callback = function($msg) {echo " [x] Received ", $msg->body, "\n";$msg->ack();
};$channel->basic_consume('mq_sms_send_q', 'consumer1', false, false, false, false, $callback);
持久化注意
不能在已存在的队列名上加持久化设置,会报错,可以重新设置队列名称
公平调度
rabbitMQ在分发给消费者任务时,不会智能去监测消费者是否空闲。所以会出现部分消费者在处理一个重任务还未完成,另一个任务已经到来。而另外的消费者之前拿到轻任务很快处理完,闲了很久。
源码
根据源码在basic_qos()方法中,第二个参数设置为1,等待消费者处理完成后再接新消息,不堆积
$channel->basic_qos(null, 1, null);
源码
生产者
<?php
require_once __DIR__ . &#39;/vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
fwrite(STDOUT, "Please enter a message:\n");
$msg_str = fgets(STDIN);
$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;guest&#39;, &#39;guest&#39;);
$channel = $connection->channel();
$channel->exchange_declare(&#39;mq_sms_send_ex&#39;, &#39;direct&#39;, false, true, false);
$channel->queue_declare(&#39;mq_sms_send_q1&#39;, false, true, false, false);
$channel->queue_bind(&#39;mq_sms_send_q1&#39;, &#39;mq_sms_send_ex&#39;, &#39;sms_send&#39;);
$msg = new AMQPMessage($msg_str, [&#39;delivery_mode&#39; => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, &#39;mq_sms_send_ex&#39;, &#39;sms_send&#39;);
echo " [x] Sent: $msg_str \n";$channel->close();
$connection->close();
消费者
<?php
require_once __DIR__ . &#39;/vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;guest&#39;, &#39;guest&#39;);
$channel = $connection->channel();
$channel->exchange_declare(&#39;mq_sms_send_ex&#39;, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_declare(&#39;mq_sms_send_q1&#39;, false, true, false, false);
$channel->queue_bind(&#39;mq_sms_send_q1&#39;, &#39;mq_sms_send_ex&#39;, &#39;sms_send&#39;);echo &#39; [*] Waiting for messages. To exit press CTRL+C&#39;, "\n";$callback = function($msg) {echo " [x] Received ", $msg->body, "\n";$msg->ack();if (trim($msg->body) == &#39;quit&#39;) {$msg->getChannel()->basic_cancel($msg->getConsumerTag());}};$channel->basic_qos(null, 1, null);
$channel->basic_consume(&#39;mq_sms_send_q1&#39;, &#39;consumer1&#39;, false, false, false, false, $callback);function shutdown($channel, $connection)
{$channel->close();$connection->close();
}register_shutdown_function(&#39;shutdown&#39;, $channel, $connection);while(count($channel->callbacks)) {$channel->wait();
}
运行