热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

基于Springboot实现Mqtt

转载:基于Springboot实现MqttJava端开发:pom.xml:

转载:基于Springboot实现Mqtt

Java端开发:
pom.xml:


<dependency><groupId>org.eclipse.pahogroupId><artifactId>org.eclipse.paho.client.mqttv3artifactId><version>1.2.2version>
dependency>

Client.java

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;&#64;Component
public class Client {Logger logger &#61; LoggerFactory.getLogger(Client.class);&#64;Autowiredprivate OnMessageCallback onMessageCallback;public final String HOST &#61; "tcp://连接ip:端口号"; //连接ippublic final String TOPIC &#61; "duilie"; //订阅队列public final String clientId &#61; "jy"; //连接id&#xff0c;注意不要和已有的连接id重复private MqttClient client;private MqttConnectOptions connOpts;private String userName &#61; "userName";private String passWord &#61; "passWord";&#64;PostConstructpublic void init() {start();}public void start() {try {client &#61; new MqttClient(HOST, clientId, new MemoryPersistence());connOpts &#61; new MqttConnectOptions();connOpts.setCleanSession(true); // 清空sessionconnOpts.setUserName(userName);connOpts.setPassword(passWord.toCharArray());connOpts.setConnectionTimeout(10);// 设置超时时间connOpts.setKeepAliveInterval(20);// 设置会话心跳时间client.setCallback(onMessageCallback); // 设置回调函数System.out.println("Connecting to broker: " &#43; HOST);int Qos &#61; 1;client.connect(connOpts); // 创立连接client.subscribe(TOPIC); // 订阅} catch (Exception e) {e.printStackTrace();}}public void publish(String topic, byte[] payload) {try {this.client.publish(topic, payload, 1, false);} catch (MqttException e) {e.printStackTrace();}}public void close() {try {this.client.close();} catch (MqttException e) {e.printStackTrace();}}public void reConnect() throws Exception {if (this.client !&#61; null) {this.logger.info("开始重连");this.client.connect(this.connOpts);int Qos &#61; 1;this.logger.info("主题&#xff1a;" &#43; TOPIC);this.client.subscribe(TOPIC, Qos);}}}

OnMessageCallback.java
回调消息处理类OnMessageCallback&#xff1a;

import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;&#64;Component
public class OnMessageCallback implements MqttCallback {&#64;Autowiredprivate ServiceImpl service; //修改成自己的业务&#64;Autowiredprivate Client client;public void connectionLost(Throwable cause) {// 连接丢失后&#xff0c;一般在这里面进行重连System.out.println("连接断开&#xff0c;可以做重连");while (true) {try {Thread.sleep(5000L);this.client.reConnect();break;} catch (Exception e) {e.printStackTrace();}}}public void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息会执行到这里面System.out.println("接收消息主题:" &#43; topic);System.out.println("接收消息Qos:" &#43; message.getQos());System.out.println("接收消息内容:" &#43; new String(message.getPayload()));service.insert(new String(message.getPayload())); //修改成自己的业务}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" &#43; token.isComplete());}
}

这里加入了断线重连。在 messageArrived 方法中调用业务即可。

连接成功会在控制台可以看到连接的id
在这里插入图片描述
Qos参数&#xff1a;
level 0&#xff1a;最多一次的传输

消息是基于TCP/IP网络传输的。没有回应&#xff0c;在协议中也没有定义重传的语义。消息可能到达服务器1次&#xff0c;也可能根本不会到达。
level 1&#xff1a;至少一次的传输&#xff08;一般配置为1&#xff09;

服务器接收到消息会被确认&#xff0c;通过传输一个PUBACK信息。如果有一个可以辨认的传输失败&#xff0c;无论是通讯连接还是发送设备&#xff0c;还是过了一段时间确认信息没有收到&#xff0c;发送方都会将消息头的DUP位置1&#xff0c;然后再次发送消息。消息最少一次到达服务器。SUBSCRIBE和UNSUBSCRIBE都使用level 1 的QoS。
如果客户端没有接收到PUBACK信息&#xff08;无论是应用定义的超时&#xff0c;还是检测到失败然后通讯session重启&#xff09;&#xff0c;客户端都会再次发送PUBLISH信息&#xff0c;并且将DUP位置1。
当它从客户端接收到重复的数据&#xff0c;服务器重新发送消息给订阅者&#xff0c;并且发送另一个PUBACK消息。
level 2&#xff1a; 只有一次的传输

在QoS level 1上附加的协议流保证了重复的消息不会传送到接收的应用。这是最高级别的传输&#xff0c;当重复的消息不被允许的情况下使用。这样增加了网络流量&#xff0c;但是它通常是可以接受的&#xff0c;因为消息内容很重要。
QoS level 2在消息头有Message ID。
接收消息和写入数据库解藕&#xff1a;

这里是直接调用 server 写入数据库的写法。可以改成接收到消息写入到一个 BlockingQueue 里&#xff0c;再由业务层去取数。

遇见问题&#xff1a;
clientId&#xff1a;是每一个去订阅连接的 client 的名称&#xff0c;注意不要和原有的的连接名重复。
Mqtt是不会消息堆积&#xff0c;也就是我这里设备一直发送消息&#xff0c;而我的java客户端断了5分钟&#xff0c;那么这5分钟的消息在重连时是获取不到的&#xff0c;只能获得之后的消息。&#xff08;具体的原理没有研究&#xff09;


推荐阅读
  • springMVC JRS303验证 ... [详细]
  • 使用JS、HTML5和C3创建自定义弹出窗口
    本文介绍如何结合JavaScript、HTML5和C3.js来实现一个功能丰富的自定义弹出窗口。通过具体的代码示例,详细讲解了实现过程中的关键步骤和技术要点。 ... [详细]
  • 本文探讨了如何通过一系列技术手段提升Spring Boot项目的并发处理能力,解决生产环境中因慢请求导致的系统性能下降问题。 ... [详细]
  • 烤鸭|本文_Spring之Bean的生命周期详解
    烤鸭|本文_Spring之Bean的生命周期详解 ... [详细]
  • 深入解析Java枚举及其高级特性
    本文详细介绍了Java枚举的概念、语法、使用规则和应用场景,并探讨了其在实际编程中的高级应用。所有相关内容已收录于GitHub仓库[JavaLearningmanual](https://github.com/Ziphtracks/JavaLearningmanual),欢迎Star并持续关注。 ... [详细]
  • Redux入门指南
    本文介绍Redux的基本概念和工作原理,帮助初学者理解如何使用Redux管理应用程序的状态。Redux是一个用于JavaScript应用的状态管理库,特别适用于React项目。 ... [详细]
  • 深入理解Java多线程并发处理:基础与实践
    本文探讨了Java中的多线程并发处理机制,从基本概念到实际应用,帮助读者全面理解并掌握多线程编程技巧。通过实例解析和理论阐述,确保初学者也能轻松入门。 ... [详细]
  • 本文详细介绍了Java中实现异步调用的多种方式,包括线程创建、Future接口、CompletableFuture类以及Spring框架的@Async注解。通过代码示例和深入解析,帮助读者理解并掌握这些技术。 ... [详细]
  • docker镜像重启_docker怎么启动镜像dock ... [详细]
  • Java EE CDI:解决依赖关系冲突的实例
    在本教程中,我们将探讨如何在Java EE的CDI(上下文和依赖注入)框架中有效解决依赖关系的冲突问题。通过学习如何使用限定符,您将能够为应用程序的不同客户端提供多种接口实现,并确保每个客户端都能正确调用其所需的实现。 ... [详细]
  • Flowable 6.6.0 表单引擎在Web应用中的集成与使用
    本文档提供了Flowable 6.6.0版本中表单引擎在Web应用程序中的配置和使用指南,包括表单引擎的初始化、配置以及在Web环境下的具体实现方法。 ... [详细]
  • 本文介绍了在Java环境中使用PDFBox和XPDF工具从PDF文件中提取文本内容的方法。重点讨论了处理中文字符集及解决相关错误的技术细节,特别是针对某些特定格式的PDF文件(如网上填写的报名表和下载的论文)遇到的问题及解决方案。 ... [详细]
  • 全面解析运维监控:白盒与黑盒监控及四大黄金指标
    本文深入探讨了白盒和黑盒监控的概念,以及它们在系统监控中的应用。通过详细分析基础监控和业务监控的不同采集方法,结合四个黄金指标的解读,帮助读者更好地理解和实施有效的监控策略。 ... [详细]
  • 实用正则表达式有哪些
    小编给大家分享一下实用正则表达式有哪些,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下 ... [详细]
  • 12月16日JavaScript变量、函数、流程、循环等***线上九期班
    12月16日JavaScript变量、函数、流程、循环等***线上九期班 ... [详细]
author-avatar
萱筱璧
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有