作者:霹靂一頁書_629 | 来源:互联网 | 2023-06-19 14:49
近期,公司准备使用阿里MQTT,自己做了一个demo。
准备工作
首先,你歹有一个阿里云帐号,去阿里云注册一个MQTT实例,现在有免费试用一个月的
在这我们要记得尤其记一下我们购买的实例是买的哪的 比如我的这个是深圳 可以写进配置文件中
public final static String instanceId = "";public final static String accessKey= "";public final static String secretKey= "";public final static String cOnnectEndpoint= "";;public final static String topicId ="demo";public final static String groupId ="GID_demo";public final static String messageModel="5000";public final static String sendMsgTimeoutMillis="5000";public final static String suspendTimeMillis="5000";/*** 消息消费失败时的最大重试次数*/public final static String maxRecOnsumeTimes="10";/*** QoS参数代表传输质量,可选0,1,2*/public final static int qosLevel = 0;/*** 客户端超时时间*/public final static int timeToWait =5000;/*** MQTT所在地域*/ public final static String mqttAddress= "cn-shenzhen";
MQTT消息队列的实现功能
我们主要完成五个接口功能的实现
1.设备code发送消息到topic中
2.设备code发送消息到指定设备上(P2P模式)
3.服务启动监听队列消息
4.根据设备ID及所在队列查询设备当前状态消息(单独设备)
5.根据设备ID及所在队列批量查询设备当前状态消息(注意最大只是支持十个设备的查询)
设备code发送消息到topic中
主要思路是创建连接MQTT,直接将消息发送到对应的topic中
public void sendMqttMsgTopic(@RequestParam(value = "deviceCode", required = true) String deviceCode,@RequestParam(value = "msg", required = true) String msg) throws InterruptedException, MqttException{ String listenTopic ="earTagTopic";String clientId = MqttConfig.earTagGroupId + "@@@" + deviceCode;String topic = MqttConfig.earTagTopicId + "/" + listenTopic; //发送消息到消息队列MqttMessage message = new MqttMessage(msg.getBytes());message.setQos(MqttConfig.qosLevel);/*** 发送普通消息时,topic 必须和接收方订阅的 topic 一致,或者符合通配符匹配规则*/ try {mqttClient = MqtttClient.getProducerConnection(clientId,topic,deviceCode,MqttConfig.earTagGroupId); mqttClient.publish(topic, message);} catch (InvalidKeyException | NoSuchAlgorithmException | MqttException e1) {System.out.printf("消息发送失败:{}", e1.getMessage());e1.printStackTrace();}mqttClient.disconnect();Thread.sleep(Long.MAX_VALUE);return ;}
我们可以看到我们向earTag的eraTagTopic的主题消息发送成功,并且可以看到已经监听到并打印
如果我们不需要发送成功返回的回调,我们直接将callback代码去掉即可
mqttClient.setCallback(new MqttCallbackExtended()
我们可以看到这两条就是去掉成功回调,我们直接监听到返回结果了
设备code发送消息到指定设备上(P2P模式)
主要思路也是连接到MQTT上,然后直接向目标设备发送消息。
public void sendMqttp2pMsg(@RequestParam(value = "deviceCode", required = true) String deviceCode,@RequestParam(value = "msg", required = true) String msg) throws InterruptedException, MqttException{/*** MQ4IoT支持点对点消息,即如果发送方明确知道该消息只需要给特定的一个设备接收,且知道对端的 clientId,则可以直接发送点对点消息。* 点对点消息不需要经过订阅关系匹配,可以简化订阅方的逻辑。点对点消息的 topic 格式规范是 {{parentTopic}}/p2p/{{targetClientId}}*///对指定设备发送p2p消息 当前测试只是给监听我这个队列的设备发送消息String listenTopic ="earTagTopic";String clientId = MqttConfig.earTagGroupId + "@@@" + deviceCode+"aa";String topic = MqttConfig.earTagTopicId + "/" + listenTopic; //String msgSendTopic = MqttConfig.earTagTopicId + "/p2p/" + MqttConfig.earTagGroupId + "@@@" + deviceCode;String msgSendTopic = MqttConfig.topicId + "/p2p/" + MqttConfig.groupId + "@@@" + deviceCode;MqttMessage mqttMessage = new MqttMessage(msg.getBytes());mqttMessage.setQos(MqttConfig.qosLevel); try {mqttClient = MqtttClient.getProducerConnection(clientId,topic,deviceCode+"aa",MqttConfig.earTagGroupId); mqttClient.publish(msgSendTopic, mqttMessage);} catch (InvalidKeyException | NoSuchAlgorithmException | MqttException e1) {System.out.printf("消息发送失败:{}", e1.getMessage());e1.printStackTrace();}mqttClient.disconnect();Thread.sleep(Long.MAX_VALUE);return ;}
我向GID_demo@@@test3的设备发送指定的消息,GID_demo@@@test3这个设备成功接收到了。
服务启动监听队列消息
主要思路使用ApplicationListener来进行实现 项目启动自动启动监听
根据设备ID及所在队列查询设备当前状态消息(单独设备)
主要思路传想要查询的主题及设备code,返回当前状态信息
public SourceDataBean querySessionByClientId(@RequestParam(value = "topic", required = true) String topic,@RequestParam(value = "deviceCode", required = true) String deviceCode) throws InterruptedException, MqttException{ SourceDataBean sdb = new SourceDataBean(); List res = queryDeviceStateService.querySessionByClientId(topic, deviceCode);if(res.size()>0) {sdb.setDataSource(new DataSource<>(res,1,1,1));sdb.setMessage(RestCode.SUCCESS.code,PropertiesUtil.getValue("config.properties","QuerySuccessful"));}else {sdb.setMessage(RestCode.FAILED.code,res.get(0).getOnlineStatusName());}return sdb;}
根据设备ID及所在队列批量查询设备当前状态消息(注意最大只是支持十个设备的查询)
主要思路传想要查询的主题及设备codes,返回查询设备的状态信息
注意deviceCodes 使用逗号分隔即可,当前阿里MQTT官方文档写的是最大只是十个设备的查询,因为我这边的分页最大数量是10,未做超过10的测试。
public SourceDataBean querySessionByClientIds(@RequestParam(value = "topic", required = true) String topic,@RequestParam(value = "deviceCodes", required = true) String deviceCodes) throws InterruptedException, MqttException{ SourceDataBean sdb = new SourceDataBean();List res = queryDeviceStateService.querySessionByClientIds(topic, deviceCodes);if(res.size()>0) {sdb.setDataSource(new DataSource<>(res,res.size(),0,0));sdb.setMessage(RestCode.SUCCESS.code,PropertiesUtil.getValue("config.properties","QuerySuccessful"));}else {sdb.setMessage(RestCode.FAILED.code,res.get(0).getOnlineStatusName());}return sdb;}
这样,springMVC结合阿里MQTT的基本使用就完成了。
有不懂的地方可以QQ联系我,外接私活,源码出售,欢迎各位老板。