在 Lock 中,用到了一个同步队列 AQS,全称 AbstractQueuedSynchronizer,它是一个同步工具,也是 Lock 用来实现线程同步的核心组件。
独占和共享。
AQS 队列内部维护的是一个 FIFO 的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。
每个 Node 其实是由线程封装,当线程争抢锁失败后会封装成 Node 加入到 AQS 队列中去; 当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。
Node组成:
static final class Node {
// 排他锁的标识
static final Node EXCLUSIVE = null;
// 如果带有这个标识,证明是失效了
static final int CANCELLED = 1;
// 具有这个标识,说明后继节点需要被唤醒
static final int SIGNAL = -1;
// Node对象存储标识的地方
volatile int waitStatus;
// 指向上一个节点
volatile Node prev;
// 指向下一个节点
volatile Node next;
// 当前Node绑定的线程
volatile Thread thread;
// 存储在Condition队列中的后继节点
Node nextWaiter;
// 返回前驱节点,如果前驱节点为null,抛出NPE
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 将线程构造成一个Node,添加到等待队列
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 在Condition队列中使用
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
head 节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头结点:
设置 head 节点不需要用 CAS,原因是设置 head 节点是由获得锁的线程来完成的,而同步锁只能由一个线程获得,所以不需要 CAS 保证,只需要把 head 节点设置为原首节点的后继节点,并且断开原 head 节点的 next 引用即可。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
通过 cas 乐观锁的方式来做比较并替换,如果当前内存中的state 的值和预期值 expect 相等,则替换为 update。更新成功返回 true,否则返回 false。这个操作是原子的,不会出现线程安全问题。
private volatile int state;
state 是 AQS 中的一个属性,它在不同的实现中所表达的含义不一样, 对于重入
锁的实现来说,表示一个同步状态。它有两个含义的表示
Unsafe 类是在 sun.misc 包下,不属于 Java 标准。但是很多 Java 的基础类库,包括一些被广泛使用的高性能开发库都是基于 Unsafe 类开发的,比如 Netty、
Hadoop、 Kafka 等;
Unsafe 可认为是 Java 中留下的后门,提供了一些低层次操作,如直接内存访问、线程的挂起和恢复、 CAS、线程同步、内存屏障
而 CAS 就是 Unsafe 类中提供的一个原子操作:
public final native boolean compareAndSwapInt(Object obj, long stateOffset, int expect, int update);
整个方法的作用是如果当前时刻的值等于预期值 expect 相等,则更新为新的期望值 update,如果更新成功,则返回 true,否则返回false;
stateOffset:
一个 Java 对象可以看成是一段内存,每个字段都得按照一定的顺序放在这段内存里,通过这个方法可以准确地告诉你某个字段相对于对象的起始内存地址的字节偏移。用于在后面的 compareAndSwapInt 中,去根据偏移量找到对象在内存中的具体位置
所以 stateOffset 表示 state 这个字段在 AQS 类的内存中相对于该类首地址的偏移量
compareAndSwapInt
unsafe.cpp 文件中compareAndSwarpInt 的实现:
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset,jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj); //将 Java 对象解析成 JVM 的 oop(普通对象指针)
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); //根据对象 p 和地址偏移量找到地址
return (jint)(Atomic::cmpxchg(x, addr, e)) == e; //基于 cas 比较并替换, x 表示需要更新的值, addr 表示 state在内存中的地址, e 表示预期值
UNSAFE_END
ReentrantLock时序图:
public void lock() {
// sync分为了公平和非公平
sync.lock();
}
sync是一个抽象的静态内部类,它继承了 AQS 来实现重入锁的逻辑。
AQS 是一个同步队列,它能够实现线程的阻塞以及唤醒, 但它并不具备业务功能, 所以在不同的同步场景中,会继承 AQS 来实现对应场景的功能。
Sync 有两个具体的实现类:
NonfairSync#lock():
final void lock() {
// 通过CAS的方式尝试将state从0修改为1,如果返回true,代表修改成功,如果修改失败,返回false
if (compareAndSetState(0, 1))
// 将一个属性设置为当前线程,这个属性是AQS的父类提供的
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
acquire 是 AQS 中的方法,如果 CAS 操作未能成功,说明 state 已经不为 0,此时 acquire(1)
操作
public final void acquire(int arg) {
// tryAcquire再次尝试获取锁资源,如果尝试成功,返回true
if (!tryAcquire(arg) &&
// 获取锁资源失败后,需要将当前线程封装成一个Node,追加到AQS的队列中
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 线程中断
selfInterrupt();
}
方法的作用是尝试获取锁,如果成功返回 true,不成功返回 false。
它是重写 AQS 类中的 tryAcquire 方法。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取AQS的state的值
int c = getState();
// 如果state为0,表示无锁状态,尝试再次获取锁资源
if (c == 0) {
// CAS尝试修改state,从0-1,如果成功,设置ExclusiveOwnerThread属性为当前线程
if (compareAndSetState(0, acquires)) {
// 保存当前获得锁的线程,下次再来的时候不要再尝试竞争锁
setExclusiveOwnerThread(current);
return true;
}
}
// 当前占有锁资源的线程是否是当前线程
else if (current == getExclusiveOwnerThread()) {
// 将state + 1
int nextc = c + acquires;
// 如果加1后,小于0,超所锁可重入的最大值,抛出Error
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 没问题&#xff0c;就重新对state进行复制
setState(nextc);
// 锁重入成功
return true;
}
return false;
}
当 tryAcquire 方法获取锁失败以后&#xff0c;则会先调用 addWaiter 将当前线程封装成Node.
入参 mode 表示当前节点的状态&#xff0c;传递的参数是 Node.EXCLUSIVE&#xff0c;表示独占状态。意味着重入锁用到了 AQS 的独占锁功能
// 说明前面获取锁资源失败&#xff0c;放到队列中等待
private Node addWaiter(Node mode) {
// 创建Node类&#xff0c;并且设置thread为当前线程&#xff0c;设置为排它锁
Node node &#61; new Node(Thread.currentThread(), mode);
// 获取AQS中队列的尾部节点&#xff0c;默认是 null
Node pred &#61; tail;
// 如果tail !&#61; null&#xff0c;说明队列中存在节点
if (pred !&#61; null) {
// 把当前线程的 Node 的 prev 指向 tail
node.prev &#61; pred;
// 通过 cas 把 node加入到 AQS 队列&#xff0c;也就是设置为 tail
if (compareAndSetTail(pred, node)) {
// 设置成功以后&#xff0c;把原 tail 节点的 next指向当前 node
pred.next &#61; node;
return node;
}
}
// tail&#61;null,把 node 添加到同步队列
enq(node);
return node;
}
// enq &#xff1a;通过自旋操作把当前节点加入到队列中
// 队列没有节点&#xff0c;我是第一个&#xff0c; 如果前面CAS失败&#xff0c;也会进到这个位置重新往队尾进入。
private Node enq(final Node node) {
// 死循环
for (;;) {
// 重新获取当前的tail节点为t
Node t &#61; tail;
if (t &#61;&#61; null) {
// 队列没有节点, 我是第一个&#xff0c;没头没尾&#xff0c;都是空
if (compareAndSetHead(new Node())) // 初始化一个Node作为head&#xff0c;而这个head没有意义。
// 将头尾都指向了这个初始化的Node
tail &#61; head;
} else {
// 有节点&#xff0c;往队尾入
// 当前节点的上一个指向tail
node.prev &#61; t;
// 基于CAS的方式&#xff0c;将tail节点设置为当前节点
if (compareAndSetTail(t, node)) {
// 将之前的为节点的next&#xff0c;设置为当前节点
t.next &#61; node;
return t;
}
}
}
}
通过 addWaiter 方法把线程添加到链表后&#xff0c; 会接着把 Node 作为参数传递给acquireQueued 方法&#xff0c;去竞争锁&#xff1a;
// 已经将node加入到了双向队列中&#xff0c;然后执行当前方法
final boolean acquireQueued(final Node node, int arg) {
// 标识
boolean failed &#61; true;
try {
// 标识
boolean interrupted &#61; false;
for (;;) {
// 获取当前节点的上一个节点p
final Node p &#61; node.predecessor();
// 如果p是头&#xff0c;说明有资格去争抢锁&#xff0c;尝试获取锁资源&#xff08;state从0-1&#xff0c;锁重入操作&#xff09;&#xff0c;成功返回true&#xff0c;失败返回false
if (p &#61;&#61; head && tryAcquire(arg)) {
// 获取锁成功&#xff0c;设置head节点为当前节点&#xff0c;将thread&#xff0c;prev设置为null&#xff0c;因为拿到锁资源了 &#xff1b;
setHead(node);
p.next &#61; null; // 把原 head 节点从链表中移除&#xff0c;帮助GC回收
failed &#61; false; // 将标识修改为false
return interrupted; // 返回interrupted
}
// 保证上一个节点是-1&#xff0c;才会返回true&#xff0c;才会将线程阻塞&#xff0c;等待唤醒获取锁资源
if (shouldParkAfterFailedAcquire(p, node) &&
// 基于Unsafe类的park方法&#xff0c;挂起线程
parkAndCheckInterrupt(); // 针对fail属性&#xff0c;这里是唯一可能出现异常的地方&#xff0c;JVM内部出现问题时&#xff0c;可以这么理解&#xff0c;fianlly代码块中的内容&#xff0c;执行的几率约等于0
interrupted &#61; true; // 返回当前线程在等待过程中有没有中断过
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
如果 ThreadA 的锁还没有释放的情况下&#xff0c; ThreadB 和 ThreadC 来争抢锁肯定是会失败&#xff0c;那么失败以后会调用 shouldParkAfterFailedAcquire 方法
Node 有 5 中状态
// node是当前节点&#xff0c;pred是上一个节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取上一个节点的状态
int ws &#61; pred.waitStatus;
// 如果上一个节点状态为SIGNAL&#xff0c;意味着只需要等待其他前置节点的线程被释放
if (ws &#61;&#61; Node.SIGNAL)
return true; // 返回 true&#xff0c;意味着可以直接放心的挂起了
// ws 大于 0&#xff0c;意味着 prev 节点取消了排队&#xff0c;直接移除这个节点
if (ws > 0) {
do {
// 将当前节点的prev指针指向了上一个的上一个
node.prev &#61; pred &#61; pred.prev;
} while (pred.waitStatus > 0); // 一直找到小于等于0的,从双向列表中移除 CANCELLED 的节点
// 将重新标识好的最近的有效节点的next
pred.next &#61; node;
} else {
// 小于等于0&#xff0c;不等于-1&#xff0c;将上一个有效节点状态修改为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
使用 LockSupport.park 挂起当前线程变成 WATING 状态
Thread.interrupted&#xff0c;返回当前线程是否被其他线程触发过中断请求&#xff0c;也就是thread.interrupt(); 如果有触发过中断请求&#xff0c;那么这个方法会返回当前的中断标识true&#xff0c;并且对中断标识进行复位标识已经响应过了中断请求。 如果返回 true&#xff0c;意味着在 acquire 方法中会执行 selfInterrupt()。
selfInterrupt:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
标识如果当前线程在 acquireQueued 中被中断过&#xff0c;则需要产生一个中断请求&#xff0c;原因是线程在调用 acquireQueued 方法的时候是不会响应中断请求的。
cancelAcquire&#xff1a;
// cancelAcquire方法
private void cancelAcquire(Node node) {
// 如果当前节点为null&#xff0c;结束&#xff0c;健壮性判断
if (node &#61;&#61; null)
return;
// node不为null的前提下执行
// 将当前node的线程置位null &#xff0c; 竞争锁资源跟我没有关系了&#xff0c;
node.thread &#61; null;
// 获取当前节点的前驱节点
Node pred &#61; node.prev;
// 前驱节点的状态 > 0
while (pred.waitStatus > 0)
// 找到前驱中最近的非失效节点
node.prev &#61; pred &#61; pred.prev;
// 将第一个不是失效节点的后继节点声明出来
Node predNext &#61; pred.next;
// 将当前节点置位失效节点。给别的Node看的。
node.waitStatus &#61; Node.CANCELLED;
// 如果当前节点是尾节点&#xff0c;将尾节点设置为最近的有效节点&#xff08;如果当前节点为尾节点的操作&#xff09;
if (node &#61;&#61; tail && compareAndSetTail(node, pred)) {
// 用CAS方式将尾节点的next设置null
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 中间节点操作
// 如果上一个节点不是头节点
if (pred !&#61; head &&
获取上一届点状态&#xff0c;是不是有效
((ws &#61; pred.waitStatus) &#61;&#61; Node.SIGNAL || // pred需要唤醒后继节点的
(ws <&#61; 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread !&#61; null) {
Node next &#61; node.next;
if (next !&#61; null && next.waitStatus <&#61; 0)
compareAndSetNext(pred, predNext, next); // 尝试将pred的前驱节点的next指向当前节点的next&#xff08;必须是有效的next节点&#xff09;
} else {
// 头结点&#xff0c;唤醒后继节点
unparkSuccessor(node);
}
node.next &#61; node; // help GC
}
}