作者:袁甲2012_498 | 来源:互联网 | 2023-09-13 08:43
关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时候把这块的知识整理记录一下了。
市面上的消息队列产品有很多,比如老牌的 ActiveMQ、RabbitMQ ,目前我看最火的 Kafka ,还有 ZeroMQ ,去年底阿里巴巴捐赠给 Apache 的 RocketMQ ,连 redis 这样的 NoSQL 数据库也支持 MQ 功能。总之这块知名的产品就有十几种,就我自己的使用经验和兴趣只打算谈谈 RabbitMQ、Kafka 和 ActiveMQ ,本文先讲 RabbitMQ ,在此之前先看下消息队列的相关概念。
什么叫消息队列
消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
为何用消息队列
从上面的描述中可以看出消息队列是一种应用间的异步协作机制,那什么时候需要使用 MQ 呢?
以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。
以上是用于业务解耦的情况,其它常见场景包括最终一致性、广播、错峰流控等等。
1.什么是MQ
消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
其主要用途:不同进程Process/线程Thread之间通信。
为什么会产生消息队列?有几个原因:
不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;
关于消息队列的详细介绍请参阅:
《Java帝国之消息队列》
《一个故事告诉你什么是消息队列》
《到底什么时候该使用MQ》
MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq。
本教程pdf及代码下载地址:
代码:https://download.csdn.net/download/zpcandzhj/10585077
教程:https://download.csdn.net/download/zpcandzhj/10585092
2.RabbitMQ
![](https://img.php1.cn/3cd4a/94ce/a6e/865776f6fe3d5f59.jpeg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3BocF9senI=,size_16,color_FFFFFF,t_70)
2.接下来要安装php的amqp扩展
先用phpinfo()查看php版本信息,及
,
信息
最后根据上面的信息去下载相应的amqp版本:http://pecl.php.net/package/amqp
据上面信息我们的是32位非线程安全版本![](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9pbWFnZXMyMDE3LmNuYmxvZ3MuY29tL2Jsb2cvNDg5MDg2LzIwMTcwOC80ODkwODYtMjAxNzA4MjkxMzI1Mjk0OTktMzUzMjk0NDg3LnBuZw)
加压后:
![](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9pbWFnZXMyMDE3LmNuYmxvZ3MuY29tL2Jsb2cvNDg5MDg2LzIwMTcwOC80ODkwODYtMjAxNzA4MjkxMzI2MTEzMTItNDI5Nzk1MjE1LnBuZw)
将php_amqp.dll复制到php/ext,同时在php.ini中添加如下代码:
[amqp]
extension=php_amqp.dll
然后将rabbitmq.1.dll复制到php根目录C:/xampp/php/,同时修改apache配置文件httpd.conf,添加如下代码:
# rabbitmq
LoadFile "C:/xampp/php/rabbitmq.1.dll"
最后重启看看是否已经加载了amqp模块:
![](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9pbWFnZXMyMDE3LmNuYmxvZ3MuY29tL2Jsb2cvNDg5MDg2LzIwMTcwOC80ODkwODYtMjAxNzA4MjkxMzI5MTAyNDktNzgwNTEwMzc4LnBuZw)
---------------------------------到这里为止,安装已经结束-------------------------------------------------
RabbitMQ+PHP展示实例
新建rabbit_consumer.php作为消费者
//配置信息
$conn_args = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/'
);
$e_name = 'e_linvo'; //交换机名
$q_name = 'q_linvo'; //队列名
$k_route = 'key_1'; //路由key
//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$ex->declare()."\n";
//创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
echo "Message Total:".$q->declare()."\n";
//绑定交换机与队列,并指定路由键
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";
//阻塞模式接收消息
echo "Message:\n";
while(True){
$q->consume('processMessage');
//$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
}
$conn->disconnect();
/**
* 消费回调函数
* 处理消息
*/
function processMessage($envelope, $queue) {
$msg = $envelope->getBody();
echo $msg."\n"; //处理消息
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}
?>
新建rabbit_publisher.php作为生产者
//配置信息
$conn_args = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/'
);
$e_name = 'e_linvo'; //交换机名
//$q_name = 'q_linvo'; //无需队列名
$k_route = 'key_1'; //路由key
//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
//创建交换机对象
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
date_default_timezone_set("Asia/Shanghai");
//发送消息
//$channel->startTransaction(); //开始事务
for($i&#61;0; $i<5; &#43;&#43;$i){
sleep(1);//休眠1秒
//消息内容
$message &#61; "TEST MESSAGE!".date("h:i:sa");
echo "Send Message:".$ex->publish($message, $k_route)."\n";
}
//$channel->commitTransaction(); //提交事务
$conn->disconnect();
?>
测试一下&#xff1a;
先起一个窗口同样切换到php目录&#xff0c;输入:php c:/xampp/htdocs/RabbitMQ/rabbit_consumer.php
运行消费者
![](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9pbWFnZXMyMDE3LmNuYmxvZ3MuY29tL2Jsb2cvNDg5MDg2LzIwMTcwOC80ODkwODYtMjAxNzA4MjkxMzM2NTQyNjUtMTQwOTIyNDUyMi5wbmc)
然后再起一个dos窗口&#xff0c;切换到php根目录&#xff0c;输入以下命令&#xff1a;php c:/xampp/htdocs/RabbitMQ/rabbit_publisher.php
运行生产者
![](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9pbWFnZXMyMDE3LmNuYmxvZ3MuY29tL2Jsb2cvNDg5MDg2LzIwMTcwOC80ODkwODYtMjAxNzA4MjkxMzMzMDAyMzMtMTQwOTgwNzYwLnBuZw)
消费者接收到消息
![](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9pbWFnZXMyMDE3LmNuYmxvZ3MuY29tL2Jsb2cvNDg5MDg2LzIwMTcwOC80ODkwODYtMjAxNzA4MjkxMzM3MDU3NDktMzk3NzEwMDMzLnBuZw)
这样就模拟了队列对消息的处理&#xff0c;希望我们通过这篇文章对RabbitMQ的认识都能有一定的提升。
另外给出官网的php使用指南&#xff1a;http://www.rabbitmq.com/tutorials/tutorial-one-php.html
相关原文链接&#xff1a;
https://www.cnblogs.com/miketwais/p/RabbitMQ.html
https://www.jianshu.com/p/79ca08116d57
https://blog.csdn.net/hellozpc/article/details/81436980