热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Redis消息队列|黑马点评

目录一、认识消息队列二、List模拟消息队列三、PubSub的消息队列四、Stream的消息队列(重点)1、单消费模式2、消费者组五、redi

目录

一、认识消息队列

二、List模拟消息队列

三、PubSub的消息队列

四、Stream的消息队列(重点)

        1、单消费模式

        2、消费者组

五、redis三种消息队列对比 

 六、优化秒杀实战

1、创建消息队列

2、修改下单脚本

 3、接收消息处理




一、认识消息队列

消息队列,字面意思就存放消息的队列。最简单的消息队列模型包括3个角色:

消息队列:存储和管理消息,也被称为消息代理

生产者:发送消息到消息队列

消费者:从消息队列获取消息并处理消息

解决了jvm堵塞队列内存不足的问题,而且消息队列是可以持久化的,宕机了依然能够保存。

redis提供三种不同方式实现消息队列:


  • list结构:基于list结构模拟消息队列
  • PubSub:基于的点对点消息队列
  • Stream:比较完善的消息队列模型(推荐)


二、List模拟消息队列

redis的list结构是一个双向链表,很容易模拟出队列效果

队列是入口和出口不在一边,因此可以用LPUSH结合RPOP、或者RPUSH结合LPOP实现

但是,当队列没有消息时pop就会返回null,并不会jvm堵塞队列那样堵塞并等待消息,因此这里应该使用BRPOP或者BLPOP来实现堵塞队列。

缺点:

无法避免消息丢失。从消息队列取到消息,还没来得及处理就挂掉了,这个消息就消失了。

只支持单消费者。一个人拿走就从队列里面弹出了。


三、PubSub的消息队列

PubSub(发布订阅)是redis2.0版本引入的消息传递模型,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

支持多生产、多消费

缺点:

不支持数据持久化(刚刚的list本质是做存储的我们拿来当队列所以可以持久化)

无法避免消息丢失。

消息堆积有上限,超出时数据丢失。(缓存空间是有上限的)


四、Stream的消息队列(重点)

Stream是redis5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。


1、单消费模式


特点:


  • 消息可回溯。不消失永久保存在队列里。
  • 一个消息可以被多个消费者读取。读完不消失的,可以多个读
  • 可以堵塞读取
  • 有消息漏读的风险

2、消费者组

消费者组(Consumer Group):将多个消费者划分到一个组,监听同一个队列。

 消费者监听消息的基本思路

stream类型消息队列的消费者组特点:


  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏镀的风险
  • 有消息确认机制,保证消息至少被消费一次

五、redis三种消息队列对比 


 六、优化秒杀实战

1、创建消息队列

创建一个stream类型的消息队列,名为stream.orders


2、修改下单脚本

修改之前秒杀下单lua脚本,认定有抢购资格后,直接向steam.orders中添加消息,内容包含voucher、userId、orderId

-- 优惠券id
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 订单id
local orderId = ARGV[3]
-- 库存key
local stockKey = "seckill:stock:"..voucherId
-- 订单key
local orderKey = "seckill:order:"..voucherId
-- 判断库存是否充足
if(tonumber(redis.call(&#39;get&#39;, stockKey)) <&#61; 0) then
return 1
end
-- 判断用户是否已经下过单
if(redis.call(&#39;sismember&#39;, orderKey, userId) &#61;&#61; 1) then
return 2
end
-- 扣减库存
redis.call(&#39;incrby&#39;, stockKey, -1)
-- 将 userId 存入当前优惠券的 set 集合
redis.call(&#39;sadd&#39;, orderKey, userId)
-- 将订单信息存入到消息队列中 xadd stream.orders * k1 v1 k2 v2
redis.call(&#39;xadd&#39;, &#39;stream.orders&#39;, &#39;*&#39;, &#39;userId&#39;, userId, &#39;voucherId&#39;, voucherId, &#39;id&#39;, orderId)
return 0

 3、接收消息处理

项目启动时&#xff0c;开启一个线程任务&#xff0c;尝试获取stream.orders中的消息&#xff0c;完成下单

/***
* 创建线程池
*/
private static final ExecutorService SECKILL_ORDER_EXECUTOR &#61; Executors.newSingleThreadExecutor();
/***
* 容器启动时&#xff0c;便开始创建独立线程&#xff0c;从队列中读取数据&#xff0c;创建订单
*/
&#64;PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
&#64;Override
public void run() {
while(true){
try {
// 获取消息队列中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0
List> list &#61; stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2000)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 判断订单信息是否为空
if(list &#61;&#61; null || list.isEmpty()){
// 如果为 null&#xff0c;说明没有消息&#xff0c;继续下一次循环
continue;
}
// 解析消息
MapRecord record &#61; list.get(0);
Map value &#61; record.getValue();
VoucherOrder voucherOrder &#61; BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 创建订单
createVoucherOrder(voucherOrder);
// 确认消息 xack s1 g1 id
stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常&#xff01;", e);
handlePendingList();
}
}
}
private void handlePendingList() {
while(true){
try {
// 获取 pending-list 中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0
List> list &#61; stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 判断订单信息是否为空
if(list &#61;&#61; null || list.isEmpty()){
break;
}
// 解析消息
MapRecord record &#61; list.get(0);
Map value &#61; record.getValue();
VoucherOrder voucherOrder &#61; BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 创建订单
createVoucherOrder(voucherOrder);
// 确认消息 xack s1 g1 id
stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常&#xff01;", e);
try {
Thread.sleep(100);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
}
}


推荐阅读
  • Java中不同类型的常量池(字符串常量池、Class常量池和运行时常量池)的对比与关联分析
    在研究Java虚拟机的过程中,笔者发现存在多种类型的常量池,包括字符串常量池、Class常量池和运行时常量池。通过查阅CSDN、博客园等相关资料,对这些常量池的特性、用途及其相互关系进行了详细探讨。本文将深入分析这三种常量池的差异与联系,帮助读者更好地理解Java虚拟机的内部机制。 ... [详细]
  • 在当前的软件开发领域,Lua 作为一种轻量级脚本语言,在 .NET 生态系统中的应用逐渐受到关注。本文探讨了 Lua 在 .NET 环境下的集成方法及其面临的挑战,包括性能优化、互操作性和生态支持等方面。尽管存在一定的技术障碍,但通过不断的学习和实践,开发者能够克服这些困难,拓展 Lua 在 .NET 中的应用场景。 ... [详细]
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • 2012年9月12日优酷土豆校园招聘笔试题目解析与备考指南
    2012年9月12日,优酷土豆校园招聘笔试题目解析与备考指南。在选择题部分,有一道题目涉及中国人的血型分布情况,具体为A型30%、B型20%、O型40%、AB型10%。若需确保在随机选取的样本中,至少有一人为B型血的概率不低于90%,则需要选取的最少人数是多少?该问题不仅考察了概率统计的基本知识,还要求考生具备一定的逻辑推理能力。 ... [详细]
  • 在本文中,我们将探讨如何在Docker环境中高效地管理和利用数据库。首先,需要安装Docker Desktop以确保本地环境准备就绪。接下来,可以从Docker Hub中选择合适的数据库镜像,并通过简单的命令将其拉取到本地。此外,我们还将介绍如何配置和优化这些数据库容器,以实现最佳性能和安全性。 ... [详细]
  • C++ 异步编程中获取线程执行结果的方法与技巧及其在前端开发中的应用探讨
    本文探讨了C++异步编程中获取线程执行结果的方法与技巧,并深入分析了这些技术在前端开发中的应用。通过对比不同的异步编程模型,本文详细介绍了如何高效地处理多线程任务,确保程序的稳定性和性能。同时,文章还结合实际案例,展示了这些方法在前端异步编程中的具体实现和优化策略。 ... [详细]
  • 线程能否先以安全方式获取对象,再进行非安全发布? ... [详细]
  • 分享一款基于Java开发的经典贪吃蛇游戏实现
    本文介绍了一款使用Java语言开发的经典贪吃蛇游戏的实现。游戏主要由两个核心类组成:`GameFrame` 和 `GamePanel`。`GameFrame` 类负责设置游戏窗口的标题、关闭按钮以及是否允许调整窗口大小,并初始化数据模型以支持绘制操作。`GamePanel` 类则负责管理游戏中的蛇和苹果的逻辑与渲染,确保游戏的流畅运行和良好的用户体验。 ... [详细]
  • 本文介绍了如何利用ObjectMapper实现JSON与JavaBean之间的高效转换。ObjectMapper是Jackson库的核心组件,能够便捷地将Java对象序列化为JSON格式,并支持从JSON、XML以及文件等多种数据源反序列化为Java对象。此外,还探讨了在实际应用中如何优化转换性能,以提升系统整体效率。 ... [详细]
  • 在Android应用开发中,实现与MySQL数据库的连接是一项重要的技术任务。本文详细介绍了Android连接MySQL数据库的操作流程和技术要点。首先,Android平台提供了SQLiteOpenHelper类作为数据库辅助工具,用于创建或打开数据库。开发者可以通过继承并扩展该类,实现对数据库的初始化和版本管理。此外,文章还探讨了使用第三方库如Retrofit或Volley进行网络请求,以及如何通过JSON格式交换数据,确保与MySQL服务器的高效通信。 ... [详细]
  • 如何利用Java 5 Executor框架高效构建和管理线程池
    Java 5 引入了 Executor 框架,为开发人员提供了一种高效管理和构建线程池的方法。该框架通过将任务提交与任务执行分离,简化了多线程编程的复杂性。利用 Executor 框架,开发人员可以更灵活地控制线程的创建、分配和管理,从而提高服务器端应用的性能和响应能力。此外,该框架还提供了多种线程池实现,如固定线程池、缓存线程池和单线程池,以适应不同的应用场景和需求。 ... [详细]
  • 技术日志:使用 Ruby 爬虫抓取拉勾网职位数据并生成词云分析报告
    技术日志:使用 Ruby 爬虫抓取拉勾网职位数据并生成词云分析报告 ... [详细]
  • 本文详细探讨了Java事件处理机制的核心概念与实现原理,内容浅显易懂,适合初学者逐步掌握。通过具体的示例和详细的解释,读者可以深入了解Java事件模型的工作方式及其在实际开发中的应用。 ... [详细]
  • 本文对常见的字符串哈希函数进行了全面分析,涵盖了BKDRHash、APHash、DJBHash、JSHash、RSHash、SDBMHash、PJWHash和ELFHash等多种算法。这些哈希函数在不同的应用场景中表现出各异的性能特点,通过对比其算法原理、计算效率和碰撞概率,为实际应用提供了有价值的参考。 ... [详细]
  • 在前文探讨了Spring如何为特定的bean选择合适的通知器后,本文将进一步深入分析Spring AOP框架中代理对象的生成机制。具体而言,我们将详细解析如何通过代理技术将通知器(Advisor)中包含的通知(Advice)应用到目标bean上,以实现切面编程的核心功能。 ... [详细]
author-avatar
yu2531958135
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有