AQS

AQS是什么?

AQS 是用来构建锁或者其它同步器组件的重量级基础框架及整个 JUC 体系的基石,通过内置的CLH(FIFO)队列的变种来完成资源获取线程的排队工作,将每条将要去抢占资源的线程封装成一个 Node 节点来实现锁的分配,有一个 int 类变量表示持有锁的状态(private volatile int state),通过 CAS 完成对 status 值的修改(0表示没有,1表示阻塞)

CLH : Craig、Landin and Hagersten 队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO

1662215478189

AQS为什么是 JUC 内容中最重要的基石

  • ReentrantLock

    1662215613076

  • CountDownLatch

    1662215664603

  • ReentrantReadWriteLock

    1662215722431

  • 面向锁的实现者 (比如 Java 并发大神 Douglee ,提出统一规范并简化了锁的实现 , 屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。)

  • 加锁会导致阻塞、有阻塞就需要排队,实现排队必然需要队列

  • 如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是 CLH 队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是 AQS 的抽象表现。它将请求共享资源的线程封装成队列的结点(Node) ,通过CAS、自旋以及 LockSuport.park() 的方式,维护 state 变量的状态,使并发达到同步的效果

AQS内部体系架构

AQS内部架构图:

1662216419804

1662216474422

AQS自身:

  • state : 0 表示空闲 ,1 表示正在有线程在执行 ,大于 1 表示 重入次数
  • CLH 队列: 实现排队的 队列

Node :

  • Node的两种种类:SHARED、EXCLUSIVE
  • waitState : 有五种取值
    • 0 :默认值
    • CANCELLED(1):表示线程放弃获取锁
    • SIGNAL(-1):表示线程已经准备好了,就等资源释放
    • CONDITION(-2):表示节点在等待队列中,节点线程等待唤醒
    • PROPAGATE(-3):当前线程为 SHARED 时,使用这个状态

ReentrantLock 开始解读AQS

ReentrantLock 中有俩子类 FairSync 、NonfairSync,分别是 公平锁和非公平锁,而具体的获取锁的操作也是由这俩完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

Lock()

当我们给 ReentrantLock 构造函数传入 true 参数时,代表创建 FairSync 对象 ,反之 NonfairSync 对象

1
2
3
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

上边的源码可以看到,这两者对于 lock() 方法的实现差不多,其中 NonfairSync 实现稍微复杂一些,所以我以 NonfairSync 的实现为例:

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

首先 cas 修改 state 状态,若成功表示,无线程在占用锁,则当前线程占用。若 失败 则进入 acquire(1) 方法;

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

这个方法 是在 AbstractQueuedSynchronizer 中实现的,主要是做了三部

  1. tryAcquire() :尝试获取锁

    这个方法 在 NonfairSync 中的实现是:

    1
    2
    3
    protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
    }

    点进去

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    if (compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0) // overflow
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }

    还是 获取 当前 state 状态,若为 0 ,则 cas 替换,成功后设置当前线程 为 锁的所有者,返回 true,否则 false

    若 state==1 ,代表 有人在持有这把锁,判断是否是当前线程持有,若是则给 state 加 1,返回 true,否则 false

    目的就是尝试为当前线程获取锁,true 代表 获取成功,false 代表获取失败

  2. 若是 true 返回 true ,则 !true 短路不会执行后边,若为 false,则执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

    1
    2
    3
    4
    5
    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }

    先执行 addWaiter(Node.EXCLUSIVE) 传入的参数为 独占式类型的 Node ,代表这个节点中的线程是 独占锁的;

    点进去

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
    }
    }
    enq(node);
    return node;
    }

    为 AbstractQueuedSynchronizer 实现。当前方法的目的是 将线程加入到等待队列中。

    代码逻辑很简单:如果 队列空 则直接添加到 哨兵节点后边,如果不为空 则添加到 tail 节点的后边,自己成为 tail 节点。

    那么 为什么说 队列为 空 时,我们说 是将第一个节点添加到 哨兵节点后边呢?看下 enq(node) :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private Node enq(final Node node) {
    for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
    if (compareAndSetHead(new Node()))
    tail = head;
    } else {
    node.prev = t;
    if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;
    }
    }
    }
    }

    可以看见,整个逻辑是在一个循环中的,当队列为空时,就新建一个啥也没有的 new Node() 并设置为 head 节点

    当队列不为空时,则将当前 node 链接到 tail 的 next 节点中,并设置 当前 node 为 tail 节点

    该方法的目的是 将当前节点加入到 队列尾部,并且采用的是 有哨兵节点的 方式。

  3. 添加成功后,进入到 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 方法。点进去

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;
    for (;;) {//cas
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

    为 AbstractQueuedSynchronizer 实现。node 是当前已加入到 等待队列中的节点 ,arg 是 1;

    这段代码的核心就在 for 循环中 ,这是一个 cas ,每个等待队列中的 node 节点运行到这里,都会进行 自旋。

    循环中,获取当前节点的 前一个节点,若这个节点为首节点,则尝试为 node 节点获取锁:

    • 若获取锁成功,则证明 前一个获取锁的线程已经释放锁,则设置 node 节点为 head 节点,清除 原 head 节点的指针(便于 GC),failed 设置为 false 表示获取锁没有失败,返回 interrupted 为 false 也表示 当前线程没有终止。

    • 若获取失败,则会进入下一个 if() 。shouldParkAfterFailedAcquire(p, node) 参数是 前一个结点和当前节点,点进去

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      int ws = pred.waitStatus;
      if (ws == Node.SIGNAL)
      //这个节点已经设置了请求释放的状态来发出信号,所以当前 node 可以安全 park。
      return true;
      if (ws > 0) {//CANCELED
      //表示前一个节点被取消了。跳过并指示重试。
      do {
      node.prev = pred = pred.prev;
      } while (pred.waitStatus > 0);
      pred.next = node;
      } else {
      //waitStatus必须为 0 或 PROPAGATE。表示需要锁,但先别 park。来电者将需要重试,以确保它不能获得之前停车。
      compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
      }
      return false;
      }

      这个方法的作用是 判断 当前节点是否应该 park。

      1. 先判断 前一个节点的 waitStatus 状态,如果 为 SIGNAL(-1) ,返回 true 表示当前 node 节点确实应该 park。

      2. 如果 为 1 ,表示 前一个节点已经 被取消,则重新将 当前 node.prev 设置成前一个结点的前一个结点,直到 pred.waitStatus < 0 ,返回 false 表示 node 不该被 park。

      3. 如果 为0、CONDITION(-2) 或者 PROPAGATE(-3) ,则进入 compareAndSetWaitStatus(pred,ws,Node.SIGNAL) 方法,点进去

        1
        2
        3
        4
        5
        6
        private static final boolean compareAndSetWaitStatus(Node node,
        int expect,
        int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
        expect, update);
        }

        发现是 unsafe 方法,cas +自旋 替换前一个节点的 waitStatus 状态为 Node.SIGNAL 也就是 -1;跳出 compareAndSetWaitStatus() ,返回 false ,表示 node 不该被 park。但此时 前一个结点的 waitStatus 已经 等于 -1,在 外层方法 acquireQueued 中,自旋第二次时,就会进入到该方法的 第一步,返回 true 允许 park 当前 node 节点。

      若最终 shouldParkAfterFailedAcquire 返回的是 true,则会进入 parkAndCheckInterrupt() ,内部实现为:

      1
      2
      3
      4
      private final boolean parkAndCheckInterrupt() {
      LockSupport.park(this);
      return Thread.interrupted();
      }

      即,阻塞当前线程。返回 Thread.interrupted() 此时 为 true。到外层后,执行 interrupted = true; 然后继续循环,直到前一个节点为首节点,并成功获取到锁,会跳出循环,这是 自旋实现。那么,这里 当前节点已经被 park 了,它是在那里 unpark 的呢?下边就看一下 unlock();

unlock()

当我们调用 lock.unlock() 方法时,不管是 公平锁还是 非公平锁,都会调用到 AbstractQueuedSynchronizer 中实现的 release(1) 方法。

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

可以看到 if 中首先执行了 tryRelease(arg) 方法,点进去,找到 ReentrantLock 实现

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

按理说,解锁时,当前线程就是 占有锁的线程。如果 c==0,则代表 当前线程已经不需要在占有锁,则设置 锁的所有者为 null,若 c != 0 ,则代表这是 重入锁的解锁过程,并设置 state 值为 c,返回 free。表示 释放成功或失败。

如果,为 true ,表示锁已经释放。则进入到 if 语句块中 执行 unparkSuccessor(h) ,参数是头节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void unparkSuccessor(Node node) {
//如果状态为负(即可能需要信号),尝试清除信号。如果失败了,或者状态被等待线程改变了,这是可以的。
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

//要取消驻留的线程保存在后继节点中,后继节点通常是下一个节点。但如果被取消或明显为空,则从 tail 向前遍历以找到实际未被取消的后继对象。
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

这个方法,主要有三步:

  1. 判断 head 节点的 waitState 状态 是否为小于 0,SIGNAL(-1) ,CONDITION(-2),PROPAGATE(-3) .若为 其中的一项,则会清楚当前节点的状态 ,cas 变为0
  2. 找到 head 节点的下一个节点,判断其 waitState 状态是否大于0 ,也就是 CANCELED(1) 被取消,若是真的被取消,则置空 这个节点,然后从后往前找,找到 一个 waitStatus<0 的节点 赋值给 s ;
  3. 如果,s!=null,则 LockSupport.unpark(s.thread); 唤醒这个节点的线程。

总结

  1. 业务场景,比如说我们有三个线程A、B、C去银行办理业务了,A线程最先抢到执行权开始办理业务,那么B、C两个线程就在CLH队列里面排队如图所示,注意傀儡结点和B结点的状态都会改为-1
  2. 当A线程办理好业务,离开的时候,会把傀儡结点的 waitStatus 从-1改为0 ,将 state 从 1 改为 0 ,将当前线程置为null
  3. 这个时候如果B上位,首先将state 从0改为1(表示占用),把 thread 置为线程B , 会执行如下图的①②③④,会触发GC,然后就把第一个灰色的傀儡结点给清除掉了,这个时候原来的B结点重新成为傀儡结点

1662298448868