共享模式
共享模式允许一组线程获取同一个许可。为实现共享模式子类需要实现两个方法:
- tryAcquireShared:返回int类型的值,小于0表示获取失败,等于0表示获取成功但不允许后续更多的获取,大于0表示获取成功且允许更多的后续获取。
- tryReleaseShared:返回true表示释放许可成功,可以唤醒等待线程;false表示失败,不唤醒等待线程。
共享获取 acquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
// 添加到等待队列,不管是共享模式还是独占模式,都共享同一个等待队列。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg); // 尝试获取,返回值表示是否允许获取
if (r >= 0) {
// 获取成功
// 把自己设为头结点并传递可以获取的信号
// node 把自己设为头结点后,它的后继发现它的前驱是头结点了,就会尝试获取。
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* 尝试通知队列里的下一个结点,如果:
* 调用者指示或者之前操作记录显示需要传递
* (注意:这里对waitStatus使用单一检查,因为PROPAGATE可能被转换到SIGNAL)
* 并且
* 下一个结点以共享模式等待或者我们根本就不知道,因为它是空的。
*
* 在这些检查有点保守,可能导致不必要的唤醒,但只是在多重竞争acquires/releases时,
* 因此,大多数都是现在或不久就需要通知的。
*/
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void setHead(Node node) {
head = node;
node.thread = null; // for GC
node.prev = null;
}
共享释放 releaseShared
释放共享许可的时候,最重要的是保证传递唤醒。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true ;
}
return false;
}
// 释放共享的核心方法
private void doReleaseShared() {
// 要确保release传递,即使有其他正在进行的acquires/releases。
// 这个过程的一般做法是尝试unpark head的后继,如果它(head)需要信号。
// 如果head不需要信号,把状态设为PROPAGATE来确保一旦release,传递可以继续。
// 另外,我们必须在循环里做这个,以免有新节点添加进来。
// 不像unparkSuccessor的其他使用,我们需要知道CAS重置状态失败与否,如果失败,则重新检测。
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 头结点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
continue; // 头结点状态被改变,需要重新检测。loop to recheck cases
}
// CAS成功说明h结点需要通知后继,唤醒
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
// 如果是初始头结点,把其状态设为PROPAGATE,确保传递继续
// 状态更改失败,需要再次检测。
continue; // loop on failed CAS
}
}
if (h == head) { // loop if head changed
// 如果操作过程中,头结点被改变(可能新增结点或者一个被唤醒线程把自己设为头结点了),需要再次检测。
break;
}
}
}
CountDownLatch
作用:在完成一组正在其他线程中执行的操作之前,CountDownLatch允许一个或多个线程一直等待。CountDownLatch只阻塞一次,倒数到0之后,调用await方法的将直接通过。
看看实现阻塞与放行的内部类:
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync( int count) {
setState(count); // 需要countDown的次数
}
int getCount() {
return getState();
}
protected int tryAcquireShared( int acquires) {
// 如果倒数到0,返回1表示允许后续获取,这样可以让AQS框架通知后继,
// 否则返回-1表示失败,不能获取,线程会进入等待。
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared( int releases) {
// Decrement count; signal when transition to zero
for (;;) { // CountDownLatch.countDown()可能被多线程调用,需要失败后重试
int c = getState();
if (c == 0)
return false ;
// 注意:这里是减1,而不是减去releases,因为CountDownLatch是对countDown调用次数的计数
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
代码很简洁,因为AQS框架确实非常强大。
CountDownLatch类的一些方法:
public CountDownLatch( int count) {
if (count < 0) throw new IllegalArgumentException( "count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await( long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。