作者:夜沙 | 来源:互联网 | 2023-06-08 01:54
[Python] MQTT介绍与使用
MQTT协议
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
特点
- 开放消息协议,简单易实现
- 发布订阅模式,一对多消息发布
- 基于TCP/IP网络连接,提供有序,无损,双向连接。
- 1字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量。
- 消息QoS支持,可靠传输保证
原理
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
MQTT代理
mosquitto是一款开源的MQTT消息代理(服务器)软件,提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通信变得简单,比如现在应用广泛的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。
sudo apt-get install mosquitto -y
sudo apt-get install mosquitto-clients -y
使用
发布使用mosquitto_pub命令,订阅使用mosquitto_sub命令,常用参数如下
参数 | 描述 |
---|
-h | 服务器主机,默认localhost |
-t | 指定主题 |
-u | 用户名 |
-P | 密码 |
-i | 客户端id,唯一 |
-m | 发布的消息内容 |
发布
mosquitto_sub -h localhost -t "test/#" -u user2 -P 123456 -i "client1"
订阅
mosquitto_pub -h localhost -t "test/abc" -u user1 -P 123456 -i "client3" -m "How are you?"
MQTT使用
MQTT 客户端工具
MQTT 客户端工具常用于建立与 MQTT 服务器 的连接,进行主题订阅、消息收发等操作
-
MQTT X 是由 EMQ 开源的一款跨平台 MQTT 5.0 桌面测试客户端,支持 macOS,Linux,Windows
项目地址: MQTT X 官网
下载地址: MQTT X GitHub
-
mqtt-spy是 Eclipse Paho 和 Eclipse IoT 的一部分,它通过直接启动 JAR 文件在 Java 8 和 JavaFX 之上运行,mqtt-spy 有一种很好的交互方式来展现基本的 MQTT发布/订阅机制。
项目地址: GitHub mqtt-spy
下载地址: https://github.com/eclipse/paho.mqtt-spy/releases
-
MQTT Explorer 是一个全面且易于使用的 MQTT 客户端,是目前比较流行的 MQTT 桌面测试客户端之一,基于它提供有关 MQTT Topics 的结构化预览展示,并使其在对 MQTT Broker 上的设备/服务的使用变得非常简单。目前基于 CC BY-NC-ND 4.0 协议开源,用户可随意查看源码和使用。
项目地址: Github MQTT-Explorer
下载地址: https://mqtt-explorer.com/
Python
使用 paho-mqtt 库 ,实现客户端与 MQTT 服务器的连接、订阅、取消订阅、收发消息等功能
- 使用connect()/connect_async() 连接MQTT代理
- 频繁的调用loop()来维持与MQTT代理之间的流量或者使用loop_start()来设置一个线程为你调用loop()或者在一个阻塞的函数中调用loop_forever()来为你调用loop()
- 使用subscribe()订阅一个主题(topic)并接受消息(messages)
- 使用publish()来发送消息
- 使用disconnect()来断开与MQTT代理的连接
详细见paho-mqtt项目描述
回调 Callbacks
- on_connect 当代理响应连接请求时调用
- on_disconnect 当与代理断开连接时调用
- on_message 当收到关于客户订阅的主题的消息时调用。
- on_publish 当使用publish()发送的消息已经传输到代理时被调用
- on_subscribe 当代理响应订阅请求时被调用。
- on_unsubscribe 当代理响应取消订阅请求时调用
- on_log 当客户端有日志信息时调用
Client
-
构造函数
Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
参数 | 含义 |
---|
client_id | 连接到代理时使用的唯一客户端ID字符串。 如果client_id长度为零或无,则会随机生成一个。 在这种情况下,clean_session参数必须为True。 |
clean_session | 一个决定客户端类型的布尔值。 如果为True,那么代理将在其断开连接时删除有关此客户端的所有信息。 如果为False,则客户端是持久客户端,当客户端断开连接时,订阅信息和排队消息将被保留。 |
userdata | 用户定义的任何类型的数据作为userdata参数传递给回调函数。 它可能会在稍后使用user_data_set()函数进行更新。 |
protocol | 用于此客户端的MQTT协议的版本。 可以是MQTTv31或MQTTv311或MQTT v5.0。 |
transport | 设置为“websockets”通过WebSockets发送MQTT。 保留默认的“tcp”使用原始TCP。 |
-
reinitialise
reinitialise(client_id="", clean_session=True, userdata=None)
reinitialise()函数将客户端重置为其开始状态,就像它刚刚创建一样。 它采用与Client()构造函数相同的参数。
-
connect
将客户端连接到代理。 这是一个阻塞函数。
connect(host, port=1883, keepalive=60, bind_address="")
参数 | 含义 |
---|
host | 远程代理的主机名或IP地址 |
port | 要连接的服务器主机的网络端口。 默认为1883 |
keepalive | 与代理通信之间允许的最长时间段(以秒为单位)。 如果没有其他消息正在交换,则它将控制客户端向代理发送ping消息的速率 |
bind_address | 假设存在多个接口,将绑定此客户端的本地网络接口的IP地址 |
| |
Publish
从客户端发送消息给代理
publish(topic, payload=None, qos=0, retain=False)
消息将会发送给代理,并随后从代理发送到订阅匹配主题的任何客户端
参数 | 含义 |
---|
topic | 该消息发布的主题 |
payload | 要发送的实际消息。如果没有给出,或设置为无,则将使用零长度消息。 传递int或float将导致有效负载转换为表示该数字的字符串。 如果你想发送一个真正的int / float,使用struct.pack()来创建你需要的负载 |
qos | 服务的质量级别 |
retain | 如果设置为True,则该消息将被设置为该主题的“最后已知良好”/保留的消息 |
返回以下属性和方法的MQTTMessageInfo
rc:发布的结果
内容 | 含义 |
---|
MQTT_ERR_SUCCESS | 成功 |
MQTT_ERR_NO_CONN | 客户端当前未连接 |
MQTT_ERR_QUEUE_SIZE | 当使用max_queued_messages_set来指示消息既不排队也不发送。 |
mid:发布请求的消息ID。
如果mid已定义,则可以通过检查on_publish()回调中的mid来跟踪发布请求。
wait_for_publish():函数将阻塞,直到消息发布。 如果消息未排队(rc == MQTT_ERR_QUEUE_SIZE),它将引发ValueError。
is_published:如果消息已发布,is_published返回True。 如果消息未排队(rc == MQTT_ERR_QUEUE_SIZE),它将引发ValueError。
如果主题为无,长度为零或无效(包含通配符),qos不是0,1或2之一,或者有效负载长度大于268435455字节,则会引发ValueError。
Subscribe
订阅一个或多个主题
subscribe(topic, qos=0
参数 | 值 |
---|
topic | 一个字符串,指定要订阅的订阅主题 |
qos | 期望的服务质量等级。 默认为0。 |
Example
import paho.mqtt.client as mqtt
import jsonCONN_CONFIG = {'product': {'host': 'mqtt.broker.com','port': 1883,'username': 'username','password': 'password'},'dev': {'host': '127.0.0.1','port': 1883,'username': 'username','password': 'password'}
}class MQTTClient:def __init__(self, client_id, profile='dev', ):self.config = CONN_CONFIG[profile]self.client_id = client_idself.connect_flag = Falseself.client = self.__create_client(client_id)self.__connect()def __create_client(self, client_id):client = mqtt.Client(client_id=client_id)client.username_pw_set(self.config['username'], self.config['password'])client.on_connect = self.__on_connectclient.on_message = self.__on_messageclient.on_publish = self.__on_publishclient.on_disconnect = self.__on_disconnectreturn clientdef __connect(self):self.client.connect(self.config['host'], self.config['port'], 60)self.client.loop_start()def publish(self, msg):if not self.connect_flag:self.__connect()payload = dict(data=msg)self.client.publish(f'topic', payload=json.dumps(payload), qos=0)def __on_connect(self, client, userdata, flags, rc):self.connect_flag = Trueprint("connected with result code: " + str(rc))def __on_message(self, client, userdata, msg):print(msg.topic + " " + str(msg.payload))def __on_subscribe(self, client, userdata, mid, granted_qos):print('on subscribe: mid=' + str(mid))def __on_publish(self, client, userdata, mid):print("on publish: mid=" + str(mid))def __on_disconnect(self, client, userdata, rc):self.connect_flag = Falseprint('disconnecting reason ' + str(rc))def __del__(self):self.client.disconnect()
参考
MQTT 入门介绍
MQTT中文网
ubuntu下Mosquitto安装及配置
常见MQTT 客户端工具比较
Python paho-mqtt 模块使用和API分析