热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【MQTT】SpringBoot集成MQTT

写在前面的话:计划梳理MQTT集成至Java、Vue的系列文档,详见收录专栏。该示例文章,已将相关方法封装至工具类,已实现断

写在前面的话:

        计划梳理MQTT集成至Java、Vue的系列文档,详见收录专栏。

        该示例文章,已将相关方法封装至工具类,已实现断线重连,已将相关参数提取至配置文件。

-- 真积力久则入



 

目录

一、前情提要

二、环境说明

三、Pom文件集成

四、代码集成

1、在配置文件中,预设参数值

2、新建枚举类,用于配置默认订阅话题

3、MQTT配置类

4、MQTT回调类

5、MQTT工具类封装

6、设置项目系统,进行MQTT初始化

7、新建测试Controller

五、基于MQTTX,联调测试

1、测试Java项目sub

2、测试Java项目Pub

六、关于断线重连趟过的一些坑

1、可能出现断线的几种情况

2、断线重连的方案




一、前情提要

【MQTT】Linux(CentOS 7.5):通过docker安装MQTT_Francis X的博客-CSDN博客

【MQTT】Windows:安装MQTT_Francis X的博客-CSDN博客



二、环境说明

        开发平台:windows

        开发工具:IDEA

        联测工具:MQTTX

        SpringBoot版本:2.2.3.RELEASE



三、Pom文件集成

org.springframework.integrationspring-integration-streamorg.springframework.bootspring-boot-starter-integrationorg.springframework.integrationspring-integration-mqtt


四、代码集成

1、在配置文件中,预设参数值

# Mqtt Config
# 以下配置,详见 前情提要
# client_id 是设置一个前缀,在应用的时候需要加上时间戳,以此保证唯一性
spring.mqtt.url=tcp://192.168.3.30:1882
spring.mqtt.username=admin
spring.mqtt.password=public
spring.mqtt.client.id=code_dev

/****************************************************** Mqtt Config* 备注:以下刻意将 defaultClientId、clientId 区分使用。* 防止断线重连的时候,clientId 被重复拼接时间戳*** @author Francis* @date 2022/4/27 22:05* @version 1.0**************************************************/
@Getter
@Configuration
public class MqttProperties {@ApiModelProperty("服务端地址")@Value("${spring.mqtt.url}")private String hostUrl;@ApiModelProperty("用户名")@Value("${spring.mqtt.username}")private String username;@ApiModelProperty("密码")@Value("${spring.mqtt.password}")private String password;@ApiModelProperty("初始化的客户端id")@Value("${spring.mqtt.client.id}")private String defaultClientId;@ApiModelProperty("客户端id")private String clientId;public String getClientId() {clientId = defaultClientId + Constants.DEFAULT_SPLIT_SYMBOL + System.currentTimeMillis();return clientId;}
}

2、新建枚举类,用于配置默认订阅话题

/****************************************************** 默认订阅的话题 -- 枚举类* 备注:此类不可为空,若无默认订阅话题,MQTT在连接上后,会立即断开。*** @author Francis* @date 2022/7/27 10:14* @version 1.0**************************************************/
@Getter
@AllArgsConstructor
@ApiModel("默认订阅的话题 -- 枚举类")
public enum DefineSubTopicEnum {// 设备输出订阅TEST("test", 0);private final String topic;private final int qos;/*** 获取所有话题名* * @return topicArr*/public static String[] queryAllTopic() {List topicList = new ArrayList<>();for (DefineSubTopicEnum item : DefineSubTopicEnum.values()) {topicList.add(item.getTopic());}String[] topicArr = new String[topicList.size()];topicArr = topicList.toArray(topicArr);return topicArr;}/*** 获取所有qos* * @return qosArr*/public static int[] queryAllQos() {List qosList = new ArrayList<>();for (DefineSubTopicEnum item : DefineSubTopicEnum.values()) {qosList.add(item.getQos());}int[] qosArr = new int[qosList.size()];qosArr = qosList.stream().mapToInt(Integer::intValue).toArray();return qosArr;}}

3、MQTT配置类

/****************************************************** Mqtt 工厂类*** @author Francis* @date 2022/4/28 21:55* @version 1.0**************************************************/
@Slf4j
@Component
public class MqttFactory {@Autowiredprivate MqttProperties config;private static MqttFactory factory;private static MqttClient client;@PostConstructpublic void init() {factory = this;factory.cOnfig= this.config;}/*** 获取客户端实例* 单例模式:存在即返回,不存在则初始化** @return client* @throws MqttException 此处刻意抛出异常,否则无法执行断线重连*/public static MqttClient getInstance() throws MqttException {if (client == null) {connect();}return client;}/*** 清空客户端实例* 当 mqtt 断开连接时,需清空 clientId,再执行断线重连*/public static void clear() {client = null;}/*** 断线重连方法*/public static void reconnect() {int count = 0;while (true) {clear();++ count;try {log.info("----------------即将执行自动重连----------------");getInstance();log.info("----------------自动重连成功----------------");break;} catch (MqttException e) {log.error("----------------自动重连失败,当前为第 {} 次尝试----------------", count);try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException ex) {log.error("----------------自动重连,休眠失败!----------------", e);}}}}/*** 客户端连接服务端** @throws MqttException 此处刻意抛出异常,否则无法执行断线重连*/private static void connect() throws MqttException {// 创建MQTT客户端对象client = new MqttClient(factory.config.getHostUrl(), factory.config.getClientId(), new MemoryPersistence());// 连接设置MqttConnectOptions optiOns= new MqttConnectOptions();// 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息// 设置为true表示每次连接服务器都是以新的身份options.setCleanSession(true);// 设置连接用户名options.setUserName(factory.config.getUsername());// 设置连接密码options.setPassword(factory.config.getPassword().toCharArray());// 设置超时时间,单位为秒options.setConnectionTimeout(100);// 设置心跳时间 单位为秒,表示服务器每隔 20 秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(20);// 设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息options.setWill("willTopic", (factory.config.getClientId() + "与服务器断开连接").getBytes(), 0, false);// 设置回调client.setCallback(new MqttCallBack());client.connect(options);// 设置默认订阅主题// 消息等级,与主题数组一一对应int[] qos = DefineSubTopicEnum.queryAllQos();// 主题String[] topics = DefineSubTopicEnum.queryAllTopic();// 订阅主题client.subscribe(topics, qos);}}

4、MQTT回调类

/****************************************************** Mqtt 回调*** @author Francis* @since 2022/4/27 22:11* @version 1.0**************************************************/
@Slf4j
@Configuration
public class MqttCallBack implements MqttCallback {/*** 与服务器断开的回调*/@Overridepublic void connectionLost(Throwable throwable) {log.info("MQTT断开了与服务端的连接。考虑是否服务端掉线 or 回调参数解析报错 or 无默认sub");// 执行自动重连MqttFactory.reconnect();}/*** 消息到达的回调** @param topic 话题* @param mqttMessage 消息内容*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {String msg = new String(mqttMessage.getPayload());log.info("已获取返回数据,当前数据为:{}", msg);// 自己的业务处理}/*** 消息发布成功的回调** @param token token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {IMqttAsyncClient client = token.getClient();log.info("{}:消息发布成功!", client.getClientId());}}

5、MQTT工具类封装

/****************************************************** Mqtt工具类*** @author Francis* @date 2022/4/28 21:54* @version 1.0**************************************************/
public class MqttUtils {/*** 发布消息** @param qos 0-至多1次、1-至少1次、2-一次* @param retained 是否保留:true-sub重新连接mqtt服务端时,总能拿到该主题的最新消息、false-sub重新连接mqtt服务端时,只能拿到连接后发布的消息* @param topic 话题* @param message 消息内容*/public static void pub(int qos, boolean retained, String topic, String message) throws MqttException {// 获取客户端实例MqttClient client = MqttFactory.getInstance();MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(qos);mqttMessage.setRetained(retained);mqttMessage.setPayload(message.getBytes());// 主题的目的地,用于发布/订阅信息MqttTopic mqttTopic = client.getTopic(topic);// 提供一种机制来跟踪消息的传递进度// 用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度MqttDeliveryToken token;try {// 将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态// 一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。token = mqttTopic.publish(mqttMessage);token.waitForCompletion();} catch (MqttException e) {e.printStackTrace();}}/*** 订阅话题** @param topic 话题* @param qos 0-至多1次、1-至少1次、2-一次*/public static void sub(String topic, int qos) throws MqttException {// 获取客户端实例MqttClient client = MqttFactory.getInstance();client.subscribe(topic, qos);}/*** 断开连接*/public static void disConnect() {try {// 获取客户端实例MqttClient client = MqttFactory.getInstance();client.disconnect();} catch (MqttException e) {e.printStackTrace();}}
}

6、设置项目系统,进行MQTT初始化

        注:此处按需取舍。图示只粘贴了关于MQTT初始化的核心代码。

@EnableScheduling
@EnableTransactionManagement
@MapperScan("com.francis.modules.*.mapper")
@SpringBootApplication(exclude = {DruidDataSourceAutoConfigure.class})
public class Application implements ApplicationListener {public static void main(String[] args) {SpringApplication.run(Application.class, args);}@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {try {// 初始化Mqtt连接MqttFactory.getInstance();} catch (MqttException e) {// 连接失败,疑似 mqtt服务端 掉线,开启线程,执行自动重连MqttReconnectThread thread = new MqttReconnectThread();FutureTask result = new FutureTask<>(thread);ExecutorService executorService = Executors.newFixedThreadPool(10);executorService.execute(new Runnable() {@Overridepublic void run() {result.run();}});}}static class MqttReconnectThread implements Callable {@Overridepublic String call() throws Exception {MqttFactory.reconnect();return null;}}
}

7、新建测试Controller

/****************************************************** Mqtt测试 -- controller*** @author Francis* @date 2022/4/28 22:28* @version 1.0**************************************************/
@Slf4j
@RestController
@RequestMapping("/mqtt")
public class MqttTestController {@PostMapping("/pub")public JsonResult pub(int qos, boolean retained, String topic, String msg) {try {MqttUtils.pub(qos, retained, topic, msg);return JsonResult.success("发送成功!");} catch (Exception e) {log.error("消息发送失败!", e);return JsonResult.success("发送失败!");}}@PostMapping("/sub")public JsonResult sub(String topic, int qos) throws MqttException {MqttUtils.sub(topic, qos);return JsonResult.success("动态订阅成功!");}}


五、基于MQTTX,联调测试

1、测试Java项目sub

2、测试Java项目Pub



六、关于断线重连趟过的一些坑

1、可能出现断线的几种情况

1、clientId冲突,后来的会把先来的干掉。此处通过固定clientId前缀 + 时间戳避免该问题,实现clientId的唯一。
2、Mqtt回调解析,若解析过程中有代码报错,且没有try...catch...,会出现掉线情况。
3、Mqtt服务端断开,会出现掉线的情况。
4、Mqtt连接后,没有默认订阅话题,会在连接好后,立即断开。

2、断线重连的方案

1、在断线后,需先清空MqttFactory的client,否则再次连接时,client依旧存在,不会重新走connect方法。
2、断线重连的重试的方案选择:a. 可通过配置文件,设置断线重连的重试次数。因项目业务需求,故本文中未采用该方案。但在count处,已预留相关拓展口。b. 通过休眠固定时间,永久请求重连,直到连接成功。本文中采用此方案。另:休眠时间可提取至配置文件。


推荐阅读
  • 使用freemaker生成Java代码的步骤及示例代码
    本文介绍了使用freemaker这个jar包生成Java代码的步骤,通过提前编辑好的模板,可以避免写重复代码。首先需要在springboot的pom.xml文件中加入freemaker的依赖包。然后编写模板,定义要生成的Java类的属性和方法。最后编写生成代码的类,通过加载模板文件和数据模型,生成Java代码文件。本文提供了示例代码,并展示了文件目录结构。 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • 本文介绍了在SpringBoot中集成thymeleaf前端模版的配置步骤,包括在application.properties配置文件中添加thymeleaf的配置信息,引入thymeleaf的jar包,以及创建PageController并添加index方法。 ... [详细]
  • r2dbc配置多数据源
    R2dbc配置多数据源问题根据官网配置r2dbc连接mysql多数据源所遇到的问题pom配置可以参考官网,不过我这样配置会报错我并没有这样配置将以下内容添加到pom.xml文件d ... [详细]
  • AFNetwork框架(零)使用NSURLSession进行网络请求
    本文介绍了AFNetwork框架中使用NSURLSession进行网络请求的方法,包括NSURLSession的配置、请求的创建和执行等步骤。同时还介绍了NSURLSessionDelegate和NSURLSessionConfiguration的相关内容。通过本文可以了解到AFNetwork框架中使用NSURLSession进行网络请求的基本流程和注意事项。 ... [详细]
  • SpringBoot简单日志配置
     在生产环境中,只打印error级别的错误,在测试环境中,可以调成debugapplication.properties文件##默认使用logbacklogging.level.r ... [详细]
  • 本文介绍了一个Magento模块,其主要功能是实现前台用户利用表单给管理员发送邮件。通过阅读该模块的代码,可以了解到一些有关Magento的细节,例如如何获取系统标签id、如何使用Magento默认的提示信息以及如何使用smtp服务等。文章还提到了安装SMTP Pro插件的方法,并给出了前台页面的代码示例。 ... [详细]
  • 本文详细介绍了在Linux虚拟化部署中进行VLAN配置的方法。首先要确认Linux系统内核是否已经支持VLAN功能,然后配置物理网卡、子网卡和虚拟VLAN网卡的关系。接着介绍了在Linux配置VLAN Trunk的步骤,包括将物理网卡添加到VLAN、检查添加的VLAN虚拟网卡信息以及重启网络服务等。最后,通过验证连通性来确认配置是否成功。 ... [详细]
  • ps:写的第一个,不足之处,欢迎拍砖---只是想用自己的方法一步步去实现一些框架看似高大上的小功能(比如说模型中的toArraytoJsonsetAtt ... [详细]
  • 在Docker中,将主机目录挂载到容器中作为volume使用时,常常会遇到文件权限问题。这是因为容器内外的UID不同所导致的。本文介绍了解决这个问题的方法,包括使用gosu和suexec工具以及在Dockerfile中配置volume的权限。通过这些方法,可以避免在使用Docker时出现无写权限的情况。 ... [详细]
  • 安装mysqlclient失败解决办法
    本文介绍了在MAC系统中,使用django使用mysql数据库报错的解决办法。通过源码安装mysqlclient或将mysql_config添加到系统环境变量中,可以解决安装mysqlclient失败的问题。同时,还介绍了查看mysql安装路径和使配置文件生效的方法。 ... [详细]
  • 后台获取视图对应的字符串
    1.帮助类后台获取视图对应的字符串publicclassViewHelper{将View输出为字符串(注:不会执行对应的ac ... [详细]
  • Dockerfile构建镜像的指令和说明
    本文介绍了Dockerfile是用来构建镜像的文本文件,其中包含了构建镜像所需的指令和说明。通过创建一个Dockerfile文件并编写内容,可以快速创建自定义的镜像。文章还提供了一个示例,展示了如何使用Dockerfile创建一个本地构建的nginx镜像,并通过docker images命令查看镜像的版本。希望本文对大家的学习有所帮助,并希望大家多多支持编程笔记。 ... [详细]
  • VUE中引用路径的配置
    在vue项目开发中经常引用JS、CSS、IMG文件。当项目较大时文件层级很多,导致路径很长,我们可以通过在bulidwebpack.base.conf.js设置简便的引用路径一、 ... [详细]
  • {moduleinfo:{card_count:[{count_phone:1,count:1}],search_count:[{count_phone:4 ... [详细]
author-avatar
似水年华的梦想_818
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有