AQS 详解

AQS是一个用来构建锁同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。

AQS核心思想?

AQS 是用来构建锁或者其它同步器组件的重量级基础框架,通过内置的 CLH(FIFO) 队列的变种来完成资源获取线程的排队工作,将每条将要去抢占资源的线程封装成一个 Node 节点来实现锁的分配,有一个 int 类变量表示持有锁的状态(private volatile int state),通过 CAS 完成对 status 值的修改(0表示没有,1表示阻塞)

1
private volatile int state;//共享变量,使用volatile修饰保证线程可见性

状态信息通过procted类型的getState,setState,compareAndSetState进行操作

1
2
3
4
5
6
7
8
9
10
11
12
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。

1662215478189

AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。其中Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。而Condition queue不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue。

AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

AQS 对资源的共享方式

通过阅读 AQS 的Node节点类源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;

static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

volatile int waitStatus;

volatile Node prev;

volatile Node next;

volatile Thread thread;

Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}


final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

AQS定义两种资源共享方式:

  • Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
    • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
    • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
  • Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。

ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。

AQS定义了 五种 Node(waitState)状态:

  • 0 :默认值,表示当前节点在sync队列中,等待着获取锁
  • CANCELLED(1):表示线程放弃获取锁
  • SIGNAL(-1):表示当前节点的后继节点包含的线程需要运行,也就是unpark
  • CONDITION(-2):表示当前节点在等待condition,也就是在condition队列中
  • PROPAGATE(-3):当前线程为 SHARED 时,使用这个状态。表示当前场景下后续的 acquireShared 能够得以执行

AQS 模板方法模式

使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放) 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:

1
2
3
4
5
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的

AQS源码分析

类的继承关系

AbstractQueuedSynchronizer继承自 AbstractOwnableSynchronizer 抽象类,并且实现了 Serializable 接口,可以进行序列化。

1
2
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable

其中 AbstractOwnableSynchronizer 抽象类的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
// 版本序列号
private static final long serialVersionUID = 3737899427754241961L;
// 构造方法
protected AbstractOwnableSynchronizer() { }
// 独占模式下的线程
private transient Thread exclusiveOwnerThread;

// 设置独占线程
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}

// 获取独占线程
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}

AbstractOwnableSynchronizer抽象类中,可以设置独占资源线程和获取独占资源线程。分别为 setExclusiveOwnerThread 与 getExclusiveOwnerThread 方法,这两个方法会被子类调用。

AbstractQueuedSynchronizer类有两个内部类,分别为 Node 类与 ConditionObject类。下面分别做介绍。

Node类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static final class Node {
// 模式,分为共享与独占
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 结点状态
// CANCELLED,值为1,表示当前的线程被取消
// SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
// 值为0,表示当前节点在sync队列中,等待着获取锁
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

// 结点状态
volatile int waitStatus;
// 前驱结点
volatile Node prev;
// 后继结点
volatile Node next;
// 结点所对应的线程
volatile Thread thread;
// 下一个等待者
Node nextWaiter;

// 结点是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}

// 获取前驱结点,若前驱结点为空,抛出异常
final Node predecessor() throws NullPointerException {
// 保存前驱结点
Node p = prev;
if (p == null) // 前驱结点为空,抛出异常
throw new NullPointerException();
else // 前驱结点不为空,返回
return p;
}

// 无参构造方法
Node() { // Used to establish initial head or SHARED marker
}

// 构造方法
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

// 构造方法
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。

  • CANCELLED,值为1,表示当前的线程被取消。
  • SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作。
  • CONDITION,值为-2,表示当前节点在等待condition,也就是在condition queue中。
  • PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行。
  • 值为0,表示当前节点在sync queue中,等待着获取锁。

ConditionObject类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
// 内部类
public class ConditionObject implements Condition, java.io.Serializable {
// 版本号
private static final long serialVersionUID = 1173984872572414699L;
// condition队列的头节点
private transient Node firstWaiter;
// condition队列的尾结点
private transient Node lastWaiter;

// 构造方法
public ConditionObject() { }
// Internal methods

// 添加新的waiter到wait队列
private Node addConditionWaiter() {
// 保存尾结点
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) { // 尾结点不为空,并且尾结点的状态不为CONDITION
// 清除状态为CONDITION的结点
unlinkCancelledWaiters();
// 将最后一个结点重新赋值给t
t = lastWaiter;
}
// 新建一个结点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) // 尾结点为空
// 设置condition队列的头节点
firstWaiter = node;
else // 尾结点不为空
// 设置为节点的nextWaiter域为node结点
t.nextWaiter = node;
// 更新condition队列的尾结点
lastWaiter = node;
return node;
}


private void doSignal(Node first) {
// 循环
do {
if ( (firstWaiter = first.nextWaiter) == null) // 该节点的nextWaiter为空
// 设置尾结点为空
lastWaiter = null;
// 设置first结点的nextWaiter域
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null); // 将结点从condition队列转移到sync队列失败并且condition队列中的头节点不为空,一直循环
}

private void doSignalAll(Node first) {
// condition队列的头节点尾结点都设置为空
lastWaiter = firstWaiter = null;
// 循环
do {
// 获取first结点的nextWaiter域结点
Node next = first.nextWaiter;
// 设置first结点的nextWaiter域为空
first.nextWaiter = null;
// 将first结点从condition队列转移到sync队列
transferForSignal(first);
// 重新设置first
first = next;
} while (first != null);
}


// 从condition队列中清除状态为CANCEL的结点
private void unlinkCancelledWaiters() {
// 保存condition队列头节点
Node t = firstWaiter;
Node trail = null;
while (t != null) { // t不为空
// 下一个结点
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) { // t结点的状态不为CONDTION状态
// 设置t节点的nextWaiter域为空
t.nextWaiter = null;
if (trail == null) // trail为空
// 重新设置condition队列的头节点
firstWaiter = next;
else // trail不为空
// 设置trail结点的nextWaiter域为next结点
trail.nextWaiter = next;
if (next == null) // next结点为空
// 设置condition队列的尾结点
lastWaiter = trail;
}
else // t结点的状态为CONDTION状态
// 设置trail结点
trail = t;
// 设置t结点
t = next;
}
}

// public methods

// 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
public final void signal() {
if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
throw new IllegalMonitorStateException();
// 保存condition队列头节点
Node first = firstWaiter;
if (first != null) // 头节点不为空
// 唤醒一个等待线程
doSignal(first);
}

// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
public final void signalAll() {
if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
throw new IllegalMonitorStateException();
// 保存condition队列头节点
Node first = firstWaiter;
if (first != null) // 头节点不为空
// 唤醒所有等待线程
doSignalAll(first);
}

// 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
public final void awaitUninterruptibly() {
// 添加一个结点到等待队列
Node node = addConditionWaiter();
// 获取释放的状态
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) { //
// 阻塞当前线程
LockSupport.park(this);
if (Thread.interrupted()) // 当前线程被中断
// 设置interrupted状态
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted) //
selfInterrupt();
}


private static final int REINTERRUPT = 1;
private static final int THROW_IE = -1;


private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}

private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}

// 等待,当前线程在接到信号或被中断之前一直处于等待状态
public final void await() throws InterruptedException {
if (Thread.interrupted()) // 当前线程被中断,抛出异常
throw new InterruptedException();
// 在wait队列上添加一个结点
Node node = addConditionWaiter();
//
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 阻塞当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检查结点等待时的中断类型
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}

// 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}

// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}


final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}


// 查询是否有正在等待此条件的任何线程
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}

// 返回正在等待此条件的线程数估计值
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}

// 返回包含那些可能正在等待此条件的线程集合
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}

此类实现了Condition接口,Condition接口定义了条件操作规范,具体如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface Condition {

// 等待,当前线程在接到信号或被中断之前一直处于等待状态
void await() throws InterruptedException;

// 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
void awaitUninterruptibly();

//等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
long awaitNanos(long nanosTimeout) throws InterruptedException;

// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
boolean await(long time, TimeUnit unit) throws InterruptedException;

// 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
boolean awaitUntil(Date deadline) throws InterruptedException;

// 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
void signal();

// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
void signalAll();
}

Condition接口中定义了await、signal方法,用来等待条件、释放条件。之后会详细分析CondtionObject的源码。

属性

属性中包含了头节点head,尾结点tail,状态state、自旋时间 spinForTimeoutThreshold ,还有 AbstractQueuedSynchronizer 抽象的属性在内存中的偏移地址,通过该偏移地址,可以获取和设置该属性的值,同时还包括一个静态初始化块,用于加载内存偏移地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 版本号
private static final long serialVersionUID = 7373984972572414691L;
// 头节点
private transient volatile Node head;
// 尾结点
private transient volatile Node tail;
// 状态
private volatile int state;
// 自旋时间
static final long spinForTimeoutThreshold = 1000L;

// Unsafe类实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// state内存偏移地址
private static final long stateOffset;
// head内存偏移地址
private static final long headOffset;
// state内存偏移地址
private static final long tailOffset;
// tail内存偏移地址
private static final long waitStatusOffset;
// next内存偏移地址
private static final long nextOffset;
// 静态初始化块
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));

} catch (Exception ex) { throw new Error(ex); }
}
}

类的构造方法

此类构造方法为从抽象构造方法,供子类调用。

1
protected AbstractQueuedSynchronizer() { }    

核心方法 - acquire方法

该方法以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。源码如下:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

这个方法 是在 AbstractQueuedSynchronizer 中实现的,主要是做了三部

  1. 首先调用tryAcquire方法,调用此方法的线程会试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。在AbstractQueuedSynchronizer源码中默认会抛出一个异常,即需要子类去重写此方法完成自己的逻辑。之后会进行分析。
  2. 若tryAcquire失败,则调用addWaiter方法,addWaiter方法完成的功能是将调用此方法的线程封装成为一个结点并放入Sync queue。
  3. 调用acquireQueued方法,此方法完成的功能是Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。

方法解析

  1. tryAcquire() :尝试获取锁

    这个方法 在 NonfairSync 中的实现是:

    1
    2
    3
    protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
    }

    点进去

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    if (compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0) // overflow
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }

    还是 获取 当前 state 状态,若为 0 ,则 cas 替换,成功后设置当前线程 为 锁的所有者,返回 true,否则 false

    若 state==1 ,代表 有人在持有这把锁,判断是否是当前线程持有,若是则给 state 加 1,返回 true,否则 false

    目的就是尝试为当前线程获取锁,true 代表 获取成功,false 代表获取失败

  2. 若是 true 返回 true ,则 !true 短路不会执行后边,若为 false,则执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

    1
    2
    3
    4
    5
    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }

    先执行 addWaiter(Node.EXCLUSIVE) 传入的参数为 独占式类型的 Node ,代表这个节点中的线程是 独占锁的;

    点进去

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // 添加等待者
    private Node addWaiter(Node mode) {
    // 新生成一个结点,默认为独占模式
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 保存尾结点
    Node pred = tail;
    if (pred != null) { // 尾结点不为空,即已经被初始化
    // 将node结点的prev域连接到尾结点
    node.prev = pred;
    if (compareAndSetTail(pred, node)) { // 比较pred是否为尾结点,是则将尾结点设置为node
    // 设置尾结点的next域为node
    pred.next = node;
    return node; // 返回新生成的结点
    }
    }
    enq(node); // 尾结点为空(即还没有被初始化过),或者是compareAndSetTail操作失败,则入队列
    return node;
    }

    为 AbstractQueuedSynchronizer 实现。当前方法的目的是 将线程加入到等待队列中。

    代码逻辑很简单:如果 队列空 则直接添加到 哨兵节点后边,如果不为空 则添加到 tail 节点的后边,自己成为 tail 节点。

    那么 为什么说 队列为 空 时,我们说 是将第一个节点添加到 哨兵节点后边呢?看下 enq(node) :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private Node enq(final Node node) {
    for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
    if (compareAndSetHead(new Node()))
    tail = head;
    } else {
    node.prev = t;
    if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;
    }
    }
    }
    }

    可以看见,整个逻辑是在一个循环中的,当队列为空时,就新建一个啥也没有的 new Node() 并设置为 head 节点

    当队列不为空时,则将当前 node 链接到 tail 的 next 节点中,并设置 当前 node 为 tail 节点

    该方法的目的是 将当前节点加入到 队列尾部,并且采用的是 有哨兵节点的 方式。

  3. 添加成功后,进入到 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 方法。点进去

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    // sync队列中的结点在独占且忽略中断的模式下获取(资源)
    final boolean acquireQueued(final Node node, int arg) {
    // 标志
    boolean failed = true;
    try {
    // 中断标志
    boolean interrupted = false;
    for (;;) { // 无限循环
    // 获取node节点的前驱结点
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) { // 前驱为头节点并且成功获得锁
    setHead(node); // 设置头节点
    p.next = null; // help GC
    failed = false; // 设置标志
    return interrupted;
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

    为 AbstractQueuedSynchronizer 实现。node 是当前已加入到 等待队列中的节点 ,arg 是 1;

    首先获取当前节点的前驱节点,如果前驱节点是头节点并且能够获取(资源),代表该当前节点能够占有锁,设置头节点为当前节点,返回。否则,调用 shouldParkAfterFailedAcquire 和parkAndCheckInterrupt 方法,首先,我们看shouldParkAfterFailedAcquire方法,代码如下

    • 若获取锁成功,则证明 前一个获取锁的线程已经释放锁,则设置 node 节点为 head 节点,清除 原 head 节点的指针(便于 GC),failed 设置为 false 表示获取锁没有失败,返回 interrupted 为 false 也表示 当前线程没有终止。]

    若获取失败,则会进入下一个 if() 。shouldParkAfterFailedAcquire(p, node) 参数是 前一个结点和当前节点,点进去

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // 当获取(资源)失败后,检查并且更新结点状态
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取前驱结点的状态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) // 状态为SIGNAL,为-1
    // 可以进行park操作
    return true;
    if (ws > 0) { // 表示状态为CANCELLED,为1
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0); // 找到pred结点前面最近的一个状态不为CANCELLED的结点
    // 赋值pred结点的next域
    pred.next = node;
    } else { // 为PROPAGATE -3 或者是0 表示无状态,(为CONDITION -2时,表示此节点在condition queue中)
    // 比较并设置前驱结点的状态为SIGNAL
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 不能进行park操作
    return false;
    }

    这个方法的作用是 判断 当前节点是否应该 park。

    只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。否则,将不能进行park操作。再看parkAndCheckInterrupt方法

    1
    2
    3
    4
    5
    6
    private static final boolean compareAndSetWaitStatus(Node node,
    int expect,
    int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset,
    expect, update);
    }

    发现是 unsafe 方法,cas +自旋 替换前一个节点的 waitStatus 状态为 Node.SIGNAL 也就是 -1;跳出 compareAndSetWaitStatus() ,返回 false ,表示 node 不该被 park。但此时 前一个结点的 waitStatus 已经 等于 -1,在 外层方法 acquireQueued 中,自旋第二次时,就会进入到该方法的 第一步,返回 true 允许 park 当前 node 节点。

    若最终 shouldParkAfterFailedAcquire 返回的是 true,则会进入 parkAndCheckInterrupt() ,内部实现为:

    1
    2
    3
    4
    5
    6
    // 进行park操作并且返回该线程是否被中断
    private final boolean parkAndCheckInterrupt() {
    // 在许可可用之前禁用当前线程,并且设置了blocker
    LockSupport.park(this);
    return Thread.interrupted(); // 当前线程是否已被中断,并清除中断标记位
    }

    parkAndCheckInterrupt 方法里的逻辑是首先执行 park 操作,即禁用当前线程,然后返回该线程是否已经被中断,再看 finally 块中的cancelAcquire方法,其源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 取消继续获取(资源)
private void cancelAcquire(Node node) {
// node为空,返回
if (node == null)
return;
// 设置node结点的thread为空
node.thread = null;
// 保存node的前驱结点
Node pred = node.prev;
while (pred.waitStatus > 0) // 找到node前驱结点中第一个状态小于0的结点,即不为CANCELLED状态的结点
node.prev = pred = pred.prev;

// 获取pred结点的下一个结点
Node predNext = pred.next;
// 设置node结点的状态为CANCELLED
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) { // node结点为尾结点,则设置尾结点为pred结点
// 比较并设置pred结点的next节点为null
compareAndSetNext(pred, predNext, null);
} else { // node结点不为尾结点,或者比较设置不成功
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) { // (pred结点不为头节点,并且pred结点的状态为SIGNAL)或者
// pred结点状态小于等于0,并且比较并设置等待状态为SIGNAL成功,并且pred结点所封装的线程不为空
// 保存结点的后继
Node next = node.next;
if (next != null && next.waitStatus <= 0) // 后继不为空并且后继的状态小于等于0
compareAndSetNext(pred, predNext, next); // 比较并设置pred.next = next;
} else {
unparkSuccessor(node); // 释放node的前一个结点
}

node.next = node; // help GC
}
}

该方法的作用就是为了释放node节点的后继结点。

对于cancelAcquire与unparkSuccessor方法,如下示意图可以清晰的表示:

image

其中node为参数,在执行完cancelAcquire方法后的效果就是unpark了s结点所包含的t4线程。

现在,再来看acquireQueued方法的整个的逻辑。逻辑如下:

  • 判断结点的前驱是否为head并且是否成功获取(资源)。
  • 若步骤1均满足,则设置结点为head,之后会判断是否finally模块,然后返回。
  • 若步骤2不满足,则判断是否需要park当前线程,是否需要park当前线程的逻辑是判断结点的前驱结点的状态是否为SIGNAL,若是,则park当前结点,否则,不进行park操作。
  • 若park了当前线程,之后某个线程对本线程unpark后,并且本线程也获得机会运行。那么,将会继续进行步骤①的判断。

核心方法 - release方法

以独占模式释放对象,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
public final boolean release(int arg) {
if (tryRelease(arg)) { // 释放成功
// 保存头节点
Node h = head;
if (h != null && h.waitStatus != 0) // 头节点不为空并且头节点状态不为0
unparkSuccessor(h); //释放头节点的后继结点
return true;
}
return false;
}

其中,tryRelease的默认实现是抛出异常,需要具体的子类实现,如果tryRelease成功,那么如果头节点不为空并且头节点的状态不为0,则释放头节点的后继结点,unparkSuccessor方法已经分析过,不再累赘。

可以看到 if 中首先执行了 tryRelease(arg) 方法,点进去,找到 ReentrantLock 实现

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

按理说,解锁时,当前线程就是 占有锁的线程。如果 c==0,则代表 当前线程已经不需要在占有锁,则设置 锁的所有者为 null,若 c != 0 ,则代表这是 重入锁的解锁过程,并设置 state 值为 c,返回 free。表示 释放成功或失败。

如果,为 true ,表示锁已经释放。则进入到 if 语句块中 执行 unparkSuccessor(h) ,参数是头节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void unparkSuccessor(Node node) {
//如果状态为负(即可能需要信号),尝试清除信号。如果失败了,或者状态被等待线程改变了,这是可以的。
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

//要取消驻留的线程保存在后继节点中,后继节点通常是下一个节点。但如果被取消或明显为空,则从 tail 向前遍历以找到实际未被取消的后继对象。
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

这个方法,主要有三步:

  1. 判断 head 节点的 waitState 状态 是否为小于 0,SIGNAL(-1) ,CONDITION(-2),PROPAGATE(-3) .若为 其中的一项,则会清楚当前节点的状态 ,cas 变为0
  2. 找到 head 节点的下一个节点,判断其 waitState 状态是否大于0 ,也就是 CANCELED(1) 被取消,若是真的被取消,则置空 这个节点,然后从后往前找,找到 一个 waitStatus<0 的节点 赋值给 s ;
  3. 如果,s!=null,则 LockSupport.unpark(s.thread); 唤醒这个节点的线程。

AQS总结

对于AbstractQueuedSynchronizer的分析,最核心的就是sync queue的分析。

  • 每一个结点都是由前一个结点唤醒
  • 当结点发现前驱结点是head并且尝试获取成功,则会轮到该线程运行。
  • condition queue中的结点向sync queue中转移是通过signal操作完成的。
  • 当结点的状态为SIGNAL时,表示后面的结点需要运行