热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

mica-mqtt1.1.2发布,添加基于redispub/submqtt集群实现

一、简介mica-mqtt基于t-io实现的简单、低延迟、高性能的mqtt物联网开源组件。使用详见mica-mqttgitee源码mica-mqtt-example模块。在多个朋友咨询mica-mqtt集群怎么实现之后,添加了一个mica-mqt

一、简介

 

mica-mqtt 基于 t-io 实现的简单低延迟高性能 的 mqtt 物联网开源组件。使用详见 mica-mqtt gitee 源码 mica-mqtt-example 模块

 

在多个朋友咨询 mica-mqtt 集群怎么实现之后,添加了一个 mica-mqtt-broker 模块演示了基于 redis pub/sub 实现集群实现。

 

二、功能

 

  • 支持 MQTT v3.1、v3.1.1 以及 v5.0 协议。
  • 支持 websocket mqtt 子协议(支持 mqtt.js)。
  • 支持 http rest api,http api 文档详见
  • 支持 MQTT client 客户端。
  • 支持 MQTT server 服务端。
  • 支持 MQTT 遗嘱消息。
  • 支持 MQTT 保留消息。
  • 支持自定义消息(mq)处理转发实现集群。
  • MQTT 客户端 阿里云 mqtt 连接 demo。
  • 支持 GraalVM 编译成本机可执行程序。
  • 支持 Spring boot 项目快速接入(mica-mqtt-spring-boot-starter)。
  • mica-mqtt-spring-boot-starter 支持对接 Prometheus + Grafana。
  • 基于 redis pub/sub 实现集群,详见 mica-mqtt-broker 模块

 

三、待办

 

  • ????优化处理 mqtt session,以及支持部分 mqtt v5.0 新特性。

 

四、更新记录

 

  • ✨ 添加 mica-mqtt-broker 模块,基于 redis pub/sub 实现 mqtt 集群。
  • ✨ mica-mqtt-broker 基于 redis 实现客户端状态存储。
  • ✨ mica-mqtt-broker 基于 redis 实现遗嘱、保留消息存储。
  • ✨ mqtt-server http api 调整订阅和取消订阅,方便集群处理。
  • ✨ mica-mqtt-spring-boot-example 添加 mqtt 和 http api 认证示例。
  • ✨ 添加 mqtt 5 所有 ReasonCode。
  • ✨ 优化解码 PacketNeededLength 计算。
  • ???? 修复遗嘱消息,添加消息类型。
  • ???? 修复 mqtt-server 保留消息匹配规则。

 

五、Spring boot 快速接入

 

5.1 添加依赖

 


    net.dreamlu
    mica-mqtt-spring-boot-starter
    1.1.2

 

5.2 服务端配置示例

 

mqtt:
  server:
    enabled: true               # 是否开启,默认:true
    ip: 127.0.0.1               # 服务端 ip 默认:127.0.0.1
    port: 5883                  # 端口,默认:1883
    name: Mica-Mqtt-Server      # 名称,默认:Mica-Mqtt-Server
    buffer-allocator: HEAP      # 堆内存和堆外内存,默认:堆内存
    heartbeat-timeout: 120000   # 心跳超时,单位毫秒,默认: 1000 * 120
    read-buffer-size: 8092      # 接收数据的 buffer size,默认:8092
    max-bytes-in-message: 8092  # 消息解析最大 bytes 长度,默认:8092
    debug: true                 # 如果开启 prometheus 指标收集建议关闭
    websocket-enable: true      # 开启 websocket 子协议,默认开启
    websocket-port: 8083        # websocket 端口,默认:8083

 

5.3 服务端可实现接口(注册成 Spring Bean 即可)

接口

是否必须

说明

IMqttServerAuthHandler

用于客户端认证

IMqttMessageListener

消息监听

IMqttConnectStatusListener

连接状态监听

IMqttSessionManager

session 管理

IMqttMessageStore

集群是,单机否

遗嘱和保留消息存储

AbstractMqttMessageDispatcher

集群是,单机否

消息转发,(遗嘱、保留消息转发)

IpStatListener

t-io ip 状态监听

 

5.4 Prometheus + Grafana 监控对接

 

得益于 t-io 良好的设计,监控指标直接对接的 t-iostat,目前支持下列指标,后期会不断完善。

支持得指标

说明

mqtt_connections_accepted

共接受过连接数

mqtt_connections_closed

关闭过的连接数

mqtt_connections_size

当前连接数

mqtt_messages_handled_packets

已处理消息数

mqtt_messages_handled_bytes

已处理消息字节数

mqtt_messages_received_packets

已接收消息数

mqtt_messages_received_bytes

已处理消息字节数

mqtt_messages_send_packets

已发送消息数

mqtt_messages_send_bytes

已发送消息字节数

 

mica-mqtt 1.1.2 发布,添加基于 redis pub/sub  mqtt 集群实现

 

关于 mica-mqtt-spring-boot-starter 更多请查看文档:https://gitee.com/596392912/mica-mqtt/tree/master/mica-mqtt-spring-boot-starter

 

六、普通 java 项目接入

 

6.1 maven 依赖

 

 
   net.dreamlu
   mica-mqtt-core
   1.1.2
 

 

6.2 mica-mqtt 客户端

 

 // 初始化 mqtt 客户端
 MqttClient client = MqttClient.create()
     .ip("127.0.0.1")
     .port(1883)                     // 默认:1883
     .username("admin")
     .password("123456")
     .version(MqttVersion.MQTT_5)    // 默认:3_1_1
     .clientId("xxxxxx")             // 默认:MICA-MQTT- 前缀和 36进制的纳秒数
     .connect();                     // 连接
 
     // 消息订阅,同类方法 subxxx
     client.subQos0("/test/#", (topic, payload) -> {
         logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
     });
     // 取消订阅
     client.unSubscribe("/test/#");
 
     // 发送消息
     client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));
 
     // 断开连接
     client.disconnect();
     // 重连
     client.reconnect();
     // 停止
     client.stop();

 

6.3 mica-mqtt 服务端

 

 // 注意:为了能接受更多链接(降低内存),请添加 jvm 参数 -Xss129k
 MqttServer mqttServer = MqttServer.create()
     // 默认:127.0.0.1
     .ip("127.0.0.1")
     // 默认:1883
     .port(1883)
     // 默认为: 8092(mqtt 默认最大消息大小),为了降低内存可以减小小此参数,如果消息过大 t-io 会尝试解析多次(建议根据实际业务情况而定)
     .readBufferSize(512)
     // 自定义认证
     .authHandler((clientId, userName, password) -> true)
     // 消息监听
     .messageListener((clientId, topic, mqttQoS, payload) -> {
         logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
     })
     .debug() // 开启 t-io debug 信息日志
     .start();
 
 // 发送给某个客户端
 mqttServer.publish("clientId","/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
 
 // 发送给所有在线监听这个 topic 的客户端
 mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
 
 // 停止服务
 mqttServer.stop();

 

七、集群演示

 

mica-mqtt 1.1.2 发布,添加基于 redis pub/sub  mqtt 集群实现


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 我们


推荐阅读
  • 搭建Windows Server 2012 R2 IIS8.5+PHP(FastCGI)+MySQL环境的详细步骤
    本文详细介绍了搭建Windows Server 2012 R2 IIS8.5+PHP(FastCGI)+MySQL环境的步骤,包括环境说明、相关软件下载的地址以及所需的插件下载地址。 ... [详细]
  • Centos7.6安装Gitlab教程及注意事项
    本文介绍了在Centos7.6系统下安装Gitlab的详细教程,并提供了一些注意事项。教程包括查看系统版本、安装必要的软件包、配置防火墙等步骤。同时,还强调了使用阿里云服务器时的特殊配置需求,以及建议至少4GB的可用RAM来运行GitLab。 ... [详细]
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • CF:3D City Model(小思维)问题解析和代码实现
    本文通过解析CF:3D City Model问题,介绍了问题的背景和要求,并给出了相应的代码实现。该问题涉及到在一个矩形的网格上建造城市的情景,每个网格单元可以作为建筑的基础,建筑由多个立方体叠加而成。文章详细讲解了问题的解决思路,并给出了相应的代码实现供读者参考。 ... [详细]
  • 本文介绍了在使用Python中的aiohttp模块模拟服务器时出现的连接失败问题,并提供了相应的解决方法。文章中详细说明了出错的代码以及相关的软件版本和环境信息,同时也提到了相关的警告信息和函数的替代方案。通过阅读本文,读者可以了解到如何解决Python连接服务器失败的问题,并对aiohttp模块有更深入的了解。 ... [详细]
  • 本文介绍了在Windows环境下如何配置php+apache环境,包括下载php7和apache2.4、安装vc2015运行时环境、启动php7和apache2.4等步骤。希望对需要搭建php7环境的读者有一定的参考价值。摘要长度为169字。 ... [详细]
  • WebSocket与Socket.io的理解
    WebSocketprotocol是HTML5一种新的协议。它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送 ... [详细]
  • Java在运行已编译完成的类时,是通过java虚拟机来装载和执行的,java虚拟机通过操作系统命令JAVA_HOMEbinjava–option来启 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 面试:Websocket简介WebSocket是一种与HTTP不同的协议。两者都位于OSI模型的应用层,并且都依赖于传输层的TCP协议。虽然它们不同& ... [详细]
  • 【系列二】长连接,短连接及WebSocket介绍(含http1.0,1.1,2.0相关)
    前言上一节讲了长轮询和轮询及其实现,这节讲一讲长连接、短连接及webSocket,在讲这些之前,我们先来普及一下http相关的一 ... [详细]
  • 本文介绍了作者在开发过程中遇到的问题,即播放框架内容安全策略设置不起作用的错误。作者通过使用编译时依赖注入的方式解决了这个问题,并分享了解决方案。文章详细描述了问题的出现情况、错误输出内容以及解决方案的具体步骤。如果你也遇到了类似的问题,本文可能对你有一定的参考价值。 ... [详细]
  • imx6ull开发板驱动MT7601U无线网卡的方法和步骤详解
    本文详细介绍了在imx6ull开发板上驱动MT7601U无线网卡的方法和步骤。首先介绍了开发环境和硬件平台,然后说明了MT7601U驱动已经集成在linux内核的linux-4.x.x/drivers/net/wireless/mediatek/mt7601u文件中。接着介绍了移植mt7601u驱动的过程,包括编译内核和配置设备驱动。最后,列举了关键词和相关信息供读者参考。 ... [详细]
author-avatar
黒彡白小彩电邦
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有