热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

redis分布式锁案例分析

Case1未使用锁:RequestMapping(deduct_stock1)publicStringdeductStock1(){获取库存值intstockIn

Case 1
未使用锁:

@RequestMapping("/deduct_stock1")public String deductStock1() {//获取库存值int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}return "end";}

假设:key = stock ; value = 500
存在并发问题:会发现如果大量线程同时访问,扣减库存的方法时。在某个很小的时间内。获取的库存都是相同的值500.如果此时有10线程调用该方法时。库存为500, 那么这10个线程执行完过后。库存量就为499.
这就出现超卖问题了。
Case 2

添加Jvm级别的锁: @RequestMapping("/deduct_stock2")public String deductStock2() {synchronized (this){//获取库存值int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}}return "end";}

synchronized 此时如果当前的项目是部署在单机上的(只部署在一台服务器上),那就可以实现一个。如果是集群,锁的生效只有在当前服务器的进程上生效。
Case 3
使用redis中的setnx();设计一个简单的入门级别分布式锁

/*** 使用redis中的setnx();设计一个简单的入门级别分布式锁* * @return*/@RequestMapping("/deduct_stock3")public String deductStock3() {String localKey = "lock:product:0001";Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true");if (!aBoolean){return "当前系统繁忙";}try{//获取库存值int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}}finally {stringRedisTemplate.delete(localKey);}return "end";}

存在的问题:
如果中间的任何一个一部分逻辑抛出了异常,那么就不会执行delete(localKey);的操作。那之后所有的线程都将加锁不成功。也就不会执行后面的业务代码。
优化:
在finally{}中进行delete(localKey)操作。
存在问题:
锁没有释放,宕机了的情况
Case 4
解决case3中存在的宕机没有释放锁的问题

@RequestMapping("/deduct_stock4")public String deductStock4() {String localKey = "lock:product:0001";Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true");stringRedisTemplate.expire(localKey,10,TimeUnit.SECONDS);if (!aBoolean){return "当前系统繁忙";}try{//获取库存值int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}}finally {stringRedisTemplate.delete(localKey);}return "end";}

设置一个过期时间:
存在的问题:存在原子性问题。
原因:还没有执行到expire()时就宕机了

Case 5
解决枷锁时的原子性问题
解决办法:在枷锁时就设置超时时间,也就是枷锁和设置超时时间是原子操作

@RequestMapping("/deduct_stock5")public String deductStock5() {String localKey = "lock:product:0001";//这条命令能够保证原子性Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true",10,TimeUnit.SECONDS);if (!aBoolean){return "当前系统繁忙";}try{//获取库存值int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}}finally {stringRedisTemplate.delete(localKey);}return "end";}

存在问题:如果系统并发量不是特别的大,问题不大。并发特别大的时候依然存在超卖问题。
高并发(每秒几千上万访问量)的场景下存在严重的并发问题:
lock-------------- > -----------delete
假设某个请求A的时间超过了超时时间(10s)(锁失效了),此时该线程A还没有执行delete方法。
另一个线程B这时候就可以加锁成功了,但是这时候线程A执行了delete方法。但是这时候线程A释放的锁是线程B的。
这时候在极端情况下就会出现 请求A释放请求B的锁,B释放C的,C释放D的,… 最后就会导致大量的超卖问题。
Case 6
该如何解决 deductStock5()中存在的问题。
分析:问题存在的根本原因就是在执行delete方法的时候。自己的锁被其他的线程释放了。
解决办法:给每个线程生成一个唯一id.例如使用uuid. 在最后释放锁的时候判断是否是自己的锁。如果是自己的才释放。
注意:不要使用线程id,不同的服务器可能有相同的线程id

@RequestMapping("/deduct_stock6")public String deductStock6() {String localKey = "lock:product:0001";String uuid = UUID.randomUUID().toString();//这条命令能够保证原子性Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, uuid,10,TimeUnit.SECONDS);if (!aBoolean){return "当前系统繁忙";}try{//获取库存值int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}}finally {if (uuid.equals(stringRedisTemplate.opsForValue().get(localKey))){stringRedisTemplate.delete(localKey);}}return "end";}

存在问题:存在原子性问题

if (uuid.equals(stringRedisTemplate.opsForValue().get(localKey))){// stringRedisTemplate.delete(localKey);}

上面的代码中不是原子的。在当前线程执行完if判断却还没有执行delete操作的时候。当前锁过期了。
又可能会出现超卖问题。当前的线程释放了其他线程的锁

解决方式:
1.锁续命(实现不容易)
使用一个分线程,使用定时任务,每过一段时间,判断业务的主线程有没有结束(是否还加着锁)。如果还加着锁,将锁的超时时间重新设置。
2.使用现成的 例如redisson
Case 7

@RequestMapping("/deduct_stock7")public String deductStock7() {String lockKey = "lock:product:0001";//获取锁对象RLock redissonLock = redisson.getLock(lockKey);//加分布式锁redissonLock.lock();try {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}} finally {//解锁redissonLock.unlock();}return "end";}

核心使用lua脚本

RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}

Redis Lua脚本
Redis在2.6推出了脚本功能,允许开发者使用Lua语言编写脚本传到Redis中执行。使用脚本的好处如下:
1、减少网络开销:本来5次网络请求的操作,可以用一个请求完成,原先5次请求的逻辑放在redis服务器上完成。使用脚本,减少了网络往返时延。这点跟管道类似。
2、原子操作:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。管道不是原子的,不过redis的批量操作命令(类似mset)是原子的。
3、替代redis的事务功能:redis自带的事务功能很鸡肋,而redis的lua脚本几乎实现了常规的事务功能,官方推荐如果要使用redis的事务功能可以用redis lua替代。


推荐阅读
author-avatar
Jasonscw
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有