热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

MQTT:java客户端库Paho

简介EclipsePahoJavaClient(opensnewwindow)是用Java编写的MQTT客户端库(MQTTJavaClient),可用于JVM或其他Java兼容平台

简介

Eclipse Paho Java Client (opens new window)是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如Android)。

Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 异步和同步 API。

emqx官方文档中有相关介绍:https://docs.emqx.cn/enterprise/v4.3/development/java.html#通过-maven-安装-paho-java

paho客户端对象初始化

先导入paho依赖:


  org.eclipse.paho
	org.eclipse.paho.client.mqttv3
	1.2.2

配置MqttProperties.java:

@ConfigurationProperties("mqtt")
@Component
@Data
public class MqttProperties {

    private String brokerUrl;

    private String clientId;

    private String username;

    private String password;
}

application.yml

mqtt:
  broker-url: tcp://192.168.40.128:1883
  client-id: emq-client
  username: admin
  password: public

EmqClient.java

@Component
public class EmqClient {

    private static final Logger logger = LoggerFactory.getLogger(EmqClient.class);

    @Autowired
    private MqttProperties mqttProperties;

    private IMqttClient mqttClient;

    @PostConstruct
    public void init(){

        MemoryPersistence memoryPersistence = new MemoryPersistence();
        try {
            mqttClient = new MqttClient(mqttProperties.getBrokerUrl(),
                    mqttProperties.getClientId(),
                    memoryPersistence);
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("mqttClient 创建失败");
        }
    }
}

编写客户端一些方法

创建Qos枚举类:

public enum QosEnum {
    Qos0(0),Qos1(1),Qos2(2);

    private final int value;

    QosEnum(int value) {
        this.value = value;
    }

    public int value(){
        return this.value;
    }
}
    @Autowired
    private MqttCallback mqttCallback;

    public void connect(String username, String password){
        MqttConnectOptions optiOns= new MqttConnectOptions();
        options.setAutomaticReconnect(true);//自动重连
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setCleanSession(true);//临时会话

        mqttClient.setCallback(mqttCallback);//设置方法回调
        try {
            mqttClient.connect(options);
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("mqttClient 连接失败");
        }
    }

    @PreDestroy
    public void disConnect(){
        try {
            mqttClient.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("mqttClient 断开连接失败");
        }
    }

    public void reConnect(){
        try {
            mqttClient.reconnect();
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("mqttClient 重新连接失败");
        }
    }

    /**
     * 发布消息
     * @author wen.jie
     * @date 2021/7/27 16:11
     */
    public void publish(String topic, String msg, QosEnum qosEnum, boolean retained){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload(msg.getBytes());
        mqttMessage.setQos(qosEnum.value());
        mqttMessage.setRetained(retained);

        try {
            mqttClient.publish(topic, mqttMessage);
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("发布消息失败");
        }
    }

    /**
     * 添加订阅
     * @author wen.jie
     * @date 2021/7/27 16:18
     */
    public void subscribe(String topicFilters, QosEnum qosEnum){
        try {
            mqttClient.subscribe(topicFilters, qosEnum.value());
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("添加订阅失败");
        }
    }

编写MqttCallback回调类

MessageCallback.java

@Component
public class MessageCallback implements MqttCallback {

    private static final Logger logger = LoggerFactory.getLogger(MessageCallback.class);

    /**
     * 丢失了对服务端连接后的回调
     * @author wen.jie
     * @date 2021/7/27 16:27
     */
    @Override
    public void connectionLost(Throwable cause) {
        //丢失对服务端的连接后触发该方法回调,此处可以做一些特殊处理,比如重连
        logger.info("丢失了对服务端连接");
    }

    /**
     * 订阅到消息后的回调
     * 该方法由mqtt客户端同步调用,在此方法未正确返回之前,不会发送ack确认消息到broker
     * 一旦该方法向外抛出了异常客户端将异常关闭,当再次连接时;所有QoS1,QoS2且客户端未进行ack确认的消息都将由broker服务器再次发送到客户端
     * @author wen.jie
     * @date 2021/7/27 16:28
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        logger.info("订阅到了消息;topic={},messageid={},qos={},msg={}",
                topic,
                message.getId(),
                message.getQos(),
                new String(message.getPayload()));
    }

    /**
     * 消息发布完成且收到ack确认后的回调
     * QoS0:消息被网络发出后触发一次
     * QoS1:当收到broker的PUBACK消息后触发
     * QoS2:当收到broker的PUBCOMP消息后触发
     * @author wen.jie
     * @date 2021/7/27 16:34
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        int messageId = token.getMessageId();
        String[] topics = token.getTopics();
        logger.info("消息发送完成,messageId={},topics={}",messageId,topics);
    }
}

主启动类

@SpringBootApplication
public class EmqDemoApplication {

    @Autowired
    private EmqClient emqClient;

    @Autowired
    private MqttProperties mqttProperties;

    @PostConstruct
    public void init(){
        emqClient.connect(mqttProperties.getUsername(), mqttProperties.getPassword());
        emqClient.subscribe("topictest/#", QosEnum.Qos2);
        new Thread(()->{
            while (true){
                emqClient.publish("topictest/123", "hello world", QosEnum.Qos2, true);
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }).start();
    }

    public static void main(String[] args) {
        SpringApplication.run(EmqDemoApplication.class, args);
    }

}

测试

启动主启动类,观察效果

MQTT:java客户端库Paho

每五秒钟收发一次消息,测试成功


推荐阅读
  • 五、RabbitMQ Java Client基本使用详解
    JavaClient的5.x版本系列需要JDK8,用于编译和运行。在Android上,仅支持Android7.0或更高版本。4.x版本系列支持7.0之前 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • 代理模式的详细介绍及应用场景
    代理模式是一种在软件开发中常用的设计模式,通过在客户端和目标对象之间增加一层中间层,让代理对象代替目标对象进行访问,从而简化系统的复杂性。代理模式可以根据不同的使用目的分为远程代理、虚拟代理、Copy-on-Write代理、保护代理、防火墙代理、智能引用代理和Cache代理等几种。本文将详细介绍代理模式的原理和应用场景。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • 后台获取视图对应的字符串
    1.帮助类后台获取视图对应的字符串publicclassViewHelper{将View输出为字符串(注:不会执行对应的ac ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • 猜字母游戏
    猜字母游戏猜字母游戏——设计数据结构猜字母游戏——设计程序结构猜字母游戏——实现字母生成方法猜字母游戏——实现字母检测方法猜字母游戏——实现主方法1猜字母游戏——设计数据结构1.1 ... [详细]
  • CentOS 6.5安装VMware Tools及共享文件夹显示问题解决方法
    本文介绍了在CentOS 6.5上安装VMware Tools及解决共享文件夹显示问题的方法。包括清空CD/DVD使用的ISO镜像文件、创建挂载目录、改变光驱设备的读写权限等步骤。最后给出了拷贝解压VMware Tools的操作。 ... [详细]
author-avatar
枇杷语1314
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有