作者:yu2531958135 | 来源:互联网 | 2023-09-08 14:44
目录
一、认识消息队列
二、List模拟消息队列
三、PubSub的消息队列
四、Stream的消息队列(重点)
1、单消费模式
2、消费者组
五、redis三种消息队列对比
六、优化秒杀实战
1、创建消息队列
2、修改下单脚本
3、接收消息处理
一、认识消息队列
消息队列,字面意思就存放消息的队列。最简单的消息队列模型包括3个角色:
消息队列:存储和管理消息,也被称为消息代理
生产者:发送消息到消息队列
消费者:从消息队列获取消息并处理消息
解决了jvm堵塞队列内存不足的问题,而且消息队列是可以持久化的,宕机了依然能够保存。
redis提供三种不同方式实现消息队列:
- list结构:基于list结构模拟消息队列
- PubSub:基于的点对点消息队列
- Stream:比较完善的消息队列模型(推荐)
二、List模拟消息队列
redis的list结构是一个双向链表,很容易模拟出队列效果
队列是入口和出口不在一边,因此可以用LPUSH结合RPOP、或者RPUSH结合LPOP实现
但是,当队列没有消息时pop就会返回null,并不会jvm堵塞队列那样堵塞并等待消息,因此这里应该使用BRPOP或者BLPOP来实现堵塞队列。
缺点:
无法避免消息丢失。从消息队列取到消息,还没来得及处理就挂掉了,这个消息就消失了。
只支持单消费者。一个人拿走就从队列里面弹出了。
三、PubSub的消息队列
PubSub(发布订阅)是redis2.0版本引入的消息传递模型,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
支持多生产、多消费
缺点:
不支持数据持久化(刚刚的list本质是做存储的我们拿来当队列所以可以持久化)
无法避免消息丢失。
消息堆积有上限,超出时数据丢失。(缓存空间是有上限的)
四、Stream的消息队列(重点)
Stream是redis5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
1、单消费模式
特点:
- 消息可回溯。不消失永久保存在队列里。
- 一个消息可以被多个消费者读取。读完不消失的,可以多个读
- 可以堵塞读取
- 有消息漏读的风险
2、消费者组
消费者组(Consumer Group):将多个消费者划分到一个组,监听同一个队列。
消费者监听消息的基本思路
stream类型消息队列的消费者组特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏镀的风险
- 有消息确认机制,保证消息至少被消费一次
五、redis三种消息队列对比
六、优化秒杀实战
1、创建消息队列
创建一个stream类型的消息队列,名为stream.orders
2、修改下单脚本
修改之前秒杀下单lua脚本,认定有抢购资格后,直接向steam.orders中添加消息,内容包含voucher、userId、orderId
-- 优惠券id
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 订单id
local orderId = ARGV[3]
-- 库存key
local stockKey = "seckill:stock:"..voucherId
-- 订单key
local orderKey = "seckill:order:"..voucherId
-- 判断库存是否充足
if(tonumber(redis.call(&#39;get&#39;, stockKey)) <&#61; 0) then
return 1
end
-- 判断用户是否已经下过单
if(redis.call(&#39;sismember&#39;, orderKey, userId) &#61;&#61; 1) then
return 2
end
-- 扣减库存
redis.call(&#39;incrby&#39;, stockKey, -1)
-- 将 userId 存入当前优惠券的 set 集合
redis.call(&#39;sadd&#39;, orderKey, userId)
-- 将订单信息存入到消息队列中 xadd stream.orders * k1 v1 k2 v2
redis.call(&#39;xadd&#39;, &#39;stream.orders&#39;, &#39;*&#39;, &#39;userId&#39;, userId, &#39;voucherId&#39;, voucherId, &#39;id&#39;, orderId)
return 0
3、接收消息处理
项目启动时&#xff0c;开启一个线程任务&#xff0c;尝试获取stream.orders中的消息&#xff0c;完成下单
/***
* 创建线程池
*/
private static final ExecutorService SECKILL_ORDER_EXECUTOR &#61; Executors.newSingleThreadExecutor();
/***
* 容器启动时&#xff0c;便开始创建独立线程&#xff0c;从队列中读取数据&#xff0c;创建订单
*/
&#64;PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
&#64;Override
public void run() {
while(true){
try {
// 获取消息队列中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0
List> list &#61; stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2000)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 判断订单信息是否为空
if(list &#61;&#61; null || list.isEmpty()){
// 如果为 null&#xff0c;说明没有消息&#xff0c;继续下一次循环
continue;
}
// 解析消息
MapRecord record &#61; list.get(0);
Map value &#61; record.getValue();
VoucherOrder voucherOrder &#61; BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 创建订单
createVoucherOrder(voucherOrder);
// 确认消息 xack s1 g1 id
stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常&#xff01;", e);
handlePendingList();
}
}
}
private void handlePendingList() {
while(true){
try {
// 获取 pending-list 中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0
List> list &#61; stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 判断订单信息是否为空
if(list &#61;&#61; null || list.isEmpty()){
break;
}
// 解析消息
MapRecord record &#61; list.get(0);
Map value &#61; record.getValue();
VoucherOrder voucherOrder &#61; BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 创建订单
createVoucherOrder(voucherOrder);
// 确认消息 xack s1 g1 id
stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常&#xff01;", e);
try {
Thread.sleep(100);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
}
}