MQTT协议就不多说了,百度一下很多,官网 mqtt.org上也有很多说明. 今天记录的是在物联网设备上连接mosquitto, 发布消息。 JAVA写的应用程序订阅设备发送过来的消息。就本身应用来说是很简单的,与通常用的MQ没多大差别。我关注的重点是 mosquitto 对离线消息的处理。通常网上的例子是没有这些的. 看了下官网文档。还有fusesource客户端mqtt-client API,发现是可以很简单实现的。记录下过程。
1. 修改配置文件 mosquitto.confpersistence true
persistence_file mosquitto.db
persistence_location d:/tmp/mosquitto
max_queued_messages 100000
2. 客户端publish消息时需要设置clientID以及clean session设置为false
public static void main(String[] args) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost(MQTTConstants.HOST, MQTTConstants.PORT);
mqtt.setCleanSession(false);
mqtt.setClientId("AH-JAVA-CLIENT-PUB-001");
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
CountDownLatch count = new CountDownLatch(88);
for(int i&#61;0; i<88; i&#43;&#43;) {
String message &#61; "Message_Test_" &#43; i;
connection.publish(MQTTConstants.TOPIC_001, message.getBytes(), QoS.AT_LEAST_ONCE, false);
System.out.println("publish: " &#43; message);
count.countDown();
//Thread.sleep(10000);
}
count.await();
connection.disconnect();
}
3. 客户端订阅消息时需要设置clientID以及clean session设置为false
public static void main(String[] args) throws Exception {
MQTT mqtt &#61; new MQTT();
mqtt.setClientId("AH-JAVA-CLIENT-100");
mqtt.setCleanSession(false);
mqtt.setHost(MQTTConstants.HOST, MQTTConstants.PORT);
BlockingConnection connection &#61; mqtt.blockingConnection();
connection.connect();
Topic[] topics &#61; {new Topic(MQTTConstants.TOPIC_001, QoS.AT_LEAST_ONCE), new Topic("$SYS/broker/clients/total", QoS.AT_LEAST_ONCE)};
connection.subscribe(topics);
int i &#61; 0;
while(true){
Message message &#61; connection.receive();
if(message!&#61;null){
System.out.println("Received messages. topic: " &#43; message.getTopic() &#43; "---" &#43; new String(message.getPayload()));
message.ack();
i&#43;&#43;;
if (i>&#61; 50) {
break;
}
}
}
}
用的JAVA写的测试&#xff0c;但真实场景设备那边是C语言的API&#xff0c; 应该类似处理就可以。java, pom包如下&#xff1a;
org.fusesource.mqtt-client
mqtt-client
1.12