热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

redis分布式锁java实现

1、为什么要使用分布式锁如果在一个分布式系统中,我们从数据库中读取一个数据,然后修改保存,这种情况很容易遇到并发问题。因为读取和更新保存不是一个原子操作,在并发时就会导致数据的不

1、为什么要使用分布式锁

如果在一个分布式系统中,我们从数据库中读取一个数据,然后修改保存,这种情况很容易遇到并发问题。因为读取和更新保存不是一个原子操作,在并发时就会导致数据的不正确。这种场景其实并不少见,比如电商秒杀活动,库存数量的更新就会遇到。如果是单机应用,直接使用本地锁就可以避免。如果是分布式应用,本地锁派不上用场,这时就需要引入分布式锁来解决。

由此可见分布式锁的目的其实很简单,就是为了保证多台服务器在执行某一段代码时保证只有一台服务器执行

2、为了保证分布式锁的可用性,至少要确保锁的实现要同时满足以下几点:
  • 互斥性。在任何时刻,保证只有一个客户端持有锁。
  • 不能出现死锁。如果在一个客户端持有锁的期间,这个客户端崩溃了,也要保证后续的其他客户端可以上锁。
  • 保证上锁和解锁都是同一个客户端。
3、一般来说,实现分布式锁的方式有以下几种:
  • 使用MySQL,基于唯一索引。
  • 使用ZooKeeper,基于临时有序节点。
  • 使用Redis,基于set命令(2.6.12 版本开始)。

本篇文章主要讲解Redis的实现方式。

4、用到的redis命令

锁的实现主要基于redis的SET命令(SET详细解释参考这里),我们来看SET的解释:

SET key value [EX seconds] [PX milliseconds] [NX|XX]

  • 将字符串值 value 关联到 key 。
  • 如果 key 已经持有其他值, SET 就覆写旧值,无视类型。
  • 对于某个原本带有生存时间(TTL)的键来说, 当 SET 命令成功在这个键上执行时, 这个键原有的 TTL 将被清除。
    可选参数

从 Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改:

EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。
PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。
NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。
XX :只在键已经存在时,才对键进行设置操作。

加锁:使用SET key value [PX milliseconds] [NX]命令,如果key不存在,设置value,并设置过期时间(加锁成功)。如果已经存在lock(也就是有客户端持有锁了),则设置失败(加锁失败)。

解锁:使用del命令,通过删除键值释放锁。释放锁之后,其他客户端可以通过set命令进行加锁。

5、上面第二项,说了分布式锁,要考虑的问题,下面讲解一下

5.1、互斥性。在任何时刻,保证只有一个客户端持有锁

redis命令是原子性的,只要客户端调用redis的命令SET key value [PX milliseconds] [NX] 执行成功,就算加锁成功了

5.2、不能出现死锁。如果在一个客户端持有锁的期间,这个客户端崩溃了,也要保证后续的其他客户端可以上锁。

set命令px设置了过期时间,key过期失效了,就能避免死锁了

5.3保证上锁和解锁都是同一个客户端。

释放锁(删除key)的时候,只要确保是当前客户端设置的value才去删除key即可,采用lua脚本来实现

在Redis中,执行Lua语言是原子性,也就是说Redis执行Lua的时候是不会被中断的,具备原子性,这个特性有助于Redis对并发数据一致性的支持。


6、java代码实现

先把需要的jar包引入
        
            redis.clients
            jedis
            2.9.3
        
加锁设置参数的实体类
import lombok.Data;

//加锁设置的参数
@Data
public class LockParam {
    //锁的key
    private String lockKey;
    //尝试获得锁的时间(单位:毫秒),默认值:3000毫秒
    private Long tryLockTime;
    //尝试获得锁后,持有锁的时间(单位:毫秒),默认值:5000毫秒
    private Long holdLockTime;

    public LockParam(String lockKey){
        this(lockKey,1000*3L,1000*5L);
    };
    public LockParam(String lockKey,Long tryLockTime){
        this(lockKey,tryLockTime,1000*5L);
    };
    public LockParam(String lockKey,Long tryLockTime,Long holdLockTime){
        this.lockKey = lockKey;
        this.tryLockTime = tryLockTime;
        this.holdLockTime = holdLockTime;
    };
}
redis分布式具体代码实现
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Jedis;

import java.util.Collections;
import java.util.UUID;

/**
 * redis分布式锁
 */
@Slf4j
public class RedisLock {

    //锁key的前缀
    private final static String prefix_key = "redisLock:";
    //释放锁的lua脚本
    private final static  String unLockScript = "if redis.call(\'get\', KEYS[1]) == ARGV[1] then return redis.call(\'del\', KEYS[1]) else return 0 end";
    //执行unLockScript脚本,释放锁成功值
    private final static  Long unLockSuccess = 1L;


    //加锁设置的参数(key值、超时时间、持有锁的时间)
    private LockParam lockParam;
    //尝试获得锁的截止时间【lockParam.getTryLockTime()+System.currentTimeMillis()】
    private Long tryLockEndTime;
    //redis加锁的key
    private String redisLockKey;
    //redis加锁的vlaus
    private String redisLockValue;
    //redis加锁的成功标示
    private Boolean holdLockSuccess= Boolean.FALSE;


    //jedis实例
    private Jedis jedis;
    //获取jedis实例
    private Jedis getJedis(){
        return this.jedis;
    }
    //关闭jedis
    private void closeJedis(Jedis jedis){
        jedis.close();
        jedis = null;
    }

    public RedisLock(LockParam lockParam){
        if(lockParam==null){
            new RuntimeException("lockParam is null");
        }
        if(lockParam.getLockKey()==null || lockParam.getLockKey().trim().length()==0){
            new RuntimeException("lockParam lockKey is error");
        }
        this.lockParam = lockParam;

        this.tryLockEndTime = lockParam.getTryLockTime()+System.currentTimeMillis();
        this.redisLockKey = prefix_key.concat(lockParam.getLockKey());
        this.redisLockValue = UUID.randomUUID().toString().replaceAll("-","");

        //todo 到时候可以更换获取Jedis实例的实现
        jedis = new Jedis("127.0.0.1",6379);
    }

    /**
     * 加锁
     * @return 成功返回true,失败返回false
     */
    public boolean lock() {
        while(true){
            //判断是否超过了,尝试获取锁的时间
            if(System.currentTimeMillis()>tryLockEndTime){
                return false;
            }
            //尝试获取锁
            holdLockSuccess = tryLock();
            if(Boolean.TRUE.equals(holdLockSuccess)){
                return true;//获取锁成功
            }

            try {
                //获得锁失败,休眠50毫秒再去尝试获得锁,避免一直请求redis,导致redis cpu飙升
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 执行一次加锁操作:成功返回true 失败返回false
     * @return 成功返回true,失败返回false
     */
    private boolean tryLock() {
        try {
            String result = getJedis().set(redisLockKey,redisLockValue, "NX", "PX", lockParam.getHoldLockTime());
            if ("OK".equals(result)) {
                return true;
            }
        }catch (Exception e){
            log.warn("tryLock failure redisLockKey:{} redisLockValue:{} lockParam:{}",redisLockKey,redisLockValue,lockParam,e);
        }
        return false;
    }

    /**
     * 解锁
     * @return 成功返回true,失败返回false
     */
    public Boolean unlock() {
        Object result = null;
        try {
            //获得锁成功,才执行lua脚本
            if(Boolean.TRUE.equals(holdLockSuccess)){
                //执行Lua脚本
                result = getJedis().eval(unLockScript, Collections.singletonList(redisLockKey), Collections.singletonList(redisLockValue));
                if (unLockSuccess.equals(result)) {//释放成功
                    return true;
                }
            }
        } catch (Exception e) {
            log.warn("unlock failure redisLockKey:{} redisLockValue:{} lockParam:{} result:{}",redisLockKey,redisLockValue,lockParam,result,e);
        } finally {
            this.closeJedis(jedis);
        }
        return false;
    }
}
redis分布式锁使用
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class test {
    static String lockKey = "666";
    public static void main(String[] args) throws InterruptedException {
        log.info("下面测试两个线程同时,抢占锁的结果");
        Thread thread1 = new Thread(()->{
            testRedisLock();
        });
        thread1.setName("我是线程1");
        Thread thread2 = new Thread(()->{
            testRedisLock();
        });
        thread2.setName("我是线程2");

        //同时启动线程
        thread1.start();
        thread2.start();

        Thread.sleep(1000*20);
        log.info("-----------------我是一条分割线----------------");
        log.info("");
        log.info("");
        log.info("");


        log.info("下面是测试  一个线程获取锁成功后,由于业务执行时间超过了设置持有锁的时间,是否会把其他线程持有的锁给释放掉");
        Thread thread3 = new Thread(()->{
            testRedisLock2();
        });
        thread3.setName("我是线程3");
        thread3.start();

        Thread.sleep(1000*1);//暂停一秒是为了让线程3获的到锁
        Thread thread4 = new Thread(()->{
            testRedisLock();
        });
        thread4.setName("我是线程4");
        thread4.start();
    }

    public static void testRedisLock(){
        LockParam lockParam = new LockParam(lockKey);
        lockParam.setTryLockTime(2000L);//2秒时间尝试获得锁
        lockParam.setHoldLockTime(1000*10L);//获得锁成功后持有锁10秒时间
        RedisLock redisLock = new RedisLock(lockParam);
        try {
            Boolean lockFlag = redisLock.lock();
            log.info("加锁结果:{}",lockFlag);
            if(lockFlag){
                try {
                    //20秒模拟处理业务代码时间
                    Thread.sleep(1000*5L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }catch (Exception e) {
            log.info("testRedisLock e---->",e);
        }finally {
            boolean unlockResp = redisLock.unlock();
            log.info("释放锁结果:{}",unlockResp);
        }
    }


    public static void testRedisLock2(){
        LockParam lockParam = new LockParam(lockKey);
        lockParam.setTryLockTime(1000*2L);//2秒时间尝试获得锁
        lockParam.setHoldLockTime(1000*2L);//获得锁成功后持有锁2秒时间
        RedisLock redisLock = new RedisLock(lockParam);
        try {
            Boolean lockFlag = redisLock.lock();
            log.info("加锁结果:{}",lockFlag);
            if(lockFlag){
                try {
                    //10秒模拟处理业务代码时间
                    Thread.sleep(1000*10L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }catch (Exception e) {
            log.info("testRedisLock e---->",e);
        }finally {
            boolean unlockResp = redisLock.unlock();
            log.info("释放锁结果:{}",unlockResp);
        }
    }
}


这是代码在执行过程中,通过redis可视化工具看到的效果,可以参考一下~

image

控制台日志打印结果
15:02:28.569 [main] INFO com.test.test - 下面测试两个线程同时,抢占锁的结果
15:02:28.645 [我是线程2] INFO com.test.test - 加锁结果:true
15:02:30.618 [我是线程1] INFO com.test.test - 加锁结果:false
15:02:30.620 [我是线程1] INFO com.test.test - 释放锁结果:false
15:02:33.652 [我是线程2] INFO com.test.test - 释放锁结果:true
15:02:48.614 [main] INFO com.test.test - -----------------我是一条分割线----------------
15:02:48.614 [main] INFO com.test.test - 
15:02:48.614 [main] INFO com.test.test - 
15:02:48.614 [main] INFO com.test.test - 
15:02:48.614 [main] INFO com.test.test - 下面是测试  一个线程获取锁成功后,由于业务执行时间超过了设置持有锁的时间,是否会把其他线程持有的锁给释放掉
15:02:48.616 [我是线程3] INFO com.test.test - 加锁结果:true
15:02:50.645 [我是线程4] INFO com.test.test - 加锁结果:true
15:02:55.647 [我是线程4] INFO com.test.test - 释放锁结果:true
15:02:58.621 [我是线程3] INFO com.test.test - 释放锁结果:false
  • 可以看到多个线程竞争一把锁的时候,保证了只有一个线程持有锁
  • 分割线下面的日志也能看出,一个线程持有了锁,由于处理业务代码时间,超过了设置持有锁的时间,通过lua脚本释放锁的时候,也不会把其他线程持有的锁给释放掉,保证了安全释放了锁

7、分布式锁 实际使用中需要注意的一些问题

假设有这样一个场景: 有一个修改订单状态的接口,订单状态修改为失败,就不允许在修改为其他状态了;
在单台机器上,在代码方法上加了synchronized来做并发控制,由于代码逻辑比较复杂,现在它的TPS是1,一秒就只能处理一个订单。
后面对这个系统做集群,部署了一百台,那么这个接口性能就提升了100倍了。
但是synchronized是进程级别的锁,在集群环境下synchronized没办法控制其他服务器下线程并发访问 临界代码了,后面就采用了分布式锁来做并发控制。

7.1、那么使用分布锁要注意什么了?

7.1.1、锁粒度

如果分布式锁的key 设置的是 redisLock:updateOrderStatus 相当于集群下对这个接口加了相同的一把大锁,按照上面那个场景TPS就变成1了,集群部署就浪费了。

7.1.2、那么如何控制锁粒度了?

平常我们修改订单的时候都有订单号,那么分布式的key可以设置为:redisLock:updateOrderStatus:{orderCode} ,{orderCode}执行的时候动态的替换成订单编号,那么锁粒度就控制到这条订单了,就跟数据库从表锁 变成了行锁一样,接口支持更高的并发了。

7.1.3、获取锁时间

如果时间设置的太长:用户就会等待太久才能得到响应结果
太短:频繁获取锁失败,用户体验性也不好
只能按照不同的业务代码,由开发人员来衡量设置多长的时间

7.1.4、持有锁时间:

如果锁粒度比较小,时间可以设置长一点,就算执行比较慢,影响面比较小可以接受

7.1.5、难道每次想使用分布式锁的时候都需要下面流程一样,在编码一次?有什么办法能优化吗?

1、先创建一个 分布式锁对象;RedisLock redisLock = new RedisLock(lockParam);
2、加锁;Boolean lockFlag = redisLock.lock();
3、finally 解锁;redisLock.unlock();

分布式锁使用的加锁、解锁 流程是固定的,没办法改变了;
这个流程是不是跟spring的 编程式事务一样,spring 有编程式事务,也有声明式事务
那么我们参考这个声明式事务实现一个 声明式的分布式锁

下面是声明式事务的优点:
声明式事务:可知编程式事务每次实现都要单独实现,但业务量大功能复杂时,使用编程式事务无疑是痛苦的,而声明式事务不同,声明式事务属于无侵入式,不会影响业务逻辑的实现。
那声明式的分布式锁 优点就是:
声明式分布式锁:可知编程式分布式锁每次实现都要单独实现,但业务量大功能复杂时,使用编程式分布式锁无疑是痛苦的,而声明式分布式锁不同,声明式分布式锁属于无侵入式,不会影响业务逻辑的实现。

实际上我们工作中开发也是借助spring aop + 自定义注解 来实现这种 声明式分布式锁

声明式分布式锁传送门~


推荐阅读
  • Python 数据可视化实战指南
    本文详细介绍如何使用 Python 进行数据可视化,涵盖从环境搭建到具体实例的全过程。 ... [详细]
  • 服务器部署中的安全策略实践与优化
    服务器部署中的安全策略实践与优化 ... [详细]
  • 深入理解Redis中的字典实现
    本文详细介绍了Redis中字典的实现机制,包括其底层数据结构、哈希表与哈希节点的关系、元素添加方法及rehash操作的具体流程。 ... [详细]
  • RocketMQ在秒杀时的应用
    目录一、RocketMQ是什么二、broker和nameserver2.1Broker2.2NameServer三、MQ在秒杀场景下的应用3.1利用MQ进行异步操作3. ... [详细]
  • 包含phppdoerrorcode的词条 ... [详细]
  • 本文详细介绍了如何在 Linux 系统上安装 JDK 1.8、MySQL 和 Redis,并提供了相应的环境配置和验证步骤。 ... [详细]
  • 本文深入探讨了如何选择适合业务需求的MySQL存储引擎,详细解析了不同存储引擎的特点、适用场景及其在数据存储和管理中的优势。通过对比InnoDB、MyISAM等主流引擎,为读者提供了全面的技术指导和专业建议,帮助开发者在实际应用中做出明智的选择。 ... [详细]
  • 2021年Java开发实战:当前时间戳转换方法详解与实用网址推荐
    在当前的就业市场中,金九银十过后,金三银四也即将到来。本文将分享一些实用的面试技巧和题目,特别是针对正在寻找新工作机会的Java开发者。作者在准备字节跳动的面试过程中积累了丰富的经验,并成功获得了Offer。文中详细介绍了如何将当前时间戳进行转换的方法,并推荐了一些实用的在线资源,帮助读者更好地应对技术面试。 ... [详细]
  • Amoeba 通过优化 MySQL 的读写分离功能显著提升了数据库性能。作为一款基于 MySQL 协议的代理工具,Amoeba 能够高效地处理应用程序的请求,并根据预设的规则将 SQL 请求智能地分配到不同的数据库实例,从而实现负载均衡和高可用性。该方案不仅提高了系统的并发处理能力,还有效减少了主数据库的负担,确保了数据的一致性和可靠性。 ... [详细]
  • 本文详细介绍了使用 Python 进行 MySQL 和 Redis 数据库操作的实战技巧。首先,针对 MySQL 数据库,通过 `pymysql` 模块展示了如何连接和操作数据库,包括建立连接、执行查询和更新等常见操作。接着,文章深入探讨了 Redis 的基本命令和高级功能,如键值存储、列表操作和事务处理。此外,还提供了多个实际案例,帮助读者更好地理解和应用这些技术。 ... [详细]
  • 本文介绍了 Go 语言中的高性能、可扩展、轻量级 Web 框架 Echo。Echo 框架简单易用,仅需几行代码即可启动一个高性能 HTTP 服务。 ... [详细]
  • Cookie学习小结
    Cookie学习小结 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 性能测试中的关键监控指标与深入分析
    在软件性能测试中,关键监控指标的选取至关重要。主要目的包括:1. 评估系统的当前性能,确保其符合预期的性能标准;2. 发现软件性能瓶颈,定位潜在问题;3. 优化系统性能,提高用户体验。通过综合分析这些指标,可以全面了解系统的运行状态,为后续的性能改进提供科学依据。 ... [详细]
  • 本文深入解析了Django框架中的MVT(Model-View-Template)设计模式,详细阐述了其工作原理和应用流程。通过分析URL模式、视图、模型和模板等关键组件,读者将全面理解Django应用程序的架构体系,掌握如何高效地构建和管理Web应用。 ... [详细]
author-avatar
爱是种承诺ml
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有