作者:榜榜爱打球 | 来源:互联网 | 2023-07-09 13:46
ApacheapolloMqtt是一个基于TcpIp协议的消息代理服务器,在物联网方面应用十分广泛。Mqtt下载这是我的项目结构,PushCallBack.java是消息处理类,Serve
Apache apollo Mqtt 是一个基于Tcp/Ip协议的消息代理服务器,在物联网方面应用十分广泛。Mqtt 下载
这是我的项目结构,PushCallBack.java 是消息处理类,ServeMqtt.java 是Mqtt代理服务器的配置用的。各个文件内容如下:
public class PushCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.err.println("连接断开,可以做重连");
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
}
}
public class ServerMqtt {
public static final String HOST = "tcp://127.0.0.1:61613";
public static MqttClient client;
private MqttTopic topic11;
private String topicStr;
private String userName = "admin";
private String passWord = "password";
private MqttMessage message;
private static ServerMqtt serverMqtt ;
public static ServerMqtt getServerMQTTInstance() {
if (null != serverMqtt) {
return serverMqtt;
}
serverMqtt = new ServerMqtt();
return serverMqtt;
}
public void connect( MqttClient client) {
MqttConnectOptions optiOns= new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
try {
client.setCallback(new PushCallback());
client.connect(options);
MqttTopic topic = client.getTopic("154");
options.setWill(topic, "close".getBytes(), 2, true);
int[] Qos = { 1 };
String[] topic1 = { "1008611" };
client.subscribe(topic1, Qos);
} catch (Exception e) {
e.printStackTrace();
}
}
public static MqttClient getMqttClient(String clientId) throws MqttException {
if (client != null && clientId.equals(client.getClientId())) {
return client;
}
return new MqttClient(HOST, clientId);
}
public void MqttInit( String clientId) throws MqttException {
MqttClient client1 = ServerMqtt.getMqttClient(clientId);
if(this.client !=client1){
this.client = client1;
connect(client);
}
}
public void publish(String topicStr, JSONObject object) throws MqttPersistenceException, MqttException {
message = new MqttMessage();
message.setQos(2);
message.setRetained(true);
message.setPayload(object.toString().getBytes());
topic11 = client.getTopic(topicStr);
MqttDeliveryToken token = topic11.publish(message);
token.waitForCompletion();
System.out.println("message is published completely! " + token.isComplete());
}
}
test1.java 是测试类
note:
1、开始我看了很多的关于mqtt的博客,都只是实现了一些功能,但是在实际项目开发中,代码就不适用了,实际开发中,需要又很多不通的终端,它们的Clientd不尽相同,在这里要注意了,一个ClientId 只能同时new 一个Client出来,否者会把你之前的挤掉。