热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

重入锁(ReentrantLock)学习及实现原理

本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。

JUC之ReentrantLock学习

  • 在学习synchronized的时候,知道要实现一个锁,需要一下几点因素:
    • 共享资源
    • 线程阻塞和唤醒
    • 阻塞线程存储:wait池
  • 其次,我们知道synchronized是一个非公平的锁,当一个持有锁的线程刚刚释放掉锁的同时,恰好有个线程来抢占锁是可以抢占到锁的。而不是wait池里面的第一个线程抢占到锁。这样就产生的插队的效果。
  • 重入锁,不需要关心锁类型,只需要关心锁的范围。既然是锁,也要遵循锁的特性,需要满足上面说的3点因素,另外
    • 重入锁支持重入,就是对于同一个线程,同一把锁,可以在上一次未释放锁之前,再次抢占到这把锁
    • 重入锁有有两种实现:公平与非公平,非公平和synchronized差不多,公平锁的话,则不允许出现插队的情况

一、类图

image-20200609075246869

二、测试

public class TestReentrantLock {

    private static Integer i = 0;
    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {
        //1.创建一千个线程
        Thread[] threads = new Thread[1000];
        for (int j = 0; j <1000; j++) {
            Thread t1 = new Thread(() -> incr());
            threads[j] = t1;
        }

        for (int j = 0; j <1000; j++) {
            threads[j].start();
        }

        //2.使线程充分运行
        TimeUnit.SECONDS.sleep(1);
        System.out.println(i);
    }

    private static void incr(){
//        //(1)
//        lock.lock();
        try {
            i++;
        } finally {
//            lock.unlock();
        }
    }

    private static void desr() {
        lock.lock();
        try {
            i--;
        } finally {
            lock.unlock();
        }
    }
}

  • 上述代码,在incr()方法中lock.lock()被注释掉后,运行程序,输出的结果是一个<=1000的数字(基本不可能=1000),这就是常说的线程安全问题。
  • 这里通常可以通过两种方式来处理,synchronized 和 lock。放开(1)后,发现每次运行的结果都是1000。

三、整体流程

image-20200612073833268

四、方法

1、lock()

//ReentrantLock.java
public void lock() {
    sync.lock();
}

private final Sync sync;
public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

//NonfairSync.java
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

//FairSync.java
final void lock() {
    acquire(1);
}

  • 通过类图和上述ReenstrantLock的构造函数发现,ReenstrantLock里有个同步器Sync,而这个同步器又有两种实现:NonfairSyncFairSync,实际上ReenstrantLock的lock方法是委派给了Sync的lock方法

2、NonfairSync.lock()

  • 代码1

    //NonfairSync.java
    final void lock() {
        //尝试抢占锁一次,抢占成功,保存当前线程信息到AQS同步器中
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            //抢占失败,再次尝试抢占锁一次(`acquire(1)`)
            acquire(1);
    }
    
    //CAS操作替换内存
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    
    • 前面提到要想实现锁,第一个条件就是共享资源,这里通过AbstractQueuedSynchronizer(以下简称AQS)类的一个volatile修饰的字段来充当private volatile int state

      这里通过unsafe的CAS操作来直接操作内存,CAS是一种乐观锁的思想,通过比对当前的值是否匹配来决定是否修改内存,原子操作,没有指令重排序问题。且state字段使用volatile修饰,解决了可见性问题,因此不需要考虑线程安全问题。

    • acquire(1)里又有几个方法,其执行顺序依次是

      • tryAcquire(arg)

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            //判断锁是否被某线程持有
            //1.无锁,尝试抢占锁
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    //抢占成功,设置当前线程到同步器中
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //2.有锁且持有锁的线程为当前线程,state+1,标识重入次数。
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc <0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
        
        
      • 这个方法的目的有两个

        • 无锁情况下尝试抢占锁一次
      • 有锁的情况下判断是否是当前线程,即判断是否是重入的场景

    • 方法返回true,表示抢占锁成功,无需执行下面两个方法;返回false,需要执行下面两个方法。

      有锁且持有锁的线程为当前线程,因为当前线程持有的锁,别的线程无法执行这段代码,所有这里无需考虑线程安全问题,就不需要使用CAS操作

    • addWaiter(Node.EXCLUSIVE)

      private Node addWaiter(Node mode) {
          //1.为当前线程创建一个独占模式的节点,waitStatus = 0 
          Node node = new Node(Thread.currentThread(), mode);
          //2.若tail节点指向不空,尝试把当前节点插入表尾
          Node pred = tail;
          if (pred != null) {
              node.prev = pred;
              if (compareAndSetTail(pred, node)) {
                  pred.next = node;
                  return node;
              }
          }
          //tail节点指向为空,创建一个链表,表头指向new Node(),表尾指向当前node.
          enq(node);
          return node;
      }
      
      private Node enq(final Node node) {
          //自旋
          for (;;) {
              Node t = tail;
              if (t == null) { // Must initialize
                  if (compareAndSetHead(new Node()))
                      tail = head;
              } else {
                  node.prev = t;
                  if (compareAndSetTail(t, node)) {
                      t.next = node;
                      return t;
                }
              }
        }
      }
      
      
      • 方法目的:创建一个双向链表,节点为独享模式,把当前节点插到表尾
    • acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

      final boolean acquireQueued(final Node node, int arg) {
          boolean failed = true;
          try {
              boolean interrupted = false;
              //自旋
              for (;;) {
                  final Node p = node.predecessor();
                  //1.取出前继节点,前继节点是head节点,尝试抢占锁
                  if (p == head && tryAcquire(arg)) {
                      //1.1.head节点抢占成功,释放head节点,当前节点变成head节点
                      setHead(node);
                      p.next = null; // help GC
                      failed = false;
                      return interrupted;
                  }
                  //2.若前继节点不是head节点,或者抢占锁失败,判断是否需要阻塞
                  if (shouldParkAfterFailedAcquire(p, node) &&
                      //3.阻塞当前线程,并返回中断状态
                      parkAndCheckInterrupt())
                      interrupted = true;
              }
          } finally {
              if (failed)
                  cancelAcquire(node);
          }
      }
      
      
      //前提:前继节点不是head,或者head节点抢占锁失败
      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
          int ws = pred.waitStatus;
          //1.判断前继节点状态,如果前继节点是阻塞状态,则当前节点肯定是可以被park的,所以返回true
          if (ws == Node.SIGNAL)
              return true;
          //2.若前置节点为cancel(>0的只有一个-1,标识取消状态),向前遍历把前继节点 和其他 取消状态的节点删除
          if (ws > 0) {
              do {
                  node.prev = pred = pred.prev;
              } while (pred.waitStatus > 0);
              pred.next = node;
          } else {
              //3.这里节点状态有两种(0,PROPAGATE),无论哪一种,都需要把前继节点状态设置为-1
              compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
          }
          return false;
      }
      
      
      private final boolean parkAndCheckInterrupt() {
          //1.阻塞当前线程(中断方式:1.unpark;2.interrupt)
          LockSupport.park(this);
          //2.返回当前线程的阻塞状态
          return Thread.interrupted();
      }
      
      
      • 方法目的
        • 拦截所有不是head下一个节点的节点,保证队列中线程获取锁的顺序的公平
        • 标记队列中所有的线程为阻塞状态
      //线程已被取消
      static final int CANCELLED =  1;
      //等待触发状态,前继节点可能是head或者节点为cancel状态
      static final int SIGNAL    = -1;
      //等待条件装的,线程在等待队列中
      static final int COnDITION= -2;
      //后继线程获取共享锁的请求应该被无条件传播
      static final int PROPAGATE = -3;
      
      

3、unlock()

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    //1.当前线程释放锁
    if (tryRelease(arg)) { //(1)
        Node h = head;
        //2.若表头 != null && 表头的waitStatus 不是 初始状态,
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); //(2)
        return true;
    }
    return false;
}

//(1)
protected final boolean tryRelease(int releases) {
    //1.同步器state - 1
    //2.如果当前线程 != 获得锁的线程,报错
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //3.如果state -1 = 0,说明释放后处于无锁状态,清空当前同步器中抢占锁的线程
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

//(2)
//前提:当前线程释放锁成功,操作head节点
private void unparkSuccessor(Node node) {
    //1.如果head节点的waitStatus <0,尝试清除状态
    int ws = node.waitStatus;
    if (ws <0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        //2.如果head的后继节点为空,或者后继节点的线程已经被取消了,清空后继节点
        s = null;
        //3.从表尾开始向表头遍历(条件:不是表头,不等于null),找到一个阻塞状态的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //
    if (s != null)
        LockSupport.unpark(s.thread);
}

这里为什么从后面遍历,查找了下,在enq(final Node node)方法中,把cas操作后,t.next = node之前,有可能会有其他的线程进来,导致出现问题,所以从tail开始遍历,能够遍历到所有节点。

4、公平锁与非公平锁的差别

//FairSync.java
//在lock方法中没有先去尝试抢占锁,而是先看看队列里是否有阻塞状态的线程
final void lock() {
    acquire(1);
}

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc <0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

//
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

五、总结

  • 重入锁实际上是通过CAS操作一个volatile修饰的共享变量,来实现抢占锁的成功与否。并把抢占锁失败的线程放到等待队列中,等待被唤醒,重新加入抢占锁的队列中。
  • 重入的实现是通过保存持有当前锁的线程来判断是否是当前线程再一次抢占锁。

推荐阅读
author-avatar
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有