热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

[Python]MQTT介绍与使用

[Python]MQTT介绍与使用MQTT协议MQTT(MessageQueuingTelemetryTransport,消息队列遥测传输协议&#x

[Python] MQTT介绍与使用


MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。


特点


  • 开放消息协议,简单易实现
  • 发布订阅模式,一对多消息发布
  • 基于TCP/IP网络连接,提供有序,无损,双向连接。
  • 1字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量。
  • 消息QoS支持,可靠传输保证

原理

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:


  • (1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload)

  • (2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

在这里插入图片描述


MQTT代理

mosquitto是一款开源的MQTT消息代理(服务器)软件,提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通信变得简单,比如现在应用广泛的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。


  • 安装mosquitto

sudo apt-get install mosquitto -y

  • 安装mosquitto命令行客户端

sudo apt-get install mosquitto-clients -y

使用

发布使用mosquitto_pub命令,订阅使用mosquitto_sub命令,常用参数如下


参数描述
-h服务器主机,默认localhost
-t指定主题
-u用户名
-P密码
-i客户端id,唯一
-m发布的消息内容

发布

mosquitto_sub -h localhost -t "test/#" -u user2 -P 123456 -i "client1"

订阅

mosquitto_pub -h localhost -t "test/abc" -u user1 -P 123456 -i "client3" -m "How are you?"

MQTT使用


MQTT 客户端工具

MQTT 客户端工具常用于建立与 MQTT 服务器 的连接,进行主题订阅、消息收发等操作


  • MQTT X 是由 EMQ 开源的一款跨平台 MQTT 5.0 桌面测试客户端,支持 macOS,Linux,Windows

    MQTTX

    项目地址: MQTT X 官网

    下载地址: MQTT X GitHub

  • mqtt-spy是 Eclipse Paho 和 Eclipse IoT 的一部分,它通过直接启动 JAR 文件在 Java 8 和 JavaFX 之上运行,mqtt-spy 有一种很好的交互方式来展现基本的 MQTT发布/订阅机制。

    项目地址: GitHub mqtt-spy

    下载地址: https://github.com/eclipse/paho.mqtt-spy/releases

  • MQTT Explorer 是一个全面且易于使用的 MQTT 客户端,是目前比较流行的 MQTT 桌面测试客户端之一,基于它提供有关 MQTT Topics 的结构化预览展示,并使其在对 MQTT Broker 上的设备/服务的使用变得非常简单。目前基于 CC BY-NC-ND 4.0 协议开源,用户可随意查看源码和使用。

    explorer

    项目地址: Github MQTT-Explorer

    下载地址: https://mqtt-explorer.com/


Python

使用 paho-mqtt 库 ,实现客户端与 MQTT 服务器的连接、订阅、取消订阅、收发消息等功能


  • 使用connect()/connect_async() 连接MQTT代理
  • 频繁的调用loop()来维持与MQTT代理之间的流量或者使用loop_start()来设置一个线程为你调用loop()或者在一个阻塞的函数中调用loop_forever()来为你调用loop()
  • 使用subscribe()订阅一个主题(topic)并接受消息(messages)
  • 使用publish()来发送消息
  • 使用disconnect()来断开与MQTT代理的连接

详细见paho-mqtt项目描述


回调 Callbacks


  • on_connect 当代理响应连接请求时调用
  • on_disconnect 当与代理断开连接时调用
  • on_message 当收到关于客户订阅的主题的消息时调用。
  • on_publish 当使用publish()发送的消息已经传输到代理时被调用
  • on_subscribe 当代理响应订阅请求时被调用。
  • on_unsubscribe 当代理响应取消订阅请求时调用
  • on_log 当客户端有日志信息时调用

Client


  • 构造函数
    Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")

    参数含义
    client_id连接到代理时使用的唯一客户端ID字符串。 如果client_id长度为零或无,则会随机生成一个。 在这种情况下,clean_session参数必须为True。
    clean_session一个决定客户端类型的布尔值。 如果为True,那么代理将在其断开连接时删除有关此客户端的所有信息。 如果为False,则客户端是持久客户端,当客户端断开连接时,订阅信息和排队消息将被保留。
    userdata用户定义的任何类型的数据作为userdata参数传递给回调函数。 它可能会在稍后使用user_data_set()函数进行更新。
    protocol用于此客户端的MQTT协议的版本。 可以是MQTTv31或MQTTv311或MQTT v5.0。
    transport设置为“websockets”通过WebSockets发送MQTT。 保留默认的“tcp”使用原始TCP。
  • reinitialise

    reinitialise(client_id="", clean_session=True, userdata=None)

    reinitialise()函数将客户端重置为其开始状态,就像它刚刚创建一样。 它采用与Client()构造函数相同的参数。

  • connect
    将客户端连接到代理。 这是一个阻塞函数。

    connect(host, port=1883, keepalive=60, bind_address="")

    参数含义
    host远程代理的主机名或IP地址
    port要连接的服务器主机的网络端口。 默认为1883
    keepalive与代理通信之间允许的最长时间段(以秒为单位)。 如果没有其他消息正在交换,则它将控制客户端向代理发送ping消息的速率
    bind_address假设存在多个接口,将绑定此客户端的本地网络接口的IP地址

Publish

从客户端发送消息给代理
publish(topic, payload=None, qos=0, retain=False)

消息将会发送给代理,并随后从代理发送到订阅匹配主题的任何客户端


参数含义
topic该消息发布的主题
payload要发送的实际消息。如果没有给出,或设置为无,则将使用零长度消息。 传递int或float将导致有效负载转换为表示该数字的字符串。 如果你想发送一个真正的int / float,使用struct.pack()来创建你需要的负载
qos服务的质量级别
retain如果设置为True,则该消息将被设置为该主题的“最后已知良好”/保留的消息

返回以下属性和方法的MQTTMessageInfo
rc:发布的结果


内容含义
MQTT_ERR_SUCCESS成功
MQTT_ERR_NO_CONN客户端当前未连接
MQTT_ERR_QUEUE_SIZE当使用max_queued_messages_set来指示消息既不排队也不发送。

mid:发布请求的消息ID。
如果mid已定义,则可以通过检查on_publish()回调中的mid来跟踪发布请求。
wait_for_publish():函数将阻塞,直到消息发布。 如果消息未排队(rc == MQTT_ERR_QUEUE_SIZE),它将引发ValueError。
is_published:如果消息已发布,is_published返回True。 如果消息未排队(rc == MQTT_ERR_QUEUE_SIZE),它将引发ValueError。


如果主题为无,长度为零或无效(包含通配符),qos不是0,1或2之一,或者有效负载长度大于268435455字节,则会引发ValueError。



Subscribe

订阅一个或多个主题

subscribe(topic, qos=0


参数
topic一个字符串,指定要订阅的订阅主题
qos期望的服务质量等级。 默认为0。

Example

#!/usr/bin/env python3
# -*- coding: utf-8 -*-import paho.mqtt.client as mqtt
import jsonCONN_CONFIG = {'product': {'host': 'mqtt.broker.com','port': 1883,'username': 'username','password': 'password'},'dev': {'host': '127.0.0.1','port': 1883,'username': 'username','password': 'password'}
}class MQTTClient:def __init__(self, client_id, profile='dev', ):self.config = CONN_CONFIG[profile]self.client_id = client_idself.connect_flag = Falseself.client = self.__create_client(client_id)self.__connect()def __create_client(self, client_id):client = mqtt.Client(client_id=client_id)client.username_pw_set(self.config['username'], self.config['password'])client.on_connect = self.__on_connectclient.on_message = self.__on_messageclient.on_publish = self.__on_publishclient.on_disconnect = self.__on_disconnectreturn clientdef __connect(self):self.client.connect(self.config['host'], self.config['port'], 60)self.client.loop_start()def publish(self, msg):if not self.connect_flag:self.__connect()payload = dict(data=msg)self.client.publish(f'topic', payload=json.dumps(payload), qos=0)def __on_connect(self, client, userdata, flags, rc):self.connect_flag = Trueprint("connected with result code: " + str(rc))def __on_message(self, client, userdata, msg):print(msg.topic + " " + str(msg.payload))def __on_subscribe(self, client, userdata, mid, granted_qos):print('on subscribe: mid=' + str(mid))def __on_publish(self, client, userdata, mid):print("on publish: mid=" + str(mid))def __on_disconnect(self, client, userdata, rc):self.connect_flag = Falseprint('disconnecting reason ' + str(rc))def __del__(self):self.client.disconnect()

参考

MQTT 入门介绍
MQTT中文网
ubuntu下Mosquitto安装及配置
常见MQTT 客户端工具比较
Python paho-mqtt 模块使用和API分析


推荐阅读
author-avatar
夜沙
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有