再读 AQS

基于 JDK 1.8.0_101 。

AQS 的核心:两种模式、一个状态、一个同步等待队列、条件对象。

  • 两种模式:是指支持独占模式或共享模式,两者不能并存,要么独占、要么共享。
  • 一个状态:状态是 AQS 的灵魂,根据所处的模式来定义其值的含义。
  • 一个同步等待队列:是线程为进入指定的模式、但还不能进入时,需要在同步等待队列上进行等待。该队列是双向链表,以方便支持取消操作。
  • 条件对象:只能在独占模式下使用,线程可以在条件对象上进行等待、由其他线程在满足条件时唤醒。
    > 每个条件对象其实是一个单向链表实现的条件等待队列,在给定条件对象上等待的线程会进入条件等待队列,等待其他线程来唤醒。
    > 进入等待之前,线程会先进入条件等待队列、保存自己持有时的状态值,然后释放独占模式,进入挂起线程、等待唤醒,唤醒后重新尝试进入独占模式,进入后再继续执行。

0. 加入同步等待队列

队列是个双向链表,方便实现取消操作。

AQS 有 head/tail 两个指针,分别指向队头、队尾节点。队列默认是空的,两个指针都指向 null

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 队列变为空,插入一个空白的头结点,因为等待线程是要检查前驱的状态
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

1. 独占模式获取

独占模式获取逻辑框架:
1. 尝试获取,具体的逻辑是由子类实现的;
2. 如果获取成功,直接返回;
3. 获取失败,把创建节点并加入等待队列;
4. 在死循环里继续尝试获取,获取不成功则可能挂起线程、等待释放线程来通知。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();    // 如果线程在等待过程中被中断过,则重新设置中断标记
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();  // CLH 模式实现:轮询前驱的状态
            if (p == head && tryAcquire(arg)) { // 如果前驱是头结点,再次尝试获取。
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())    // 获取不成功,挂起并继续等待
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

1.2 公平与非公平实现

所谓的公平性是指 AQS 的分配是否满足先到先得,满足就是公平的、否则是非公平的。

所以公平性实现的核心就是在获取时是否判断已经有在等待的。

//  非公平实现
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) { // 不检查是否有在等待的
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

// 公平性实现
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&     // 公平性的核心:有人在等待了就加到末尾
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

2. 共享模式

AQS 提供了下面两个方法给子类来自定义共享模式的实现。

/**
 * 返回值为 负数 表示获取失败;
 * 返回值为 0 表示获取成功,但后续的节点没法立即也获取成功;
 * 返回值为 正数 表示获取成功,后续可获取成功的数量。
*/
protected int tryAcquireShared(int arg);

protected boolean tryReleaseShared(int arg);

获取共享模式:如果尝试获取成功,则直接返回;尝试获取失败,则以共享模式加入等待队列,然后循环:
1. 检查前驱是否是头结点,如果是则继续尝试获取,获取成功了把自己设为头结点,通知后续结点。
2. 如果前驱非头结点或尝试获取失败,则挂起线程、等待唤醒。

doAcquireShared 逻辑:
1. 把自己加入等待队列,如果队列为空,则创建一个空白的头结点,再把自己加到队列的末尾。
2. 检查自己的前驱释放是头结点,如果是,则尝试获取:
  2.1 获取失败进入步骤 3;
  2.2 获取成功,则把自己设为头结点,并通知后续结点。
3. 如果前驱的状态是 SIGNAL,则进入等待,如果前驱是 CANCELED 则消除已取消的节点,否则前驱状态是 0 或 PROPAGATE,那么把前驱状态修改为 SIGNAL,然后进入等待。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {    // 判断前驱是否是头节点
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

释放共享模式:tryReleaseShared 返回 true 后会调用 doReleaseShared

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);     // 唤醒当前头结点后继节点的线程
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // 如果头结点改变了,继续唤醒新的头结点
            break;
    }
}

3. ConditionObject, 条件对象

ConditionObject 代表与 AQS 关联的一个条件队列,其实例上的操作只能在持有 AQS 独占模式的前提下进行,这样不需要考虑并发安全问题。

不同的等待条件使用不同的 ConditionObject,方便进行更细化的控制。与 AQS 等待队列分开可以简化逻辑。

public class ConditionObject implements Condition, java.io.Serializable {
    private transient Node firstWaiter; // 不用 volatile 修饰
    private transient Node lastWaiter;
}

3.1 添加到条件等待队列

条件的等待队列是单向链表,因为在独占模式下进行访问,因此不需要考虑线程安全问题。

头结点可以是有效的条件队列等待节点。

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // lastWaiter 已取消,则移除,需从 firstWaiter 开始遍历
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;    // 单向链表
    lastWaiter = node;
    return node;
}

3.2 条件对象等待

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加到条件的等待队列
    Node node = addConditionWaiter();

    // 释放独占模式,并返回释放前的 state 值,以恢复独占模式。
    int savedState = fullyRelease(node);
    int interruptMode = 0;

    // 如果 node 不在 acquire 的获取队列上,则等待
    // 如果在,说明线程曾经被唤醒,也就是此条件对象上调用过 signal/signalAll
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);     // 挂起当前线程
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }

    // 重新获取独占模式
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

3.2 条件对象唤醒

如果条件对象的等待队列不为空,则唤醒对象上的第一个或所有节点。

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&       // 有唤醒成功的节点、则终止循环。
             (first = firstWaiter) != null);
}

// 唤醒队列上的所有节点的线程
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

// AbstractQueuedSynchronizer
final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

欢迎关注我的微信公众号: coderbee笔记

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据