热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

基于MQTT的物联网云测量解决方案

1.问题描述最近,本实验室大量上马云测量,云监控方面的项目,大概是属于物联网应用的一个分支。老板也有将旧有仪器改造的想法,所

1. 问题描述

最近,本实验室大量上马云测量,云监控方面的项目,大概是属于物联网应用的一个分支。老板也有将旧有仪器改造的想法,所以要实现仪器设备的云控制。本文是其中的一个解决方案。

2. 技术选型

  1. 消息队列:MQTT,服务器使用centos,安装mosquitto
  2. 客户端使用C#,窗体框架使用WPF,MQTT的客户端使用MQTTNet
  3. 服务端采用spring-cloud微服务框架
  4. 前端采用Vue,使用Element-admin-ui后台框架,使用MQTTJS组件(MQTTJS采用websocket连接方式)
  5. 客户端的测量采集数据程序使用TPL Dataflow

3. 架构设计

这个解决方案加入如图(请原谅我的懒惰):

以上只是本demo的架构,很简单,但是也有很大的问题。

3.1 客户端:

客户端跟随仪器,原则上一台仪器一个控制程序,当然也可以有多个。客户端实现了对仪器所有硬件设备的控制,对所有数据的采集。客户端可以有界面,也可以没有界面,一般来说,我们是需要一个界面的,客户端可以独立完成测量任务。

客户端集成了网络通信功能,可以完全替代用户对客户端的操作使用。具体架构如下(请原谅我的懒惰):

其中,客户端界面和网络接口是等效的,可以单独控制,也可以共同控制。

3.2 服务端:

服务端基于spring-cloud微服务框架,主要提供服务发现,用户管理,权限管理,设备管理,MQTT节点管理等管理功能

3.3 前端网页:

前端网页是用户通过网络操作仪器设备的交互接口。采用日前流行的Vue框架,由于是后台管理模式,就使用Element-admin-ui这个框架。

4. Demo地址

客户端: https://github.com/spartajet/IotWpfClient
服务端:https://github.com/spartajet/iot-demo-server
前端网页:https://github.com/spartajet/iot-demo-web

5. MQTT介绍

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和制动器(比如通过Twitter让房屋联网)的通信协议。

MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
1、使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
2、对负载内容屏蔽的消息传输;
3、使用 TCP/IP 提供网络连接;
4、有三种消息发布服务质量:
“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5、小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
6、使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制;

详细的MQTT协议内容请参考:https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/01-Introduction.html

6. MQTT安装

本文使用开源的MQTT 服务器mosquitto,介绍如下:

Eclipse Mosquitto is an open source (EPL/EDL licensed) message broker that implements the MQTT protocol versions 3.1 and 3.1.1. Mosquitto is lightweight and is suitable for use on all devices from low power single board computers to full servers.

The MQTT protocol provides a lightweight method of carrying out messaging using a publish/subscribe model. This makes it suitable for Internet of Things messaging such as with low power sensors or mobile devices such as phones, embedded computers or microcontrollers.

6.1 centos 安装 mosquitto

CentOS 7没有mosquitto包。要安装它,我们将首先安装一个名为Extra Packages for Enterprise Linux或EPEL的额外软件存储库。

sudo yum -y install epel-release

然后安装

sudo yum -y install mosquitto

6.2 MAC 安装 mosquitto

直接brew开路:

brew install mosquitto

6.3 mosquitto配置

6.3.1 配置文件

配置文件mosquitto.conf路径:

mac :/usr/local/etc/mosquitto/mosquitto.conf

centos:/etc/mosquitto/mosquitto.conf

6.3.2 配置MQTT端口:

bind_address 127.0.0.1
port 1883
protocol mqtt

bind_address可以不配置,

6.3.3 开启websocket支持:

listener 1884
protocol websockets

6.3.4 配置密码

首先,要禁止匿名登录

allow_anonymous false

配置密码文件:

password_file usr/local/etc/mosquitto/pwfile

配置密码的方法:

mosquitto提供了mosquitto_passwd工具设置密码,使用方法如下:

➜ ~ mosquitto_passwd help
mosquitto_passwd is a tool for managing password files for mosquitto.Usage: mosquitto_passwd [-c | -D] passwordfile usernamemosquitto_passwd -b passwordfile username passwordmosquitto_passwd -U passwordfile-b : run in batch mode to allow passing passwords on the command line.-c : create a new password file. This will overwrite existing files.-D : delete the username rather than adding/updating its password.-U : update a plain text password file to use hashed passwords.See http://mosquitto.org/ for more information.

我的做法是:先在usr/local/etc/mosquitto/pwfile中把用的密码写下来,如下:

user1:passwd1
user2:passwd2
user3:passwd3
user4:passwd4
user5:passwd5
user6:passwd6

然后使用mosquitto_passwd -U usr/local/etc/mosquitto/pwfile指令生成密码

更多配置请参考 https://mosquitto.org

6.3.5 完整配置

# Config file for mosquitto
#
# See mosquitto.conf(5) for more information.
#
# Default values are shown, uncomment to change.
#
# Use the
# character to indicate a comment, but only if it is the
# very first character on the line.
# =================================================================
# General configuration
# =================================================================
# 重新发送已经发出去的Qos 为1或者2的消息的等待时间
retry_interval 20
# 系统状态的刷新时间,设置为0表示不刷新
sys_interval 10
#清除在内部消息存储里面的未引用的消息的时间。
#较低的值将占用较少的内存,但处理器时间较长,
#越高的值将产生相反的效果。
#设置值为0意味着未引用的消息将以尽可能快的速度处理。
store_clean_interval 10
#pid_file
# 以什么用户启动 mosquitto,此配置在 windows 下无效,以非 root 运行无效
#user mosquitto
#当前每个客户端正在传输的Qo1和2消息的最大数量。
#这包括通过握手信息,以及那些正在重试的信息。
#默认为20。设置为0表示无上限。
#设置为1将保证QoS 1 和2的消息按顺序传递 max_inflight_messages 20
#当前正在进行的队列中Qos 1和2条消息的最大数量。默认为100。
#设置到0表示没有上限(不推荐)。同样可参见queue_qos0_messages max_queued_messages 100
#设置为true,当一个持久客户端被断开连接时,以Qos为0将消息放到队列中。
#这些消息受max_queued_messages限制 queue_qos0_messages false
#此选项设置被代理允许发布的消息的大小。
#超过这个尺寸的消息将不会被代理接受。
#默认值为0,这意味着所有有效的MQTT消息都被接受。
#MQTT的最大有效大小为268435455字节
message_size_limit 0
# 用于设置客户端长连接的过期时间,默认永不过期,必须以h d w m y为单位
#分别代表 小时,天,星期,月,念
#persistent_client_expiration
# 如果客户端订阅了多个重叠的订阅,例如foo/
#和foo/+/baz,然后MQTT期望当代理接
#收到一个与两个订阅相匹配的主题的消息时,例如foo/bar/baz,那么客户端应该只接
#收一次消息。为了满足这一要求,mosquitto不断跟踪发送给客户的消息。允许重复的
#消息选项允许禁用此行为,如果您有大量的客户端订阅相同的主题集合,并且非常关注
#最小化内存使用的情况,那么这个选项可能是有用的。如果你事先知道你的客户端永不
#会有重叠的订阅,那么你的客户必须能够正确处理重复的信息,即使在Qo = 2的时候,
#你的客户端也必须能够正确地处理重复的信息
#allow_duplicate_messages false
# =================================================================
# Default listener
# =================================================================
# 服务绑定的IP地址
#bind_address
# 服务绑定的端口号
#port 1883
# 允许的最大连接数,-1表示没有限制
#max_connections -1
# cafile:CA证书文件
# capath:CA证书目录
# certfile:PEM证书文件
# keyfile:PEM密钥文件
#cafile
#capath
#certfile
#keyfile
# 必须提供证书以保证数据安全性
#require_certificate false
# 若require_certificate值为true,use_identity_as_username也必须为true #use_identity_as_username false
# 启用PSK(Pre-shared-key)支持
#psk_hint
# SSL/TSL加密算法,可以使用“openssl ciphers”命令获取
# as the output of that command.
#ciphers
# =================================================================
# Persistence
# =================================================================
# 消息自动保存的间隔时间
#autosave_interval 1800
# 消息自动保存功能的开关
#autosave_on_changes false
# 持久化功能的开关 persistence true
# 持久化DB文件
#persistence_file mosquitto.db
# 持久化DB文件目录
#persistence_location /var/lib/mosquitto/
# =================================================================
# Logging
# =================================================================
# 4种日志模式:stdout、stderr、syslog、topic
# none 则表示不记日志,此配置可以提升些许性能 log_dest none
# 选择日志的级别(可设置多项)
#log_type error
#log_type warning
#log_type notice
#log_type information
# 是否记录客户端连接信息
#connection_messages true
# 是否记录日志时间
#log_timestamp true
# =================================================================
# Security
# =================================================================
# 客户端ID的前缀限制,可用于保证安全性
#clientid_prefixes
# 允许匿名用户
#allow_anonymous true
# 用户/密码文件,默认格式:username:password
#password_file
# PSK格式密码文件,默认格式:identity:key
#psk_file
# pattern write sensor/%u/data
# ACL权限配置,常用语法如下:
# 用户限制:user
# 话题限制:topic [read|write]
# 正则限制:pattern write sensor/%u/data
#acl_file
# =================================================================
# Bridges
# =================================================================
# 允许服务之间使用“桥接”模式(可用于分布式部署)
#connection
#address [:]
#topic [[[out | in | both] qos-level] local-prefix remote-prefix]
# 设置桥接的客户端ID
#clientid
# 桥接断开时,是否清除远程服务器中的消息
#cleansession false
# 是否发布桥接的状态信息
#notifications true
# 设置桥接模式下,消息将会发布到的话题地址
# $SYS/broker/connection//state #notification_topic
# 设置桥接的keepalive数值
#keepalive_interval 60
# 桥接模式,目前有三种:automatic、lazy、once
#start_type automatic
# 桥接模式automatic的超时时间
#restart_timeout 30
# 桥接模式lazy的超时时间
#idle_timeout 60
# 桥接客户端的用户名
#username
# 桥接客户端的密码
#password
# bridge_cafile:桥接客户端的CA证书文件
# bridge_capath:桥接客户端的CA证书目录
# bridge_certfile:桥接客户端的PEM证书文件
# bridge_keyfile:桥接客户端的PEM密钥文件
#bridge_cafile
#bridge_capath
#bridge_certfile
#bridge_keyfile

参考文献:https://www.imooc.com/article/19459

6.4 开启MQTT

6.4.1 MAC

brew services start/stop mosquitto

6.4.2 CentOs

systemctl start/stop/restart mosquitto

6.5 MQTT的Topic

与消息队列相比,主题非常轻量级。 客户端不需要在发布或订阅之前创建所需的主题,因为代理接受每个有效主题时不需要进行任何预初始化。

主题使用/来分层次,以下是几个主题的示例:

sensors/COMPUTER_NAME/temperature/HARDDRIVE_NAME

更重要的是,MQTT提供了通配符

6.5.1 单层通配符 +

+号仅仅匹配一个主题层次。例如,finance/stock/+匹配finance/stock/ibm与finance/stock/xyz,但是不匹配finance/stock/ibm/closingprice。因为单层次通配符仅仅匹配一个层次,finance/+不匹配finance。
单层次通配符可用于主题树内任何层次,并与多层次通配符一起使用。必须用于在顶层分隔符之后,除了当自己指定时。因此,+和finance/+ 都是有效的,但是finance+无效。单层通配符可用于主题树最后或者在主题树内,例如,finance/+与finance/+/ibm都是有效的。

示例如下:

* a/b/c/d
* +/b/c/d
* a/+/c/d
* a/+/+/d
* +/+/+/+

6.5.2 多层通配符

#号可以匹配主题内任何层次

单层次通配符可用于主题树内任何层次,并与多层次通配符一起使用。必须用于在顶层分隔符之后,除了当自己指定时。因此,+和finance/+ 都是有效的,但是finance+无效。单层通配符可用于主题树最后或者在主题树内,例如,finance/+与finance/+/ibm都是有效的。

示例如下:

* a/b/c/d
* #
* a/#
* a/b/#
* a/b/c/#
* +/b/c/#

7. 客户端程序

目前,我手上还没有一个趁手的采样设备,所以只能模拟生成数据,但是,预计下周,我就可以解决这个问题了

关于如何生成数据以及使用TPL Dataflow数据采集和处理,请参考我的另一篇博客:

像Labview一样,使用C#构建测量数据流式处理框架

这里重点介绍MQTTnet的使用

7.1 MQTTnet介绍

MQTTnet是一个高性能的MQTT基础连接.NET库。提供了MQTT服务端和客户端支持。

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.

特性如下:

  • Async support
  • TLS 1.2 support for client and server (but not UWP servers)
  • Extensible communication channels (i.e. In-Memory, TCP, TCP+TLS, WS)
  • Lightweight (only the low level implementation of MQTT, no overhead)
  • Performance optimized (processing ~70.000 messages / second)*
  • Interfaces included for mocking and testing
  • Access to internal trace messages
  • Unit tested (~90 tests)

目前支持的版本如下:

  • .NET Standard 1.3+
  • .NET Core 1.1+
  • .NET Core App 1.1+
  • .NET Framework 4.5.2+ (x86, x64, AnyCPU)
  • Mono 5.2+
  • Universal Windows Platform (UWP) 10.0.10240+ (x86, x64, ARM, AnyCPU, Windows 10 IoT Core)
  • Xamarin.Android 7.5+
  • Xamarin.iOS 10.14+

重点是:支持异步

7.2 MQTTnet客户端使用

连接MQTT

///


/// 初始化初始化MQTT
///

private void InitialMqtt()
{this._mqttClient = new MqttFactory().CreateMqttClient();this._mqttClient.ConnectAsync(new MqttClientOptionsBuilder().WithClientId(Guid.NewGuid().ToString("N")).WithTcpServer("*****",1883).WithCredentials("admin", "admin").WithCleanSession().Build());
}

可以看到,我们使用的ConnectAsync()方法,是异步连接。

发布消息

首先要构建消息

var message = new MqttApplicationMessageBuilder().WithTopic("measure/force").WithPayload(t.ToString(CultureInfo.InvariantCulture)).WithExactlyOnceQoS().WithRetainFlag().Build();

然后异步发送

this._mqttClient.PublishAsync(message);

详细代码请参考

客户端: https://github.com/spartajet/IotWpfClient

8. 服务端程序

服务端主要是提供用户管理,参考源代码即可,涉及到CORS跨域问题,请参考我的另一篇博客:

Spring boot 和Vue开发中CORS跨域问题

9. 前端MQTTJS使用

9.1 mqttjs 介绍

mqttjs是支持MQTT协议的客户端Javascript库,注意只是客户端,并且通信方式是websockt,所以要在mosquitto服务器开启websocket支持。

MQTT.js is a client library for the MQTT protocol, written in Javascript for node.js and the browser.

9.2 mqttjs安装

npm install mqtt

9.3 mqttjs的API

mqtt.connect()
mqtt.Client()
mqtt.Client#publish()
mqtt.Client#subscribe()
mqtt.Client#unsubscribe()
mqtt.Client#end()
mqtt.Client#removeOutgoingMessage()
mqtt.Client#reconnect()
mqtt.Client#handleMessage()
mqtt.Client#connected
mqtt.Client#reconnecting
mqtt.Client#getLastMessageId()
mqtt.Store()
mqtt.Store#put()
mqtt.Store#del()
mqtt.Store#createStream()
mqtt.Store#close()

9.4 mqttjs的使用

连接服务器:

const client = mqtt.connect('ws://ip:1884', {clientid: 'fdafdafas',username: 'admin',password: 'admin'})

设置连接后的事件,要订阅相关主题的消息

client.on('connect', function() {client.subscribe('measure/force', function(err) {if (!err) {client.publish('measure/force', 'Hello mqtt')}})})

消息推送通知事件

client.on('message', function (topic, message) {// message is Bufferconsole.log(message.toString())client.end()
})

其他内容请参考项目源码:

前端网页:https://github.com/spartajet/iot-demo-web

10 总结&展望

  1. 目前的demo只是完成了mqtt的使用基础范例,没有其他功能
  2. 对于设备管理,用户权限等功能,打算用hsweb大神的物联网框架hsweb-iot-cloud,也不排除自己开发的可能,看我的时间和项目需求
  3. mqtt的压力测试还没有测试,但是从目前的情况来看(我的MQTT服务器用的华为云),我客户端每秒生成100个数据,网页端显示基本没什么延时,但并不代表实时性很好



推荐阅读
  • 基于KVM的SRIOV直通配置及性能测试
    SRIOV介绍、VF直通配置,以及包转发率性能测试小慢哥的原创文章,欢迎转载目录?1.SRIOV介绍?2.环境说明?3.开启SRIOV?4.生成VF?5.VF ... [详细]
  • 深入解析 Apache Shiro 安全框架架构
    本文详细介绍了 Apache Shiro,一个强大且灵活的开源安全框架。Shiro 专注于简化身份验证、授权、会话管理和加密等复杂的安全操作,使开发者能够更轻松地保护应用程序。其核心目标是提供易于使用和理解的API,同时确保高度的安全性和灵活性。 ... [详细]
  • 本文作者分享了在阿里巴巴获得实习offer的经历,包括五轮面试的详细内容和经验总结。其中四轮为技术面试,一轮为HR面试,涵盖了大量的Java技术和项目实践经验。 ... [详细]
  • PostgreSQL 最新动态 —— 2022年4月6日
    了解 PostgreSQL 社区的最新进展和技术分享 ... [详细]
  • Spring Cloud学习指南:深入理解微服务架构
    本文介绍了微服务架构的基本概念及其在Spring Cloud中的实现。讨论了微服务架构的主要优势,如简化开发和维护、快速启动、灵活的技术栈选择以及按需扩展的能力。同时,也探讨了微服务架构面临的挑战,包括较高的运维要求、分布式系统的复杂性、接口调整的成本等问题。最后,文章提出了实施微服务时应遵循的设计原则。 ... [详细]
  • Spring Cloud Config 使用 Vault 作为配置存储
    本文探讨了如何在Spring Cloud Config中集成HashiCorp Vault作为配置存储解决方案,基于Spring Cloud Hoxton.RELEASE及Spring Boot 2.2.1.RELEASE版本。文章还提供了详细的配置示例和实践建议。 ... [详细]
  • 利用YAML配置Resilience4J的Circuit Breaker
    本文探讨了Resilience4j作为现代Java应用程序中不可或缺的容错工具,特别介绍了如何通过YAML文件配置Circuit Breaker以提高服务的弹性和稳定性。 ... [详细]
  • Canvas漫游:碰撞检测与动画模拟
    探索Canvas在Web开发中的应用,通过碰撞检测与动画模拟提升交互体验。 ... [详细]
  • 本文档详细介绍了在 CentOS Linux 7.9 系统环境下,如何从源代码编译安装 libwebsockets 库及其示例程序,并提供了编译过程中可能遇到的问题及解决方案。 ... [详细]
  • Java WebSocket 实时通信服务端实现
    本文介绍了一个基于Java的WebSocket实时通信服务端代码示例,包括客户端连接管理、消息接收与分发等功能。 ... [详细]
  • 经过一段时间的学习与实践,我已经使用D3.js完成了一些项目。鉴于中文D3教程稀缺,而英文资料虽丰富却对英语水平有一定要求,特此撰写一系列D3实战文章,旨在通过具体案例(如统计数据可视化、地图信息展示等)分享D3的使用技巧,促进技术交流。 ... [详细]
  • 本文通过对OkHttp源码的详细解读,旨在帮助读者理解其核心执行流程,特别是同步与异步请求的处理方式。文中不仅涵盖了基本的使用示例,还深入探讨了OkHttp的核心功能——拦截器链的工作原理。 ... [详细]
  • 本文总结了一次针对大厂Java研发岗位的面试经历,探讨了面试中常见的问题及其背后的原因,并分享了一些实用的面试准备资料。 ... [详细]
  • 本文探讨了 Java 中 HttpClient 和 HtmlUnit 的区别,重点介绍了它们的功能和应用场景。 ... [详细]
  • 如何使用 `org.apache.tomcat.websocket.server.WsServerContainer.findMapping()` 方法及其代码示例解析 ... [详细]
author-avatar
淘美国
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有