作者:枇杷语1314 | 来源:互联网 | 2023-07-22 10:33
简介EclipsePahoJavaClient(opensnewwindow)是用Java编写的MQTT客户端库(MQTTJavaClient),可用于JVM或其他Java兼容平台
简介
Eclipse Paho Java Client (opens new window)是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如Android)。
Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 异步和同步 API。
emqx官方文档中有相关介绍:https://docs.emqx.cn/enterprise/v4.3/development/java.html#通过-maven-安装-paho-java
paho客户端对象初始化
先导入paho依赖:
org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.2.2
配置MqttProperties.java:
@ConfigurationProperties("mqtt")
@Component
@Data
public class MqttProperties {
private String brokerUrl;
private String clientId;
private String username;
private String password;
}
application.yml
mqtt:
broker-url: tcp://192.168.40.128:1883
client-id: emq-client
username: admin
password: public
EmqClient.java
@Component
public class EmqClient {
private static final Logger logger = LoggerFactory.getLogger(EmqClient.class);
@Autowired
private MqttProperties mqttProperties;
private IMqttClient mqttClient;
@PostConstruct
public void init(){
MemoryPersistence memoryPersistence = new MemoryPersistence();
try {
mqttClient = new MqttClient(mqttProperties.getBrokerUrl(),
mqttProperties.getClientId(),
memoryPersistence);
} catch (MqttException e) {
e.printStackTrace();
logger.error("mqttClient 创建失败");
}
}
}
编写客户端一些方法
创建Qos枚举类:
public enum QosEnum {
Qos0(0),Qos1(1),Qos2(2);
private final int value;
QosEnum(int value) {
this.value = value;
}
public int value(){
return this.value;
}
}
@Autowired
private MqttCallback mqttCallback;
public void connect(String username, String password){
MqttConnectOptions optiOns= new MqttConnectOptions();
options.setAutomaticReconnect(true);//自动重连
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setCleanSession(true);//临时会话
mqttClient.setCallback(mqttCallback);//设置方法回调
try {
mqttClient.connect(options);
} catch (MqttException e) {
e.printStackTrace();
logger.error("mqttClient 连接失败");
}
}
@PreDestroy
public void disConnect(){
try {
mqttClient.disconnect();
} catch (MqttException e) {
e.printStackTrace();
logger.error("mqttClient 断开连接失败");
}
}
public void reConnect(){
try {
mqttClient.reconnect();
} catch (MqttException e) {
e.printStackTrace();
logger.error("mqttClient 重新连接失败");
}
}
/**
* 发布消息
* @author wen.jie
* @date 2021/7/27 16:11
*/
public void publish(String topic, String msg, QosEnum qosEnum, boolean retained){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(msg.getBytes());
mqttMessage.setQos(qosEnum.value());
mqttMessage.setRetained(retained);
try {
mqttClient.publish(topic, mqttMessage);
} catch (MqttException e) {
e.printStackTrace();
logger.error("发布消息失败");
}
}
/**
* 添加订阅
* @author wen.jie
* @date 2021/7/27 16:18
*/
public void subscribe(String topicFilters, QosEnum qosEnum){
try {
mqttClient.subscribe(topicFilters, qosEnum.value());
} catch (MqttException e) {
e.printStackTrace();
logger.error("添加订阅失败");
}
}
编写MqttCallback回调类
MessageCallback.java
@Component
public class MessageCallback implements MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(MessageCallback.class);
/**
* 丢失了对服务端连接后的回调
* @author wen.jie
* @date 2021/7/27 16:27
*/
@Override
public void connectionLost(Throwable cause) {
//丢失对服务端的连接后触发该方法回调,此处可以做一些特殊处理,比如重连
logger.info("丢失了对服务端连接");
}
/**
* 订阅到消息后的回调
* 该方法由mqtt客户端同步调用,在此方法未正确返回之前,不会发送ack确认消息到broker
* 一旦该方法向外抛出了异常客户端将异常关闭,当再次连接时;所有QoS1,QoS2且客户端未进行ack确认的消息都将由broker服务器再次发送到客户端
* @author wen.jie
* @date 2021/7/27 16:28
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("订阅到了消息;topic={},messageid={},qos={},msg={}",
topic,
message.getId(),
message.getQos(),
new String(message.getPayload()));
}
/**
* 消息发布完成且收到ack确认后的回调
* QoS0:消息被网络发出后触发一次
* QoS1:当收到broker的PUBACK消息后触发
* QoS2:当收到broker的PUBCOMP消息后触发
* @author wen.jie
* @date 2021/7/27 16:34
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
int messageId = token.getMessageId();
String[] topics = token.getTopics();
logger.info("消息发送完成,messageId={},topics={}",messageId,topics);
}
}
主启动类
@SpringBootApplication
public class EmqDemoApplication {
@Autowired
private EmqClient emqClient;
@Autowired
private MqttProperties mqttProperties;
@PostConstruct
public void init(){
emqClient.connect(mqttProperties.getUsername(), mqttProperties.getPassword());
emqClient.subscribe("topictest/#", QosEnum.Qos2);
new Thread(()->{
while (true){
emqClient.publish("topictest/123", "hello world", QosEnum.Qos2, true);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
public static void main(String[] args) {
SpringApplication.run(EmqDemoApplication.class, args);
}
}
测试
启动主启动类,观察效果
每五秒钟收发一次消息,测试成功