热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

redis续期_部门老大:redis分布式锁再这么用,我就劝退你

-如有不严谨或者错误之处,还望不吝赐教,轻点怼,人家还是个孩子,嘤嘤嘤~引言:最近项目上线的频率颇高ÿ

-如有不严谨或者错误之处,还望不吝赐教,轻点怼,人家还是个孩子,嘤嘤嘤~

引言:

最近项目上线的频率颇高,连着几天加班熬夜,身体有点吃不消精神也有些萎靡,无奈业务方催的紧,工期就在眼前只能硬着头皮上了。脑子浑浑噩噩的时候,写的就不能叫代码,可以直接叫做Bug。我就熬夜写了一个bug被骂惨了。

342a68b2f121b6eb9570707ccb5bd21e.png

由于是做商城业务,要频繁的对商品库存进行扣减,应用是集群部署,为避免并发造成库存超买超卖等问题,采用 redis 分布式锁加以控制。本以为给扣库存的代码加上锁lock.tryLock就万事大吉了

/** * @author xiaofu * @description 扣减库存 * @date 2020/4/21 12:10 */ public String stockLock() { RLock lock = redissonClient.getLock("stockLock"); try { /** * 获取锁 */ if (lock.tryLock(10, TimeUnit.SECONDS)) { /** * 扣减库存 */ 。。。。。。 } else { LOGGER.info("未获取到锁业务结束.."); } } catch (Exception e) { LOGGER.info("处理异常", e); } return "ok"; }

结果业务代码执行完以后我忘了释放锁lock.unlock(),导致redis线程池被打满,redis服务大面积故障,造成库存数据扣减混乱,被领导一顿臭骂,这个月绩效~ 哎·~。

随着 使用redis 锁的时间越长,我发现 redis 锁的坑远比想象中要多。就算在面试题当中redis分布式锁的出镜率也比较高,比如:“用锁遇到过哪些问题?” ,“又是如何解决的?” 基本都是一套连招问出来的。

今天就分享一下我用redis 分布式锁的踩坑日记,以及一些解决方案,和大家一起共勉。

一、锁未被释放

这种情况是一种低级错误,就是我上边犯的错,由于当前线程 获取到redis 锁,处理完业务后未及时释放锁,导致其它线程会一直尝试获取锁阻塞,例如:用Jedis客户端会报如下的错误信息

redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool

redis线程池已经没有空闲线程来处理客户端命令。

解决的方法也很简单,只要我们细心一点,拿到锁的线程处理完业务及时释放锁,如果是重入锁未拿到锁后,线程可以释放当前连接并且sleep一段时间。

public void lock() { while (true) { boolean flag = this.getLock(key); if (flag) { TODO ......... } else { // 释放当前redis连接 redis.close(); // 休眠1000毫秒 sleep(1000); } } }

二、B的锁被A给释放了

我们知道Redis实现锁的原理在于 SETNX命令。当 key不存在时将 key的值设为 value ,返回值为 1;若给定的 key 已经存在,则 SETNX不做任何动作,返回值为 0 。

SETNX key value

我们来设想一下这个场景:A、B两个线程来尝试给key myLock加锁,A线程先拿到锁(假如锁3秒后过期),B线程就在等待尝试获取锁,到这一点毛病没有。

那如果此时业务逻辑比较耗时,执行时间已经超过redis锁过期时间,这时A线程的锁自动释放(删除key),B线程检测到myLock这个key不存在,执行 SETNX命令也拿到了锁。

但是,此时A线程执行完业务逻辑之后,还是会去释放锁(删除key),这就导致B线程的锁被A线程给释放了。

为避免上边的情况,一般我们在每个线程加锁时要带上自己独有的value值来标识,只释放指定value的key,否则就会出现释放锁混乱的场景。

三、数据库事务超时

emm~ 聊redis锁咋还扯到数据库事务上来了?别着急往下看,看下边这段代码:

@Transaction public void lock() { while (true) { boolean flag = this.getLock(key); if (flag) { insert(); } }

给这个方法添加一个@Transaction注解开启事务,如代码中抛出异常进行回滚,要知道数据库事务可是有超时时间限制的,并不会无条件的一直等一个耗时的数据库操作。

比如:我们解析一个大文件,再将数据存入到数据库,如果执行时间太长,就会导致事务超时自动回滚。

一旦你的key长时间获取不到锁,获取锁等待的时间远超过数据库事务超时时间,程序就会报异常。

一般为解决这种问题,我们就需要将数据库事务改为手动提交、回滚事务。

@Autowired DataSourceTransactionManager dataSourceTransactionManager; @Transaction public void lock() { //手动开启事务 TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); try { while (true) { boolean flag = this.getLock(key); if (flag) { insert(); //手动提交事务 dataSourceTransactionManager.commit(transactionStatus); } } } catch (Exception e) { //手动回滚事务 dataSourceTransactionManager.rollback(transactionStatus); } }

四、锁过期了,业务还没执行完

这种情况和我们上边提到的第二种比较类似,但解决思路上略有不同。

同样是redis分布式锁过期,而业务逻辑没执行完的场景,不过,这里换一种思路想问题,把redis锁的过期时间再弄长点不就解决了吗?

那还是有问题,我们可以在加锁的时候,手动调长redis锁的过期时间,可这个时间多长合适?业务逻辑的执行时间是不可控的,调的过长又会影响操作性能。

要是redis锁的过期时间能够自动续期就好了。

为了解决这个问题我们使用redis客户端redisson,redisson很好的解决了redis在分布式环境下的一些棘手问题,它的宗旨就是让使用者减少对Redis的关注,将更多精力用在处理业务逻辑上。

redisson对分布式锁做了很好封装,只需调用API即可。

RLock lock = redissonClient.getLock("stockLock");

redisson在加锁成功后,会注册一个定时任务监听这个锁,每隔10秒就去查看这个锁,如果还持有锁,就对过期时间进行续期。默认过期时间30秒。这个机制也被叫做:“看门狗”,这名字。。。

举例子:假如加锁的时间是30秒,过10秒检查一次,一旦加锁的业务没有执行完,就会进行一次续期,把锁的过期时间再次重置成30秒。

通过分析下边redisson的源码实现可以发现,不管是加锁、解锁、续约都是客户端把一些复杂的业务逻辑,通过封装在Lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性。

&#64;Slf4j&#64;Servicepublic class RedisDistributionLockPlus { /** * 加锁超时时间&#xff0c;单位毫秒&#xff0c; 即&#xff1a;加锁时间内执行完操作&#xff0c;如果未完成会有并发现象 */ private static final long DEFAULT_LOCK_TIMEOUT &#61; 30; private static final long TIME_SECONDS_FIVE &#61; 5 ; /** * 每个key的过期时间 {&#64;link LockContent} */ private Map lockContentMap &#61; new ConcurrentHashMap<>(512); /** * redis执行成功的返回 */ private static final Long EXEC_SUCCESS &#61; 1L; /** * 获取锁lua脚本&#xff0c; k1&#xff1a;获锁key, k2&#xff1a;续约耗时key, arg1:requestId&#xff0c;arg2&#xff1a;超时时间 */ private static final String LOCK_SCRIPT &#61; "if redis.call(&#39;exists&#39;, KEYS[2]) &#61;&#61; 1 then ARGV[2] &#61; math.floor(redis.call(&#39;get&#39;, KEYS[2]) &#43; 10) end " &#43; "if redis.call(&#39;exists&#39;, KEYS[1]) &#61;&#61; 0 then " &#43; "local t &#61; redis.call(&#39;set&#39;, KEYS[1], ARGV[1], &#39;EX&#39;, ARGV[2]) " &#43; "for k, v in pairs(t) do " &#43; "if v &#61;&#61; &#39;OK&#39; then return tonumber(ARGV[2]) end " &#43; "end " &#43; "return 0 end"; /** * 释放锁lua脚本, k1&#xff1a;获锁key, k2&#xff1a;续约耗时key, arg1:requestId&#xff0c;arg2&#xff1a;业务耗时 arg3: 业务开始设置的timeout */ private static final String UNLOCK_SCRIPT &#61; "if redis.call(&#39;get&#39;, KEYS[1]) &#61;&#61; ARGV[1] then " &#43; "local ctime &#61; tonumber(ARGV[2]) " &#43; "local biz_timeout &#61; tonumber(ARGV[3]) " &#43; "if ctime > 0 then " &#43; "if redis.call(&#39;exists&#39;, KEYS[2]) &#61;&#61; 1 then " &#43; "local avg_time &#61; redis.call(&#39;get&#39;, KEYS[2]) " &#43; "avg_time &#61; (tonumber(avg_time) * 8 &#43; ctime * 2)/10 " &#43; "if avg_time >&#61; biz_timeout - 5 then redis.call(&#39;set&#39;, KEYS[2], avg_time, &#39;EX&#39;, 24*60*60) " &#43; "else redis.call(&#39;del&#39;, KEYS[2]) end " &#43; "elseif ctime > biz_timeout -5 then redis.call(&#39;set&#39;, KEYS[2], ARGV[2], &#39;EX&#39;, 24*60*60) end " &#43; "end " &#43; "return redis.call(&#39;del&#39;, KEYS[1]) " &#43; "else return 0 end"; /** * 续约lua脚本 */ private static final String RENEW_SCRIPT &#61; "if redis.call(&#39;get&#39;, KEYS[1]) &#61;&#61; ARGV[1] then return redis.call(&#39;expire&#39;, KEYS[1], ARGV[2]) else return 0 end"; private final StringRedisTemplate redisTemplate; public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) { this.redisTemplate &#61; redisTemplate; ScheduleTask task &#61; new ScheduleTask(this, lockContentMap); // 启动定时任务 ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS); } /** * 加锁 * 取到锁加锁&#xff0c;取不到锁一直等待知道获得锁 * * &#64;param lockKey * &#64;param requestId 全局唯一 * &#64;param expire 锁过期时间, 单位秒 * &#64;return */ public boolean lock(String lockKey, String requestId, long expire) { log.info("开始执行加锁, lockKey &#61;{}, requestId&#61;{}", lockKey, requestId); for (; ; ) { // 判断是否已经有线程持有锁&#xff0c;减少redis的压力 LockContent lockContentOld &#61; lockContentMap.get(lockKey); boolean unLocked &#61; null &#61;&#61; lockContentOld; // 如果没有被锁&#xff0c;就获取锁 if (unLocked) { long startTime &#61; System.currentTimeMillis(); // 计算超时时间 long bizExpire &#61; expire &#61;&#61; 0L ? DEFAULT_LOCK_TIMEOUT : expire; String lockKeyRenew &#61; lockKey &#43; "_renew"; RedisScript script &#61; RedisScript.of(LOCK_SCRIPT, Long.class); List keys &#61; new ArrayList<>(); keys.add(lockKey); keys.add(lockKeyRenew); Long lockExpire &#61; redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire)); if (null !&#61; lockExpire && lockExpire > 0) { // 将锁放入map LockContent lockContent &#61; new LockContent(); lockContent.setStartTime(startTime); lockContent.setLockExpire(lockExpire); lockContent.setExpireTime(startTime &#43; lockExpire * 1000); lockContent.setRequestId(requestId); lockContent.setThread(Thread.currentThread()); lockContent.setBizExpire(bizExpire); lockContent.setLockCount(1); lockContentMap.put(lockKey, lockContent); log.info("加锁成功, lockKey &#61;{}, requestId&#61;{}", lockKey, requestId); return true; } } // 重复获取锁&#xff0c;在线程池中由于线程复用&#xff0c;线程相等并不能确定是该线程的锁 if (Thread.currentThread() &#61;&#61; lockContentOld.getThread() && requestId.equals(lockContentOld.getRequestId())){ // 计数 &#43;1 lockContentOld.setLockCount(lockContentOld.getLockCount()&#43;1); return true; } // 如果被锁或获取锁失败&#xff0c;则等待100毫秒 try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { // 这里用lombok 有问题 log.error("获取redis 锁失败, lockKey &#61;{}, requestId&#61;{}", lockKey, requestId, e); return false; } } } /** * 解锁 * * &#64;param lockKey * &#64;param lockValue */ public boolean unlock(String lockKey, String lockValue) { String lockKeyRenew &#61; lockKey &#43; "_renew"; LockContent lockContent &#61; lockContentMap.get(lockKey); long consumeTime; if (null &#61;&#61; lockContent) { consumeTime &#61; 0L; } else if (lockValue.equals(lockContent.getRequestId())) { int lockCount &#61; lockContent.getLockCount(); // 每次释放锁&#xff0c; 计数 -1&#xff0c;减到0时删除redis上的key if (--lockCount > 0) { lockContent.setLockCount(lockCount); return false; } consumeTime &#61; (System.currentTimeMillis() - lockContent.getStartTime()) / 1000; } else { log.info("释放锁失败&#xff0c;不是自己的锁。"); return false; } // 删除已完成key&#xff0c;先删除本地缓存&#xff0c;减少redis压力, 分布式锁&#xff0c;只有一个&#xff0c;所以这里不加锁 lockContentMap.remove(lockKey); RedisScript script &#61; RedisScript.of(UNLOCK_SCRIPT, Long.class); List keys &#61; new ArrayList<>(); keys.add(lockKey); keys.add(lockKeyRenew); Long result &#61; redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime), Long.toString(lockContent.getBizExpire())); return EXEC_SUCCESS.equals(result); } /** * 续约 * * &#64;param lockKey * &#64;param lockContent * &#64;return true:续约成功&#xff0c;false:续约失败(1、续约期间执行完成&#xff0c;锁被释放 2、不是自己的锁&#xff0c;3、续约期间锁过期了(未解决)) */ public boolean renew(String lockKey, LockContent lockContent) { // 检测执行业务线程的状态 Thread.State state &#61; lockContent.getThread().getState(); if (Thread.State.TERMINATED &#61;&#61; state) { log.info("执行业务的线程已终止,不再续约 lockKey &#61;{}, lockContent&#61;{}", lockKey, lockContent); return false; } String requestId &#61; lockContent.getRequestId(); long timeOut &#61; (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000; RedisScript script &#61; RedisScript.of(RENEW_SCRIPT, Long.class); List keys &#61; new ArrayList<>(); keys.add(lockKey); Long result &#61; redisTemplate.execute(script, keys, requestId, Long.toString(timeOut)); log.info("续约结果&#xff0c;True成功&#xff0c;False失败 lockKey &#61;{}, result&#61;{}", lockKey, EXEC_SUCCESS.equals(result)); return EXEC_SUCCESS.equals(result); } static class ScheduleExecutor { public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) { long delay &#61; unit.toMillis(initialDelay); long period_ &#61; unit.toMillis(period); // 定时执行 new Timer("Lock-Renew-Task").schedule(task, delay, period_); } } static class ScheduleTask extends TimerTask { private final RedisDistributionLockPlus redisDistributionLock; private final Map lockContentMap; public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map lockContentMap) { this.redisDistributionLock &#61; redisDistributionLock; this.lockContentMap &#61; lockContentMap; } &#64;Override public void run() { if (lockContentMap.isEmpty()) { return; } Set> entries &#61; lockContentMap.entrySet(); for (Map.Entry entry : entries) { String lockKey &#61; entry.getKey(); LockContent lockContent &#61; entry.getValue(); long expireTime &#61; lockContent.getExpireTime(); // 减少线程池中任务数量 if ((expireTime - System.currentTimeMillis())/ 1000 { boolean renew &#61; redisDistributionLock.renew(lockKey, lockContent); if (renew) { long expireTimeNew &#61; lockContent.getStartTime() &#43; (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000; lockContent.setExpireTime(expireTimeNew); } else { // 续约失败&#xff0c;说明已经执行完 OR redis 出现问题 lockContentMap.remove(lockKey); } }); } } } }}

五、redis主从复制的坑

redis高可用最常见的方案就是主从复制(master-slave)&#xff0c;这种模式也给redis分布式锁挖了一坑。

redis cluster集群环境下&#xff0c;假如现在A客户端想要加锁&#xff0c;它会根据路由规则选择一台master节点写入key mylock&#xff0c;在加锁成功后&#xff0c;master节点会把key异步复制给对应的slave节点。

如果此时redis master节点宕机&#xff0c;为保证集群可用性&#xff0c;会进行主备切换&#xff0c;slave变为了redis master。B客户端在新的master节点上加锁成功&#xff0c;而A客户端也以为自己还是成功加了锁的。

此时就会导致同一时间内多个客户端对一个分布式锁完成了加锁&#xff0c;导致各种脏数据的产生。

至于解决办法嘛&#xff0c;目前看还没有什么根治的方法&#xff0c;只能尽量保证机器的稳定性&#xff0c;减少发生此事件的概率。

总结

上面就是我在使用Redis 分布式锁时遇到的一些坑&#xff0c;有点小感慨&#xff0c;经常用一个方法填上这个坑&#xff0c;没多久就发现另一个坑又出来了&#xff0c;其实根本没有什么十全十美的解决方案&#xff0c;哪有什么银弹&#xff0c;只不过是在权衡利弊后&#xff0c;选一个在接受范围内的折中方案而已。

作者&#xff1a;程序员内点事
链接&#xff1a;https://juejin.im/post/5e9ff1766fb9a03c3b2bddab
来源&#xff1a;掘金




推荐阅读
  • Nacos 0.3 数据持久化详解与实践
    本文详细介绍了如何将 Nacos 0.3 的数据持久化到 MySQL 数据库,并提供了具体的步骤和注意事项。 ... [详细]
  • JUC(三):深入解析AQS
    本文详细介绍了Java并发工具包中的核心类AQS(AbstractQueuedSynchronizer),包括其基本概念、数据结构、源码分析及核心方法的实现。 ... [详细]
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • Java高并发与多线程(二):线程的实现方式详解
    本文将深入探讨Java中线程的三种主要实现方式,包括继承Thread类、实现Runnable接口和实现Callable接口,并分析它们之间的异同及其应用场景。 ... [详细]
  • 本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ... [详细]
  • 本文详细介绍了 InfluxDB、collectd 和 Grafana 的安装与配置流程。首先,按照启动顺序依次安装并配置 InfluxDB、collectd 和 Grafana。InfluxDB 作为时序数据库,用于存储时间序列数据;collectd 负责数据的采集与传输;Grafana 则用于数据的可视化展示。文中提供了 collectd 的官方文档链接,便于用户参考和进一步了解其配置选项。通过本指南,读者可以轻松搭建一个高效的数据监控系统。 ... [详细]
  • PTArchiver工作原理详解与应用分析
    PTArchiver工作原理及其应用分析本文详细解析了PTArchiver的工作机制,探讨了其在数据归档和管理中的应用。PTArchiver通过高效的压缩算法和灵活的存储策略,实现了对大规模数据的高效管理和长期保存。文章还介绍了其在企业级数据备份、历史数据迁移等场景中的实际应用案例,为用户提供了实用的操作建议和技术支持。 ... [详细]
  • 本文介绍了 Java 中 io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evfiltSock() 方法的使用及其代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • 本文详细介绍了在 CentOS 7 系统中配置 fstab 文件以实现开机自动挂载 NFS 共享目录的方法,并解决了常见的配置失败问题。 ... [详细]
  • 在分析Android的Audio系统时,我们对mpAudioPolicy->get_input进行了详细探讨,发现其背后涉及的机制相当复杂。本文将详细介绍这一过程及其背后的实现细节。 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • MySQL Decimal 类型的最大值解析及其在数据处理中的应用艺术
    在关系型数据库中,表的设计与SQL语句的编写对性能的影响至关重要,甚至可占到90%以上。本文将重点探讨MySQL中Decimal类型的最大值及其在数据处理中的应用技巧,通过实例分析和优化建议,帮助读者深入理解并掌握这一重要知识点。 ... [详细]
  • 深入解析Android GPS机制:第五部分 ... [详细]
  • 在使用 Cacti 进行监控时,发现已运行的转码机未产生流量,导致 Cacti 监控界面显示该转码机处于宕机状态。进一步检查 Cacti 日志,发现数据库中存在 SQL 查询失败的问题,错误代码为 145。此问题可能是由于数据库表损坏或索引失效所致,建议对相关表进行修复操作以恢复监控功能。 ... [详细]
  • 如何在PHP中准确获取服务器IP地址?
    如何在PHP中准确获取服务器IP地址? ... [详细]
author-avatar
Lucky_Leeing
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有