热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spring Boot MQTT Too many publishes in progress错误的解决方案_java

本文介绍Spring Boot MQTT Too many publishes in progress错误的解决方案,文章围绕主题展开详细的内容介绍,具有一定的参考价值,感兴趣

前言

最近项目中需要与andorid端进行交互,采用了MQTT消息进行通信,生产环境中偶尔会出现Too many publishesin progress(32202)的错误,严重的影响了正常功能的使用。

项目中采用的是Spring Boot2.0集成的MQTT引入的版本为1.2.0,消息发送用的是MessagingGateway的方式实现,不熟悉的朋友可以查看这篇文章Spring boot 集成 MQTT详情

原因分析

出现此问题的原因跟MQTT的Qos的设置有关,所以需要简单的介绍下Qos相关值的含义

0 最多一次的传输

发布者发送消息到服务器,没有确认消息,也不知道对方是否收到。

1 至少一次的传输

发布者发布消息保存消息,服务器(broker)接收到消息,服务器(broker)发送消息到订阅者,服务器(broker)回一个PUBACK信息到发布者让删除消息,然后订阅者接收消息后PUBACK给服务器让删除消息。如果失败了,在一段时间确认信息没有收到,发送方都会将消息头的DUP设置为1,然后再次发送消息,消息最少一次到达服务。例如网络延迟等问题,发布者重复发送消息,订阅者多次订阅会产生重复消息.

2 只有一次的传输

 Qos为2只是在1的基础上做了改进,在发布者发送到服务器之后多了消息的确认以及多了消息msgID的缓存,重复信息的去重。在服务器发送到订阅者之后也多了消息的确认。

项目中使用了MQTT发送消息的地方比较多,且一般都是以Qos为0,那么为什么Qos为0,在并发量比较大的情况下就会出现Too many publishesin progress(32202)的错误,报错的内容的源码如下:

当actualInFlight超出设置的maxInflight最大值时就会报此错误,那么具体是什么原因导致的呢?我们需要通过源码来分析此问题的原因。

源码分析

关于源码的阅读我们需要整理主线思路,MQTT发送消息主线分为消息push到缓存中和异步发送两部分。

MQTT的Push消息到缓存中时序图

MqttPahoMessageHandler的publish方法

说明: checkConection检查连接后,在发送消息。

MqttAsyncClient的publish方法

ClientComms的internalSend方法

ClientState的send方法

MqttPublish消息类型,继承了父类MqttWireMessage,而在MqttWireMessage的构造方法中将消息id设置为0

SaveToken的源码实现如下:

通过前面这几步的操作,消息已经放入到HashTable缓存中,准备异步发送。

异步发送消息时序图

说明:MqttAsyncClient的connect为客户端建立连接,兴趣的可以看下源码。

ClientComms的conncect方法

ConnectBG的run方法

CommsSender的run方法

1.从clientState中获取消息 2.通过消息id去hashtable中获取缓存消息 3.消息不为空,执行消息发送 4.调用notifySent方法删除消息,且actualInFlight执行递减操作。

CommsSender的notifySent方法

小结

在高并发的场景下,pendingMessage可能会添加多条数据,Qos设置为0的时候,存入tokens(Hashtable)中的key一直是0,当执行tokenStore.getToken在发送方法之后会remove所有数据,由于tokenStore中已经不存在值,因为已经被上一次已经全部remove了,当再次getToken的消息时获取会为空,不在发送信息,使得actualInFlight没有递减,所以才经过一段时间后actualInFlight就会超出设置最大值,从而报错。

//存放待发送消息的Vector数组
volatile private Vector pendingMessages;

解决方案

方案1:发送消息时设置为Qos=1

此方案虽然可以解决此问题,但存在如下的缺点:


  • 网络延迟时会发送重复消息问题,导致消费者重复消费,关于重复的消息解决需要进行相关的幂等性操作,增加了修改的复杂度和成本。

  • 发送消息需要进行消息确认,网络资源消耗过大。

方案2: 修改maxInflight的默认值,例如将其修改为50


```
MqttConnectOptions mqttCOnnectOptions= new MqttConnectOptions();
mqttConnectOptions.setMaxInflight(50);
```

此方案虽然修改比较简单,但是并没有从根本上解决问题,只是缓解了出现错误的时间,如果项目中并发量比较低,可以采用此方案解决。

方案3:将消息配置为多客户端模式

由于mqttMessageHandler只会引用一个paho客户端,所以才会想到增加客户端模式来提高并发量.需要重写MqttPahoMessageHandler类的相关方法。虽然可以解决此问题,如果对MQTT的源码不是很了解,不建议采用此方案,不利于后续的版本升级。

方案4:升级mqtt版本为1.2.1

在1.2.1的版本中官方已经进行了相关的修改,当qos=0已经不存入tokenStore了,每次发送完之后就会删除掉token以及释放id,所以就不会出现Too many publishes in progress的问题。

引入1.2.1的版本会带来https验证问题,因为在Mqtt的1.2.1版本中,增加了https的验证需要添加相关配置,否则启动时会报安全认证错误。

解决方案:如果项目中没有开启https认证,需要设置HttpsHostnameVerificationEnabled为false即可。

mqttConnectOptions.setHttpsHostnameVerificationEnabled(false);

总结

本文通过定位MQTT错误,详细的讲解了MQtt消息的发送流程,解决的方案虽然有多种,我们需要结合实际的业务情况来解决问题,关于MQtt如果还有其他疑问,可以随时反馈,大家共同学习,共同进步。


推荐阅读
  • 本文介绍了一种算法,用于在一个给定的二叉树中找到一个节点,该节点的子树包含最大数量的值小于该节点的节点。如果存在多个符合条件的节点,可以选择任意一个。 ... [详细]
  • 本文详细介绍如何在Spring Boot项目中集成和使用JPA,涵盖JPA的基本概念、Spring Data JPA的功能以及具体的操作步骤,帮助开发者快速掌握这一强大的持久化技术。 ... [详细]
  • 微信小程序支付官方参数小程序中代码后端发起支付代码支付回调官方参数文档地址:https:developers.weixin.qq.comminiprogramdeva ... [详细]
  • 时序数据是指按时间顺序排列的数据集。通过时间轴上的数据点连接,可以构建多维度报表,揭示数据的趋势、规律及异常情况。 ... [详细]
  • 本文详细解析了Java中流的概念,特别是OutputStream和InputStream的区别,并通过实际案例介绍了如何实现Java对象的序列化。文章不仅解释了流的基本概念,还探讨了序列化的重要性和具体实现步骤。 ... [详细]
  • 探讨了生成时间敏感的一次性伪随机密码的方法,旨在通过加入时间因素防止重放攻击。 ... [详细]
  • 优雅地记录API调用时长
    本文旨在探讨如何高效且优雅地记录API接口的调用时长,通过实际案例和代码示例,帮助开发者理解并实施这一技术,提高系统的可观测性和调试效率。 ... [详细]
  • SpringBoot底层注解用法及原理
    2.1、组件添加1、Configuration基本使用Full模式与Lite模式示例最佳实战配置类组件之间无依赖关系用Lite模式加速容器启动过程,减少判断配置类组 ... [详细]
  • 解决JavaWeb项目中因IPv6导致的IP转换错误
    本文探讨了在JavaWeb项目中,当尝试将客户端IP地址从字符串形式转换为整数时遇到的问题,并提供了详细的解决方案。具体问题表现为在本地环境中通过`request.getRemoteHost()`获取到的IP地址为IPv6格式,而非预期的IPv4格式。 ... [详细]
  • 圣诞夜的驯鹿之谜:鲁道夫是公是母?
    野生动物专家探讨圣诞老人座驾上的驯鹿性别之谜 ... [详细]
  • 本文探讨了在使用Apache Flink向Kafka发送数据过程中遇到的事务频繁失败问题,并提供了详细的解决方案,包括必要的配置调整和最佳实践。 ... [详细]
  • Web网络基础
    目录儿1使用HTTP协议访问Web2HTTP的诞生2.1因特网的起源2.2互联网、因特网与万维网2.3万维网与HTTP3网络基础TCPIP3.1TCPIP协议族3.2TCPIP的分 ... [详细]
  • 本文探讨了在Node.js环境中如何有效地捕获标准输出(stdout)的内容,并将其存储到变量中。通过具体的示例和解决方案,帮助开发者解决常见的输出捕获问题。 ... [详细]
  • 深入探讨Web服务器与动态语言的交互机制:CGI、FastCGI与PHP-FPM
    本文详细解析了Web服务器(如Apache、Nginx等)与动态语言(如PHP)之间通过CGI、FastCGI及PHP-FPM进行交互的具体过程,旨在帮助开发者更好地理解这些技术背后的原理。 ... [详细]
  • 本文介绍了如何使用Workman框架构建一个功能全面的即时通讯系统,该系统不仅支持一对一聊天、群组聊天,还集成了视频会议和实时音视频通话功能,同时提供了红包发送等附加功能。 ... [详细]
author-avatar
手机用户2502883075
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有