一、安装
pecl install amqp
二、操作
1.生产者
$params = ['exchangeName' => 'ticketcert','queueName' => 'synticketcert','routeKey' => 'synticketcert',
];$connectConfig = ['host' => '127.0.0.1','port' => 5672,'login' => 'kcwc','password' => 'tiger','vhost' => '/'
];//var_dump(extension_loaded('amqp')); 判断是否加载amqp扩展//exit();
try {$conn = new AMQPConnection($connectConfig);$conn->connect();if (!$conn->isConnected()) {//die('Conexiune esuata');//TODO 记录日志echo 'rabbit-mq 连接错误:', json_encode($connectConfig);exit();}$channel = new AMQPChannel($conn);if (!$channel->isConnected()) {// die('Connection through channel failed');//TODO 记录日志echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);exit();}$exchange = new AMQPExchange($channel);$exchange->setFlags(AMQP_DURABLE); //持久化$exchange->setName($params['exchangeName'] ?: '');$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型$exchange->declareExchange();//$channel->startTransaction();$queue = new AMQPQueue($channel);$queue->setName($params['queueName'] ?: '');$queue->setFlags(AMQP_DURABLE);$queue->declareQueue();//绑定$queue->bind($params['exchangeName'], $params['routeKey']);
} catch (Exception $e) { }$num = mt_rand(100, 500);//生成消息
for ($i &#61; $num; $i <&#61; $num &#43; 5; $i&#43;&#43;) {$exchange->publish("this is {$i} message..", $params[&#39;routeKey&#39;], AMQP_MANDATORY, array(&#39;delivery_mode&#39; &#61;> 2));
}
2.消费者
$params &#61; [&#39;exchangeName&#39; &#61;> &#39;ticketcert&#39;,&#39;queueName&#39; &#61;> &#39;synticketcert&#39;,&#39;routeKey&#39; &#61;> &#39;synticketcert&#39;,
];$connectConfig &#61; [&#39;host&#39; &#61;> &#39;127.0.0.1&#39;,&#39;port&#39; &#61;> 5672,&#39;login&#39; &#61;> &#39;kcwc&#39;,&#39;password&#39; &#61;> &#39;tiger&#39;,&#39;vhost&#39; &#61;> &#39;/&#39;
];//var_dump(extension_loaded(&#39;amqp&#39;));//exit();try {$conn &#61; new AMQPConnection($connectConfig);$conn->connect();if (!$conn->isConnected()) {//die(&#39;Conexiune esuata&#39;);//TODO 记录日志echo &#39;rabbit-mq 连接错误:&#39;, json_encode($connectConfig);exit();}$channel &#61; new AMQPChannel($conn);if (!$channel->isConnected()) {// die(&#39;Connection through channel failed&#39;);//TODO 记录日志echo &#39;rabbit-mq Connection through channel failed:&#39;, json_encode($connectConfig);exit();}$exchange &#61; new AMQPExchange($channel);$exchange->setFlags(AMQP_PASSIVE); //声明一个已存在的交换器的&#xff0c;如果不存在将抛出异常&#xff0c;这个一般用在consume端$exchange->setName($params[&#39;exchangeName&#39;] ?: &#39;&#39;);$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型$exchange->declareExchange();//$channel->startTransaction();$queue &#61; new AMQPQueue($channel);$queue->setName($params[&#39;queueName&#39;] ?: &#39;&#39;);$queue->setFlags(AMQP_DURABLE);$queue->declareQueue();//绑定$queue->bind($params[&#39;exchangeName&#39;], $params[&#39;routeKey&#39;]);
} catch (Exception $e) {echo $e->getMessage();exit();
}function callback(AMQPEnvelope $message)
{global $queue;if ($message) {$body &#61; $message->getBody();echo $body . PHP_EOL;$queue->ack($message->getDeliveryTag());} else {echo &#39;no message&#39; . PHP_EOL;}
}//$queue->consume(&#39;callback&#39;); 第一种消费方式,但是会阻塞,程序一直会卡在此处//第二种消费方式,非阻塞
$message &#61; $queue->get();
if (!empty($message)) {echo $message->getBody();$queue->ack($message->getDeliveryTag()); //应答&#xff0c;代表该消息已经消费
}
//这里可以考虑用swoole 用while进程会僵死如&#xff1a;Swoole\Timer::tick(1000, function () use ($queue) {echo ".";$message &#61; $queue->get();if (!empty($message)) {echo $message->getBody() . PHP_EOL;$queue->ack($message->getDeliveryTag()); //应答&#xff0c;代表该消息已经消费}
});
参考&#xff1a;
https://blog.51cto.com/chinalx1/2150793
其它&#xff1a;
一定要注意php安装AMQP的版本&#xff0c;版本不同使用的方法不一样。在官方网站就有2个版本的AMQP
第一版本&#xff1a;xxx&#xff0c;详细的url找不到了
第二版本&#xff1a;http://docs.php.net/manual/da/book.amqp.php
千万不要出现这种情况&#xff0c;找到一个官方的版本&#xff0c;然后按照example&#xff0c;怎么调试都不通….按照PHP安装 AMQP扩展 安装的AMQP扩展是最新的&#xff0c;现在和PHP官方给出的第二版本&#xff0c;也有一些区别。主要体现在exchange和queue中有个declare的方法&#xff0c;分别更改成declarExchange()和declarQueue().一、AMQPConnection
::__construct ([ array $credentials &#61; array() ] )这方法比较简单&#xff0c;credentials的英文含义是“凭证”&#xff0c;但我喜欢把它理解为config。这些配置包括如下&#xff1a;
host、vhost、port、login、password、read_timeout、write_timeout、connect_timeout&#xff0c;如果config省略&#xff0c;php会使用默认的这5个值&#xff0c;初始化&#xff0c;强烈建议带上config初始化&#xff0c;其中read_timeout、write_timeout、connect_timeout这三个值是最新版本才有的&#xff0c;PHP官方现在还没有&#xff0c;单位都是(秒)::connect ( void ) 、 ::pconnect ( void )
::disconnect ( void ) 、::pdisconnect ( void )
::reconnect ( void ) 、::preconnect ( void ) #注意和__construct 的区别&#xff0c;该方法不带参数。
::isConnected ( void )
::isPersistent ( void ) #是否持久链接 &#xff0c;为什么不是isPconnected &#xff1f;-_-如果使用持久链接&#xff0c;在类方法__destruct中释放(断开链接)::setHost ( string $host )
::setPort ( int $port )
::setVhost ( string $vhost )
::setLogin ( string $login )
::setPassword ( string $password )
::getHost ( void )
::getPort ( void )
::getVhost ( void )
::getLogin ( void )
::getPassword ( void )
还有类似的::setTimeout ( int $timeout ) 、::getTime ( void )、::getReadTimeout ( void )、::setReadTimeout ( int $timeout )、::getWriteTimeout (void ) 、setWriteTimeout ( int $timeout)上面的这些方法&#xff0c;就不细说了&#xff0c;还有2个不常见的方法::getUsedChannels ( void ) #在链接会话期间最后一次使用的channel id
::getMaxChannels ( void ) #单次链接能创建最大channel数量二、AMQPChannel
为了一个AMQP的链接&#xff0c;创建一个channel instance&#xff08;通道实例&#xff09;&#xff0c;这里需要了解一下几个概念&#xff0c;以及她们之间的关系&#xff0c;本人做了一个罗列&#xff0c;水平有限&#xff0c;不对的地方请指正。client &#xff1a;客户端Broker&#xff1a;英文含义是代理&#xff0c;这里指代理服务器&#xff0c;也是通常说的server&#xff0c;也可以指Rabbit cluster的节点&#xff0c;
也就是在AMQPConnection的配置文件中的host、portvirtual_host&#xff1a;虚拟host&#xff0c;这个可以理解为在Broker中&#xff0c;虚拟出来的消息处理模块。一个Broker中
可以创建多个virtual_host.exchange&#xff1a;逻辑上属于virtual_host的子模块&#xff0c;负责消息转发到queue中。queue&#xff1a;逻辑上属于virtual_host的子模块。channel&#xff1a;通道。链接virtual_host的通道。
routingkey&#xff1a;virtual_host和channel的设置都是为了增加并发性能&#xff0c;降低资源消耗的。
参考文章&#xff1a;RabbitMQ中的AMQP协议规范
::__construct ( AMQPConnection $amqp_connection ) &#xff03;创建channel
::isConnected ( void ) &#xff03;这里和AMQPConnection::isConnected一样.::getConnection ( void )
&#xff03;返回的结果是AMQPConnection&#xff0c;从概念上个人理解&#xff0c;channel是建立在connection之上的&#xff0c;是个独立的链接。connection是指物理的连接&#xff0c;一个client与一个server之间有一个连接&#xff1b;一个连接上可以建立多个channel&#xff0c;可以理解为逻辑上的连接。一般应用的情况下&#xff0c;有一个channel就够用了&#xff0c;不需要创建更多的channel::getChannelId ( void )
&#xff03;返回channel id&#xff0c;在PHP中循环创建通道&#xff0c;发现通道ID始终只有1和2&#xff0c;不知是否和服务器的设置有关&#xff1f;::setPrefetchSize ( int $size )、::getPrefetchSize ( void )
这个地方有个坑&#xff0c;设置prefetchSize不等于0&#xff0c;会提示not_implemented: “prefetch_size!&#61;0 (65535)”&#xff0c;也就是说目前prefetchSize只能设置为0&#xff0c;还未实现不等于0的情况。::setPrefetchCount ( int $count )、::getPrefetchCount ( void )
这个参数设置&#xff0c;接收消息端&#xff0c;接收的最大消息数量&#xff08;包括使用get、consume&#xff09;,一旦到达这个数量&#xff0c;客户端不在接收消息。0为不限制。默认值为3.::qos ( int $size , int $count )
同时设置prefetchSize、prefetchCount&#xff0c;但prefetchSize目前只能设置为0::basicRecover ( boolen )
默认为true&#xff0c;没有确认的消息会被再次投递&#xff0c;这个地方和消息确认机制有关&#xff0c;后面再来说这个问题。三、 AMQPExchange
::__construct ( AMQPChannel $amqp_channel )::setName ( string $exchange_name )、::getName ( void )
这个命令的最好使用数字、字母、- _ . 组合(区分大小写&#xff0c;但最好使用小写)
也不要使用amq带头&#xff08;内置的exchange都是使用amq.开头&#xff09;
如果申明一个已存在的exchange name&#xff0c;如果type不一致会抛出异常.::setType ( string $exchange_type )、::getType ( void )
这个是rabbitmq的核心&#xff0c;exchange的类型&#xff0c;决定了rabbit实现何种业务模型。
这个地方尤其要注意&#xff0c;AMQ本身没有默认的exchangeType&#xff0c;所以在申明exchange必需要设置type&#xff0c;否则会抛出异常 Could not declare exchange. Exchanges must have a type&#xff0c;
目前exchange支持4种类型 AMQP_EX_TYPE_DIRECT、 AMQP_EX_TYPE_FANOUT、AMQP_EX_TYPE_HEADERS 、 AMQP_EX_TYPE_TOPIC
可参考&#xff1a;RabbitMQ AMQP 消息模型攻略
记忆方法&#xff1a;amqp_exchange_type_direct、fanout、topic、headers::setFlags ( int $flags )、::getFlags ( void )
默认AMQP_NOPARAM&#xff1b; 可选AMQP_DURABLE, AMQP_PASSIVE、AMQP_AUTODELETEpassive&#xff1a;声明一个已存在的交换器的&#xff0c;如果不存在将抛出异常&#xff0c;这个一般用在consume端。因为一般produce端创建,在consume端建议设置成AMQP_PASSIVE,防止consume创建exchangedurable&#xff1a;持久exchange&#xff0c;该交换器将在broker重启后生效&#xff0c;一般常使用该选项.auto_delete&#xff1a;该交换器将在没有消息队列绑定时自动删除。一个从未绑定任何队列的交换器不会自动删除。解释有点绕。说明下吧&#xff0c;当有队列bind到auto_delete的交换器上之后&#xff0c;删除该队列。此时交换器也会删除。一般创建为临时交换器。::setArgument ( string $key , mixed $value )
::setArguments ( array $arguments )
::getArgument ( string $key )
::getArguments ( void )::declareExchange( void ) //和PHP的官方有差别::delete ($exchangeName &#61; null, $flags &#61; AMQP_NOPARAM) //和PHP的官方有差别
未指定$exchangeName&#xff0c;删除当前链接的exchange&#xff0c;但无法删除system中内置的exchange.::bind($exchange_name, $routing_key &#61; ”, array $arguments &#61; array()) //和PHP的官方有差别
::unbind($exchange_name, $routing_key &#61; ”, array $arguments &#61; array()) //和PHP的官方有差别
可以将消息同时发送到多个exchange&#xff0c;稍后补充实际的用法。::publish ( string $message , string $routing_key [, int $flags &#61; AMQP_NOPARAM [, array $attributes &#61; array() ]] ) //发送消息方法 $attributes参考&#xff1a;RabbitMQ的PHP教程之RPC (六)参数&#xff1a;$routint_key&#xff0c;在fanout下忽略。在direct类型下&#xff0c;如果该值为空&#xff0c;则bind到exchange都能收到消息&#xff0c;这个有点像fanout的广播模式。参数&#xff1a;$flags可选值 AMQP_MANDATORY、AMQP_IMMEDIATE,默认&#xff1a;AMQP_NOPARAM&#xff0c;此处一般都设置为false&#xff0c;本人还未测出这2个flag的区别&#xff0c;这里有一篇文章关于这2个flag的介绍&#xff1a;
http://blog.csdn.net/jiao_fuyou/article/details/21594947::getChannel()、getConnection() //不再介绍&#xff0c;见上面四、AMQPQueue
::__construct(AMQPChannel $amqp_channel) //有PHP官方有区别::setName ( string $queue_name )、::getName ( void ) //队列名称::setFlags ( int $flags )、::getFlags ( void ) //重点
可选参数&#xff1a;AMQP_DURABLE, AMQP_PASSIVE,AMQP_EXCLUSIVE, AMQP_AUTODELETE
默认&#xff1a;auto_deleteauto_delete&#xff1a;当队列中有消费者&#xff0c;则队列存在&#xff0c;当没有消费者链接&#xff0c;则队列删除durable&#xff1a;持久化&#xff0c;队列不删除&#xff0c;注意仅仅是队列持久&#xff0c;消息不持久&#xff08;消息的持久在publish时的增加属性delivery_mode&#xff09;。消费的消息&#xff0c;从队列里删除&#xff0c;未消费的消息保存在队列中&#xff0c;不需要关注是否有消费者。最实用passive&#xff1a;声明一个1个已存在的队列。意义不大&#xff0c;如果队列不存在会抛出异常。exclusive&#xff1a;排他队列&#xff0c;如果一个队列被声明为排他队列&#xff0c;该队列仅对首次声明它的连接可见&#xff0c;并在连接断开时自动删除。这里需要注意三点&#xff1a;排他队列是基于连接可见的&#xff0c;同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。"首次"&#xff0c;如果一个连接已经声明了一个排他队列&#xff0c;其他连接是不允许建立同名的排他队列的&#xff0c;这个与普通队列不同。即使该队列是持久化的&#xff0c;一旦连接关闭或者客户端退出&#xff0c;该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。::setArgument ( string $key , mixed $value )
::setArguments ( array $arguments )
::getArgument ( string $key )
::getArguments ( void )::declareQueue ( void )
::bind ( string $exchange_name , string $routing_key )
::unbind ( string $exchange_name , string $routing_key )::get ([ int $flags ] )
//非阻塞&#xff0c;从队列中检索下一个可用的消息&#xff0c;flags为空默认为amqp.auto_ack&#xff08;在php.ini中可用设置默认值&#xff09;,可选参数&#xff1a;AMQP_AUTOACK::consume ( callable $callback [, int $flags &#61; AMQP_NOPARAM ] )
//阻塞&#xff0c;$callback支持数组的写法&#xff0c;flags为空默认为amqp.auto_ack&#xff08;在php.ini中可用设置默认值&#xff09;,可选参数&#xff1a;AMQP_AUTOACK::ack ( int $delivery_tag [, int $flags &#61; AMQP_NOPARAM ] ) //复杂的业务逻辑&#xff0c;最好采用手动应答&#xff0c;防止消费处理业务逻辑复杂时&#xff0c;消息丢失。指定务必指定为AMQP_NOPARAM&#xff0c; 本人测试使用参数 AMQP_MULTIPLE&#xff0c;达不到预期的效果。此处有待高人解决。
更人性化的应答&#xff0c;在使用get、consume时&#xff0c;指定AMQP_NOPARAM时&#xff08;未指定参数&#xff0c;则会调用PHP.ini的默认的设置&#xff09;&#xff0c;此时未对消费的消息做应答。在处理完复杂的业务逻辑后&#xff0c;使用ack做应答。此时队列中&#xff0c;会删除该条消息&#xff0c;如未应答&#xff0c;则会一直存在队列中。::nack($delivery_tag, $flags &#61; AMQP_NOPARAM)、reject($delivery_tag, $flags &#61; AMQP_NOPARAM)
用法ack相反&#xff0c;表示not ack.::purge ( void )
清空队列中的消息。::delete ( void )
删除队列::getChannel()、getConnection() //不再介绍&#xff0c;见上面::cancel ([ string $consumer_tag &#61; “” ] ) //暂时不知该如何使用五、AMQPEnvelope
该类没有set的方法&#xff0c;各种get::getBody ( void )
::getRoutingKey ( void )
::getDeliveryTag ( void )
::getDeliveryMode ( void )
::getExchangeName ( void )
::isRedelivery ( void )
::getContentType ( void )
::getContentEncoding ( void )
::getType ( void )
::getTimeStamp ( void )
::getPriority ( void )
::getExpiration ( void )
::getUserId ( void )
::getAppId ( void )
::getMessageId ( void )
::getReplyTo ( void )
::getCorrelationId ( void )
::getHeaders ( void )
::getHeader($header_key).
docker rabbit环境
镜像安装
官方镜像地址 https://registry.hub.docker.com/_/rabbitmq/
下载 3.7.28 management 镜像
docker pull rabbitmq:3.7.28-management 有管理后台
环境变量
RABBITMQ_DEFAULT_USER 用户名
RABBITMQ_DEFAULT_PASS 密码
RABBITMQ_DEFAULT_VHOST default vhost
创建容器并运行
设置默认账户密码
docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER&#61;user -e RABBITMQ_DEFAULT_PASS&#61;password rabbitmq:3.7.28-management
设置默认 vhost
ocker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_VHOST&#61;my_vhost rabbitmq:3.7.28-management
通用命令
docker run -d --hostname rabbitmq --name rabbitmq -e RABBITMQ_DEFAULT_USER&#61;admin -e RABBITMQ_DEFAULT_PASS&#61;admin -e RABBITMQ_DEFAULT_VHOST&#61;my_vhost -p 15672:15672 -p 5672:5672 rabbitmq:3.7.28-management
主要端口
4369 erlang 发现口
5672 client 端通信口
15672 管理界面 ui 端口
25672 server 间内部通信口如果程序不能连接新建帐号并给权限