热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

php实战RabbitMQ四(消息持久化与公平调度)

php实战RabbitMQ四(数据持久化与公平调度)前言分析消息持久化RabbitMQ退出或崩溃应对队列持久化消息持久化消费者异常退出应对消费者消息确认


php实战RabbitMQ四(数据持久化与公平调度)

  • 前言
    • 分析
  • 消息持久化
    • RabbitMQ退出或崩溃应对
      • 队列持久化
      • 消息持久化
    • 消费者异常退出应对
      • 消费者消息确认
    • 持久化注意
  • 公平调度
  • 源码
    • 生产者
    • 消费者
  • 运行


前言

在开始之前先讲个故事吧!
憧憬下未来,假如我有2个孩子,龙凤胎,俊男靓女,emmm哈哈哈停不下来了。

有天,我给家里留了纸条,纸条内容:
1. 孩子们记得吃饭
2. 要洗碗
3. 要拖地
4. 出门时记得锁门
哈哈哈[ 爱你们的爸爸 ]


分析

我:消息生产者
纸条:rabbitMQ
孩子们: 消费者

通过将故事与我们实际开发相结合,在这里有两个问题:

1. 谁洗碗?谁拖地?都是男孩吗?那女孩子就没事干了。反映到开发中,有的消费者很忙,有的消费者很闲。是否公平调度?2. 纸条我放到地方安全显眼吗?会不会被小风吹走?会不会被无视?反映到开发中,数据能持久存在吗?

消息持久化


RabbitMQ退出或崩溃应对

RabbitMQ退出或崩溃会丢失队列与消息,所以在这里我们需要将消息与队列标记为持久。


队列持久化

源码
在这里插入图片描述

根据源码将第三个参数设为true


$channel->queue_declare('hello', false, true, false, false);

消息持久化

源码
在这里插入图片描述
源码
在这里插入图片描述

根据源码在消息传递的过程中设置delivery_mode的值为2或者DELIVERY_MODE_PERSISTENT

$msg = new AMQPMessage('你的消息', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] );

消费者异常退出应对

消费者可能接收到消息后,执行一个耗时任务,结果执行在中途,异常退出。默认情况下,rabbitMQ一旦发送消息给客户端立即删除。这就很尴尬了,反映到现实~~张三刚下的订单突然没了

我们当然希望有消费者异常退出后,赶紧来个接茬的,任务交给另一个消费者来干。


消费者消息确认

为了保证消息不丢失,我们希望消费者完成消息处理后发送ack确认,rabbitMQ收到后才能对消息删除。
源码
在这里插入图片描述
根据源码在消费者方法中,第四个参数设置为false,要求确认
紧接着在回调函数中处理完消息后调用ack()方法,发送确认

$callback = function($msg) {//收到消息了,打印echo " [x] Received ", $msg->body, "\n";//***消息确认***$msg->ack();
};$channel->basic_consume('mq_sms_send_q', 'consumer1', false, false, false, false, $callback);

持久化注意

不能在已存在的队列名上加持久化设置,会报错,可以重新设置队列名称


公平调度

rabbitMQ在分发给消费者任务时,不会智能去监测消费者是否空闲。所以会出现部分消费者在处理一个重任务还未完成,另一个任务已经到来。而另外的消费者之前拿到轻任务很快处理完,闲了很久。

源码在这里插入图片描述
根据源码在basic_qos()方法中,第二个参数设置为1,等待消费者处理完成后再接新消息,不堆积

$channel->basic_qos(null, 1, null);

源码


生产者

<?php
require_once __DIR__ . &#39;/vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;//获取终端提示用户输入的数据
fwrite(STDOUT, "Please enter a message:\n");
$msg_str = fgets(STDIN);//建立生产者与mq之间的连接
//参数:地址,端口,账号,密码
$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;guest&#39;, &#39;guest&#39;);//在已连接基础上建立生产者与mq之间的通道
$channel = $connection->channel();//声明初始化交换机
//参数:交换机名,路由类型,是否检测同名队列,是否开启队列持久化,通道关闭后是否删除队列
$channel->exchange_declare(&#39;mq_sms_send_ex&#39;, &#39;direct&#39;, false, true, false); //声明初始化一条队列
//参数:队列名,是否检测同名队列,是否开启队列持久化,是否能被其他队列访问,通道关闭后是否删除队列
$channel->queue_declare(&#39;mq_sms_send_q1&#39;, false, true, false, false);//将队列与某个交换机进行绑定,并使用路由关键字
//参数:队列名,交换机名,路由键名
$channel->queue_bind(&#39;mq_sms_send_q1&#39;, &#39;mq_sms_send_ex&#39;, &#39;sms_send&#39;); //生成消息
$msg = new AMQPMessage($msg_str, [&#39;delivery_mode&#39; => AMQPMessage::DELIVERY_MODE_PERSISTENT]);//推送消息到某个交换机
//参数:消息,交换机名,路由键名
$channel->basic_publish($msg, &#39;mq_sms_send_ex&#39;, &#39;sms_send&#39;);
echo " [x] Sent: $msg_str \n";$channel->close();
$connection->close();

消费者

<?php
require_once __DIR__ . &#39;/vendor/autoload.php&#39;;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;$connection = new AMQPStreamConnection(&#39;localhost&#39;, 5672, &#39;guest&#39;, &#39;guest&#39;);
$channel = $connection->channel();//声明初始化交换机
//参数:交换机名,路由类型,是否检测同名队列,是否开启队列持久化,通道关闭后是否删除队列
$channel->exchange_declare(&#39;mq_sms_send_ex&#39;, AMQPExchangeType::DIRECT, false, true, false);//声明初始化一条队列
//参数:队列名,是否检测同名队列,是否开启队列持久化,是否能被其他队列访问,通道关闭后是否删除队列
$channel->queue_declare(&#39;mq_sms_send_q1&#39;, false, true, false, false);//将队列与某个交换机进行绑定,并使用路由关键字
//参数:队列名,交换机名,路由键名
$channel->queue_bind(&#39;mq_sms_send_q1&#39;, &#39;mq_sms_send_ex&#39;, &#39;sms_send&#39;);echo &#39; [*] Waiting for messages. To exit press CTRL+C&#39;, "\n";$callback = function($msg) {echo " [x] Received ", $msg->body, "\n";//消息确认$msg->ack();//判断获取到quit后if (trim($msg->body) == &#39;quit&#39;) {$msg->getChannel()->basic_cancel($msg->getConsumerTag());}};$channel->basic_qos(null, 1, null);//
//参数:队列名,消费者标识符,不接收此使用者发布的消息,使用者是否使用自动确认模式,请求独占使用者访问,不等待,消息回调函数
$channel->basic_consume(&#39;mq_sms_send_q1&#39;, &#39;consumer1&#39;, false, false, false, false, $callback);function shutdown($channel, $connection)
{$channel->close();$connection->close();
}register_shutdown_function(&#39;shutdown&#39;, $channel, $connection);while(count($channel->callbacks)) {$channel->wait();
}

运行

在这里插入图片描述


推荐阅读
  • RabbitMQ的消息持久化处理
    1、RabbitMQ的消息持久化处理,消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。2、auto ... [详细]
  • 本文介绍了南邮ctf-web的writeup,包括签到题和md5 collision。在CTF比赛和渗透测试中,可以通过查看源代码、代码注释、页面隐藏元素、超链接和HTTP响应头部来寻找flag或提示信息。利用PHP弱类型,可以发现md5('QNKCDZO')='0e830400451993494058024219903391'和md5('240610708')='0e462097431906509019562988736854'。 ... [详细]
  • 本文讨论了如何使用IF函数从基于有限输入列表的有限输出列表中获取输出,并提出了是否有更快/更有效的执行代码的方法。作者希望了解是否有办法缩短代码,并从自我开发的角度来看是否有更好的方法。提供的代码可以按原样工作,但作者想知道是否有更好的方法来执行这样的任务。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • 本文讨论了微软的STL容器类是否线程安全。根据MSDN的回答,STL容器类包括vector、deque、list、queue、stack、priority_queue、valarray、map、hash_map、multimap、hash_multimap、set、hash_set、multiset、hash_multiset、basic_string和bitset。对于单个对象来说,多个线程同时读取是安全的。但如果一个线程正在写入一个对象,那么所有的读写操作都需要进行同步。 ... [详细]
  • 本文介绍了SPOJ2829题目的解法及优化方法。题目要求找出满足一定条件的数列,并对结果取模。文章详细解释了解题思路和算法实现,并提出了使用FMT优化的方法。最后,对于第三个限制条件,作者给出了处理方法。文章最后给出了代码实现。 ... [详细]
  • 由于同源策略的限制,满足同源的脚本才可以获取资源。虽然这样有助于保障网络安全,但另一方面也限制了资源的使用。那么如何实现跨域呢,以下是实现跨域的一些方法。 ... [详细]
  • 来吹下汽车
    最近帮同事的一个朋友选车,最后他决定了一汽大众的迈腾,也就是海外版(欧洲为主)的帕萨特B8,国内如果加长过的话,应该叫B8L吧。基于大众最新的通用MQB平台(模块化横置发动机平台) ... [详细]
  • 时域|波形_语音处理基于matlab GUI音频数据处理含Matlab源码 1734期
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了语音处理基于matlabGUI音频数据处理含Matlab源码1734期相关的知识,希望对你有一定的参考价值。 ... [详细]
  • PHP图片截取方法及应用实例
    本文介绍了使用PHP动态切割JPEG图片的方法,并提供了应用实例,包括截取视频图、提取文章内容中的图片地址、裁切图片等问题。详细介绍了相关的PHP函数和参数的使用,以及图片切割的具体步骤。同时,还提供了一些注意事项和优化建议。通过本文的学习,读者可以掌握PHP图片截取的技巧,实现自己的需求。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • 本文介绍了在Android开发中使用软引用和弱引用的应用。如果一个对象只具有软引用,那么只有在内存不够的情况下才会被回收,可以用来实现内存敏感的高速缓存;而如果一个对象只具有弱引用,不管内存是否足够,都会被垃圾回收器回收。软引用和弱引用还可以与引用队列联合使用,当被引用的对象被回收时,会将引用加入到关联的引用队列中。软引用和弱引用的根本区别在于生命周期的长短,弱引用的对象可能随时被回收,而软引用的对象只有在内存不够时才会被回收。 ... [详细]
  • AFNetwork框架(零)使用NSURLSession进行网络请求
    本文介绍了AFNetwork框架中使用NSURLSession进行网络请求的方法,包括NSURLSession的配置、请求的创建和执行等步骤。同时还介绍了NSURLSessionDelegate和NSURLSessionConfiguration的相关内容。通过本文可以了解到AFNetwork框架中使用NSURLSession进行网络请求的基本流程和注意事项。 ... [详细]
  • linux进阶50——无锁CAS
    1.概念比较并交换(compareandswap,CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作࿰ ... [详细]
  • 本文介绍了如何通过维持两个堆来获取一个数据流中的中位数。通过使用最大堆和最小堆,分别保存数据流中较小的一半和较大的一半数值,可以保证两个堆的大小差距为1或0。如果数据流中的数量为奇数,则中位数为较大堆的最大值;如果数量为偶数,则中位数为较大堆的最大值和较小堆的最小值的平均值。可以使用优先队列来实现堆的功能。本文还提供了相应的Java代码实现。 ... [详细]
author-avatar
漂漂雪飘飘业_348
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有