在Java中,关于锁我想大家都很熟悉。在并发编程中,我们通过锁,来避免由于竞争而造成的数据不一致问题。通常我们以进程锁synchronized 、Lock来实现它,对于分布式程序,就不能用进程锁了,这时候常用的是分布式锁。
什么是分布式锁分布式锁,是一种思想,它的实现方式有很多。比如,我们将沙滩当做分布式锁的组件,那么它看起来应该是这样的:
在沙滩上踩一脚,留下自己的脚印,就对应了加锁操作。其他进程或者线程,看到沙滩上已经有脚印,证明锁已被别人持有,则等待。
把脚印从沙滩上抹去,就是解锁的过程。
为了避免死锁,我们可以设置一阵风,在单位时间后刮起,将脚印自动抹去。
分布式锁的实现有很多,比如基于数据库、memcached、Redis、系统文件、zookeeper等。它们的核心的理念跟上面的过程大致相同。基于数据库可以用乐观锁和悲观锁处理分页式锁,乐观锁使用对比记录version号来实现,悲观锁使用类似“select * where * for update”行锁实现。
本文讨论的是基于redis实现分页式锁的问题,别的方面不做详说,有相关需求可以参考和查阅别的资料。
Redis分布式锁原理加锁实际上就是在redis中,给Key键设置一个值,为避免死锁,并给定一个过期时间。
SET lock_key random_value NX PX 5000
值得注意的是:
random_value 是客户端生成的唯一的字符串。
NX 代表只在键不存在时,才对键进行设置操作。
PX 5000 设置键的过期时间为5000毫秒。
这样,如果上面的命令执行成功,则证明客户端获取到了锁。
解锁的过程就是将Key键删除。但也不能乱删,不能说客户端1的请求将客户端2的锁给删除掉。这时候random_value的作用就体现出来。
为了保证解锁操作的原子性,我们用LUA脚本完成这一操作。先判断当前锁的字符串是否与传入的值相等,是的话就删除Key,解锁成功。
if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1])
elsereturn 0
end
jedis实现(单节点)
/*** 获取分布式锁:一分命令,保证事务的一致性。* @param lockKey* @param requestId* @param expireTime* @return*/public static boolean getDistributeLock(String lockKey, String requestId, long expireTime) {Jedis jedis = null;try {jedis = getResource();String result = jedis.set(lockKey,requestId,"NX","PX",expireTime);if ("OK".equals(result)) {return true;}} catch (Exception e) {logger.error("getDistributeLock {}", lockKey, e);} finally {returnResource(jedis);}return false;}/*** 释放分布式锁:使用lua脚本,一个命令实现对带有标志的锁的释放* @param lockKey* @param requestId* @return*/public static boolean releaseDistributeLock(String lockKey, String requestId) {Jedis jedis = null;try {jedis = getResource();String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));Long RELEASE_SUCCESS = 1L;if (RELEASE_SUCCESS.equals(result)) {return true;}} catch (Exception e) {logger.error("releaseDistributeLock {}", lockKey, e);} finally {returnResource(jedis);}return false;}
注意:这里的requestId,类似客户端口请求id,每次请求都是不同的可以使用uuid,测试和使用可以参考后面的”测试和说明“部分。
缺点:在集群包括主从、哨兵模式、集群模式不可用;锁不具有可重入性。
redisson实现(通用)Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。它里面也实现了分布式锁,而且包含多种类型的锁:可重入锁,公平锁等。
具体实现如下:
JedisUtil提供
//从配置类中获取redisson对象private static Redisson redisson = JedisConfig.getRedisson();//加锁 Redisson:适用单机、主从、哨兵和集群//同步方法,等待锁返回执行 所以涉及锁使用的,可以放在线程池中进行public static boolean acquire(String lockName){//声明key对象String key = lockName;//获取锁对象RLock mylock = redisson.getLock(key);//加锁,并且设置锁过期时间,防止死锁的产生mylock.lock(2, TimeUnit.MINUTES); // 分钟//加锁成功return true;}//锁的释放 Redisson:适用单机、主从、哨兵和集群//同步方法,等待锁返回执行 所以涉及锁使用的,可以放在线程池中进行public static void release(String lockName){//必须是和加锁时的同一个keyString key = lockName;//获取所对象RLock mylock = redisson.getLock(key);//释放锁(解锁)mylock.unlock();}
JedisConfig提供
private static Config config = new Config();
//声明redisso对象
private static Redisson redisson = null;static{//可以用"redis://"来启用SSL连接if (IS_CLUSTER.equals(CLUSTER_USED)) {//集群log.info("Redisson redis lock init cluster config:"+server1+";"+server2+";"+server3+";"+server4+";"+server5+";"+server6);config.useClusterServers().addNodeAddress("redis://".concat(server1),"redis://".concat(server2), "redis://".concat(server3),"redis://".concat(server4),"redis://".concat(server5), "redis://".concat(server6)).setScanInterval(5000);} else {//单机log.info("Redisson redis lock init single node config:"+server1+";"+server2+";"+server3+";"+server4+";"+server5+";"+server6);config.useSingleServer().setAddress("redis://".concat(poolHost).concat(":").concat(poolPort));}//得到redisson对象redisson = (Redisson) Redisson.create(config);}/*** Redisson redis分布式锁处理对象* @return*/public static Redisson getRedisson() {return redisson;}
测试和说明
测试和使用,可以参考下面的junit测试用例。
https://blog.csdn.net/u014353343/article/details/88921212 https://www.jianshu.com/p/828aa3b44564 https://www.jianshu.com/p/47fd7f86c848 臭味相投的朋友们,我在这里:@Slf4j
public class JedisUtilTest extends SpringTxTestCase {private static Logger logger &#61; LoggerFactory.getLogger(JedisUtils.class);/*** 单机版本&#xff1a;加解锁功能*/&#64;Testpublic void testSingleRedisLockAndUnlock(){JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE);{for (int i &#61; 0; i <5; i&#43;&#43;) {boolean result &#61; JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE);System.out.println(Thread.currentThread().getName()&#43;"&#xff1a;lock result:"&#43;result);JedisUtils.releaseDistributeLock("lockKey","requestId");boolean result1 &#61; JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE);System.out.println(Thread.currentThread().getName()&#43;"&#xff1a;unlock result1:"&#43;result1);}}}/*** 单机版本&#xff1a;锁测试*/&#64;Testpublic void testSingleRedisLock(){{final CyclicBarrier cbRef &#61; new CyclicBarrier(10);final ReentrantLock reentrantLock&#61;new ReentrantLock();for(int i&#61;0;i<10;i&#43;&#43;){Thread t&#61; new Thread(new Runnable() {&#64;Overridepublic void run() {try {System.out.println(Thread.currentThread().getName() &#43; "准备");cbRef.await();//10个线程等待在这里 才开始执行下面的//reentrantLock.lock();//tryGetDistributedLock("hello","hello",10000);boolean result &#61; JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE);System.out.println(Thread.currentThread().getName()&#43;"&#61;&#61;&#61;lock result:"&#43;result);JedisUtils.releaseDistributeLock("lockKey",UUID.randomUUID().toString());boolean result1 &#61; JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE);System.out.println(Thread.currentThread().getName()&#43;"&#61;&#61;&#61;lock result1:"&#43;result);} catch (Exception e) {e.printStackTrace();}finally {//reentrantLock.unlock();}}});t.start();}//这一段可以不要try {Thread.sleep(2000);System.out.println(Thread.currentThread().getName() &#43; "起跑");System.out.println( cbRef.getParties()&#43;"--" &#43;cbRef.getNumberWaiting());} catch (Exception e){e.printStackTrace();}}}/*** 单机版本redis&#xff1a;测试分布式锁的使用方法*/&#64;Testpublic void testUseOfSingleRedisLock() throws InterruptedException {final CountDownLatch countDownLatch &#61; new CountDownLatch(10);String data2Deal &#61; "data to deal";final CyclicBarrier cbRef &#61; new CyclicBarrier(10);for(int i&#61;0;i<10;i&#43;&#43;){Thread t&#61; new Thread(new Runnable() {&#64;Overridepublic void run() {System.out.println(Thread.currentThread().getName() &#43; "准备");try {cbRef.await();//10个线程等待在这里 才开始执行下面的&#43;} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}final ReentrantLock reentrantLock&#61;new ReentrantLock();reentrantLock.lock();try {String data2Deal &#61; "data to deal&#xff1a;" &#43; Thread.currentThread().getName();useOfSingleRedisLock(data2Deal);} catch (Exception e){e.printStackTrace();} finally {reentrantLock.unlock();}countDownLatch.countDown();}});t.start();}countDownLatch.await();System.out.println("所有线程都执行完了……");}/*** 分布式锁的使用方法&#xff1a;单机redis cluster包括(集群和哨兵)不适用。* &#64;param data2Deal*/public void useOfSingleRedisLock(String data2Deal){String requestId &#61; UUID.randomUUID().toString();if(JedisPoolUtils.getDistributeLock("lock_key", requestId, 1000*60*5)){try {methonNeedDisLock(data2Deal);} catch (Exception e) {logger.error("分布式锁业务处理失败&#xff01;",e);e.printStackTrace();} finally {JedisPoolUtils.releaseDistributeLock("lock_key",requestId);}} else {try {Thread.sleep(1000);useOfSingleRedisLock(data2Deal);} catch (InterruptedException e) {logger.error(e.getMessage());}}}/** 需要分布式锁的业务代码*/public void methonNeedDisLock(String data2Deal){System.out.println("分布式锁业务处理方法:"&#43;data2Deal);}/*** 测试分布式锁(Redisson)的使用方法&#xff1a;redis单机和哨兵、集群都适用* 测试说明&#xff1a;开启1000个线程&#xff0c;对count进行累加*/int count &#61; 0;&#64;Testpublic void testRedisLock() throws InterruptedException {int clientcount &#61;1000;final CountDownLatch countDownLatch &#61; new CountDownLatch(clientcount);ExecutorService executorService &#61; Executors.newFixedThreadPool(clientcount);long start &#61; System.currentTimeMillis();for (int i &#61; 0;i
参考
猿in小站:http://www.yuanin.net
csdn博客:https://blog.csdn.net/jiabeis
简书:https://www.jianshu.com/u/4cb7d664ec4b
微信免费订阅号“猿in”