AQS 详解
AQS是一个用来构建锁 和同步器 的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。
AQS核心思想? AQS 是用来构建锁或者其它同步器组件的重量级基础框架,通过内置的 CLH(FIFO) 队列的变种来完成资源获取线程的排队工作,将每条将要去抢占资源的线程封装成一个 Node 节点来实现锁的分配,有一个 int 类变量表示持有锁的状态(private volatile int state),通过 CAS 完成对 status 值的修改(0表示没有,1表示阻塞)
1 private volatile int state;
状态信息通过procted类型的getState,setState,compareAndSetState进行操作
1 2 3 4 5 6 7 8 9 10 11 12 protected final int getState () { return state; } protected final void setState (int newState) { state = newState; } protected final boolean compareAndSetState (int expect, int update) { return unsafe.compareAndSwapInt(this , stateOffset, expect, update); }
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。其中Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。而Condition queue不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue。
AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
AQS 对资源的共享方式 通过阅读 AQS 的Node节点类源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 static final class Node { static final Node SHARED = new Node (); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter = = SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException (); else return p; } Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
AQS定义两种资源共享方式:
Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。
ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。
AQS定义了 五种 Node(waitState)状态:
0 :默认值,表示当前节点在sync队列中,等待着获取锁
CANCELLED(1):表示线程放弃获取锁
SIGNAL(-1):表示当前节点的后继节点包含的线程需要运行,也就是unpark
CONDITION(-2):表示当前节点在等待condition,也就是在condition队列中
PROPAGATE(-3):当前线程为 SHARED 时,使用这个状态。表示当前场景下后续的 acquireShared 能够得以执行
AQS 模板方法模式 使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放) 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:
1 2 3 4 5 isHeldExclusively() tryAcquire(int ) tryRelease(int ) tryAcquireShared(int ) tryReleaseShared(int )
默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的
AQS源码分析 类的继承关系 AbstractQueuedSynchronizer继承自 AbstractOwnableSynchronizer 抽象类,并且实现了 Serializable 接口,可以进行序列化。
1 2 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable
其中 AbstractOwnableSynchronizer 抽象类的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public abstract class AbstractOwnableSynchronizer implements java .io.Serializable { private static final long serialVersionUID = 3737899427754241961L ; protected AbstractOwnableSynchronizer () { } private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread (Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread () { return exclusiveOwnerThread; } }
AbstractOwnableSynchronizer抽象类中,可以设置独占资源线程和获取独占资源线程。分别为 setExclusiveOwnerThread 与 getExclusiveOwnerThread 方法,这两个方法会被子类调用。
AbstractQueuedSynchronizer类有两个内部类,分别为 Node 类与 ConditionObject类。下面分别做介绍。
Node类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 static final class Node { static final Node SHARED = new Node (); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter = = SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException (); else return p; } Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。
CANCELLED
,值为1,表示当前的线程被取消。
SIGNAL
,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作。
CONDITION
,值为-2,表示当前节点在等待condition,也就是在condition queue中。
PROPAGATE
,值为-3,表示当前场景下后续的acquireShared能够得以执行。
值为0,表示当前节点在sync queue中,等待着获取锁。
ConditionObject类public class ConditionObject implements Condition , java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L ; private transient Node firstWaiter; private transient Node lastWaiter; public ConditionObject () { } private Node addConditionWaiter () { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node (Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); } private void doSignalAll (Node first) { lastWaiter = firstWaiter = null ; do { Node next = first.nextWaiter; first.nextWaiter = null ; transferForSignal(first); first = next; } while (first != null ); } private void unlinkCancelledWaiters () { Node t = firstWaiter; Node trail = null ; while (t != null ) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null ; if (trail == null ) firstWaiter = next; else trail.nextWaiter = next; if (next == null ) lastWaiter = trail; } else trail = t; t = next; } } public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignal(first); } public final void signalAll () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignalAll(first); } public final void awaitUninterruptibly () { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if (Thread.interrupted()) interrupted = true ; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } private static final int REINTERRUPT = 1 ; private static final int THROW_IE = -1 ; private int checkInterruptWhileWaiting (Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0 ; } private void reportInterruptAfterWait (int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException (); else if (interruptMode == REINTERRUPT) selfInterrupt(); } public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } public final long awaitNanos (long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0 ; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L ) { transferAfterCancelledWait(node); break ; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } public final boolean awaitUntil (Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false ; int interruptMode = 0 ; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break ; } LockSupport.parkUntil(this , abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); return !timedout; } public final boolean await (long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false ; int interruptMode = 0 ; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L ) { timedout = transferAfterCancelledWait(node); break ; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); return !timedout; } final boolean isOwnedBy (AbstractQueuedSynchronizer sync) { return sync = = AbstractQueuedSynchronizer.this ; } protected final boolean hasWaiters () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); for (Node w = firstWaiter; w != null ; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) return true ; } return false ; } protected final int getWaitQueueLength () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); int n = 0 ; for (Node w = firstWaiter; w != null ; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; } protected final Collection<Thread> getWaitingThreads () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); ArrayList<Thread> list = new ArrayList <Thread>(); for (Node w = firstWaiter; w != null ; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null ) list.add(t); } } return list; } }
此类实现了Condition接口,Condition接口定义了条件操作规范,具体如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public interface Condition { void await () throws InterruptedException; void awaitUninterruptibly () ; long awaitNanos (long nanosTimeout) throws InterruptedException; boolean await (long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil (Date deadline) throws InterruptedException; void signal () ; void signalAll () ; }
Condition接口中定义了await、signal方法,用来等待条件、释放条件。之后会详细分析CondtionObject的源码。
属性 属性中包含了头节点head,尾结点tail,状态state、自旋时间 spinForTimeoutThreshold ,还有 AbstractQueuedSynchronizer 抽象的属性在内存中的偏移地址,通过该偏移地址,可以获取和设置该属性的值,同时还包括一个静态初始化块,用于加载内存偏移地址。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io.Serializable { private static final long serialVersionUID = 7373984972572414691L ; private transient volatile Node head; private transient volatile Node tail; private volatile int state; static final long spinForTimeoutThreshold = 1000L ; private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state" )); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head" )); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail" )); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus" )); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next" )); } catch (Exception ex) { throw new Error (ex); } } }
类的构造方法 此类构造方法为从抽象构造方法,供子类调用。
1 protected AbstractQueuedSynchronizer () { }
核心方法 - acquire方法 该方法以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。源码如下:
1 2 3 4 5 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
这个方法 是在 AbstractQueuedSynchronizer 中实现的,主要是做了三部
首先调用tryAcquire方法,调用此方法的线程会试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。在AbstractQueuedSynchronizer源码中默认会抛出一个异常,即需要子类去重写此方法完成自己的逻辑。之后会进行分析。
若tryAcquire失败,则调用addWaiter方法,addWaiter方法完成的功能是将调用此方法的线程封装成为一个结点并放入Sync queue。
调用acquireQueued方法,此方法完成的功能是Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。
方法解析 :
tryAcquire() :尝试获取锁
这个方法 在 NonfairSync 中的实现是:
1 2 3 protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); }
点进去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
还是 获取 当前 state 状态,若为 0 ,则 cas 替换,成功后设置当前线程 为 锁的所有者,返回 true,否则 false
若 state==1 ,代表 有人在持有这把锁,判断是否是当前线程持有,若是则给 state 加 1,返回 true,否则 false
目的就是尝试为当前线程获取锁,true 代表 获取成功,false 代表获取失败
若是 true 返回 true ,则 !true 短路不会执行后边,若为 false,则执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
1 2 3 4 5 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
先执行 addWaiter(Node.EXCLUSIVE)
传入的参数为 独占式类型的 Node ,代表这个节点中的线程是 独占锁的;
点进去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
为 AbstractQueuedSynchronizer 实现
。当前方法的目的是 将线程加入到等待队列中。
代码逻辑很简单:如果 队列空 则直接添加到 哨兵节点后边,如果不为空 则添加到 tail 节点的后边,自己成为 tail 节点。
那么 为什么说 队列为 空 时,我们说 是将第一个节点添加到 哨兵节点后边呢?看下 enq(node)
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
可以看见,整个逻辑是在一个循环中的,当队列为空时,就新建一个啥也没有的 new Node()
并设置为 head 节点
当队列不为空时,则将当前 node 链接到 tail 的 next 节点中,并设置 当前 node 为 tail 节点
该方法的目的是 将当前节点加入到 队列尾部,并且采用的是 有哨兵节点的 方式。
添加成功后,进入到 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法。点进去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
为 AbstractQueuedSynchronizer 实现
。node 是当前已加入到 等待队列中的节点 ,arg 是 1;
首先获取当前节点的前驱节点,如果前驱节点是头节点并且能够获取(资源),代表该当前节点能够占有锁,设置头节点为当前节点,返回。否则,调用 shouldParkAfterFailedAcquire 和parkAndCheckInterrupt 方法,首先,我们看shouldParkAfterFailedAcquire方法,代码如下
若获取锁成功,则证明 前一个获取锁的线程已经释放锁,则设置 node 节点为 head 节点,清除 原 head 节点的指针(便于 GC),failed 设置为 false 表示获取锁没有失败,返回 interrupted 为 false 也表示 当前线程没有终止。]
若获取失败,则会进入下一个 if() 。shouldParkAfterFailedAcquire(p, node)
参数是 前一个结点和当前节点,点进去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
这个方法的作用是 判断 当前节点是否应该 park。
。
只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。否则,将不能进行park操作。再看parkAndCheckInterrupt方法
1 2 3 4 5 6 private static final boolean compareAndSetWaitStatus (Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); }
发现是 unsafe 方法,cas +自旋 替换前一个节点的 waitStatus 状态为 Node.SIGNAL 也就是 -1;跳出 compareAndSetWaitStatus()
,返回 false ,表示 node 不该被 park。但此时 前一个结点的 waitStatus 已经 等于 -1,在 外层方法 acquireQueued
中,自旋第二次时,就会进入到该方法的 第一步,返回 true 允许 park 当前 node 节点。
若最终 shouldParkAfterFailedAcquire
返回的是 true,则会进入 parkAndCheckInterrupt()
,内部实现为:
1 2 3 4 5 6 private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
parkAndCheckInterrupt 方法里的逻辑是首先执行 park 操作,即禁用当前线程,然后返回该线程是否已经被中断,再看 finally 块中的cancelAcquire方法,其源码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 private void cancelAcquire (Node node) { if (node == null ) return ; node.thread = null ; Node pred = node.prev; while (pred.waitStatus > 0 ) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null ); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null ) { Node next = node.next; if (next != null && next.waitStatus <= 0 ) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; } }
该方法的作用就是为了释放node节点的后继结点。
对于cancelAcquire与unparkSuccessor方法,如下示意图可以清晰的表示:
其中node为参数,在执行完cancelAcquire方法后的效果就是unpark了s结点所包含的t4线程。
现在,再来看acquireQueued方法的整个的逻辑。逻辑如下:
判断结点的前驱是否为head并且是否成功获取(资源)。
若步骤1均满足,则设置结点为head,之后会判断是否finally模块,然后返回。
若步骤2不满足,则判断是否需要park当前线程,是否需要park当前线程的逻辑是判断结点的前驱结点的状态是否为SIGNAL,若是,则park当前结点,否则,不进行park操作。
若park了当前线程,之后某个线程对本线程unpark后,并且本线程也获得机会运行。那么,将会继续进行步骤①的判断。
核心方法 - release方法 以独占模式释放对象,其源码如下:
1 2 3 4 5 6 7 8 9 10 11 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
其中,tryRelease的默认实现是抛出异常,需要具体的子类实现,如果tryRelease成功,那么如果头节点不为空并且头节点的状态不为0,则释放头节点的后继结点,unparkSuccessor方法已经分析过,不再累赘。
可以看到 if 中首先执行了 tryRelease(arg)
方法,点进去,找到 ReentrantLock 实现
1 2 3 4 5 6 7 8 9 10 11 12 protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
按理说,解锁时,当前线程就是 占有锁的线程。如果 c==0,则代表 当前线程已经不需要在占有锁,则设置 锁的所有者为 null,若 c != 0 ,则代表这是 重入锁的解锁过程,并设置 state 值为 c,返回 free。表示 释放成功或失败。
如果,为 true ,表示锁已经释放。则进入到 if 语句块中 执行 unparkSuccessor(h)
,参数是头节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
这个方法,主要有三步:
判断 head 节点的 waitState 状态 是否为小于 0,SIGNAL(-1) ,CONDITION(-2),PROPAGATE(-3) .若为 其中的一项,则会清楚当前节点的状态 ,cas 变为0
找到 head 节点的下一个节点,判断其 waitState 状态是否大于0 ,也就是 CANCELED(1) 被取消,若是真的被取消,则置空 这个节点,然后从后往前找,找到 一个 waitStatus<0 的节点 赋值给 s ;
如果,s!=null,则 LockSupport.unpark(s.thread);
唤醒这个节点的线程。
AQS总结 对于AbstractQueuedSynchronizer的分析,最核心的就是sync queue的分析。
每一个结点都是由前一个结点唤醒
当结点发现前驱结点是head并且尝试获取成功,则会轮到该线程运行。
condition queue中的结点向sync queue中转移是通过signal操作完成的。
当结点的状态为SIGNAL时,表示后面的结点需要运行