JUC 源码分析 四 wait notify notifyAll 与 条件对象

内置锁 与 wait notify 机制

每个Java对象都有一个内置锁,通过 synchronized 关键字使用。线程之间可以通过 Object 类的 wait, notify, notifyAll 进行协调。

wait, notify, notifyAll 方法说明:

  • wait:在其他线程调用此对象的 notify() 方法或 notifyAll() 方法前,导致当前线程等待。当前线程必须拥有此对象监视器。该线程发布对此监视器的所有权并等待,直到其他线程通过调用 notify 方法,或 notifyAll 方法通知在此对象的监视器上等待的线程醒来。然后该线程将等到重新获得对监视器的所有权后才能继续执行。
  • notify:唤醒在此对象监视器上等待的单个线程,直到当前线程放弃此对象上的锁定,才能继续执行被唤醒的线程。如果所有线程都在此对象上等待,则会选择唤醒其中一个线程,选择是任意性的,被唤醒的线程将以常规方式与在该对象上主动同步的其他所有线程进行竞争。
  • notifyAll:唤醒在此对象监视器上等待的所有线程,直到当前线程放弃此对象上的锁定,才能继续执行被唤醒的线程。被唤醒的线程将以常规方式与在该对象上主动同步的其他所有线程进行竞争。

以一个有界缓存为例,展示了内置锁和 wait、notify、notifyAll 的一般用法:

public class BoundedBuffer<T> {
       private final Object[] buffer;
       private final int length;

       public BoundedBuffer(int length ) {
             if (length < 0) {
                   throw new IllegalArgumentException("length < 0");
            }
             this.length = length ;
             buffer = new Object[length ];
      }

       // synchronized 用于方法上,表示一个同步方法,线程进入方法前自动获得内置锁
       public synchronized void put(T obj) throws InterruptedException {
             // 线程被唤醒时,条件不一定满足(虚假唤醒),所以需要在循环里进行测试、等待
             while (isFull()) {
                   // 在当前的对象实例上等待,由其他线程调用 notifyAll 或 notify 方法唤醒
                  wait();
            }

            doPut(obj);

             // 唤醒在此对象监视器上等待(通过wait方法进入等待)的所有线程。
             // 直到当前线程放弃此对象上的锁定,才能继续执行被唤醒的线程。
            notifyAll();

      }

       public synchronized T take() throws InterruptedException {
            T object = null;
             while (isEmpty()) {
                  wait();
            }
            object = doTake();
            notifyAll();
             return object;
      }

       private void doPut(T obj) { // not implemented
      }

       private T doTake() { // not implemented
             return null ;
      }

       private boolean isEmpty() { // not implemented
             return false ;
      }

       private boolean isFull() { // not implemented
             return false ;
      }
}

Condition 接口

java.util.concurrent.locks.ConditionObject 的监视器方法(wait、notifynotifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。

Condition 实例实质上被绑定到一个锁上,要获得 Condition 实例时可以在目标 Lock 上调用其 newCondition() 方法。

下面是用显式锁加Condition实现的有界缓存示例:

public class BoundedBufferWithCondition {
       final Lock lock = new ReentrantLock(); // 显式锁

       // 条件对象维护了在此条件上的等待线程
       final Condition notFull = lock.newCondition(); // 非满的等待线程队列
       final Condition notEmpty = lock.newCondition(); // 非空的等待线程队列

       final Object[] items = new Object[100];
       int putptr , takeptr , count ;

       public void put(Object x) throws InterruptedException {
             // 显式锁要显式获取
             lock.lock();

             // 锁保护的代码必须放在 try-finally 语句块中执行,在 finally 语句块中确保显式锁释放
             try {
                   while (count == items.length)
                         notFull.await(); // 缓存是满的,put 线程加入的是非满等待线程队列

                   items[putptr ] = x;
                   if (++putptr == items.length)
                         putptr = 0;
                  ++ count;

                   notEmpty.signal(); // 缓存不空,需要通知的是 非空的等待线程,不会通知到 put 线程

            } finally {
                   // 在 finally 语句块中确保显式锁释放
                   lock.unlock();
            }
      }

       public Object take() throws InterruptedException {
             lock.lock();
             try {
                   while (count == 0)
                         notEmpty.await();
                  Object x = items[takeptr ];
                   if (++takeptr == items.length)
                         takeptr = 0;
                  -- count;
                   notFull.signal();
                   return x;
            } finally {
                   lock.unlock();
            }
      }
}

AbstractQueuedSynchronizer.ConditionObject

AbstractQueuedSynchronizer.ConditionObject 是JUC包里 Condition 接口的一个实现,由于这个实现不是线程安全的,所以它需要在加锁的情况下访问。用法见上节。

属性

属性并没有用 volatile 修饰符修饰。每个条件都维护一个等待队列。

private transient Node firstWaiter; // 条件队列的头结点
private transient Node lastWaiter; // 条件队列的尾结点

队列维护

主要有添加到等待队列和从队列移除已取消结点。

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果最后的结点不处于条件等待,清除
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 单向链表的添加操作
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

// 把已取消的结点从队列里移除。这个方法需要避免在缺少信号的时候持有垃圾。
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    // 从头完整遍历队列,移除非等待结点。
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
             if (trail == null) {
                firstWaiter = next;
            } else {
                trail.nextWaiter = next;
            }
            if (next == null) {
                lastWaiter = trail;
                 // break;
            }
        } else {
            trail = t;
        }
        t = next;
    }
}

await

await有个先释放锁,等待、被唤醒后再获取锁的过程。释放锁前要保存锁的状态,用于恢复。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter(); // 把结点添加到等待的条件队列

    // 添加到条件队列的操作必须在下面的释放锁之前完成
    int savedState = fullyRelease(node); // 释放锁,返回释放前的状态,因为线程唤醒后要恢复锁的状态。
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) { // 如果结点不在同步获取等待队列上,则进入等待(释放锁之后其他的唤醒线程会把它加入同步队列)
        LockSupport.park( this);

        // checkInterruptWhileWaiting 在结点没有取消的情况下会把结点加入sync队列。
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 重新获取锁
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

// 如果结点已经在等待获取的同步队列上就返回true,否则false。
final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null )
        return false ;

    // 如果有后继,肯定在队列上
    if (node.next != null ) // If has successor, it must be on queue
        return true ;


    /*
     * node.prev 可以是非空的,但由于CAS失败还没有加入队列。
     * 所以需要从尾部遍历来确认它加入队列了。在调用这个方法时,它将总是在靠近尾部的地方,
     * 除非CAS失败(几乎不可能的),它就在尾部,所以我们几乎不用移动太多。
     */
    return findNodeFromTail(node);
}

private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true ;
        if (t == null)
            return false ;
        t = t.prev;
    }
}

signal

public final void signal() {
    if (!isHeldExclusively()) // 必须是锁的持有者才能释放锁
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first); // 唤醒条件队列的头结点,如果有
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) && // transferForSignal
             (first = firstWaiter) != null);
}

// 把结点从条件队列转移到sync队列
final boolean transferForSignal(Node node) {
       // 不能修改waitStatus说明结点已经被取消
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false ;

    // 把结点加入sync队列,如果可以设置前驱waitStatus 以指明这个线程(可能)在等待。
    // 如果前驱已取消或设置前驱waitStatus失败,唤醒结点去resync
    //(在这种情况下,结点的waitStatus是处于短暂、无害的错误状态)。// 因为前面设置为 0
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

AQS 并没有使用内置的监视器锁来实现同步,而是基于 volatile 和 CAS 操作来实现同步,内存可视性是通过这两个操作所产生的内存屏障指令来实现的。这都是基于 Java 存储模型的,所以要深入理解 JUC 包,还是需要先理解 Java 存储模型,具体可见这里


欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。

JUC 源码分析 四 wait notify notifyAll 与 条件对象》上有2个想法

  1. java.util.concurrent.locks.Condition 将 Object 的监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)
    一句话就把Condition的作用说清楚了。博主的文章对读源码很有帮助,Thx

    • 这是来自JDK API说明文档。看源码之前可以先看看这些说明文档,这些文档一般会涵盖为什么这样设计、设计的理论基础 还有用法举例。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据