热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

基于Redis实现的延迟消息队列

基于Redis实现的延迟消息队列,Go语言社区,Golang程序员人脉社


  • 1. 设计方案

  • 2. 代码实现
    • 2.1 技术说明

    • 2.2 核心代码
      • 2.2.1 Message对象

      • 2.2.2 Route(消息路由器)

      • 2.2.3 RedisMq(消息队列)

      • 2.2.4 RedisMq消息队列配置:

      • 2.2.5 消费者



  • 4. 测试

  • 3. 总结

  • 4. 源码连接

本文以redis为数据结构基础,配合Spring管理机制,使用java实现了一个轻量级、可配置的消息队列。适合的项目特点:


  • Spring框架管理对象

  • 有消息需求,但不想维护mq中间件

  • 有使用redis

  • 对消息持久化并没有很苛刻的要求

需要使用rabbitmq实现延迟消息请参考这里



1. 设计方案

设计主要包含以下几点:


  • 将整个Redis当做消息池,以kv形式存储消息

  • 使用ZSET做优先队列,按照score维持优先级

  • 使用LIST结构,以先进先出的方式消费

  • zset和list存储消息地址(对应消息池的每个key)

  • 自定义路由对象,存储zset和list名称,以点对点的方式将消息从zset路由到正确的list

  • 使用定时器维持路由

  • 根据TTL规则实现消息延迟

    基于Redis有消息队列设计方案


2. 代码实现

2.1 技术说明



示例使用Springboot,gradle,redis,jdk8。



2.2 核心代码



核心代码主要包含消息对象Message,路由器Route和消息队列RedisMQ。



2.2.1 Message对象

package git.yampery.msmq;
/**
* @decription Message
*

封装消息元数据


* @author Yampery
* @date 2017/11/2 15:50
*/
public class Message {
/**
* 消息主题
*/

private String topic;
/**
* 消息id
*/

private String id;
/**
* 消息延迟
*/

private long delay;
/**
* 消息优先级
*/

private int priority;
/**
* 消息存活时间
*/

private int ttl;
/**
* 消息体,对应业务内容
*/

private String body;
/**
* 创建时间,如果只有优先级没有延迟,可以设置创建时间为0
* 用来消除时间的影响
*/

private long createTime;
/**
* 消息状态(延迟-0;待发送-1;已发送-2;发送失败-3)
*/

private int status;
/**
* getset略...
*/

}

2.2.2 Route(消息路由器)

package git.yampery.msmq;
/**
* @decription Route
*

消息路由器,主要控制将消息从指定的队列路由到待消费的list
* 通过这种方式实现自定义延迟以及优先级发送


* @author Yampery
* @date 2017/11/3 14:33
*/
public class Route {
/**
* 存放消息的队列
*/

private String queue;
/**
* 待消费的列表
*/

private String list;
public Route(String queue, String list) {
this.queue = queue;
this.list = list;
}
/**
* getset略...
*/

}

2.2.3 RedisMq(消息队列)

package git.yampery.msmq;
import git.yampery.utils.JedisUtils;
import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/**
* @decription RedisMQ
*

基于redis的消息队列


*

将整个redis作为消息池存储消息体,以ZSET为消息队列,LIST作为待消费列表
* 用Spring定时器作为监听器,每次监听ZSET中指定数量的消息
* 根据SCORE确定是否达到发送要求,如果达到,利用消息路由{@link Route}将消息路由到待消费list


* @author Yampery
* @date 2017/11/2 15:49
*/
public class RedisMQ {
/**
* 消息池前缀,以此前缀加上传递的消息id作为key,以消息{@link Message}
* 的消息体body作为值存储
*/

private static final String MSG_POOL = "Message:Pool:";
/**
* 默认监听数量,对应监听zset队列前多少个元素
*/

private static final int DEFAUT_MOnITOR= 10;
@Resource private JedisUtils jedisUtils;
/**
* 每次监听queue中元素的数量,可配置
*/

private int mOnitorCount= DEFAUT_MONITOR;
/**
* 消息路由
*/

private List routes;
/**
* 存入消息池
* @param message
* @return
*/

public boolean addMsgPool(Message message) {
if (null != message) {
return jedisUtils.setex(MSG_POOL + message.getId(), message.getBody(), message.getTtl());
}
return false;
}
/**
* 从消息池中删除消息
* @param id
* @return
*/

public boolean deMsgPool(String id) {
return jedisUtils.del(MSG_POOL + id);
}
/**
* 像队列中添加消息
* @param key
* @param score 优先级
* @param val
* @return 返回消息id
*/

public String enMessage(String key, long score, String val) {
if (jedisUtils.zadd(key, score, val)) {
return val;
}
return "";
}
/**
* 从队列删除消息
* @param id
* @return
*/

public boolean deMessage(String key, String id) {
return jedisUtils.zdel(key, id);
}
/**
* 消费
* @return
*/

public List consume(String key) {
long count = jedisUtils.countList(key);
if (0 // 可根据需求做限制
List ids = jedisUtils.rangeList(key, 0, count - 1);
if (ids != null) {
List result = new ArrayList<>();
ids.forEach(l -> result.add(jedisUtils.get(MSG_POOL + l, "")));
jedisUtils.removeListValue(key, ids);
return result;
} /// if end~
}
return null;
}
/**
* 消息队列监听器
* 监听所有路由器,将消息队列中的消息路由到待消费列表
*/

@Scheduled(cron="*/5 * * * * *")
public void monitor() {
// 获取消息路由
int route_size;
if (null == routes || 1 > (route_size = routes.size())) return;
String queue, list;
Set set;
for (int i = 0; i queue = routes.get(i).getQueue();
list = routes.get(i).getList();
set = jedisUtils.getSoredSetByRange(queue, 0, monitorCount, true);
if (null != set) {
long current = System.currentTimeMillis();
long score;
for (String id : set) {
score = jedisUtils.getScore(queue, id).longValue();
if (current >= score) {
// 添加到list
if (jedisUtils.insertList(list, id)) {
// 删除queue中的元素
deMessage(queue, id);
} /// if end~
} /// if end~
} /// for end~
} /// if end~
} /// for end~
}
public int getMonitorCount() {
return monitorCount;
}
public void setMonitorCount(int monitorCount) {
this.mOnitorCount= monitorCount;
}
public List getRoutes() {
return routes;
}
public void setRoutes(List routes) {
this.routes = routes;
}
}

2.2.4 RedisMq消息队列配置:


  • mq.properties文件

# 队列的监听数量
mq.monitor.count =30
# 队列一
mq.queue.first =queue:1
# 队列二
mq.queue.second =queue:2
# 消费列表一
mq.consumer.first =list:1
# 消费列表二
mq.consumer.second =list:2

  • MqConfig.java文件

package git.yampery.config;
import git.yampery.msmq.RedisMQ;
import git.yampery.msmq.Route;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import java.util.ArrayList;
import java.util.List;
/**
* @decription MqConfig
*

消息队列配置


* @author Yampery
* @date 2018/2/9 14:26
*
* 根据不同的架构可选择使用XML配置
* ---------------------------------------------------
*















* ----------------------------------------------------
*/
@Configuration
public class MqConfig {
@Bean(name = "redisMQ")
@Primary
public RedisMQ getRedisMq() {
RedisMQ redisMQ = new RedisMQ();
// 配置监听队列元素数量
redisMQ.setMonitorCount(monitorCount);
// 配置路由表
redisMQ.setRoutes(routeList());
return redisMQ;
}
/**
* 返回路由表
* @return
*/

public List routeList() {
List routeList = new ArrayList<>();
Route routeFirst = new Route(queueFirst, listFirst);
Route routeSecOnd= new Route(queueSecond, listSecond);
routeList.add(routeFirst);
routeList.add(routeSecond);
return routeList;
}
@Value("${mq.monitor.count}")
private int monitorCount;
@Value("${mq.queue.first}")
private String queueFirst;
@Value("${mq.queue.second}")
private String queueSecond;
@Value("${mq.consumer.first}")
private String listFirst;
@Value("${mq.consumer.second}")
private String listSecond;
}

如果使用的是xml配置 请参考:

<bean id="redisMQ" class="git.yampery.msmq.RedisMQ">
<property name="monitorCount" value="15"/>
<property name="routes">
<list>
<bean class="git.yampery.msmq.Route">
<property name="queue" value="${mq.queue.first}"/>
<property name="list" value="${mq.consumer.first}"/>
bean>
<bean class="git.yampery.msmq.Route">
<property name="queue" value="${mq.queue.second}"/>
<property name="list" value="${mq.consumer.second}"/>
bean>
list>
property>
bean>

2.2.5 消费者



并没有内置消费者监听器来实现,可以直接使用定时器实现


package git.yampery.task;
import com.alibaba.fastjson.JSONObject;
import git.yampery.msmq.RedisMQ;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.List;
/**
* @decription MsgTask
*

发送消息


* @author Yampery
* @date 2018/2/9 18:04
*/
@Component
public class MsgTask {
@Resource private RedisMQ redisMQ;
// @Value("${mq.list.first}") private String MQ_LIST_FIRST;
@Scheduled(cron="*/5 * * * * *")
public void sendMsg() {
// 消费
List msgs = redisMQ.consume(redisMQ.getRoutes().get(0).getList());
int len;
if (null != msgs && 0 <(len = msgs.size())) {
// 将每一条消息转为JSONObject
JSONObject jObj;
for (int i = 0; i if (!StringUtils.isEmpty(msgs.get(i))) {
jObj = JSONObject.parseObject(msgs.get(i));
// 取出消息
System.out.println(jObj.toJSONString());
}
}
}
}
}

4. 测试

测试设置20秒延迟,发布消息到queue:1,在list:1消费。


package git.yampery.mq;
import com.alibaba.fastjson.JSONObject;
import git.yampery.msmq.Message;
import git.yampery.msmq.RedisMQ;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.UUID;
/**
* @decription TestMQ
*

测试


* @author Yampery
* @date 2018/2/9 18:43
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestMQ {
@Resource
private RedisMQ redisMQ;
@Value("${mq.queue.first}")
private String MQ_QUEUE_FIRST;
@Test
public void testMq() {
JSONObject jObj = new JSONObject();
jObj.put("msg", "这是一条短信");
String seqId = UUID.randomUUID().toString();
// 将有效信息放入消息队列和消息池中
Message message = new Message();
message.setBody(jObj.toJSONString());
// 可以添加延迟配置
message.setDelay(20);
message.setTopic("SMS");
message.setCreateTime(System.currentTimeMillis());
message.setId(seqId);
// 设置消息池ttl,防止长期占用
message.setTtl(20 * 60);
message.setStatus(0);
message.setPriority(0);
redisMQ.addMsgPool(message);
redisMQ.enMessage(MQ_QUEUE_FIRST,
message.getCreateTime() + message.getDelay() + message.getPriority(), message.getId());
}
}

3. 总结

文章利用redis已有的数据存储结构,实现了轻量级的消息队列,并未真正实现消息持久化。示例是针对点对点的消息路由方式,当然,也可以扩展成广播和主题的方式,不过,这样就得不偿失了,如果需求比较复杂,可靠性要求较高,反而不如直接维护rabbitmq之类的消息队列。
需要使用rabbitmq实现延迟消息请参考这里


4. 源码连接

文章并未贴出所有代码,gradle构建、jedisUtils以及一些配置等可以参考源码,源码只需要设置自己的redis配置即可。
https://github.com/Yampery/rdsmq.git




推荐阅读
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • jQuery Flot 数据可视化插件:高效绘制图表的专业工具
    jQuery Flot 是一款高效的数据可视化插件,专为绘制各种图表而设计。该工具支持丰富的图表类型和自定义选项,适用于多种应用场景。用户可以通过其官方网站获取示例代码和下载资源,以便快速上手和使用。 ... [详细]
  • BZOJ4240 Gym 102082G:贪心算法与树状数组的综合应用
    BZOJ4240 Gym 102082G 题目 "有趣的家庭菜园" 结合了贪心算法和树状数组的应用,旨在解决在有限时间和内存限制下高效处理复杂数据结构的问题。通过巧妙地运用贪心策略和树状数组,该题目能够在 10 秒的时间限制和 256MB 的内存限制内,有效处理大量输入数据,实现高性能的解决方案。提交次数为 756 次,成功解决次数为 349 次,体现了该题目的挑战性和实际应用价值。 ... [详细]
  • 在启用分层编译的情况下,即时编译器(JIT)的触发条件涉及多个因素,包括方法调用频率、代码复杂度和运行时性能数据。本文将详细解析这些条件,并探讨分层编译如何优化JVM的执行效率。 ... [详细]
  • 软件开发史上最具影响力的十位编程大师(附图解)
    在软件开发领域,有十位编程大师对行业发展产生了深远影响。本文基于国外知名社区的一项评选,通过图文并茂的形式,详细介绍了这十位杰出人物,包括游戏开发先驱John Carmack等,为读者呈现了他们卓越的技术贡献与创新精神。 ... [详细]
  • 本项目在Java Maven框架下,利用POI库实现了Excel数据的高效导入与导出功能。通过优化数据处理流程,提升了数据操作的性能和稳定性。项目已发布至GitHub,当前最新版本为0.0.5。该项目不仅适用于小型应用,也可扩展用于大型企业级系统,提供了灵活的数据管理解决方案。GitHub地址:https://github.com/83945105/holygrail,Maven坐标:`com.github.83945105:holygrail:0.0.5`。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • 本文探讨了在Android应用中实现动态滚动文本显示控件的优化方法。通过详细分析焦点管理机制,特别是通过设置返回值为`true`来确保焦点不会被其他控件抢占,从而提升滚动文本的流畅性和用户体验。具体实现中,对`MarqueeText.java`进行了代码层面的优化,增强了控件的稳定性和兼容性。 ... [详细]
  • 本文详细介绍了如何在Linux系统中搭建51单片机的开发与编程环境,重点讲解了使用Makefile进行项目管理的方法。首先,文章指导读者安装SDCC(Small Device C Compiler),这是一个专为小型设备设计的C语言编译器,适合用于51单片机的开发。随后,通过具体的实例演示了如何配置Makefile文件,以实现代码的自动化编译与链接过程,从而提高开发效率。此外,还提供了常见问题的解决方案及优化建议,帮助开发者快速上手并解决实际开发中可能遇到的技术难题。 ... [详细]
  • ZeroMQ在云计算环境下的高效消息传递库第四章学习心得
    本章节深入探讨了ZeroMQ在云计算环境中的高效消息传递机制,涵盖客户端请求-响应模式、最近最少使用(LRU)队列、心跳检测、面向服务的队列、基于磁盘的离线队列以及主从备份服务等关键技术。此外,还介绍了无中间件的请求-响应架构,强调了这些技术在提升系统性能和可靠性方面的应用价值。个人理解方面,ZeroMQ通过这些机制有效解决了分布式系统中常见的通信延迟和数据一致性问题。 ... [详细]
  • 在RabbitMQ中,消息发布者默认情况下不会接收到关于消息在Broker中状态的反馈,这可能导致消息丢失的问题。为了确保消息的可靠传输与投递,可以采用确认机制(如发布确认和事务模式)来验证消息是否成功抵达Broker,并采取相应的重试策略以提高系统的可靠性。此外,还可以配置消息持久化和镜像队列等高级功能,进一步增强消息的可靠性和高可用性。 ... [详细]
  • Sublime Text 3 注册密钥及激活方法详解
    本文详细介绍了Sublime Text 3的注册密钥获取与激活方法,旨在帮助用户合法且高效地使用这款强大的文本编辑器。文章不仅提供了最新的注册密钥信息,还涵盖了详细的激活步骤,确保用户能够顺利激活软件,享受其带来的便捷与高效。此外,文中还简要对比了Sublime Text 3与其他主流文本编辑器的功能差异,为用户提供更多选择参考。 ... [详细]
  • 深入解析 org.hibernate.event.spi.EventSource.getFactory() 方法及其应用实例 ... [详细]
  • 前言: 网上搭建k8s的文章很多,但很多都无法按其说明在阿里云ecs服务器成功搭建,所以我就花了些时间基于自己成功搭建k8s的步骤写了个操作手册,希望对想搭建k8s环境的盆友有所帮 ... [详细]
  • SpringCloud之Bus(消息总线)
    说明:关于SpringCloud系列的文章中的代码都在码云上面地址:https:gitee.comzh_0209_javaspringcloud-ali ... [详细]
author-avatar
oo艾丁湖oooo
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有