一、框架
windows 10
.NETFramework 4.6.1
MQTTnet 3.0.5.0
1、packages.config
原始文档
NuGet包
二、工具
org.eclipse.paho.ui.app-1.0.2-win32.win32.x86_64.zip(jdk-8u211-windows-x64.exe)
遇到问题
Paho
A Java Runtime Environment(JRE)or Java Development Kit (JDK)
must be available in order to run Paho. No Java virtual machine
was found after searching the following location:
...\org.eclipse.paho.ui.app-1.0.2-win32.win32.x86_64\jre\bin\javaw.exe
Javaw.exe in your current Path
解决方案
安装 JDK 并设置环境变量 , 如:Windows 10 x64 安装 "jdk-8u211-windows-x64.exe"
jdk-8u211-windows-x64.exe 配置
JDK 安装路径举例如下,以实际安装为准:
InstallPath JDK : C:\Program Files\Java\jdk1.8.0_211
InstallPath JRE : C:\Program Files\Java\jre1.8.0_211[电脑-属性-高级系统设置-环境变量-系统变量]
1、添加
变量名:JAVA_HOME
变量值:C:\Program Files\Java\jdk1.8.0_211
2、添加
变量名:JRE_HOME
变量值:C:\Program Files\Java\jre1.8.0_211
3、添加
变量名:CLASSPATH
变量值:.;%JAVA_HOME%\lib;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar;
4、追加
变量名:Path
变量值:;%JAVA_HOME%\bin;%JAVA_HOME%\jre\bin;
三、管理 NuGet 程序包
方法一、VS2013>工具>NuGet包管理器>程序包管理器控制台
NETStandard.Library
Install-Package NETStandard.Library –Version 2.0.3
MQTTnet
Install-Package MQTTnet –Version 3.0.5
方法二、VS2013>解决方案资源管理器>[Project]>引用(右键)>管理NuGet程序包(N)...
NETStandard.Library
MQTTnet
四、编写订阅和发布代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Connecting;namespace CSDN
{class HOSMQTT{private static MqttClient mqttClient &#61; null;private static IMqttClientOptions options &#61; null;private static bool runState &#61; false;private static bool running &#61; false;/// /// 服务器IP/// private static string ServerUrl &#61; "182.61.51.85";/// /// 服务器端口/// private static int Port &#61; 61613;/// /// 选项 - 开启登录 - 密码/// private static string Password &#61; "ruichi8888";/// /// 选项 - 开启登录 - 用户名/// private static string UserId &#61; "admin";/// /// 主题/// China/Hunan/Yiyang/Nanxian/// Hotel/Room01/Tv/// Hospital/Dept01/Room001/Bed001/// Hospital/#/// private static string Topic &#61; "China/Hunan/Yiyang/Nanxian";/// /// 保留/// private static bool Retained &#61; false;/// /// 服务质量/// 0 - 至多一次/// 1 - 至少一次/// 2 - 刚好一次/// private static int QualityOfServiceLevel &#61; 0;public static void Stop(){runState &#61; false;}public static bool IsRun(){return (runState && running);}/// /// 启动客户端/// public static void Start(){try{runState &#61; true;System.Threading.Thread thread &#61; new System.Threading.Thread(new System.Threading.ThreadStart(Work));thread.Start();}catch (Exception exp){Console.WriteLine(exp);}}/// /// /// private static void Work(){running &#61; true;Console.WriteLine("Work >>Begin");try{var factory &#61; new MqttFactory();mqttClient &#61; factory.CreateMqttClient() as MqttClient;options &#61; new MqttClientOptionsBuilder().WithTcpServer(ServerUrl, Port).WithCredentials(UserId, Password).WithClientId(Guid.NewGuid().ToString().Substring(0, 5)).Build();mqttClient.ConnectAsync(options);mqttClient.ConnectedHandler &#61; new MqttClientConnectedHandlerDelegate(new Func(Connected));mqttClient.DisconnectedHandler &#61; new MqttClientDisconnectedHandlerDelegate(new Func(Disconnected));mqttClient.ApplicationMessageReceivedHandler &#61; new MqttApplicationMessageReceivedHandlerDelegate(new Action(MqttApplicationMessageReceived));while (runState){System.Threading.Thread.Sleep(100);}}catch (Exception exp){Console.WriteLine(exp);}Console.WriteLine("Work >>End");running &#61; false;runState &#61; false;}public static void StartMain(){try{var factory &#61; new MqttFactory();var mqttClient &#61; factory.CreateMqttClient();var options &#61; new MqttClientOptionsBuilder().WithTcpServer(ServerUrl, Port).WithCredentials(UserId, Password).WithClientId(Guid.NewGuid().ToString().Substring(0, 5)).Build();mqttClient.ConnectAsync(options);mqttClient.UseConnectedHandler(async e &#61;>{Console.WriteLine("Connected >>Success");// Subscribe to a topicvar topicFilterBulder &#61; new TopicFilterBuilder().WithTopic(Topic).Build();await mqttClient.SubscribeAsync(topicFilterBulder);Console.WriteLine("Subscribe >>" &#43; Topic);});mqttClient.UseDisconnectedHandler(async e &#61;>{Console.WriteLine("Disconnected >>Disconnected Server");await Task.Delay(TimeSpan.FromSeconds(5));try{await mqttClient.ConnectAsync(options);}catch (Exception exp){Console.WriteLine("Disconnected >>Exception" &#43; exp.Message);}});mqttClient.UseApplicationMessageReceivedHandler(e &#61;>{Console.WriteLine("MessageReceived >>" &#43; Encoding.UTF8.GetString(e.ApplicationMessage.Payload));});Console.WriteLine(mqttClient.IsConnected.ToString());}catch (Exception exp){Console.WriteLine("MessageReceived >>" &#43; exp.Message);}}/// /// 发布/// /// 0 - 最多一次/// 1 - 至少一次/// 2 - 仅一次/// /// 发布主题/// 发布内容/// public static void Publish( string Topic,string Message){try{if (mqttClient &#61;&#61; null) return;if (mqttClient.IsConnected &#61;&#61; false)mqttClient.ConnectAsync(options);if (mqttClient.IsConnected &#61;&#61; false){Console.WriteLine("Publish >>Connected Failed! ");return;}Console.WriteLine("Publish >>Topic: " &#43; Topic &#43; "; QoS: " &#43; QualityOfServiceLevel &#43; "; Retained: " &#43; Retained &#43; ";");Console.WriteLine("Publish >>Message: " &#43; Message);MqttApplicationMessageBuilder mamb &#61; new MqttApplicationMessageBuilder().WithTopic(Topic).WithPayload(Message).WithRetainFlag(Retained);if (QualityOfServiceLevel &#61;&#61; 0){mamb &#61; mamb.WithAtMostOnceQoS();}else if (QualityOfServiceLevel &#61;&#61; 1){mamb &#61; mamb.WithAtLeastOnceQoS();}else if (QualityOfServiceLevel &#61;&#61; 2){mamb &#61; mamb.WithExactlyOnceQoS();}mqttClient.PublishAsync(mamb.Build());}catch (Exception exp){Console.WriteLine("Publish >>" &#43; exp.Message);}}/// /// 连接服务器并按标题订阅内容/// /// /// private static async Task Connected(MqttClientConnectedEventArgs e){try{List listTopic &#61; new List();if (listTopic.Count() <&#61; 0){var topicFilterBulder &#61; new TopicFilterBuilder().WithTopic(Topic).Build();listTopic.Add(topicFilterBulder);Console.WriteLine("Connected >>Subscribe " &#43; Topic);}await mqttClient.SubscribeAsync(listTopic.ToArray());Console.WriteLine("Connected >>Subscribe Success");}catch (Exception exp){Console.WriteLine(exp.Message);}}/// /// 失去连接触发事件/// /// /// private static async Task Disconnected(MqttClientDisconnectedEventArgs e){try{Console.WriteLine("Disconnected >>Disconnected Server");await Task.Delay(TimeSpan.FromSeconds(5));try{await mqttClient.ConnectAsync(options);}catch (Exception exp){Console.WriteLine("Disconnected >>Exception " &#43; exp.Message);}}catch (Exception exp){Console.WriteLine(exp.Message);}}/// /// 接收消息触发事件/// /// private static void MqttApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e){try{string text &#61; Encoding.UTF8.GetString(e.ApplicationMessage.Payload);string Topic &#61; e.ApplicationMessage.Topic;string QoS &#61; e.ApplicationMessage.QualityOfServiceLevel.ToString();string Retained &#61; e.ApplicationMessage.Retain.ToString();Console.WriteLine("MessageReceived >>Topic:" &#43; Topic &#43; "; QoS: " &#43; QoS &#43; "; Retained: " &#43; Retained &#43; ";");Console.WriteLine("MessageReceived >>Msg: " &#43; text);}catch (Exception exp){Console.WriteLine(exp.Message);}}}
}
五、测试
1、启动
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;namespace CSDN
{class Program{static void Main(string[] args){HOSMQTT.Start();}}
}
2、使用Paho 工具发布消息
设置服务器IP和端口
设置用户名密码
连接服务
发布消息
监控订阅结果