作者:mobiledu2502886233 | 来源:互联网 | 2023-08-20 17:38
==pom依赖==
org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.2.1
==Java样例代码==
package com.rexel.core.common.utils.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* MQTT Utils
*
* @author admin
* @date 2022-2-24
*/
public class MqttUtils {
public static void main(String[] args) {
String serverUrl = "tcp://36.137.154.92:1883";
String clientId = "client111";
String username = "root";
String password = "test123";
String topic = "qch_test1";
try {
MqttConnectOptions options = new MqttConnectOptions();
// 设置连接的用户名
options.setUserName(username);
// 设置连接的密码
options.setPassword(password.toCharArray());
// 设置是否清空session
// 设置为false表示服务器会保留客户端的连接记录
// 设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(false);
// 设置会话心跳时间
options.setAutomaticReconnect(true);
// 设置超时时间,单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间,单位为秒
// 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// host:服务连接地址
// clientIId:连接MQTT的客户端ID,一般以唯一标识符表示
// MemoryPersistence:clientId的保存形式,默认为以内存保存
MqttClient client = new MqttClient(serverUrl, clientId, new MemoryPersistence());
// 设置回调
MyMqttCallback callback = new MyMqttCallback(topic, client);
client.setCallback(callback);
// 建立连接
client.connect(options);
// 发送测试消息
MqttMessage pubMsg = new MqttMessage();
pubMsg.setPayload("我的第一个MQTT消息".getBytes());
pubMsg.setQos(2);
// 当订阅消费端服务器重新连接MQTT服务器时,总能拿到该主题最新消息, 这个时候我们需要把retained设置为true;
// 当订阅消费端服务器重新连接MQTT服务器时,不能拿到该主题最新消息,只能拿连接后发布的消息,这个时候我们需要把 retained设置为false;
pubMsg.setRetained(true);
client.publish(topic, pubMsg);
} catch (MqttException e) {
e.printStackTrace();
}
}
static class MyMqttCallback implements MqttCallbackExtended {
private final String topic;
private final MqttClient client;
public MyMqttCallback(String topic, MqttClient client) {
this.topic = topic;
this.client = client;
}
@Override
public void connectComplete(boolean b, String s) {
System.out.println("连接成功");
// 订阅消息
// QoS0,At most once,至多一次;
// QoS1,At least once,至少一次;
// QoS2,Exactly once,确保只有一次。
int[] qos = {2};
String[] topics = {topic};
try {
client.subscribe(topics, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void connectionLost(Throwable throwable) {
System.out.println("连接断开");
}
@Override
public void messageArrived(String topics, MqttMessage message) {
System.out.println("接收消息主题:" + topics);
System.out.println("接收消息Qos:" + message.getQos());
System.out.println("接收消息内容:" + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
}
}
==执行结果==
连接成功
deliveryComplete---------true
接收消息主题:qch_test1
接收消息Qos:2
接收消息内容:我的第一个MQTT消息