作者:鲁尼杰 | 来源:互联网 | 2023-10-10 11:59
一:什么是AQS AQS(AbstractQueuedSynchronizer)是JUC中的基础框架,例如ReentrantLock,CountDownLatch等组件都是基
一:什么是AQS
AQS(AbstractQueuedSynchronizer)是JUC中的基础框架,例如ReentrantLock,CountDownLatch等组件都是基于AQS实现同步控制。
二:AQS独占式获取和释放同步状态
在了解AQS中线程如何获取同步状态前,需要了解Node类, AQS内部由一个双向队列实现对同步状态的管理,可以看成多生产者单消费者模型,当线程没有获取到同步状态,线程会被封装成一个Node节点,利用CAS添加至队列的尾部,当节点的前驱节点是头节点才有机会获取同步状态。
AQS获取同步状态分为独占式和共享式获取同步状态。
独占式:某一时刻只有一个线程能获取同步状态(ReentrantLock依靠AQS的独占式实现)。
共享式:则可以多个线程获取同步状态(ReentrantReadWriteLock依靠AQS的共享式实现)。
由于独占式和共享式获取同步状态的代码大体相同,本文主要分析独占式获取同步状态,弄懂独占式获取同步状态代码两者肯定都能懂。
static final class Node {
// 共享
static final Node SHARED = new Node();
// 独占
static final Node EXCLUSIVE = null;
/**
* 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态
*/
static final int CANCELLED = 1;
/**
* 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
*/
static final int SIGNAL = -1;
/**
* 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
*/
static final int COnDITION= -2;
/**
* 表示下一次共享式同步状态获取,将会无条件地传播下去
*/
static final int PROPAGATE = -3;
/** 等待状态 */
volatile int waitStatus;
/** 前驱节点,当节点添加到同步队列时被设置(尾部添加) */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 等待队列中的后续节点。如果当前节点是共享的,那么字段将是一个 SHARED 常量,也就是说节点类型(独占和共享)和等待队列中的后续节点共用同一个字段 */
Node nextWaiter;
/** 获取同步状态的线程 */
volatile Thread thread;
}
waitStatus 字段,等待状态,用来控制线程的阻塞和唤醒,并且可以避免不必要的调用LockSupport的 #park(...) 和 #unpark(...) 方法。。目前有 4 种:CANCELLED SIGNAL CONDITION PROPAGATE 。
实际上,有第 5 种,INITAL ,值为 0 ,初始状态。
thread 字段,Node 节点对应的线程 Thread 。
#acquire(int arg) 方法,为 AQS 提供的模板方法。该方法为独占式获取同步状态,但是该方法对中断不敏感。也就是说,由于线程获取同步状态失败而加入到 CLH 同步队列中,后续对该线程进行中断操作时,线程不会从 CLH 同步队列中移除。代码如下:
1: public final void acquire(int arg) {
2: if (!tryAcquire(arg) &&
3: acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4: selfInterrupt();
5: }
第 2 行:调用 #tryAcquire(int arg) 方法,去尝试获取同步状态,获取成功则设置锁状态并返回 true ,否则获取失败,返回 false 。若获取成功,#acquire(int arg) 方法,直接返回,不用线程阻塞,自旋直到获得同步状态成功。
#tryAcquire(int arg) 方法,需要自定义同步组件自己实现,该方法必须要保证线程安全的获取同步状态。代码如下:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
第 3 行:如果 #tryAcquire(int arg) 方法返回 false ,即获取同步状态失败,则调用 #addWaiter(Node mode) 方法,将当前线程加入到 CLH 同步队列尾部。并且, mode 方法参数为 Node.EXCLUSIVE ,表示独占模式。
第 3 行:调用 boolean #acquireQueued(Node node, int arg) 方法,自旋直到获得同步状态成功。详细解析,见下文#acquireQueued中。另外,该方法的返回值类型为 boolean ,当返回 true 时,表示在这个过程中,发生过线程中断。但是呢,这个方法又会清理线程中断的标识,所以在种情况下,需要调用【第 4 行】的 #selfInterrupt() 方法,恢复线程中断的标识,代码如下:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
#acquireQueued(Node node, int arg) 方法,为一个自旋的过程,也就是说,当前线程(Node)进入同步队列后,就会进入一个自旋的过程,每个节点都会自省地观察,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。
1: final boolean acquireQueued(final Node node, int arg) {
2: // 记录是否获取同步状态成功
3: boolean failed = true;
4: try {
5: // 记录过程中,是否发生线程中断
6: boolean interrupted = false;
7: /*
8: * 自旋过程,其实就是一个死循环而已
9: */
10: for (;;) {
11: // 当前线程的前驱节点
12: final Node p = node.predecessor();
13: // 当前线程的前驱节点是头结点,且同步状态成功
14: if (p == head && tryAcquire(arg)) {
15: setHead(node);
16: p.next = null; // help GC
17: failed = false;
18: return interrupted;
19: }
20: // 获取失败,线程等待--具体后面介绍
21: if (shouldParkAfterFailedAcquire(p, node) &&
22: parkAndCheckInterrupt())
23: interrupted = true;
24: }
25: } finally {
26: // 获取同步状态发生异常,取消获取。
27: if (failed)
28: cancelAcquire(node);
29: }
30: }
第 3 行:failed 变量,记录是否获取同步状态成功。
第 6 行:interrupted 变量,记录获取过程中,是否发生线程中断。
========== 第 7 至 24 行:“死”循环,自旋直到获得同步状态成功。==========
第 12 行:调用 Node#predecessor() 方法,获得当前线程的前一个节点 p 。
第 14 行:p == head 代码块,若满足,则表示当前线程的前一个节点为头节点,因为 head 是最后一个获得同步状态成功的节点,此时调用 #tryAcquire(int arg) 方法,尝试获得同步状态。