热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

simpsmqtt适用于PHP的MQTT协议解析和协程客户端

MQTT是一种基于公布订阅(publishsubscribe)模式的轻量级通信协定,作为一种低开销、低带宽占用的即时通讯协定,曾经成为物联网的重要组成部分Swoole也给PHP
文章目录[隐藏]
  • 订阅
  • 公布

MQTT 是一种基于公布/订阅(publish/subscribe)模式的”轻量级”通信协定,作为一种低开销、低带宽占用的即时通讯协定,曾经成为物联网的重要组成部分

Swoole 也给 PHP 提供了开发物联网我的项目的能力,只须要设置一个 open_mqtt_protocol 选项,启用后就会解析 MQTT 包头,在 Worker 过程的 onReceive 事件每次都会返回一个残缺的 MQTT 数据包

当然其余的也有,例如 Workerman 之前提供的 异步 mqtt 客户端库 ,还有其余的开源库,这里就不一一介绍了

Simps 的第一个版本 MQTT 库 就是参考了 Workerman 的实现,使其可能应用 Swoole 的协程能力,同时也修复了一些问题

在此也要感激 @walkor 对 PHP 生态作出的奉献

第一个版本的实现是放在了框架当中,限度了一些用户的应用。于是又开始了重构,将 MQTT 独立为一个 library ,不便用户应用的同时也丰盛了 PHP 生态,让 PHP 程序员不再局限于 Web 开发

在第一个版本公布之后,Simps 的交换群中也有不少用户询问 MQTT 的问题,Swoole 也修复了一些相干的 Bug,当初应用 PHP + Swoole 去开发物联网相干的我的项目应该是锦上添花

同时第一个版本的 MQTT 库,只反对 MQTT 3.x,不反对 MQTT 5.0,在 GitHub 上也没有找到相干反对的类库,所以在重构了 3.x 版本之后,也反对了一下 MQTT 5.0

兴许这是第一个反对 MQTT v5.0 协定的 PHP library…

反对 MQTT 协定 3.1、3.1.1 和 5.0 版本,反对 QoS 0、QoS 1、QoS 2,那么它来了,应用 composer 来装置

composer require simps/mqtt

装置胜利之后咱们来看一下订阅和公布的应用,以 MQTT5.0 为例

订阅

首先应该是订阅,订阅胜利之后能力收到对应主题的公布音讯,创立一个subscribe.php写入以下内容

include __DIR__ . '/vendor/autoload.php';

use Simps\MQTT\Hex\ReasonCode;
use Swoole\Coroutine;
use Simps\MQTT\Client;
use Simps\MQTT\Types;

$cOnfig= [
    'host' => 'broker.emqx.io',
    'port' => 1883,
    'time_out' => 5,
    'user_name' => 'user001',
    'password' => 'hLXQ9ubnZGzkzf',
    'client_id' => Client::genClientID(),
    'keep_alive' => 10,
    'properties' => [
        'session_expiry_interval' => 60,
        'receive_maximum' => 200,
        'topic_alias_maximum' => 200,
    ],
    'protocol_level' => 5,
];

Coroutine\run(function () use ($config) {
    $client = new Client($config, ['open_mqtt_protocol' => true, 'package_max_length' => 2 * 1024 * 1024]);
    while (!$data = $client->connect()) {
        Coroutine::sleep(3);
        $client->connect();
    }
    $topics['simps-mqtt/user001/get'] = [
        'qos' => 1,
        'no_local' => true,
        'retain_as_published' => true,
        'retain_handling' => 2,
    ];
    $timeSincePing = time();
    $res = $client->subscribe($topics);
    // 订阅的后果
    var_dump($res);
    while (true) {
        $buffer = $client->recv();
        if ($buffer && $buffer !== true) {
            $timeSincePing = time();
            // 收到的数据包
            var_dump($buffer);
        }
        if (isset($config[&#039;keep_alive&#039;]) && $timeSincePing <(time() - $config[&#039;keep_alive&#039;])) {
            $buffer = $client->ping();
            if ($buffer) {
                echo &#039;send ping success&#039; . PHP_EOL;
                $timeSincePing = time();
            } else {
                $client->close();
                break;
            }
        }
        // QoS1 公布回复
        if ($buffer[&#039;type&#039;] === Types::PUBLISH && $buffer[&#039;qos&#039;] === 1) {
            $client->send(
                [
                    &#039;type&#039; => Types::PUBACK,
                    &#039;message_id&#039; => $buffer[&#039;message_id&#039;],
                    &#039;code&#039; => ReasonCode::SUCCESS
                ]
            );
        }
    }
});

执行php subscribe.php,就会失去这样的输入

array(3) {
  ["type"]=>
  int(9)
  ["message_id"]=>
  int(1)
  ["codes"]=>
  array(1) {
    [0]=>
    int(1)
  }
}

示意订阅胜利,codes 对应的是对应订阅主题的 QoS 等级

公布

订阅胜利之后,创立一个publish.php来测试公布

include __DIR__ . &#039;/vendor/autoload.php&#039;;

use Swoole\Coroutine;
use Simps\MQTT\Client;

$cOnfig= [
    &#039;host&#039; => &#039;broker.emqx.io&#039;,
    &#039;port&#039; => 1883,
    &#039;time_out&#039; => 5,
    &#039;user_name&#039; => &#039;user002&#039;,
    &#039;password&#039; => &#039;adIJS1D482sd&#039;,
    &#039;client_id&#039; => Client::genClientID(),
    &#039;keep_alive&#039; => 20,
    &#039;properties&#039; => [
        &#039;session_expiry_interval&#039; => 60,
        &#039;receive_maximum&#039; => 200,
        &#039;topic_alias_maximum&#039; => 200,
    ],
    &#039;protocol_level&#039; => 5,
];

Coroutine\run(function () use ($config) {
    $client = new Client($config, [&#039;open_mqtt_protocol&#039; => true, &#039;package_max_length&#039; => 2 * 1024 * 1024]);
    while (!$client->connect()) {
        Coroutine::sleep(3);
        $client->connect();
    }
    while (true) {
        $respOnse= $client->publish(
            &#039;simps-mqtt/user001/get&#039;,
            &#039;{"time":&#039; . time() . &#039;}&#039;,
            1,
            0,
            0,
            [&#039;topic_alias&#039; => 1]
        );
        var_dump($response);
        Coroutine::sleep(3);
    }
});

代码的意思是每隔 3 秒给订阅的主题simps-mqtt/user001/get公布一次音讯

关上一个新的终端窗口,执行php publish.php就会失去输入:

array(4) {
  ["type"]=>
  int(4)
  ["message_id"]=>
  int(1)
  ["code"]=>
  int(0)
  ["message"]=>
  string(7) "Success"
}

这里减少了 message,为了用户可读,不须要去查找对应的 code 含意是什么

返回到订阅的窗口,就会看到所打印的公布信息

array(8) {
  ["type"]=>
  int(3)
  ["topic"]=>
  string(0) ""
  ["message"]=>
  string(19) "{"time":1608017156}"
  ["dup"]=>
  int(1)
  ["qos"]=>
  int(1)
  ["retain"]=>
  int(0)
  ["message_id"]=>
  int(4)
  ["properties"]=>
  array(1) {
    ["topic_alias"]=>
    int(1)
  }
}

这样一个简略的公布订阅性能就实现了

在这个库中还有一些值得优化和还未实现的局部,如还没有反对 MQTT5 的Auth type,以及局部的properties还未反对

想参加的同学能够提交 PR,如果有问题也能够提交 Issue,让咱们独特去建设 PHP 的生态

仓库地址:simps/mqtt ,反对记得点个 Star 哦



推荐阅读
author-avatar
810526猪肝
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有