AQS 详解
AQS是一个用来构建锁 和同步器 的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。
AQS核心思想? AQS 是用来构建锁或者其它同步器组件的重量级基础框架,通过内置的 CLH(FIFO) 队列的变种来完成资源获取线程的排队工作,将每条将要去抢占资源的线程封装成一个 Node 节点来实现锁的分配,有一个 int 类变量表示持有锁的状态(private volatile int state),通过 CAS 完成对 status 值的修改(0表示没有,1表示阻塞)
1 private volatile int state;
状态信息通过procted类型的getState,setState,compareAndSetState进行操作
1 2 3 4 5 6 7 8 9 10 11 12 protected final int getState () { return state; } protected final void setState (int newState) { state = newState; } protected final boolean compareAndSetState (int expect, int update) { return unsafe.compareAndSwapInt(this , stateOffset, expect, update); }
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。其中Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。而Condition queue不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue。
AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
AQS 对资源的共享方式 通过阅读 AQS 的Node节点类源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 static final class Node { static final Node SHARED = new Node (); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter = = SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException (); else return p; } Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
AQS定义两种资源共享方式:
Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。
ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。
AQS定义了 五种 Node(waitState)状态:
0 :默认值,表示当前节点在sync队列中,等待着获取锁
CANCELLED(1):表示线程放弃获取锁
SIGNAL(-1):表示当前节点的后继节点包含的线程需要运行,也就是unpark
CONDITION(-2):表示当前节点在等待condition,也就是在condition队列中
PROPAGATE(-3):当前线程为 SHARED 时,使用这个状态。表示当前场景下后续的 acquireShared 能够得以执行
AQS 模板方法模式 使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放) 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:
1 2 3 4 5 isHeldExclusively() tryAcquire(int ) tryRelease(int ) tryAcquireShared(int ) tryReleaseShared(int )
默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的
AQS源码分析 类的继承关系 AbstractQueuedSynchronizer继承自 AbstractOwnableSynchronizer 抽象类,并且实现了 Serializable 接口,可以进行序列化。
1 2 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable
其中 AbstractOwnableSynchronizer 抽象类的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public abstract class AbstractOwnableSynchronizer implements java .io.Serializable { private static final long serialVersionUID = 3737899427754241961L ; protected AbstractOwnableSynchronizer () { } private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread (Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread () { return exclusiveOwnerThread; } }
AbstractOwnableSynchronizer抽象类中,可以设置独占资源线程和获取独占资源线程。分别为 setExclusiveOwnerThread 与 getExclusiveOwnerThread 方法,这两个方法会被子类调用。
AbstractQueuedSynchronizer类有两个内部类,分别为 Node 类与 ConditionObject类。下面分别做介绍。
Node类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 static final class Node { static final Node SHARED = new Node (); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter = = SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException (); else return p; } Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。
CANCELLED
,值为1,表示当前的线程被取消。
SIGNAL
,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作。
CONDITION
,值为-2,表示当前节点在等待condition,也就是在condition queue中。
PROPAGATE
,值为-3,表示当前场景下后续的acquireShared能够得以执行。
值为0,表示当前节点在sync queue中,等待着获取锁。
ConditionObject类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 public class ConditionObject implements Condition , java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L ; private transient Node firstWaiter; private transient Node lastWaiter; public ConditionObject () { } private Node addConditionWaiter () { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node (Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); } private void doSignalAll (Node first) { lastWaiter = firstWaiter = null ; do { Node next = first.nextWaiter; first.nextWaiter = null ; transferForSignal(first); first = next; } while (first != null ); } private void unlinkCancelledWaiters () { Node t = firstWaiter; Node trail = null ; while (t != null ) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null ; if (trail == null ) firstWaiter = next; else trail.nextWaiter = next; if (next == null ) lastWaiter = trail; } else trail = t; t = next; } } public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignal(first); } public final void signalAll () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignalAll(first); } public final void awaitUninterruptibly () { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if (Thread.interrupted()) interrupted = true ; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } private static final int REINTERRUPT = 1 ; private static final int THROW_IE = -1 ; private int checkInterruptWhileWaiting (Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0 ; } private void reportInterruptAfterWait (int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException (); else if (interruptMode == REINTERRUPT) selfInterrupt(); } public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } public final long awaitNanos (long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0 ; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L ) { transferAfterCancelledWait(node); break ; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } public final boolean awaitUntil (Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false ; int interruptMode = 0 ; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break ; } LockSupport.parkUntil(this , abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); return !timedout; } public final boolean await (long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false ; int interruptMode = 0 ; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L ) { timedout = transferAfterCancelledWait(node); break ; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); return !timedout; } final boolean isOwnedBy (AbstractQueuedSynchronizer sync) { return sync = = AbstractQueuedSynchronizer.this ; } protected final boolean hasWaiters () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); for (Node w = firstWaiter; w != null ; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) return true ; } return false ; } protected final int getWaitQueueLength () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); int n = 0 ; for (Node w = firstWaiter; w != null ; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; } protected final Collection<Thread> getWaitingThreads () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); ArrayList<Thread> list = new ArrayList <Thread>(); for (Node w = firstWaiter; w != null ; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null ) list.add(t); } } return list; } }
此类实现了Condition接口,Condition接口定义了条件操作规范,具体如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public interface Condition { void await () throws InterruptedException; void awaitUninterruptibly () ; long awaitNanos (long nanosTimeout) throws InterruptedException; boolean await (long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil (Date deadline) throws InterruptedException; void signal () ; void signalAll () ; }
Condition接口中定义了await、signal方法,用来等待条件、释放条件。之后会详细分析CondtionObject的源码。
属性 属性中包含了头节点head,尾结点tail,状态state、自旋时间 spinForTimeoutThreshold ,还有 AbstractQueuedSynchronizer 抽象的属性在内存中的偏移地址,通过该偏移地址,可以获取和设置该属性的值,同时还包括一个静态初始化块,用于加载内存偏移地址。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable { private static final long serialVersionUID = 7373984972572414691L ; private transient volatile Node head; private transient volatile Node tail; private volatile int state; static final long spinForTimeoutThreshold = 1000L ; private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state" )); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head" )); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail" )); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus" )); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next" )); } catch (Exception ex) { throw new Error (ex); } } }
类的构造方法 此类构造方法为从抽象构造方法,供子类调用。
1 protected AbstractQueuedSynchronizer () { }
核心方法 - acquire方法 该方法以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。源码如下:
1 2 3 4 5 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
这个方法 是在 AbstractQueuedSynchronizer 中实现的,主要是做了三部
首先调用tryAcquire方法,调用此方法的线程会试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。在AbstractQueuedSynchronizer源码中默认会抛出一个异常,即需要子类去重写此方法完成自己的逻辑。之后会进行分析。
若tryAcquire失败,则调用addWaiter方法,addWaiter方法完成的功能是将调用此方法的线程封装成为一个结点并放入Sync queue。
调用acquireQueued方法,此方法完成的功能是Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。
方法解析 :
tryAcquire() :尝试获取锁
这个方法 在 NonfairSync 中的实现是:
1 2 3 protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); }
点进去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
还是 获取 当前 state 状态,若为 0 ,则 cas 替换,成功后设置当前线程 为 锁的所有者,返回 true,否则 false
若 state==1 ,代表 有人在持有这把锁,判断是否是当前线程持有,若是则给 state 加 1,返回 true,否则 false
目的就是尝试为当前线程获取锁,true 代表 获取成功,false 代表获取失败
若是 true 返回 true ,则 !true 短路不会执行后边,若为 false,则执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
1 2 3 4 5 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
先执行 addWaiter(Node.EXCLUSIVE)
传入的参数为 独占式类型的 Node ,代表这个节点中的线程是 独占锁的;
点进去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
为 AbstractQueuedSynchronizer 实现
。当前方法的目的是 将线程加入到等待队列中。
代码逻辑很简单:如果 队列空 则直接添加到 哨兵节点后边,如果不为空 则添加到 tail 节点的后边,自己成为 tail 节点。
那么 为什么说 队列为 空 时,我们说 是将第一个节点添加到 哨兵节点后边呢?看下 enq(node)
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
可以看见,整个逻辑是在一个循环中的,当队列为空时,就新建一个啥也没有的 new Node()
并设置为 head 节点
当队列不为空时,则将当前 node 链接到 tail 的 next 节点中,并设置 当前 node 为 tail 节点
该方法的目的是 将当前节点加入到 队列尾部,并且采用的是 有哨兵节点的 方式。
添加成功后,进入到 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法。点进去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
为 AbstractQueuedSynchronizer 实现
。node 是当前已加入到 等待队列中的节点 ,arg 是 1;
首先获取当前节点的前驱节点,如果前驱节点是头节点并且能够获取(资源),代表该当前节点能够占有锁,设置头节点为当前节点,返回。否则,调用 shouldParkAfterFailedAcquire 和parkAndCheckInterrupt 方法,首先,我们看shouldParkAfterFailedAcquire方法,代码如下
若获取锁成功,则证明 前一个获取锁的线程已经释放锁,则设置 node 节点为 head 节点,清除 原 head 节点的指针(便于 GC),failed 设置为 false 表示获取锁没有失败,返回 interrupted 为 false 也表示 当前线程没有终止。]
若获取失败,则会进入下一个 if() 。shouldParkAfterFailedAcquire(p, node)
参数是 前一个结点和当前节点,点进去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
这个方法的作用是 判断 当前节点是否应该 park。
。
只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。否则,将不能进行park操作。再看parkAndCheckInterrupt方法
1 2 3 4 5 6 private static final boolean compareAndSetWaitStatus (Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); }
发现是 unsafe 方法,cas +自旋 替换前一个节点的 waitStatus 状态为 Node.SIGNAL 也就是 -1;跳出 compareAndSetWaitStatus()
,返回 false ,表示 node 不该被 park。但此时 前一个结点的 waitStatus 已经 等于 -1,在 外层方法 acquireQueued
中,自旋第二次时,就会进入到该方法的 第一步,返回 true 允许 park 当前 node 节点。
若最终 shouldParkAfterFailedAcquire
返回的是 true,则会进入 parkAndCheckInterrupt()
,内部实现为:
1 2 3 4 5 6 private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
parkAndCheckInterrupt 方法里的逻辑是首先执行 park 操作,即禁用当前线程,然后返回该线程是否已经被中断,再看 finally 块中的cancelAcquire方法,其源码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 private void cancelAcquire (Node node) { if (node == null ) return ; node.thread = null ; Node pred = node.prev; while (pred.waitStatus > 0 ) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null ); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null ) { Node next = node.next; if (next != null && next.waitStatus <= 0 ) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; } }
该方法的作用就是为了释放node节点的后继结点。
对于cancelAcquire与unparkSuccessor方法,如下示意图可以清晰的表示:
其中node为参数,在执行完cancelAcquire方法后的效果就是unpark了s结点所包含的t4线程。
现在,再来看acquireQueued方法的整个的逻辑。逻辑如下:
判断结点的前驱是否为head并且是否成功获取(资源)。
若步骤1均满足,则设置结点为head,之后会判断是否finally模块,然后返回。
若步骤2不满足,则判断是否需要park当前线程,是否需要park当前线程的逻辑是判断结点的前驱结点的状态是否为SIGNAL,若是,则park当前结点,否则,不进行park操作。
若park了当前线程,之后某个线程对本线程unpark后,并且本线程也获得机会运行。那么,将会继续进行步骤①的判断。
核心方法 - release方法 以独占模式释放对象,其源码如下:
1 2 3 4 5 6 7 8 9 10 11 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
其中,tryRelease的默认实现是抛出异常,需要具体的子类实现,如果tryRelease成功,那么如果头节点不为空并且头节点的状态不为0,则释放头节点的后继结点,unparkSuccessor方法已经分析过,不再累赘。
可以看到 if 中首先执行了 tryRelease(arg)
方法,点进去,找到 ReentrantLock 实现
1 2 3 4 5 6 7 8 9 10 11 12 protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
按理说,解锁时,当前线程就是 占有锁的线程。如果 c==0,则代表 当前线程已经不需要在占有锁,则设置 锁的所有者为 null,若 c != 0 ,则代表这是 重入锁的解锁过程,并设置 state 值为 c,返回 free。表示 释放成功或失败。
如果,为 true ,表示锁已经释放。则进入到 if 语句块中 执行 unparkSuccessor(h)
,参数是头节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
这个方法,主要有三步:
判断 head 节点的 waitState 状态 是否为小于 0,SIGNAL(-1) ,CONDITION(-2),PROPAGATE(-3) .若为 其中的一项,则会清楚当前节点的状态 ,cas 变为0
找到 head 节点的下一个节点,判断其 waitState 状态是否大于0 ,也就是 CANCELED(1) 被取消,若是真的被取消,则置空 这个节点,然后从后往前找,找到 一个 waitStatus<0 的节点 赋值给 s ;
如果,s!=null,则 LockSupport.unpark(s.thread);
唤醒这个节点的线程。
AQS总结 对于AbstractQueuedSynchronizer的分析,最核心的就是sync queue的分析。
每一个结点都是由前一个结点唤醒
当结点发现前驱结点是head并且尝试获取成功,则会轮到该线程运行。
condition queue中的结点向sync queue中转移是通过signal操作完成的。
当结点的状态为SIGNAL时,表示后面的结点需要运行