本文所有内容均个人从RabbitMQ官网教程中翻译,若图片文字的引用有任何侵权的地方,联系我,我会立马删除。ThisarticlewastranslatedfromRabbitMQ
本文所有内容均个人从RabbitMQ官网教程中翻译,若图片文字的引用有任何侵权的地方,联系我,我会立马删除。
This article was translated from RabbitMQ Official Tutorials by myself,and if this article and the images in this article have any infringement,please contact to me, and i will delete them.
介绍
RabbitMQ 是一个消息代理:它可以接受及转发消息。你可以想象它为一个邮局:当你把要寄出的邮件放进邮箱,你可以确信邮递员最终会把你的邮件发送到你的收件人手上。在这个类比中,RabbitMQ 就是一个邮箱,一个邮局以及一个邮递员。
RabbitMQ 与邮局主要不同的地方在于它并不需要纸张来处理这些过程,相反地它以二进制数据的形式接受、存储及转发消息。
一般来说,RabbitMQ 使用了一些特定的术语(,它们会出现在本教程中)。
教程所需
本教程是在假设RabbitMQ已经在你本地安装并且运行在5672端口。
如果你使用的是另外的地址,端口或者凭证,你需要自行调整连接配置。
从哪儿可以获取帮助
如果你在本教程中遇到了任何困难,你可以通过邮件联系RabbitMQ官方。
Producing就是发送的意思。一个会发送消息的程序是Producer(生产者),它以下面的符号出现:
Producer(生产者)
Queue(队列)是RabbitMQ中的邮箱。尽管消息在RabbitMQ与你的应用之间流通(通信),但他们只能被存储在Queue(队列)之中。一个Queue(队列)(的大小)只受到主机的内存和磁盘大小的限制。它本质上是一个巨大的消息缓冲(message buffer)。一个Queue(队列)可以被许多Producers(生产者)发送消息到里面,也可以被许多Consumers(消费者)从里面接收消息。本教程以下面的符号的描述一个队列:
Queue(队列)
Consuming(消费)与receiving(接收)的意思很相似。一个Consumer(消费者)是一个主要用于等待接受信息的程序。本教程将以下面的符号描述一个Consumer(消费者):
Consumer(消费者)
“Hello World!”
(使用php-amqplib客户端)
在本教程的这一部分,我们将会编写两个PHP程序:一个Pruducer(生产者)用于发送单条消息,以及一个Consumer(消费者)用于接收消息并把他们打印出来。我们将会对php-amqplib API中的一些细节进行解释,我们将从这个非常简单的例子开始。
在下面的这张途中,“P” 就是我们的Producer(生产者),“C”就是我们的Consumer(消费者)。图中央的盒子就是一个Queue(队列)——一个RabbitMQ为Consumer(消费者)维持的消息缓冲。
produce(生产者)->queue(队列)-> consumer消费者
php-amqblib client 库
RabbitMQ适合多种协议。本教程是依据AMQP 0-9-1,它是开源的,通用的消息协议。在不同的编程语言中有许多客户端支持(实现?)RabbitMQ。本教程中我们使用了php-amqplib,并使用Composer作为依赖管理。
添加一个 composer.json 文件在你的项目中:
{
"require":{
"php-amqplib/php-amqplib": ">=2.6.1",
}
}
如果你已经安装并配置好了Composer,你可以运行一下代码:
composer.phar install
这里有一个 windows版本的Composer安装器
现在我们已将把php-amqplib库安装好了,我们可以开始写一些代码了。
Sendding(发送)
producer send a message to hello queue
我们将会运行我们的消息发送者 send.php 以及我们的消息接收者 receiver.php。发送者会连接上RabbitMQ,并发送一条消息,然后退出。
在 send.php 中,我们需要引用库文件以及 use 必要的 classes(类):
require_once __DIR__ . '/vendor/autoload.php';// 引用Composer的自动加载文件
use PhpAmqpLib\Connection\AMQPStreamConnection;// 连接类
use PhpAmqpLib\Message\AMQPMessage;// 消息类
然后我们使用下面的代码创建一个RabbitMQ服务的连接:
$cOnnection=new AMQPStreamConnection(
'localhost',// host地址
5672,// 端口
'guest',// 用户名,默认guest
'guest',// 密码,默认guest
);
$channel = $connection->channel();
这个连接(指的是AMQPStreamConnection)是对socket连接的抽象,它为我们完成了协议的版本协商和认证(?)工作。在此,我们连接了我们本地机器上的一个代理——localhost。如果我们想连接一个在别的机器的代理,我们只需要在这里简单的声明它的名称或者IP地址。
然后我们创建了一个channel(频道),这是 php-amqplib API 处理绝大多数工作的地方。
为了发送(消息),我们必须声明一个我们要往它发送消息的Queue(队列);然后我们把一个消息放进队列之中。
$channel->queue_declare('hello',false,false,false,false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg,'','hello');
echo" [x] Sent 'Hello World!'\n";
我们只在不存在Queue(队列)的时候声明一个新的Quene(队列)(可以复用的意思吧)。消息体是一个byte array(字节组),所以你可以使用任意的编码。
最后,我们关闭channel(频道)与connection(连接):
$channel->close();
$connection->close();
完整的 send.php 在此。
发送不成功?
如果这是你第一次使用RabbitMQ并且你并不能看到“发送”的消息,你可能会很难想出是什么错误导致的。可能 RabbitMQ 代理在磁盘空间不足的情况下启动了(默认地,它需要至少200MB的空间),所以它拒绝了接收消息。检查一下代理的日志文件去确认(是否存在这一情况)并且如果在由必要的情况下,减少这一个空间限制。配置文档将会告诉你怎样设置disk_free_limit。
Receiving(接收)
这(个接收程序)是对应(上面的)Publisher(发送者)。我们的Receiver(接收者)将消息从RabbitMQ(队列)出队,所以不像只发送单一消息的Publisher(发送者),我们将会让Receiver(接收者)一直运行以监听消息及将消息打印出来。
Receiver从Queue读取消息
reveive.php 的代码几乎和 send.php 的代码include 与 use 的一样:
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
(Connection)的设置是和发送者的一样的;我们打开一个Connection(连接)与channel(频道),并且声明一个我们将要consume(消费)(处理)的队列。注意,此处(Connection(连接)与quene(队列)的配置)应该与发送端的相匹配。
$cOnnection= new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel();
$channel->queue_declare('hello',false,false,false,false);
echo '[x] Waiting for messages. To exit press CTRL+C',"\n";
注意,我们在这里同样也声明了一个Queue(队列)。因为我们将会在publisher(发送者)之前启动consumer(消费者),我们想确保在我们尝试从Queue(队列)consume(消费)消息前Queue(队列)就已经存在了。
我们就要告诉(RabbitMQ)服务器去从Queue(队列)分发我们的消息。我们将会定义一个用于接收从(RabbitMQ)服务器发出的消息的PHP callable(可调用的php函数)。记住,消息是异步地从(RabbitMQ)服务器发往(RabbitMQ)客户端的。
$callback = function($msg){
echo" [x] Received ", $msg->body,"\n";
};
$channel->basic_consume('hello','',false,true,false,false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
我们的代码将会阻塞。但我们接收到一个消息的时候,我们的 $callback 函数将会把收到的消息传递。
这里是完整的 receive.php 类
把他们放在一起(运行)
现在我们可以运行这两个脚本。在一个终端,运行Consumer(消费者)(接收者):
php receive.php
然后,运行Publisher(发送者):
php send.php
Consumer(消费者)将会打印从发送者得到的通过 RabbitMQ 发送的消息。这个接收者将会一直运行,等待消息(使用 ctrl+c 去停止它),所以,尝试在另外一个终端运行发送者。
列出Queues(队列)
你可能希望看一下RabbitMQ已经由多少个Queues(队列)了以及有多少消息在里面。你可以通过 rabbitmqctl 工具实现(root用户):
sudo rabbitmqctl list_queues
在Windows平台,省略sudo:
rabbitmqctl.bat list_queues