AQS(1.0)
AQS
AQS是什么?
AQS 是用来构建锁或者其它同步器组件的重量级基础框架及整个 JUC 体系的基石,通过内置的CLH(FIFO)队列的变种来完成资源获取线程的排队工作,将每条将要去抢占资源的线程封装成一个 Node 节点来实现锁的分配,有一个 int 类变量表示持有锁的状态(private volatile int state),通过 CAS 完成对 status 值的修改(0表示没有,1表示阻塞)
CLH : Craig、Landin and Hagersten 队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO
AQS为什么是 JUC 内容中最重要的基石
ReentrantLock
CountDownLatch
ReentrantReadWriteLock
面向锁的实现者 (比如 Java 并发大神 Douglee ,提出统一规范并简化了锁的实现 , 屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。)
加锁会导致阻塞、有阻塞就需要排队,实现排队必然需要队列
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是 CLH 队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是 AQS 的抽象表现。它将请求共享资源的线程封装成队列的结点(Node) ,通过CAS、自旋以及 LockSuport.park() 的方式,维护 state 变量的状态,使并发达到同步的效果
AQS内部体系架构
AQS内部架构图:
AQS自身:
- state : 0 表示空闲 ,1 表示正在有线程在执行 ,大于 1 表示 重入次数
- CLH 队列: 实现排队的 队列
Node :
- Node的两种种类:SHARED、EXCLUSIVE
- waitState : 有五种取值
- 0 :默认值
- CANCELLED(1):表示线程放弃获取锁
- SIGNAL(-1):表示线程已经准备好了,就等资源释放
- CONDITION(-2):表示节点在等待队列中,节点线程等待唤醒
- PROPAGATE(-3):当前线程为 SHARED 时,使用这个状态
ReentrantLock 开始解读AQS
ReentrantLock 中有俩子类 FairSync 、NonfairSync,分别是 公平锁和非公平锁,而具体的获取锁的操作也是由这俩完成的。
1 | static final class NonfairSync extends Sync { |
Lock()
当我们给 ReentrantLock 构造函数传入 true 参数时,代表创建 FairSync 对象 ,反之 NonfairSync 对象
1 | public ReentrantLock(boolean fair) { |
上边的源码可以看到,这两者对于 lock() 方法的实现差不多,其中 NonfairSync 实现稍微复杂一些,所以我以 NonfairSync 的实现为例:
1 | final void lock() { |
首先 cas 修改 state 状态,若成功表示,无线程在占用锁,则当前线程占用
。若 失败 则进入 acquire(1) 方法;
1 | public final void acquire(int arg) { |
这个方法 是在 AbstractQueuedSynchronizer 中实现的,主要是做了三部
tryAcquire() :尝试获取锁
这个方法 在 NonfairSync 中的实现是:
1
2
3protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}点进去
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}还是 获取 当前 state 状态,若为 0 ,则 cas 替换,成功后设置当前线程 为 锁的所有者,返回 true,否则 false
若 state==1 ,代表 有人在持有这把锁,判断是否是当前线程持有,若是则给 state 加 1,返回 true,否则 false
目的就是尝试为当前线程获取锁,true 代表 获取成功,false 代表获取失败
若是 true 返回 true ,则 !true 短路不会执行后边,若为 false,则执行
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
1
2
3
4
5public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}先执行
addWaiter(Node.EXCLUSIVE)
传入的参数为 独占式类型的 Node ,代表这个节点中的线程是 独占锁的;点进去
1
2
3
4
5
6
7
8
9
10
11
12
13
14private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}为 AbstractQueuedSynchronizer 实现
。当前方法的目的是 将线程加入到等待队列中。代码逻辑很简单:如果 队列空 则直接添加到 哨兵节点后边,如果不为空 则添加到 tail 节点的后边,自己成为 tail 节点。
那么 为什么说 队列为 空 时,我们说 是将第一个节点添加到 哨兵节点后边呢?看下
enq(node)
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}可以看见,整个逻辑是在一个循环中的,当队列为空时,就新建一个啥也没有的
new Node()
并设置为 head 节点当队列不为空时,则将当前 node 链接到 tail 的 next 节点中,并设置 当前 node 为 tail 节点
该方法的目的是 将当前节点加入到 队列尾部,并且采用的是 有哨兵节点的 方式。
添加成功后,进入到
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法。点进去1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//cas
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}为 AbstractQueuedSynchronizer 实现
。node 是当前已加入到 等待队列中的节点 ,arg 是 1;这段代码的核心就在 for 循环中 ,这是一个 cas ,每个等待队列中的 node 节点运行到这里,都会进行 自旋。
循环中,获取当前节点的 前一个节点,若这个节点为首节点,则尝试为 node 节点获取锁:
若获取锁成功,则证明 前一个获取锁的线程已经释放锁,则设置 node 节点为 head 节点,清除 原 head 节点的指针(便于 GC),failed 设置为 false 表示获取锁没有失败,返回 interrupted 为 false 也表示 当前线程没有终止。
若获取失败,则会进入下一个 if() 。
shouldParkAfterFailedAcquire(p, node)
参数是 前一个结点和当前节点,点进去1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//这个节点已经设置了请求释放的状态来发出信号,所以当前 node 可以安全 park。
return true;
if (ws > 0) {//CANCELED
//表示前一个节点被取消了。跳过并指示重试。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//waitStatus必须为 0 或 PROPAGATE。表示需要锁,但先别 park。来电者将需要重试,以确保它不能获得之前停车。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}这个方法的作用是 判断 当前节点是否应该 park。
。先判断 前一个节点的 waitStatus 状态,如果 为 SIGNAL(-1) ,返回 true 表示当前 node 节点确实应该 park。
如果 为 1 ,表示 前一个节点已经 被取消,则重新将 当前 node.prev 设置成前一个结点的前一个结点,直到 pred.waitStatus < 0 ,返回 false 表示 node 不该被 park。
如果 为0、CONDITION(-2) 或者 PROPAGATE(-3) ,则进入
compareAndSetWaitStatus(pred,ws,Node.SIGNAL)
方法,点进去1
2
3
4
5
6private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}发现是 unsafe 方法,cas +自旋 替换前一个节点的 waitStatus 状态为 Node.SIGNAL 也就是 -1;跳出
compareAndSetWaitStatus()
,返回 false ,表示 node 不该被 park。但此时 前一个结点的 waitStatus 已经 等于 -1,在 外层方法acquireQueued
中,自旋第二次时,就会进入到该方法的 第一步,返回 true 允许 park 当前 node 节点。
若最终
shouldParkAfterFailedAcquire
返回的是 true,则会进入parkAndCheckInterrupt()
,内部实现为:1
2
3
4private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}即,阻塞当前线程。返回
Thread.interrupted()
此时 为 true。到外层后,执行interrupted = true
; 然后继续循环,直到前一个节点为首节点,并成功获取到锁,会跳出循环,这是 自旋实现。那么,这里 当前节点已经被 park 了,它是在那里 unpark 的呢?下边就看一下 unlock();
unlock()
当我们调用 lock.unlock() 方法时,不管是 公平锁还是 非公平锁,都会调用到 AbstractQueuedSynchronizer 中实现的 release(1)
方法。
1 | public final boolean release(int arg) { |
可以看到 if 中首先执行了 tryRelease(arg)
方法,点进去,找到 ReentrantLock 实现
1 | protected final boolean tryRelease(int releases) { |
按理说,解锁时,当前线程就是 占有锁的线程。如果 c==0,则代表 当前线程已经不需要在占有锁,则设置 锁的所有者为 null,若 c != 0 ,则代表这是 重入锁的解锁过程,并设置 state 值为 c,返回 free。表示 释放成功或失败。
如果,为 true ,表示锁已经释放。则进入到 if 语句块中 执行 unparkSuccessor(h)
,参数是头节点。
1 | private void unparkSuccessor(Node node) { |
这个方法,主要有三步:
- 判断 head 节点的 waitState 状态 是否为小于 0,SIGNAL(-1) ,CONDITION(-2),PROPAGATE(-3) .若为 其中的一项,则会清楚当前节点的状态 ,cas 变为0
- 找到 head 节点的下一个节点,判断其 waitState 状态是否大于0 ,也就是 CANCELED(1) 被取消,若是真的被取消,则置空 这个节点,然后从后往前找,找到 一个 waitStatus<0 的节点 赋值给 s ;
- 如果,s!=null,则
LockSupport.unpark(s.thread);
唤醒这个节点的线程。
总结
- 业务场景,比如说我们有三个线程A、B、C去银行办理业务了,A线程最先抢到执行权开始办理业务,那么B、C两个线程就在CLH队列里面排队如图所示,注意傀儡结点和B结点的状态都会改为-1
- 当A线程办理好业务,离开的时候,会把傀儡结点的 waitStatus 从-1改为0 ,将 state 从 1 改为 0 ,将当前线程置为null
- 这个时候如果B上位,首先将state 从0改为1(表示占用),把 thread 置为线程B , 会执行如下图的①②③④,会触发GC,然后就把第一个灰色的傀儡结点给清除掉了,这个时候原来的B结点重新成为傀儡结点