方案:利用Redis的setnx方法简单实现分布式锁
Redis中setnx方法有如下优点:
时间复杂度: O(1)
返回值:命令在设置成功时返回 1 , 设置失败时返回 0 。
- 只在键 key 不存在的情况下, 将键 key 的值设置为 value 。
- 若键 key 已经存在, 则 SETNX 命令不做任何动作。
- SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。
基础方法
public void lock(String key, Object value, long timeout) {key = appendApplicationName(key);// 底层原理就是Redis的setnx方法// TODO 有更好的办法实现,下面会分析boolean isSuccess = false;while (!isSuccess) {//设置分布式锁的过期时间isSuccess = redisTemplate.opsForValue().setIfAbsent(key, value);if (isSuccess) {redisTemplate.expire(key, timeout, TimeUnit.SECONDS);} else {try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}}}
}public void unLock(String key) {key = appendApplicationName(key);redisTemplate.delete(key);
}
业务逻辑
public void runTask(Task task, StageExecute stageExecute, PlanExecType planExecType) {//生成阶段任务执行记录TaskExecute taskExecute;String pipelineExecId;try {cacheClient.lock("stage-exec-" + stageExecute.getStageId(), stageExecute.getStageId(), 10);pipelineExecId = PipelineUtils.generatePipelineExecuteId();taskExecute = taskExecService.generateTaskExecute(task, null, pipelineExecId, stageExecute);} finally {cacheClient.unLock("stage-exec-" + stageExecute.getStageId());}//生成任务并执行TaskRun taskRun = TaskRun.builder().task(task).taskExecute(taskExecute).pipelineExecId(pipelineExecId).stageExecute(stageExecute).planExecType(planExecType).build();taskRun.setRunType(PipelineRun.RunType.NEW);pipelineScheduledExecutorService.startTask(taskRun);
}
二、Redis分布式锁简单实现
熟悉Redis的同学那么肯定对setNx(set if not exist)方法不陌生,如果不存在则更新,其可以很好的用来实现我们的分布式锁。对于某个资源加锁我们只需要:
setNx resourceName value
这里有个问题,如果加锁了之后机器宕机,那么这个锁就不会得到释放。所以需要加入过期时间,加入过期时间需要和setNx同一个原子操作,在Redis2.8之前我们需要使用Lua脚本达到我们的目的,但是redis2.8之后redis支持nx和ex操作是同一原子操作。
set resourceName value ex 5 nx
Redission封装了锁的实现,其继承了java.util.concurrent.locks.Lock的接口,让我们像操作我们的本地Lock一样去操作Redission的Lock,下面介绍一下其如何实现分布式锁。
1、尝试加锁:首先会尝试进行加锁,由于保证操作是原子性,那么就只能使用lua脚本,相关的lua脚本如下:
}protected
}
可以看见他并没有使用我们的sexNx来进行操作,而是使用的hash结构。我们的每一个需要锁定的资源都可以看做是一个HashMap,锁定资源的节点信息是Key,锁定次数是value。通过这种方式可以很好的实现可重入的效果,只需要对value进行加1操作,就能进行可重入锁。
2、如果尝试加锁失败,判断是否超时,如果超时则返回false。
3、如果加锁失败之后,没有超时,那么需要在名字为redisson_lock__channel+lockName的channel上进行订阅,用于订阅解锁消息,然后一直阻塞直到超时,或者有解锁消息。
4、重试步骤1,2,3,直到最后获取到锁,或者某一步获取锁超时。
对于我们的unlock方法比较简单也是通过lua脚本进行解锁,如果是可重入锁,只是减1。如果是非加锁线程解锁,那么解锁失败。
protected RFuture
}
Redission还有公平锁的实现,对于公平锁其利用了list结构和hashset结构分别用来保存我们排队的节点,和我们节点的过期时间,用这两个数据结构帮助我们实现公平锁,这里就不展开介绍了,有兴趣可以参考源码。
我们想象一个这样的场景当机器A申请到一把锁之后,如果Redis主宕机了,这个时候从机并没有同步到这一把锁,那么机器B再次申请的时候就会再次申请到这把锁,为了解决这个问题Redis作者提出了RedLock红锁的算法,在Redission中也对RedLock进行了实现。
通过上面的代码,我们需要实现多个Redis集群,然后进行红锁的加锁,解锁。具体的步骤如下:
可以看见RedLock基本原理是利用多个Redis集群,用多数的集群加锁成功,减少Redis某个集群出故障,造成分布式锁出现问题的概率。
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}// 调用lua脚本进行续期RFuture
}
这里有四个关键点:
protected RFuture
}
很简单,就是看当前线程有没有加锁hexists, KEYS[1], ARGV[2]) == 1,有加锁的话就代表业务线程还没执行完,就给他的锁重新续期pexpire', KEYS[1], ARGV[1],然后返回1,也就是true,没加锁的话返回0,也就是false。
那就是返回1就调用自己准备下一次续期:renewExpiration();,返回0就调用cancelExpirationRenewal(null);取消续期,删除key等操作。