AQS原理与源码分析

简介

队列同步器AbstractQueuedSynchronizer,是用来构建锁或者其他同步组键的基础框架,它使用了一个int成员变量state表示同步状态,通过CLH队列完成获取资源的线程排队工作。

AQS的主要使用方式是继承,字类通过继承同步器并实现它的抽象方法来管理同步状态。AQS本身只是定义若干同步状态获取和释放的方法提供给字类来实现。

锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节。AQS是面向锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理,线程的排队,等待与唤醒等底层操作。

AQS的设计是基于模板方法模式的,使用者继承AQS并重写指定方法,随后同步器组合自定义同步组件的实现,并调用AQS提供的模板方法,模板方法调用使用者重写的方法。

可重写方法如下,arg参数为获取锁的次数。

名称 描述
protected boolean tryAcquire(int arg) 独占方式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后进行CAS设置同步状态
protected boolean tryRelease(int arg) 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态。
protected int tryAcquireShared(int arg) 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之,获取失败
protected boolean tryReleaseShared(int arg) 共享式释放同步状态
protected boolean isHeldExclusively() 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占。

模板方法:

方法名称 描述
void acquire(int arg) 独占锁获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法会调用重写的tryAcquire()方法
void acquireInterruptibly(int arg) 与acquire相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列,如果当前线程被中断,该方法会抛出InterruptedException并返回。
boolean tryAcquireNanos(int arg, long nanosTimeout) 在acquireInterruptibly的基础上增加了超时限制,如果当前线程在超时时间之内没有获取同步状态,那么将会返回false,获取到了返回true
void acquireShared(int arg) 共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占锁获取的主要区别式同一时刻可以有多个线程获取同步状态
void acquireSharedInterruptibly(int arg) 与acquireShared相同,响应中断
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 加了超时限制
boolean release(int arg) 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中的第一个节点线程唤醒
boolean releaseShared(int arg) 共享式的释放同步状态
Collection getQueuedThreads() 获取等待在同步队列上的线程集合

模板方法基本分成3类:独占式获取与释放,共享式获取与释放,查询同步队列中的情况。

AQS整体方法架构可以参照下图(来源:美团技术团队)

AQS方法架构

原理

核心思想:如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态,如果被请求的共享资源被占用,就将请求资源的线程加入CLH队列中。

CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

AQS使用一个volatile修饰的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。

原理图如下:

AQS原理图

AQS数据结构

AQS中最基本的数据结构就是CLH队列中的Node节点,该源码位于AQS中的静态内部类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
static final class Node {
//线程节点的两种状态,独享锁和共享锁
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;

//表示当前节点已取消调度
static final int CANCELLED = 1;
//表示后继节点在等待当前节点唤醒,后继节点入队时,会见前继节点状态更新为SIGNAL
static final int SIGNAL = -1;
//表示节点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
static final int CONDITION = -2;
//SHARED模式下,前继节点不仅会唤醒后继节点,也可能唤醒后继的后继节点
static final int PROPAGATE = -3;

//当前节点的状态
volatile int waitStatus;

//前继节点
volatile Node prev;

//后继节点
volatile Node next;

//处于当前节点的线程
volatile Thread thread;

//指向下一个处于CONDITION状态的节点
Node nextWaiter;

//判断是否是SHARED状态
final boolean isShared() {
return nextWaiter == SHARED;
}

//返回前继节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

State

AQS中有一个state字段,为同步状态,用volatile修饰。AQS中提供了几个访问该字段的方法:

1
2
3
4
5
6
7
8
9
10
11
12
//返回当前state
protected final int getState() {
return state;
}
//设置state
protected final void setState(int newState) {
state = newState;
}
//CAS方式更新state
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

方法

acquire

独占模式下获取共享资源,如果当前线程获取共享资源成功,则由该方法返回,否则,将会进入同步队列等待,直到获取资源为止,整个过程忽略中断。

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

该方法中出现的其他方法,下文都会详细解释。

tryAcquire

尝试去获取独占资源,如果获取成功,直接返回true,否则返回false。

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

在AQS中只是定义一个接口,具体的资源获取和释放方式交给自定义同步器去实现。

addWaiter

此方法将当前线程加到队尾,并返回当前线程所在的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node addWaiter(Node mode) {
//将当前线程和模式构造成节点
Node node = new Node(Thread.currentThread(), mode);
//pred指向尾节点tail
Node pred = tail;
if (pred != null) {
//新构造的节点加入队尾
node.prev = pred;
//compareAndSetTail方法完成尾节点的设置
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果队列为空,使用enq方法入队
enq(node);
return node;
}

enq

此方法将节点加入队尾。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node enq(final Node node) {
//CAS自旋,直到成功加入队尾
for (;;) {
Node t = tail;
if (t == null) {
//队列为空时,创建一个空节点作为head节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//队列不空时,执行该方法
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

acquireQueued

如果执行到此方法,说明该线程获取资源失败,已被放入队列尾部。acquireQueued方法具体流程如下:

  1. 节点进入队尾后,判断如果是头节点就尝试获取资源,如果成功,直接返回
  2. 否则就通过shouldParkAfterFailedAcquire判断节点状态是否为SIGNAL,是返回true,不是返回false。
  3. 如果2中返回true,执行parkAndCheckInterrupt,通过park挂起线程,他需要等待一个中断或unpark唤醒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
final boolean acquireQueued(final Node node, int arg) {
//标记是否成功拿到锁
boolean failed = true;
try {
//标记是否被中断
boolean interrupted = false;
//自旋
for (;;) {
//定义p为该节点地前驱节点
final Node p = node.predecessor();
//如果前驱节点是head,就尝试获取锁
if (p == head && tryAcquire(arg)) {
//获取成功,将头结点设置为当前节点
setHead(node);
p.next = null; // help GC
//成功获取锁
failed = false;
//返回等待过程中是否被中断过
return interrupted;
}
//不是头节点就通过shouldParkAfterFailedAcquire方法判断节点状态是否为SIGNAL
//如果是SIGNAL状态,执行parkAndCheckInterrupt方法挂起线程,如果被唤醒,检查是否被中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//是中断的话,将中断标志设置为true
interrupted = true;
}
} finally {
//如果获取资源失败,就取消节点在队列中的等待
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire

此方法用于检查状态,检查是否进入SIGNAL状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//定义pred节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//表示pred节点处于SIGNAL状态
return true;
if (ws > 0) {
//CANCELLED状态,表示获取锁地请求取消
do {
//如果前驱节点放弃了请求,就一直往前找到正常等待状态的节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驱节点正常,就把前驱节点地状态设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt

此方法主要用于挂起当前线程,并返回中断标志

1
2
3
4
5
6
private final boolean parkAndCheckInterrupt() {
//调用park方法使线程进入waiting状态
LockSupport.park(this);
//如果被唤醒,检查是否是被中断
return Thread.interrupted();
}

cancelAcquire

acquireQueued方法中,获取资源失败执行的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void cancelAcquire(Node node) {
//过滤空节点
if (node == null)
return;
//将该节点中保存的线程信息删除
node.thread = null;
//定义pred线程为node的前驱节点
Node pred = node.prev;
//通过前驱节点过滤waitStatus为CANCELLED状态的节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

//过滤后的前驱节点的后继节点
Node predNext = pred.next;
//将node状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
//如果当前节点是尾节点,将从后往前的第一个非CANCELLED状态的节点设置为尾节点
if (node == tail && compareAndSetTail(node, pred)) {
//如果更新成功,将tail的后继节点设置为null
compareAndSetNext(pred, predNext, null);
} else {
//更新失败的话,则进入else
int ws;
//如果pred不是头节点
//判断状态是否为SIGNAL,不是的话,将节点状态设置为SIGNAL看是否成功
//判断当前节点的线程是否为null
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//当前节点的前驱节点的后继指针指向当前节点的后继节点
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//上述条件不满足,那就唤醒当前节点的后继节点
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

acquire小结

具体流程:

  1. 调用自定义同步器的tryAcquire()方法尝试直接获取资源,如果成功直接返回。
  2. 没有成功就将线程加入等待队列尾部,并标记为独占状态。
  3. acquireQueued()使在等待队列挂起,有机会(被unpark)会去尝试获取资源,获取到资源直接返回,如果这个过程被中断,就返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的,只有获取资源后自我中断selfInterrupt()。

acquire的流程也就是ReentrantLock.lock()方法的流程。通过调用acquire(1);实现。

release

独占模式下释放共享资源,如果释放资源成功(state = 0),它会唤醒同步队列中第一个节点,这也是unlock()的语义。

1
2
3
4
5
6
7
8
9
10
11
public final boolean release(int arg) {
//调用tryRelease
if (tryRelease(arg)) {
//头节点
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease

tryAcquire()一样,这个方法需要自定义同步器实现。此方法尝试去释放资源

1
2
3
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

unparkSuccessor

此方法用于唤醒队列中最前面的非CANCELED状态的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void unparkSuccessor(Node node) {
//判断节点的状态是否为非CANCELLED状态
int ws = node.waitStatus;
if (ws < 0)
//如果是非CANCELLED状态,将状态设置为0
compareAndSetWaitStatus(node, ws, 0);
//定义s为node的后继节点
Node s = node.next;
//判断s是否为空节点或者是否为CANCELLED状态
if (s == null || s.waitStatus > 0) {
s = null;
//从尾节点往前找到最前面那个为非CANCELLED状态的线程
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果该节点不为空,就unpark当前节点
if (s != null)
LockSupport.unpark(s.thread);
}

release小结

release()在独占模式下释放资源。如果release时出现异常,没有unpark队列中的其他节点。会导致线程永远挂起,无法被唤醒。

acquireShared

共享模式的获取共享资源的入口,如果当前线程未获取到共享资源,将会进入同步队列等待。

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

流程:

  1. tryAcquireShared()尝试获取资源,成功则直接返回;

  2. 失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。

tryAcquireShared

tryAcquireShared由自定义同步器实现。在acquireShared方法中,已经将返回值的语义定义好了,负值表示获取失败,0代表获取成功,但是没有剩余资源,正数表示获取成功,还有剩余资源,其它线程还可以获取。

1
2
3
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

doAcquireShared

此方法将当前线程加入等待队列尾部进行休息,直到其他线程释放资源唤醒自己。自己拿到资源后才返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void doAcquireShared(int arg) {
//加入队列尾部
final Node node = addWaiter(Node.SHARED);
//是否获取资源成功标记
boolean failed = true;
try {
//是否被中断标记
boolean interrupted = false;
for (;;) {
//前驱节点
final Node p = node.predecessor();
//如果前驱节点是头节点
if (p == head) {
//尝试获取资源
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取成功,将head指向node节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
//如果等待过程中被中断
if (interrupted)
//自我中断
selfInterrupt();
failed = false;
return;
}
}
//进入park状态,等待被unpark
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

setHeadAndPropagate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void setHeadAndPropagate(Node node, int propagate) {
//保存老的头节点
Node h = head;
//将头节点指向自己
setHead(node);
//传进来的propagate为线程执行tryAcquireShared的返回值
//大于0代表获取资源成功,并且还有剩余资源
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

releaseShared

共享模式下的线程释放共享资源的顶层入口。释放掉资源,唤醒后继节点。

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

doReleaseShared

此方法用于唤醒后继节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doReleaseShared() {

for (;;) {
//保存头节点
Node h = head;
//如果头节点不为空,并且不是尾节点
if (h != null && h != tail) {
int ws = h.waitStatus;
//判断头节点的线程状态是否为SIGNAL
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//唤醒后继节点
unparkSuccessor(h);
}
//不是SIGNAL,就继续自旋
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//如果是头节点就直接跳出
if (h == head)
break;
}
}

应用

AQS作为并发编程的底层框架,为其它很多同步工具提供了很多应用场景。大致如表所述:

同步工具 与AQS的关联
ReentrantLock 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。
Semaphore 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数。
CountDownLatch 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过。
ReentrantReadWriteLock 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数。
ThreadPoolExecutor Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease)。

参考

从ReentrantLock的实现看AQS的原理及应用

《Java并发编程的艺术》

文章作者: L1nker4
文章链接: https://l1n.wang/2020/04/aqs/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 L1nker4's Blog