public class TestReentrantLock {
private static Integer i = 0;
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
//1.创建一千个线程
Thread[] threads = new Thread[1000];
for (int j = 0; j <1000; j++) {
Thread t1 = new Thread(() -> incr());
threads[j] = t1;
}
for (int j = 0; j <1000; j++) {
threads[j].start();
}
//2.使线程充分运行
TimeUnit.SECONDS.sleep(1);
System.out.println(i);
}
private static void incr(){
// //(1)
// lock.lock();
try {
i++;
} finally {
// lock.unlock();
}
}
private static void desr() {
lock.lock();
try {
i--;
} finally {
lock.unlock();
}
}
}
incr()
方法中lock.lock()
被注释掉后,运行程序,输出的结果是一个<=1000的数字(基本不可能=1000),这就是常说的线程安全问题。lock()
//ReentrantLock.java
public void lock() {
sync.lock();
}
private final Sync sync;
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
//NonfairSync.java
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//FairSync.java
final void lock() {
acquire(1);
}
NonfairSync
和FairSync
,实际上ReenstrantLock的lock方法是委派给了Sync的lock方法NonfairSync.lock()
代码1
//NonfairSync.java
final void lock() {
//尝试抢占锁一次,抢占成功,保存当前线程信息到AQS同步器中
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//抢占失败,再次尝试抢占锁一次(`acquire(1)`)
acquire(1);
}
//CAS操作替换内存
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
前面提到要想实现锁,第一个条件就是共享资源,这里通过AbstractQueuedSynchronizer(以下简称AQS)类的一个volatile修饰的字段来充当private volatile int state
这里通过unsafe的CAS操作来直接操作内存,CAS是一种乐观锁的思想,通过比对当前的值是否匹配来决定是否修改内存,原子操作,没有指令重排序问题。且state字段使用volatile修饰,解决了可见性问题,因此不需要考虑线程安全问题。
acquire(1)
里又有几个方法,其执行顺序依次是
tryAcquire(arg)
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//判断锁是否被某线程持有
//1.无锁,尝试抢占锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
//抢占成功,设置当前线程到同步器中
setExclusiveOwnerThread(current);
return true;
}
}
//2.有锁且持有锁的线程为当前线程,state+1,标识重入次数。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc <0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这个方法的目的有两个
有锁的情况下判断是否是当前线程,即判断是否是重入的场景
方法返回true,表示抢占锁成功,无需执行下面两个方法;返回false,需要执行下面两个方法。
有锁且持有锁的线程为当前线程,因为当前线程持有的锁,别的线程无法执行这段代码,所有这里无需考虑线程安全问题,就不需要使用CAS操作
addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
//1.为当前线程创建一个独占模式的节点,waitStatus = 0
Node node = new Node(Thread.currentThread(), mode);
//2.若tail节点指向不空,尝试把当前节点插入表尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//tail节点指向为空,创建一个链表,表头指向new Node(),表尾指向当前node.
enq(node);
return node;
}
private Node enq(final Node node) {
//自旋
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋
for (;;) {
final Node p = node.predecessor();
//1.取出前继节点,前继节点是head节点,尝试抢占锁
if (p == head && tryAcquire(arg)) {
//1.1.head节点抢占成功,释放head节点,当前节点变成head节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//2.若前继节点不是head节点,或者抢占锁失败,判断是否需要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
//3.阻塞当前线程,并返回中断状态
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//前提:前继节点不是head,或者head节点抢占锁失败
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//1.判断前继节点状态,如果前继节点是阻塞状态,则当前节点肯定是可以被park的,所以返回true
if (ws == Node.SIGNAL)
return true;
//2.若前置节点为cancel(>0的只有一个-1,标识取消状态),向前遍历把前继节点 和其他 取消状态的节点删除
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//3.这里节点状态有两种(0,PROPAGATE),无论哪一种,都需要把前继节点状态设置为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//1.阻塞当前线程(中断方式:1.unpark;2.interrupt)
LockSupport.park(this);
//2.返回当前线程的阻塞状态
return Thread.interrupted();
}
//线程已被取消 static final int CANCELLED = 1; //等待触发状态,前继节点可能是head或者节点为cancel状态 static final int SIGNAL = -1; //等待条件装的,线程在等待队列中 static final int COnDITION= -2; //后继线程获取共享锁的请求应该被无条件传播 static final int PROPAGATE = -3;
unlock()
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//1.当前线程释放锁
if (tryRelease(arg)) { //(1)
Node h = head;
//2.若表头 != null && 表头的waitStatus 不是 初始状态,
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //(2)
return true;
}
return false;
}
//(1)
protected final boolean tryRelease(int releases) {
//1.同步器state - 1
//2.如果当前线程 != 获得锁的线程,报错
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//3.如果state -1 = 0,说明释放后处于无锁状态,清空当前同步器中抢占锁的线程
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//(2)
//前提:当前线程释放锁成功,操作head节点
private void unparkSuccessor(Node node) {
//1.如果head节点的waitStatus <0,尝试清除状态
int ws = node.waitStatus;
if (ws <0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
//2.如果head的后继节点为空,或者后继节点的线程已经被取消了,清空后继节点
s = null;
//3.从表尾开始向表头遍历(条件:不是表头,不等于null),找到一个阻塞状态的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//
if (s != null)
LockSupport.unpark(s.thread);
}
这里为什么从后面遍历,查找了下,在enq(final Node node)
方法中,把cas操作后,t.next = node之前,有可能会有其他的线程进来,导致出现问题,所以从tail开始遍历,能够遍历到所有节点。
//FairSync.java
//在lock方法中没有先去尝试抢占锁,而是先看看队列里是否有阻塞状态的线程
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc <0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}