热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

C#MQTT通讯实现

最近做了应该MQTT通讯的服务,参考了网上的一些例子,代码分享一下。MQTT通讯帮助类:usingSystem;usingSystem

最近做了应该MQTT通讯的服务,参考了网上的一些例子,代码分享一下。

MQTT通讯帮助类:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Core;
using MQTTnet.Core.Client;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using Newtonsoft.Json;

namespace MQTTService.Common
{
    ///


    /// MQTT通讯
    ///

    public partial class MQTTnetHelper : BaseJobLog
    {
        private  MqttClient mqttClient = null;

        public  bool ConnectState { get { return mqttClient.IsConnected; } }

        public  async Task ConnectMqttServerAsync(string url,int? port, string clientId, string username, string password)
        {
            if (mqttClient == null)
            {
                mqttClient = new MqttClientFactory().CreateMqttClient() as MqttClient;
                mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
                mqttClient.Connected += MqttClient_Connected;
                mqttClient.Disconnected += MqttClient_Disconnected;
            }

            try
            {
                var options = new MqttClientTcpOptions
                {
                    Server = url,
                    Port= port,
                    ClientId = clientId,
                    UserName = username,
                    Password = password,
                    CleanSession = true
                };

                await mqttClient.ConnectAsync(options);
            }
            catch (Exception ex)
            {
                var msg =$"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine;
            }
        }

        private  void MqttClient_Connected(object sender, EventArgs e)
        {
            var msg = "已连接到MQTT服务器!" + Environment.NewLine;
        }

        private  void MqttClient_Disconnected(object sender, EventArgs e)
        {
            var msg = "已断开MQTT连接!" + Environment.NewLine;
        }

        private  void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
        {
            var msg = $">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}";
        }

        private  void Subscribe_ClickAsync(object sender, EventArgs e)
        {
            mqttClient.SubscribeAsync(new List {
                new TopicFilter("", MqttQualityOfServiceLevel.AtMostOnce)
            });
        }

        public  Task DataPublishAsync(string topic, object data)
        {
            var strdata = JsonConvert.SerializeObject(data);
            var byteData = Encoding.UTF8.GetBytes(strdata);
            var appMsg = new MqttApplicationMessage(topic, byteData, MqttQualityOfServiceLevel.AtMostOnce, false);
            var  result = mqttClient.PublishAsync(appMsg);
            return result;

        }

    }
}
 

MQTT连接上报类:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using MQTTService.Models;
using Newtonsoft.Json;

namespace MQTTService.Common
{
    public class ConnectOrPushHelper : BaseJob
    {
        private MQTTnetHelper mqttHelper = new MQTTnetHelper();

        ///


        /// MQTT连接
        ///

        ///
        public bool MQTTConnect(DeviceAuthModel deviceAuthModel)
        {
            if (deviceAuthModel.IsNull()) return false;
            //MQTT连接
            var device = deviceAuthModel.deviceInfo;
            var mq = deviceAuthModel.mqInfo;
            var IP = CommonConfig.ApiIP_MQTT.IsNotNullOrWhiteSpace() ? CommonConfig.ApiIP_MQTT : mq.mqIp;
            var port = CommonConfig.ApiPort_MQTT.IsNotNullOrWhiteSpace() ? CommonConfig.ApiPort_MQTT : mq.mqPort;
            _log.Info(string.Format("MQTT连接,IP:{0},Port:{1},clientId:{2},mqUsername:{3},mqPassword:{4}", IP, port, device.deviceName + "@" + device.deviceSecret, mq.mqUsername, mq.mqPassword));
            var res = mqttHelper.ConnectMqttServerAsync(IP, int.Parse(port), device.deviceName + "@" + device.deviceSecret, mq.mqUsername, mq.mqPassword);
            System.Threading.Thread.Sleep(120);

            if (!mqttHelper.ConnectState)
            {
                _log.Info("MQTT连接失败,DeviceAuthModel:" + JsonConvert.SerializeObject(deviceAuthModel));
                return false;
            }
            else
            {
                return true;
            }
        }

        ///


        /// MQTT上报
        ///

        ///
        ///
        ///
        public bool Push(string url,string deviceName, T reportModel)
        {
            try
            {
                _log.Info(string.Format("MQTT推送数据,url:{0},数据:{1}", url, JsonConvert.SerializeObject(reportModel)));

                var result = mqttHelper.DataPublishAsync(url, reportModel);
                if (result.Status.ToString() == "Faulted")
                {
                    _log.Info(string.Format("MQTT推送数据失败,url:{0},错误信息:{1},推送数据:{2}", url, result.Exception.Message, JsonConvert.SerializeObject(reportModel)));
                    return false;
                }
                System.Threading.Thread.Sleep(50);
                return true;
            }
            catch (Exception ex)
            {
                _log.Info(string.Format("发布失败,编号:{0},异常信息:{1}", deviceName, ex.Message));
                return false;
            }
        }

    }
}
 

Http请求帮助类:

using log4net;
using Newtonsoft.Json;
using RestSharp;
using System;
using System.Diagnostics;

namespace MQTTService.Common
{
    public class BaseJob
    {
        private string host;
        public ILog _log;

        public BaseJob()
        {
            _log = LogManager.GetLogger(typeof(BaseJob));
        }

        public string Post(string url, object data)
        {
            var client = new RestClient(url);
            if (url.Contains("https://"))
                System.Net.ServicePointManager.ServerCertificateValidationCallback += (sender, certificate, chain, sslPolicyErrors) => true; //SSL证书验证
            var request = new RestRequest(Method.POST);
            try
            {
                var str = JsonConvert.SerializeObject(data);
                request.AddHeader("cache-control", "no-cache");
                request.AddHeader("content-type", "application/json");
                request.AddParameter("application/json", JsonConvert.SerializeObject(data), ParameterType.RequestBody);

                Stopwatch sw = new Stopwatch();
                sw.Start();
                var response = client.Execute(request);
                sw.Stop();
                if(response.ErrorMessage.IsNotNullOrWhiteSpace()) _log.Info(url + ",传参:" + JsonConvert.SerializeObject(data) + ",错误信息:" + response.ErrorMessage+ ",接口执行时间:" + sw.ElapsedMilliseconds);
                return response.Content;
            }
            catch (Exception e)
            {
                _log.Error(e);
                _log.Info(url + ",传参:" + JsonConvert.SerializeObject(request.Parameters));
                return string.Empty;
            }
        }

        public T Post(string url, object data) where T : new()
        {
            url = host + url;
            var client = new RestClient(url);
            var request = new RestRequest(Method.POST);
            try
            {
                request.AddHeader("cache-control", "no-cache");
                request.AddHeader("content-type", "application/json");
                request.AddParameter("application/json", JsonConvert.SerializeObject(data), ParameterType.RequestBody);

                Stopwatch sw = new Stopwatch();
                sw.Start();
                var response = client.Execute(request);
                sw.Stop();

                _log.Info(url + ",传参:" + JsonConvert.SerializeObject(data) + ",返回结果:" + response.Content + ",接口执行时间:" + sw.ElapsedMilliseconds);
                return response.Data;
            }
            catch (Exception e)
            {
                _log.Error(e);
                _log.Info(url + ",传参: " + JsonConvert.SerializeObject(request.Parameters));
                return default(T);
            }
        }

    }
}
 



推荐阅读
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • 本文介绍了计算机网络的定义和通信流程,包括客户端编译文件、二进制转换、三层路由设备等。同时,还介绍了计算机网络中常用的关键词,如MAC地址和IP地址。 ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • 利用Visual Basic开发SAP接口程序初探的方法与原理
    本文介绍了利用Visual Basic开发SAP接口程序的方法与原理,以及SAP R/3系统的特点和二次开发平台ABAP的使用。通过程序接口自动读取SAP R/3的数据表或视图,在外部进行处理和利用水晶报表等工具生成符合中国人习惯的报表样式。具体介绍了RFC调用的原理和模型,并强调本文主要不讨论SAP R/3函数的开发,而是针对使用SAP的公司的非ABAP开发人员提供了初步的接口程序开发指导。 ... [详细]
  • Java在运行已编译完成的类时,是通过java虚拟机来装载和执行的,java虚拟机通过操作系统命令JAVA_HOMEbinjava–option来启 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • 移动端常用单位——rem的使用方法和注意事项
    本文介绍了移动端常用的单位rem的使用方法和注意事项,包括px、%、em、vw、vh等其他常用单位的比较。同时还介绍了如何通过JS获取视口宽度并动态调整rem的值,以适应不同设备的屏幕大小。此外,还提到了rem目前在移动端的主流地位。 ... [详细]
  • 上图是InnoDB存储引擎的结构。1、缓冲池InnoDB存储引擎是基于磁盘存储的,并将其中的记录按照页的方式进行管理。因此可以看作是基于磁盘的数据库系统。在数据库系统中,由于CPU速度 ... [详细]
  • 在Kubernetes上部署JupyterHub的步骤和实验依赖
    本文介绍了在Kubernetes上部署JupyterHub的步骤和实验所需的依赖,包括安装Docker和K8s,使用kubeadm进行安装,以及更新下载的镜像等。 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • 如何在HTML中获取鼠标的当前位置
    本文介绍了在HTML中获取鼠标当前位置的三种方法,分别是相对于屏幕的位置、相对于窗口的位置以及考虑了页面滚动因素的位置。通过这些方法可以准确获取鼠标的坐标信息。 ... [详细]
author-avatar
ZZDXP
学 無
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有