在业务中有时会碰到延迟操作,如下单后半小时未支付则取消订单、下单后十五分钟未支付则发短信提醒等等。那这样的需求如何去实现呢。
相关学习推荐:PHP编程从入门到精通
我们以订单创建15分钟后未支付,给用户发送邮件为场景进行学习
//创建订单的逻辑/** * 随机创建订单 */$order = [ 'order_number' => mt_rand(100,10000).date("YmdHis"), 'user_id' => mt_rand(1, 100), 'order_amount' => mt_rand(100, 1000),]; /**@var $manager Illuminate\Database\Capsule\Manager **/ $cOnn= $manager;$insertResult = $conn::table("order") ->insert($order);print_r($insertResult);
延迟处理逻辑
while(true) { // 未支付订单列表 $orderList = $conn::table("order") ->where("created_time", &#39;<=&#39;, date("Y-m-d H:i:s", strtotime("-15 minutes"))) ->where(&#39;sended_need_pay_notify&#39;, &#39;=&#39;, 2) ->where(&#39;status&#39;, &#39;=&#39;, 1) ->select([&#39;user_id&#39;, &#39;id&#39;]) ->orderBy("id", &#39;asc&#39;) ->get(); $orderList = json_decode(json_encode($orderList), true); foreach ($orderList as $orderInfo) { sendEmail($orderInfo[&#39;user_id&#39;]); $conn::table(&#39;order&#39;) ->where(&#39;id&#39;, &#39;=&#39;, $orderInfo[&#39;id&#39;]) ->update([&#39;sended_need_pay_notify&#39; => 1]); logs("update-success-orderId-". $orderInfo[&#39;id&#39;]."-userId-".$orderInfo[&#39;user_id&#39;]); } sleep(10);}
执行处理脚本
gaoz@nobodyMBP delay_mq_demo % php first_while_handler.php send email to 73 success ... 2020-06-24 11:37:36:update-success-orderId-3-userId-73
这种方式吧实现简单,但是不优雅,同时大批量订单产生也会遇到问题。
// 创建订单的逻辑try { /** * 随机创建订单 */ $order = [ &#39;order_number&#39; => mt_rand(100,10000).date("YmdHis"), &#39;user_id&#39; => mt_rand(1, 100), &#39;order_amount&#39; => mt_rand(100, 1000), ]; /**@var $manager Illuminate\Database\Capsule\Manager **/ $cOnn= $manager; $insertId = $conn::table("order") ->insertGetId($order); $body = json_encode([&#39;order_id&#39; => $insertId, &#39;created_time&#39; => date("Y-m-d H:i:s")]); $publishMessage = new TopicMessage( $body ); // 设置消息KEY $publishMessage->setMessageKey("MessageKey"); // 定时消息, 定时时间为3分钟后 $publishMessage->setStartDeliverTime(time() * 1000 + 3 * 60 * 1000); $result = $this->producer->publishMessage($publishMessage); print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result - >getMessageBodyMD5() . "\n"; } catch (\Exception $e) { print_r($e->getMessage() . "\n"); }
消费逻辑 同样是在消费者中处理
foreach ($messages as $message) { $receiptHandles[] = $message->getReceiptHandle(); $messageBody = $message->getMessageBody(); $orderInfo = json_decode($messageBody, true); if (!empty($orderInfo[&#39;order_id&#39;])) { $orderId = $orderInfo[&#39;order_id&#39;]; /**@var $manager Illuminate\Database\Capsule\Manager * */ $cOnn= $manager; $orderInfo = $conn::table("order") ->select([&#39;id&#39;, &#39;user_id&#39;]) ->where(&#39;id&#39;, &#39;=&#39;, $orderId) ->where(&#39;status&#39;, &#39;=&#39;, 1) ->first(); if (!empty($orderInfo)) { $orderInfo = json_decode(json_encode($orderInfo), true); sendEmail($orderInfo[&#39;user_id&#39;]); $conn::table(&#39;order&#39;) ->where(&#39;id&#39;, &#39;=&#39;, $orderInfo[&#39;id&#39;]) ->update([&#39;sended_need_pay_notify&#39; => 1]); logs("update-success-orderId-" . $orderInfo[&#39;id&#39;] . "-userId-" . $orderInfo[&#39;user_id&#39;]); } } }
启动生产一条消息
gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_producer.php Send mq message success. msgId is:76CF2135696C3D4EAC698A9FA1E1879D, bodyMD5 is:63448B50AA7B8AF47B07AA7CE807E3D3 gaoz@nobodyMBP delay_mq_demo %
启动消费者慢慢等待
gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_consumer.php No message, contine long polling!RequestId:5EF752583441411C74869BA9 No message, contine long polling!RequestId:5EF7525B3441411C74869FE2 No message, contine long polling!RequestId:5EF7525E3441411C7486A42C No message, contine long polling!RequestId:5EF752613441411C7486A7D9 consume finish, messages:send email to 95 success ...2020-06-27 12:08:05:update-success-orderId-8-userId-95 Array( [0] => 76CF2135696C3D4EAC698A9FA1E1879D-MCAxNTkzMjY2NzkxNDM5IDMwMDAwMCAzIDAgYmpzaGFyZTUtMDggNSAw) ack
这种方式有现有的服务可以使用,减少开发时间
// 生产者$exchange = &#39;order15min_notify_exchange&#39;; $queue = &#39;order15minx_notify_queue&#39;;$dlxExchange = "dlx_order15min_exchange"; $dlxQueue = "dlx_order15min_queue"; $cOnnection= new AMQPStreamConnection(getenv(&#39;RABBIT_HOST&#39;), getenv(&#39;RABBIT_PORT&#39;), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST")); $channel = $connection->channel();$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); $channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);// 设置队列的过期时间// 正常队列$table = new \PhpAmqpLib\Wire\AMQPTable();// 消息有效期$table->set(&#39;x-message-ttl&#39;, 3*60*1000);$table->set("x-dead-letter-exchange", $dlxExchange);$channel->queue_declare($queue, false, true, false, false, false, $table);$channel->queue_bind($queue, $exchange);// 死信队列$channel->queue_declare($dlxQueue, false, true, false, false, false);$channel->queue_bind($dlxQueue, $dlxExchange);/** * 随机创建订单 */$order = [ &#39;order_number&#39; => mt_rand(100,10000).date("YmdHis"), &#39;user_id&#39; => mt_rand(1, 100), &#39;order_amount&#39; => mt_rand(100, 1000),];/**@var $manager Illuminate\Database\Capsule\Manager **/$cOnn= $manager;$insertId = $conn::table("order") ->insertGetId($order);$messageBody = json_encode([&#39;order_id&#39; => $insertId, &#39;created_time&#39; => date("Y-m-d H:i:s")]); $message = new AMQPMessage($messageBody, array(&#39;content_type&#39; => &#39;text/plain&#39;, &#39;delivery_mode&#39; => AMQPMessage::DELIVERY_MODE_PERSISTENT)); $channel->basic_publish($message, $exchange);
消费者
$dlxExchange = "dlx_order15min_exchange";$dlxQueue = "dlx_order15min_queue"; $cOnnection= new AMQPStreamConnection(getenv(&#39;RABBIT_HOST&#39;), getenv(&#39;RABBIT_PORT&#39;), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST")); $channel = $connection->channel(); $channel->queue_declare($dlxQueue, false, true, false, false);$channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false); $channel->queue_bind($dlxQueue, $dlxExchange);/** * @param \PhpAmqpLib\Message\AMQPMessage $message */function process_message($message){ echo "\n--------\n"; echo $message->body; echo "\n--------\n"; $orderInfo = json_decode($message->body, true); if (!empty($orderInfo[&#39;order_id&#39;])) { $orderId = $orderInfo[&#39;order_id&#39;]; /**@var $conn Illuminate\Database\Capsule\Manager * */ $cOnn= getdb(); $orderInfo = $conn::table("order") ->select([&#39;id&#39;, &#39;user_id&#39;]) ->where(&#39;id&#39;, &#39;=&#39;, $orderId) ->where(&#39;status&#39;, &#39;=&#39;, 1) ->first(); if (!empty($orderInfo)) { $orderInfo = json_decode(json_encode($orderInfo), true); sendEmail($orderInfo[&#39;user_id&#39;]); $conn::table(&#39;order&#39;) ->where(&#39;id&#39;, &#39;=&#39;, $orderInfo[&#39;id&#39;]) ->update([&#39;sended_need_pay_notify&#39; => 1]); logs("update-success-orderId-" . $orderInfo[&#39;id&#39;] . "-userId-" . $orderInfo[&#39;user_id&#39;]); } } $message->delivery_info[&#39;channel&#39;]->basic_ack( $message->delivery_info[&#39;delivery_tag&#39;]);}$channel->basic_consume($dlxQueue, $consumerTag, false, false, false, false, &#39;process_message&#39;);
启动消费者
gaoz@nobodyMBP delay_mq_demo % php rabbit_mq_handler_consumer.php -------- {"order_id":7,"created_time":"2020-06-27 11:50:08"} -------- send email to 2 success ... 2020-06-27 11:56:55:update-success-orderId-7-userId-2
分别启动消费者、生产者就可以了,这里面消息的流转可以看到
消息先进入到正常队列,过期后进入了死信队列而被消费
代码示例:github.com/nobody05/delay_mq_demo
以上就是PHP 简单实现延时操作的详细内容,更多请关注 第一PHP社区 其它相关文章!