JUC 源码分析 三 AbstractQueuedSynchronizer 共享模式 与 CountDownLatch

共享模式

共享模式允许一组线程获取同一个许可。为实现共享模式子类需要实现两个方法:

  • tryAcquireShared:返回int类型的值,小于0表示获取失败,等于0表示获取成功但不允许后续更多的获取,大于0表示获取成功且允许更多的后续获取。
  • tryReleaseShared:返回true表示释放许可成功,可以唤醒等待线程;false表示失败,不唤醒等待线程。

共享获取 acquireShared

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
       // 添加到等待队列,不管是共享模式还是独占模式,都共享同一个等待队列。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg); // 尝试获取,返回值表示是否允许获取
                if (r >= 0) {
                   // 获取成功
                   // 把自己设为头结点并传递可以获取的信号
                   // node 把自己设为头结点后,它的后继发现它的前驱是头结点了,就会尝试获取。
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * 尝试通知队列里的下一个结点,如果:
     *       调用者指示或者之前操作记录显示需要传递
     *       (注意:这里对waitStatus使用单一检查,因为PROPAGATE可能被转换到SIGNAL)
     *   并且
     *       下一个结点以共享模式等待或者我们根本就不知道,因为它是空的。
     *
     * 在这些检查有点保守,可能导致不必要的唤醒,但只是在多重竞争acquires/releases时,
     * 因此,大多数都是现在或不久就需要通知的。
     */
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

private void setHead(Node node) {
    head = node;
    node.thread = null; // for GC
    node.prev = null;
}

共享释放 releaseShared

释放共享许可的时候,最重要的是保证传递唤醒。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true ;
    }
    return false;
}

// 释放共享的核心方法
private void doReleaseShared() {
       // 要确保release传递,即使有其他正在进行的acquires/releases。
       // 这个过程的一般做法是尝试unpark head的后继,如果它(head)需要信号。
       // 如果head不需要信号,把状态设为PROPAGATE来确保一旦release,传递可以继续。
       // 另外,我们必须在循环里做这个,以免有新节点添加进来。
       // 不像unparkSuccessor的其他使用,我们需要知道CAS重置状态失败与否,如果失败,则重新检测。
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) { // 头结点
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
                    continue;  // 头结点状态被改变,需要重新检测。loop to recheck cases
                }
                // CAS成功说明h结点需要通知后继,唤醒
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                   // 如果是初始头结点,把其状态设为PROPAGATE,确保传递继续
                   // 状态更改失败,需要再次检测。
                continue;                // loop on failed CAS
            }
        }
        if (h == head) {  // loop if head changed
             // 如果操作过程中,头结点被改变(可能新增结点或者一个被唤醒线程把自己设为头结点了),需要再次检测。
            break;
        }
    }
}

CountDownLatch

作用:在完成一组正在其他线程中执行的操作之前,CountDownLatch允许一个或多个线程一直等待。CountDownLatch只阻塞一次,倒数到0之后,调用await方法的将直接通过。

看看实现阻塞与放行的内部类:

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync( int count) {
        setState(count); // 需要countDown的次数
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared( int acquires) {
       // 如果倒数到0,返回1表示允许后续获取,这样可以让AQS框架通知后继,
       // 否则返回-1表示失败,不能获取,线程会进入等待。
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared( int releases) {
        // Decrement count; signal when transition to zero
        for (;;) { // CountDownLatch.countDown()可能被多线程调用,需要失败后重试
            int c = getState();
            if (c == 0)
                return false ;

            // 注意:这里是减1,而不是减去releases,因为CountDownLatch是对countDown调用次数的计数
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

代码很简洁,因为AQS框架确实非常强大。

CountDownLatch类的一些方法:

public CountDownLatch( int count) {
    if (count < 0) throw new IllegalArgumentException( "count < 0");
    this.sync = new Sync(count);
}

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean await( long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
    sync.releaseShared(1);
}

发表评论

电子邮件地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据