热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Redis分布式锁分析

方案:利用Redis的setnx方法简单实现分布式锁Redis中setnx方法有如下优点:时间复杂度:O(1)返回值:命令

方案:利用Redis的setnx方法简单实现分布式锁

Redis中setnx方法有如下优点:

时间复杂度: O(1)
返回值:命令在设置成功时返回 1 , 设置失败时返回 0 。

  1. 只在键 key 不存在的情况下, 将键 key 的值设置为 value 。
  2. 若键 key 已经存在, 则 SETNX 命令不做任何动作。
  3. SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。


一、分布式锁实际案例

基础方法

public void lock(String key, Object value, long timeout) {key = appendApplicationName(key);// 底层原理就是Redis的setnx方法// TODO 有更好的办法实现,下面会分析boolean isSuccess = false;while (!isSuccess) {//设置分布式锁的过期时间isSuccess = redisTemplate.opsForValue().setIfAbsent(key, value);if (isSuccess) {redisTemplate.expire(key, timeout, TimeUnit.SECONDS);} else {try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}}}
}public void unLock(String key) {key = appendApplicationName(key);redisTemplate.delete(key);
}

 业务逻辑

public void runTask(Task task, StageExecute stageExecute, PlanExecType planExecType) {//生成阶段任务执行记录TaskExecute taskExecute;String pipelineExecId;try {cacheClient.lock("stage-exec-" + stageExecute.getStageId(), stageExecute.getStageId(), 10);pipelineExecId = PipelineUtils.generatePipelineExecuteId();taskExecute = taskExecService.generateTaskExecute(task, null, pipelineExecId, stageExecute);} finally {cacheClient.unLock("stage-exec-" + stageExecute.getStageId());}//生成任务并执行TaskRun taskRun = TaskRun.builder().task(task).taskExecute(taskExecute).pipelineExecId(pipelineExecId).stageExecute(stageExecute).planExecType(planExecType).build();taskRun.setRunType(PipelineRun.RunType.NEW);pipelineScheduledExecutorService.startTask(taskRun);
}

二、Redis分布式锁简单实现 

熟悉Redis的同学那么肯定对setNx(set if not exist)方法不陌生,如果不存在则更新,其可以很好的用来实现我们的分布式锁。对于某个资源加锁我们只需要:

setNx resourceName value

这里有个问题,如果加锁了之后机器宕机,那么这个锁就不会得到释放。所以需要加入过期时间,加入过期时间需要和setNx同一个原子操作,在Redis2.8之前我们需要使用Lua脚本达到我们的目的,但是redis2.8之后redis支持nx和ex操作是同一原子操作。

set resourceName value ex 5 nx


 三、Redission实现分布式锁

Redission封装了锁的实现,其继承了java.util.concurrent.locks.Lock的接口,让我们像操作我们的本地Lock一样去操作Redission的Lock,下面介绍一下其如何实现分布式锁。


加锁过程

1、尝试加锁:首先会尝试进行加锁,由于保证操作是原子性,那么就只能使用lua脚本,相关的lua脚本如下:

RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {internalLockLeaseTime = unit.toMillis(leaseTime);return evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}protected RFuture evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) {CommandBatchService executorService = createCommandBatchService();RFuture result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);if (!(commandExecutor instanceof CommandBatchService)) {executorService.executeAsync();}return result;
}

可以看见他并没有使用我们的sexNx来进行操作,而是使用的hash结构。我们的每一个需要锁定的资源都可以看做是一个HashMap,锁定资源的节点信息是Key,锁定次数是value。通过这种方式可以很好的实现可重入的效果,只需要对value进行加1操作,就能进行可重入锁。

2、如果尝试加锁失败,判断是否超时,如果超时则返回false。

3、如果加锁失败之后,没有超时,那么需要在名字为redisson_lock__channel+lockName的channel上进行订阅,用于订阅解锁消息,然后一直阻塞直到超时,或者有解锁消息。

4、重试步骤1,2,3,直到最后获取到锁,或者某一步获取锁超时。
 

释放锁过程

对于我们的unlock方法比较简单也是通过lua脚本进行解锁,如果是可重入锁,只是减1。如果是非加锁线程解锁,那么解锁失败。

protected RFuture unlockInnerAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +"return nil;",Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

Redission还有公平锁的实现,对于公平锁其利用了list结构和hashset结构分别用来保存我们排队的节点,和我们节点的过期时间,用这两个数据结构帮助我们实现公平锁,这里就不展开介绍了,有兴趣可以参考源码。


四、RedLock 

我们想象一个这样的场景当机器A申请到一把锁之后,如果Redis主宕机了,这个时候从机并没有同步到这一把锁,那么机器B再次申请的时候就会再次申请到这把锁,为了解决这个问题Redis作者提出了RedLock红锁的算法,在Redission中也对RedLock进行了实现。

 通过上面的代码,我们需要实现多个Redis集群,然后进行红锁的加锁,解锁。具体的步骤如下:

  1. 首先生成多个Redis集群的Rlock,并将其构造成RedLock。
  2. 依次循环对三个集群进行加锁&#xff0c;加锁的过程和<第三节>里面一致。
  3. 如果循环加锁的过程中加锁失败&#xff0c;那么需要判断加锁失败的次数是否超出了最大值&#xff0c;这里的最大值是根据集群的个数&#xff0c;比如三个那么只允许失败一个&#xff0c;五个的话只允许失败两个&#xff0c;要保证多数成功。
  4. 加锁的过程中需要判断是否加锁超时&#xff0c;有可能我们设置加锁只能用3ms&#xff0c;第一个集群加锁已经消耗了3ms了。那么也算加锁失败。
  5. 3&#xff0c;4步里面加锁失败的话&#xff0c;那么就会进行解锁操作&#xff0c;解锁会对所有的集群在请求一次解锁。

可以看见RedLock基本原理是利用多个Redis集群&#xff0c;用多数的集群加锁成功&#xff0c;减少Redis某个集群出故障&#xff0c;造成分布式锁出现问题的概率。


五、分布式锁看门狗机制

private void renewExpiration() {ExpirationEntry ee &#61; EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee &#61;&#61; null) {return;}Timeout task &#61; commandExecutor.getConnectionManager().newTimeout(new TimerTask() {&#64;Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent &#61; EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent &#61;&#61; null) {return;}Long threadId &#61; ent.getFirstThreadId();if (threadId &#61;&#61; null) {return;}// 调用lua脚本进行续期RFuture future &#61; renewExpirationAsync(threadId);future.onComplete((res, e) -> {// 报异常就移除keyif (e !&#61; null) {log.error("Can&#39;t update lock " &#43; getName() &#43; " expiration", e);return;}// 续期成功的话就下一轮续期 if (res) {// reschedule itselfrenewExpiration();}else {// 续期失败的话就取消续期&#xff0c;移除key等操作cancelExpirationRenewal(null);}});}// 这里是个知识点&#xff0c;续期线程在过期时间达到三分之一的时候工作&#xff0c;比如9s过期时间&#xff0c;那么续期会在第3秒的时候工作&#xff0c;也就是还剩余6s的时候进行续期}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}

这里有四个关键点&#xff1a;

  1. 续期核心lua脚本在renewExpirationAsync里
  2. 续期成功自己调用自己&#xff0c;也就是为下一次续期做准备
  3. 续期失败就取消续期&#xff0c;移除key等操作
  4. 续期的开始时间是超过过期时间的三分之一&#xff0c;比如9s过期时间&#xff0c;那么第3s的时候开始续期。

protected RFuture renewExpirationAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[2]) &#61;&#61; 1) then " &#43;"redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); " &#43;"return 1; " &#43;"end; " &#43;"return 0;",Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId));
}

很简单&#xff0c;就是看当前线程有没有加锁​​hexists, KEYS[1], ARGV[2]) &#61;&#61; 1​​&#xff0c;有加锁的话就代表业务线程还没执行完&#xff0c;就给他的锁重新续期​​pexpire&#39;, KEYS[1], ARGV[1]​​&#xff0c;然后返回1&#xff0c;也就是true&#xff0c;没加锁的话返回0&#xff0c;也就是false。

那就是返回1就调用自己准备下一次续期&#xff1a;​​renewExpiration();​​&#xff0c;返回0就调用​​cancelExpirationRenewal(null);​​取消续期&#xff0c;删除key等操作。



推荐阅读
  • 优化后的标题:深入探讨网关安全:将微服务升级为OAuth2资源服务器的最佳实践
    本文深入探讨了如何将微服务升级为OAuth2资源服务器,以订单服务为例,详细介绍了在POM文件中添加 `spring-cloud-starter-oauth2` 依赖,并配置Spring Security以实现对微服务的保护。通过这一过程,不仅增强了系统的安全性,还提高了资源访问的可控性和灵活性。文章还讨论了最佳实践,包括如何配置OAuth2客户端和资源服务器,以及如何处理常见的安全问题和错误。 ... [详细]
  • 开发日志:201521044091 《Java编程基础》第11周学习心得与总结
    开发日志:201521044091 《Java编程基础》第11周学习心得与总结 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • 本指南介绍了如何在ASP.NET Web应用程序中利用C#和JavaScript实现基于指纹识别的登录系统。通过集成指纹识别技术,用户无需输入传统的登录ID即可完成身份验证,从而提升用户体验和安全性。我们将详细探讨如何配置和部署这一功能,确保系统的稳定性和可靠性。 ... [详细]
  • 在Java基础中,私有静态内部类是一种常见的设计模式,主要用于防止外部类的直接调用或实例化。这种内部类仅服务于其所属的外部类,确保了代码的封装性和安全性。通过分析JDK源码,我们可以发现许多常用类中都包含了私有静态内部类,这些内部类虽然功能强大,但其复杂性往往让人感到困惑。本文将深入探讨私有静态内部类的作用、实现方式及其在实际开发中的应用,帮助读者更好地理解和使用这一重要的编程技巧。 ... [详细]
  • 使用Maven JAR插件将单个或多个文件及其依赖项合并为一个可引用的JAR包
    本文介绍了如何利用Maven中的maven-assembly-plugin插件将单个或多个Java文件及其依赖项打包成一个可引用的JAR文件。首先,需要创建一个新的Maven项目,并将待打包的Java文件复制到该项目中。通过配置maven-assembly-plugin,可以实现将所有文件及其依赖项合并为一个独立的JAR包,方便在其他项目中引用和使用。此外,该方法还支持自定义装配描述符,以满足不同场景下的需求。 ... [详细]
  • 本文详细介绍了一种利用 ESP8266 01S 模块构建 Web 服务器的成功实践方案。通过具体的代码示例和详细的步骤说明,帮助读者快速掌握该模块的使用方法。在疫情期间,作者重新审视并研究了这一未被充分利用的模块,最终成功实现了 Web 服务器的功能。本文不仅提供了完整的代码实现,还涵盖了调试过程中遇到的常见问题及其解决方法,为初学者提供了宝贵的参考。 ... [详细]
  • 在Java Web服务开发中,Apache CXF 和 Axis2 是两个广泛使用的框架。CXF 由于其与 Spring 框架的无缝集成能力,以及更简便的部署方式,成为了许多开发者的首选。本文将详细介绍如何使用 CXF 框架进行 Web 服务的开发,包括环境搭建、服务发布和客户端调用等关键步骤,为开发者提供一个全面的实践指南。 ... [详细]
  • 本文介绍了如何利用 Delphi 中的 IdTCPServer 和 IdTCPClient 控件实现高效的文件传输。这些控件在默认情况下采用阻塞模式,并且服务器端已经集成了多线程处理,能够支持任意大小的文件传输,无需担心数据包大小的限制。与传统的 ClientSocket 相比,Indy 控件提供了更为简洁和可靠的解决方案,特别适用于开发高性能的网络文件传输应用程序。 ... [详细]
  • SQL 查询实体优化与实战技巧分享 ... [详细]
  • 本文介绍了如何利用ObjectMapper实现JSON与JavaBean之间的高效转换。ObjectMapper是Jackson库的核心组件,能够便捷地将Java对象序列化为JSON格式,并支持从JSON、XML以及文件等多种数据源反序列化为Java对象。此外,还探讨了在实际应用中如何优化转换性能,以提升系统整体效率。 ... [详细]
  • Java中不同类型的常量池(字符串常量池、Class常量池和运行时常量池)的对比与关联分析
    在研究Java虚拟机的过程中,笔者发现存在多种类型的常量池,包括字符串常量池、Class常量池和运行时常量池。通过查阅CSDN、博客园等相关资料,对这些常量池的特性、用途及其相互关系进行了详细探讨。本文将深入分析这三种常量池的差异与联系,帮助读者更好地理解Java虚拟机的内部机制。 ... [详细]
  • 本文深入解析了 jQuery 中用于扩展功能的三个关键方法:`$.extend()`、`$.fn` 和 `$.fn.extend()`。其中,`$.extend()` 用于扩展 jQuery 对象本身,而 `$.fn.extend()` 则用于扩展 jQuery 的原型对象,使自定义方法能够作为 jQuery 实例的方法使用。通过这些方法,开发者可以轻松地创建和集成自定义插件,增强 jQuery 的功能。文章详细介绍了每个方法的用法、参数及实际应用场景,帮助读者更好地理解和运用这些强大的工具。 ... [详细]
  • 本文将继续探讨 JavaScript 函数式编程的高级技巧及其实际应用。通过一个具体的寻路算法示例,我们将深入分析如何利用函数式编程的思想解决复杂问题。示例中,节点之间的连线代表路径,连线上的数字表示两点间的距离。我们将详细讲解如何通过递归和高阶函数等技术实现高效的寻路算法。 ... [详细]
  • 在多年使用Java 8进行新应用开发和现有应用迁移的过程中,我总结了一些非常实用的技术技巧。虽然我不赞同“最佳实践”这一术语,因为它可能暗示了通用的解决方案,但这些技巧在实际项目中确实能够显著提升开发效率和代码质量。本文将深入解析并探讨这四大高级技巧的具体应用,帮助开发者更好地利用Java 8的强大功能。 ... [详细]
author-avatar
齐鲁墨_931
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有