作者:萱筱璧 | 来源:互联网 | 2023-10-13 09:12
转载:基于Springboot实现Mqtt
Java端开发: pom.xml:
< dependency> < groupId> org.eclipse.paho groupId> < artifactId> org.eclipse.paho.client.mqttv3 artifactId> < version> 1.2.2 version> dependency>
Client.java
import org. eclipse. paho. client. mqttv3. *; import org. eclipse. paho. client. mqttv3. persist. MemoryPersistence; import org. slf4j. Logger; import org. slf4j. LoggerFactory; import org. springframework. beans. factory. annotation. Autowired; import org. springframework. stereotype. Component; import javax. annotation. PostConstruct; &#64;Component public class Client { Logger logger &#61; LoggerFactory. getLogger ( Client. class ) ; &#64;Autowired private OnMessageCallback onMessageCallback; public final String HOST &#61; "tcp://连接ip:端口号" ; public final String TOPIC &#61; "duilie" ; public final String clientId &#61; "jy" ; private MqttClient client; private MqttConnectOptions connOpts; private String userName &#61; "userName" ; private String passWord &#61; "passWord" ; &#64;PostConstruct public void init ( ) { start ( ) ; } public void start ( ) { try { client &#61; new MqttClient ( HOST, clientId, new MemoryPersistence ( ) ) ; connOpts &#61; new MqttConnectOptions ( ) ; connOpts. setCleanSession ( true ) ; connOpts. setUserName ( userName) ; connOpts. setPassword ( passWord. toCharArray ( ) ) ; connOpts. setConnectionTimeout ( 10 ) ; connOpts. setKeepAliveInterval ( 20 ) ; client. setCallback ( onMessageCallback) ; System. out. println ( "Connecting to broker: " &#43; HOST) ; int Qos &#61; 1 ; client. connect ( connOpts) ; client. subscribe ( TOPIC) ; } catch ( Exception e) { e. printStackTrace ( ) ; } } public void publish ( String topic, byte [ ] payload) { try { this . client. publish ( topic, payload, 1 , false ) ; } catch ( MqttException e) { e. printStackTrace ( ) ; } } public void close ( ) { try { this . client. close ( ) ; } catch ( MqttException e) { e. printStackTrace ( ) ; } } public void reConnect ( ) throws Exception { if ( this . client !&#61; null) { this . logger. info ( "开始重连" ) ; this . client. connect ( this . connOpts) ; int Qos &#61; 1 ; this . logger. info ( "主题&#xff1a;" &#43; TOPIC) ; this . client. subscribe ( TOPIC, Qos) ; } } }
OnMessageCallback.java 回调消息处理类OnMessageCallback&#xff1a;
import org. eclipse. paho. client. mqttv3. MqttCallback; import org. eclipse. paho. client. mqttv3. IMqttDeliveryToken; import org. eclipse. paho. client. mqttv3. MqttMessage; import org. springframework. beans. factory. annotation. Autowired; import org. springframework. stereotype. Component; &#64;Component public class OnMessageCallback implements MqttCallback { &#64;Autowired private ServiceImpl service; &#64;Autowired private Client client; public void connectionLost ( Throwable cause) { System. out. println ( "连接断开&#xff0c;可以做重连" ) ; while ( true ) { try { Thread. sleep ( 5000 L) ; this . client. reConnect ( ) ; break ; } catch ( Exception e) { e. printStackTrace ( ) ; } } } public void messageArrived ( String topic, MqttMessage message) throws Exception { System. out. println ( "接收消息主题:" &#43; topic) ; System. out. println ( "接收消息Qos:" &#43; message. getQos ( ) ) ; System. out. println ( "接收消息内容:" &#43; new String ( message. getPayload ( ) ) ) ; service. insert ( new String ( message. getPayload ( ) ) ) ; } public void deliveryComplete ( IMqttDeliveryToken token) { System. out. println ( "deliveryComplete---------" &#43; token. isComplete ( ) ) ; } }
这里加入了断线重连。在 messageArrived 方法中调用业务即可。
连接成功会在控制台可以看到连接的id Qos参数&#xff1a; level 0&#xff1a;最多一次的传输
消息是基于TCP/IP网络传输的。没有回应&#xff0c;在协议中也没有定义重传的语义。消息可能到达服务器1次&#xff0c;也可能根本不会到达。 level 1&#xff1a;至少一次的传输&#xff08;一般配置为1&#xff09;
服务器接收到消息会被确认&#xff0c;通过传输一个PUBACK信息。如果有一个可以辨认的传输失败&#xff0c;无论是通讯连接还是发送设备&#xff0c;还是过了一段时间确认信息没有收到&#xff0c;发送方都会将消息头的DUP位置1&#xff0c;然后再次发送消息。消息最少一次到达服务器。SUBSCRIBE和UNSUBSCRIBE都使用level 1 的QoS。 如果客户端没有接收到PUBACK信息&#xff08;无论是应用定义的超时&#xff0c;还是检测到失败然后通讯session重启&#xff09;&#xff0c;客户端都会再次发送PUBLISH信息&#xff0c;并且将DUP位置1。 当它从客户端接收到重复的数据&#xff0c;服务器重新发送消息给订阅者&#xff0c;并且发送另一个PUBACK消息。 level 2&#xff1a; 只有一次的传输
在QoS level 1上附加的协议流保证了重复的消息不会传送到接收的应用。这是最高级别的传输&#xff0c;当重复的消息不被允许的情况下使用。这样增加了网络流量&#xff0c;但是它通常是可以接受的&#xff0c;因为消息内容很重要。 QoS level 2在消息头有Message ID。 接收消息和写入数据库解藕&#xff1a;
这里是直接调用 server 写入数据库的写法。可以改成接收到消息写入到一个 BlockingQueue 里&#xff0c;再由业务层去取数。
遇见问题&#xff1a; clientId&#xff1a;是每一个去订阅连接的 client 的名称&#xff0c;注意不要和原有的的连接名重复。 Mqtt是不会消息堆积&#xff0c;也就是我这里设备一直发送消息&#xff0c;而我的java客户端断了5分钟&#xff0c;那么这5分钟的消息在重连时是获取不到的&#xff0c;只能获得之后的消息。&#xff08;具体的原理没有研究&#xff09;