目录
一、ReentrantLock
二、AQS
1.和ReentrantLock的关系
2.AQS队列同步器源码分析
同步队列:
独占式同步队列状态获取和释放:
释放锁并且唤醒一下一个处于part挂起状态的线程:
在Lock接口出现之前,Java程序主要是靠Synchronized关键字来实现锁的功能,但是在Java 5之后在并发包中增加了Lock接口来实现锁的功能,其中ReentrantLock就是Lock接口的一个比较常用的实现类
相比Synchronized(Synchronized的学习可以看本系列博客“”)来说,ReentrantLock有跟多的优势:
在ReentrantLock聚合了同步器,利用同步器来真实的实现了锁的语义,也可以说ReentrantLock是面向使用者的,同步器是锁的真实实现
结构图:
整理来看AQS使用了模板方法,该图是本人对于AQS使用整理的模板方法关系图,关系读者于评论区一起讨论
同步队列:
同步队列是一个FIFO(先进先出)的双向队列,当线程获取锁资源失败的时候会变为NODE节点,并且加入到同步队列中,同时会堵塞当前线程,当有线程释放锁资源的时候,会唤醒同步队列的首节点
1. 线程获取锁失败,线程加入队列:
当有新的线程获取锁资源失败的时候会加入到队列的尾,但是由于是多线程执行的,可能会有多个线程同时需要加入到同步队列的尾,所以我们这个时候利用CAS来保证线程安全,他需要传递当前线程认为的尾节点和当前节点,否则会由于多个线程共同插入,队列混乱。
CompareAndSetTail(Node expect,Node update)
2. 线程获取到锁,并将释放锁的节点移除同步队列:
首节点是获取锁成功的节点,首节点的线程在释放锁时,会唤醒后续节点,而后继节点在成功获取到锁后,会把自己设置成首节点,设置首节点是由获取锁成功的线程来完成的,由于只有一个线程能成功获取到锁,所以设置首节点不需要CAS
//lock方法先通过CAS尝试将同步状态(AQS的state属性)从0修改为1。
//若直接修改成功了,则将占用锁的线程设置为当前线程
final void lock() {if (compareAndSetState(0, 1))//用来保存当前占用同步状态的线程。setExclusiveOwnerThread(Thread.currentThread());else//独占式获取同步状态,如果获取成功就直接返回,否则加入同步队列acquire(1);}
解释: 同步队列中维护了一个 state 变量,当state 变量为0的时候表示当前锁没有被获取,当有一个线程想要获取到锁的时候,首先会通过CAS去判断当前state是否为0 ,如果为0就抢占锁资源,设置当前线程为同步状态的线程,因为ReentrantLock是支持可重入的,所以当某一个线程判断当前状态不为0,他是有机会继续获取同步状态的(同一个线程可以重入)
看acquire方法:
/**
* tryAcquire方法尝试获取锁,如果成功就返回,
* 如果不成功,则把当前线程和等待状态信息构适成一个Node节点,
* 并将结点放入同步队列的尾部。然后为同步队列中的当前节点循环等待获取锁,直到成功
*
*/public final void acquire(int arg) {//这里调用的是被子类重写的方法,如果没有获取成功if (!tryAcquire(arg) &&//将线程转变为NODE节点,并且加入到同步队列中acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
tryAcquire()方法:
tryAcquire方法是AQS提供的一个可重写的方法,被ReentrantLock的重写成了公平和非公平锁,我们默认走非公平锁.
final boolean nonfairTryAcquire(int acquires) {//获取当前线程final Thread current &#61; Thread.currentThread();//获取状态int c &#61; getState();//如果状态为0表示没有被加锁if (c &#61;&#61; 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}//如果发现当前加锁的是当前线程&#xff0c;那么就可以重入else if (current &#61;&#61; getExclusiveOwnerThread()) {//将重入次数&#43;1int nextc &#61; c &#43; acquires;if (nextc <0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
addWaiter(Node.EXCLUSIVE), arg)方法&#xff1a;
private Node addWaiter(Node mode) {Node node &#61; new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred &#61; tail;//如果当前同步队列不为空if (pred !&#61; null) {node.prev &#61; pred;//从尾部加入节点if (compareAndSetTail(pred, node)) {pred.next &#61; node;return node;}}//构建同步队列空的首节点enq(node);return node;}
首先创建一个Node对象&#xff0c;Node中包含了当前线程和Node模式(这时是排他模式)。tail是AQS的中表示同步队列队尾的属性&#xff0c;刚开始为null&#xff0c;所以进行enq(node)方法&#xff0c;从字面可以看出这是一个入队操作&#xff0c;来看下具体入队细节
private Node enq(final Node node) {for (;;) {Node t &#61; tail;if (t &#61;&#61; null) { // Must initialize//比较并且设置一个空的首节点if (compareAndSetHead(new Node()))tail &#61; head;} else {node.prev &#61; t;if (compareAndSetTail(t, node)) {t.next &#61; node;return t;}}}}
解析&#xff1a;
方法体是一个死循环&#xff0c;本身没有锁&#xff0c;可以多个线程并发访问&#xff0c;假如某个线程进入方法&#xff0c;此时head, tail都为null, 进入if(t&#61;&#61;null)区域&#xff0c;从方法名可以看出这里是用CAS的方式创建一个空的Node作为头结点&#xff0c;因为此时队列中只一个头结点&#xff0c;所以tail也指向它&#xff0c;第一次循环执行结束。注意这里使用CAS是防止多个线程并发执行到这儿时&#xff0c;只有一个线程能够执行成功&#xff0c;防止创建多个同步队列。
进行第二次循环时(或者是其他线程enq时)&#xff0c;tail不为null&#xff0c;进入else区域。将当前线程的Node结点(简称CNode)的prev指向tail&#xff0c;然后使用CAS将tail指向CNode。&#xff08;真实的入队情况&#xff09;
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法解析&#xff1a;
final boolean acquireQueued(final Node node, int arg) {boolean failed &#61; true;try {boolean interrupted &#61; false;for (;;) {//获得当前节点的前驱节点final Node p &#61; node.predecessor();//当前节点的前一个节点是首节点 并且 当前节点能够获取到同步状态if (p &#61;&#61; head && tryAcquire(arg)) {//设置当前节点为首节点setHead(node);p.next &#61; null; // help GCfailed &#61; false;return interrupted;}//否则就判断是否应该挂起if (shouldParkAfterFailedAcquire(p, node) &&//如果应该挂起则执行 挂起parkAndCheckInterrupt())interrupted &#61; true;}} finally {if (failed)cancelAcquire(node);}}
可以看到&#xff0c;acquireQueued方法也是一个死循环&#xff0c;直到进入 if (p &#61;&#61; head && tryAcquire(arg))条件方法块。
如果当前节点的前一个节点不是头节点&#xff0c;就无需循环抢锁。
如果抢锁成功&#xff1a;
1) 将CNode设置为头节点。
2) 将CNode的前置节点设置的next设置为null。
上面操作即完成了FIFO的出队操作。
从上面的分析可以看出&#xff0c;只有队列的第二个节点可以有机会争用锁&#xff0c;如果成功获取锁&#xff0c;则此节点晋升为头节点。对于第三个及以后的节点&#xff0c;if (p &#61;&#61; head)条件不成立&#xff0c;首先进行shouldParkAfterFailedAcquire(p, node)操作&#xff08;争用锁失败的第二个节点也如此&#xff09;&#xff0c; 来看下源码&#xff1a;
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws &#61; pred.waitStatus;//如果节点的状态为-1 那就说明状态已经是可以被唤醒的状态&#xff0c;就直接返回true即可if (ws &#61;&#61; Node.SIGNAL)return true;if (ws > 0) {do {node.prev &#61; pred &#61; pred.prev;} while (pred.waitStatus > 0);pred.next &#61; node;} else {//把前置结点变为-1&#xff0c;当前置结点为-1的时候我当前节点就挂起compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
shouldParkAfterFailedAcquire方法是判断一个争用锁的线程是否应该被阻塞。它首先判断一个节点的前置节点的状态是否为Node.SIGNAL&#xff0c;如果是&#xff0c;是说明此节点已经将状态设置如果锁释放则应当通知它&#xff0c;所以它可以安全的阻塞了&#xff0c;返回true。
如果前节点的状态大于0&#xff0c;即为CANCELLED状态时&#xff0c;则会从前节点开始逐步循环找到一个没有被“CANCELLED”节点设置为当前节点的前节点&#xff0c;返回false。在下次循环执行shouldParkAfterFailedAcquire时&#xff0c;返回true。这个操作实际是把队列中CANCELLED的节点剔除掉。
前节点状态小于0的情况是对应ReentrantLock的Condition条件等待的&#xff0c;这里不进行展开。
private final boolean parkAndCheckInterrupt() {//线程挂起LockSupport.park(this);//返回一个中断标志是否被中断过&#xff0c;并且复位&#xff0c;为了响应中断return Thread.interrupted();}
如果shouldParkAfterFailedAcquire返回了true&#xff0c;则会执行&#xff1a;“parkAndCheckInterrupt()”方法&#xff0c;它是通过LockSupport.park(this)将当前线程挂起到WATING状态&#xff0c;它需要等待一个中断、unpark方法来唤醒它&#xff0c;通过这样一种FIFO的机制的等待&#xff0c;来实现了Lock的操作。
获取锁的时序图&#xff1a;
public void unlock() {sync.release(1);}
unlock调用AQS的release()来完成, AQS的如果tryRelease方法由具体子类实现。tryRelease返回true,则会将head传入到unparkSuccessor(Node)方法中并返回true&#xff0c;否则返回false。
public final boolean release(int arg) {//如果成功if (tryRelease(arg)) {Node h &#61; head;if (h !&#61; null && h.waitStatus !&#61; 0)//唤醒自己的后继节点unparkSuccessor(h);return true;}return false;}
protected final boolean tryRelease(int releases) {//状态值减1int c &#61; getState() - releases;//如果当前线程不等于独占线程if (Thread.currentThread() !&#61; getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free &#61; false;if (c &#61;&#61; 0) {free &#61; true;//设置当前独占线程为空setExclusiveOwnerThread(null);}setState(c);return free;}
private void unparkSuccessor(Node node) {int ws &#61; node.waitStatus;if (ws <0)compareAndSetWaitStatus(node, ws, 0);Node s &#61; node.next;if (s &#61;&#61; null || s.waitStatus > 0) {s &#61; null;for (Node t &#61; tail; t !&#61; null && t !&#61; node; t &#61; t.prev)if (t.waitStatus <&#61; 0)s &#61; t;}//如果有后继节点&#xff0c;就通过unpark来释放被挂起的线程if (s !&#61; null)LockSupport.unpark(s.thread);}
内部首先会发生的动作是获取head节点的next节点&#xff0c;如果获取到的节点不为空&#xff0c;则直接通过&#xff1a;“LockSupport.unpark()”方法来释放对应的被挂起的线程&#xff0c;这样一来将会有一个节点唤醒后继续进入循环进一步尝试tryAcquire()方法来获取锁。
以上ReentrantLock的释放锁的过程就分析完毕了。