基于 JDK 1.8.0_101 。
AQS 的核心:两种模式、一个状态、一个同步等待队列、条件对象。
- 两种模式:是指支持独占模式或共享模式,两者不能并存,要么独占、要么共享。
- 一个状态:状态是 AQS 的灵魂,根据所处的模式来定义其值的含义。
- 一个同步等待队列:是线程为进入指定的模式、但还不能进入时,需要在同步等待队列上进行等待。该队列是双向链表,以方便支持取消操作。
- 条件对象:只能在独占模式下使用,线程可以在条件对象上进行等待、由其他线程在满足条件时唤醒。
> 每个条件对象其实是一个单向链表实现的条件等待队列,在给定条件对象上等待的线程会进入条件等待队列,等待其他线程来唤醒。
> 进入等待之前,线程会先进入条件等待队列、保存自己持有时的状态值,然后释放独占模式,进入挂起线程、等待唤醒,唤醒后重新尝试进入独占模式,进入后再继续执行。
0. 加入同步等待队列
队列是个双向链表,方便实现取消操作。
AQS 有 head/tail
两个指针,分别指向队头、队尾节点。队列默认是空的,两个指针都指向 null
。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 队列变为空,插入一个空白的头结点,因为等待线程是要检查前驱的状态
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
1. 独占模式获取
独占模式获取逻辑框架:
1. 尝试获取,具体的逻辑是由子类实现的;
2. 如果获取成功,直接返回;
3. 获取失败,把创建节点并加入等待队列;
4. 在死循环里继续尝试获取,获取不成功则可能挂起线程、等待释放线程来通知。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); // 如果线程在等待过程中被中断过,则重新设置中断标记
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // CLH 模式实现:轮询前驱的状态
if (p == head && tryAcquire(arg)) { // 如果前驱是头结点,再次尝试获取。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 获取不成功,挂起并继续等待
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1.2 公平与非公平实现
所谓的公平性是指 AQS 的分配是否满足先到先得,满足就是公平的、否则是非公平的。
所以公平性实现的核心就是在获取时是否判断已经有在等待的。
// 非公平实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) { // 不检查是否有在等待的
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 公平性实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && // 公平性的核心:有人在等待了就加到末尾
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
2. 共享模式
AQS 提供了下面两个方法给子类来自定义共享模式的实现。
/**
* 返回值为 负数 表示获取失败;
* 返回值为 0 表示获取成功,但后续的节点没法立即也获取成功;
* 返回值为 正数 表示获取成功,后续可获取成功的数量。
*/
protected int tryAcquireShared(int arg);
protected boolean tryReleaseShared(int arg);
获取共享模式:如果尝试获取成功,则直接返回;尝试获取失败,则以共享模式加入等待队列,然后循环:
1. 检查前驱是否是头结点,如果是则继续尝试获取,获取成功了把自己设为头结点,通知后续结点。
2. 如果前驱非头结点或尝试获取失败,则挂起线程、等待唤醒。
doAcquireShared
逻辑:
1. 把自己加入等待队列,如果队列为空,则创建一个空白的头结点,再把自己加到队列的末尾。
2. 检查自己的前驱释放是头结点,如果是,则尝试获取:
2.1 获取失败进入步骤 3;
2.2 获取成功,则把自己设为头结点,并通知后续结点。
3. 如果前驱的状态是 SIGNAL,则进入等待,如果前驱是 CANCELED 则消除已取消的节点,否则前驱状态是 0 或 PROPAGATE,那么把前驱状态修改为 SIGNAL,然后进入等待。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) { // 判断前驱是否是头节点
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
释放共享模式:tryReleaseShared
返回 true 后会调用 doReleaseShared
。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); // 唤醒当前头结点后继节点的线程
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // 如果头结点改变了,继续唤醒新的头结点
break;
}
}
3. ConditionObject, 条件对象
ConditionObject
代表与 AQS 关联的一个条件队列,其实例上的操作只能在持有 AQS 独占模式的前提下进行,这样不需要考虑并发安全问题。
不同的等待条件使用不同的 ConditionObject
,方便进行更细化的控制。与 AQS 等待队列分开可以简化逻辑。
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter; // 不用 volatile 修饰
private transient Node lastWaiter;
}
3.1 添加到条件等待队列
条件的等待队列是单向链表,因为在独占模式下进行访问,因此不需要考虑线程安全问题。
头结点可以是有效的条件队列等待节点。
private Node addConditionWaiter() {
Node t = lastWaiter;
// lastWaiter 已取消,则移除,需从 firstWaiter 开始遍历
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node; // 单向链表
lastWaiter = node;
return node;
}
3.2 条件对象等待
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 添加到条件的等待队列
Node node = addConditionWaiter();
// 释放独占模式,并返回释放前的 state 值,以恢复独占模式。
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果 node 不在 acquire 的获取队列上,则等待
// 如果在,说明线程曾经被唤醒,也就是此条件对象上调用过 signal/signalAll
while (!isOnSyncQueue(node)) {
LockSupport.park(this); // 挂起当前线程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 重新获取独占模式
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
3.2 条件对象唤醒
如果条件对象的等待队列不为空,则唤醒对象上的第一个或所有节点。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // 有唤醒成功的节点、则终止循环。
(first = firstWaiter) != null);
}
// 唤醒队列上的所有节点的线程
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
// AbstractQueuedSynchronizer
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
欢迎关注我的微信公众号: coderbee笔记 。