内置锁 与 wait notify 机制
每个Java对象都有一个内置锁,通过 synchronized
关键字使用。线程之间可以通过 Object 类的 wait, notify, notifyAll
进行协调。
wait, notify, notifyAll
方法说明:
wait
:在其他线程调用此对象的notify()
方法或notifyAll()
方法前,导致当前线程等待。当前线程必须拥有此对象监视器。该线程发布对此监视器的所有权并等待,直到其他线程通过调用notify
方法,或notifyAll
方法通知在此对象的监视器上等待的线程醒来。然后该线程将等到重新获得对监视器的所有权后才能继续执行。notify
:唤醒在此对象监视器上等待的单个线程,直到当前线程放弃此对象上的锁定,才能继续执行被唤醒的线程。如果所有线程都在此对象上等待,则会选择唤醒其中一个线程,选择是任意性的,被唤醒的线程将以常规方式与在该对象上主动同步的其他所有线程进行竞争。notifyAll
:唤醒在此对象监视器上等待的所有线程,直到当前线程放弃此对象上的锁定,才能继续执行被唤醒的线程。被唤醒的线程将以常规方式与在该对象上主动同步的其他所有线程进行竞争。
以一个有界缓存为例,展示了内置锁和 wait、notify、notifyAll
的一般用法:
public class BoundedBuffer<T> {
private final Object[] buffer;
private final int length;
public BoundedBuffer(int length ) {
if (length < 0) {
throw new IllegalArgumentException("length < 0");
}
this.length = length ;
buffer = new Object[length ];
}
// synchronized 用于方法上,表示一个同步方法,线程进入方法前自动获得内置锁
public synchronized void put(T obj) throws InterruptedException {
// 线程被唤醒时,条件不一定满足(虚假唤醒),所以需要在循环里进行测试、等待
while (isFull()) {
// 在当前的对象实例上等待,由其他线程调用 notifyAll 或 notify 方法唤醒
wait();
}
doPut(obj);
// 唤醒在此对象监视器上等待(通过wait方法进入等待)的所有线程。
// 直到当前线程放弃此对象上的锁定,才能继续执行被唤醒的线程。
notifyAll();
}
public synchronized T take() throws InterruptedException {
T object = null;
while (isEmpty()) {
wait();
}
object = doTake();
notifyAll();
return object;
}
private void doPut(T obj) { // not implemented
}
private T doTake() { // not implemented
return null ;
}
private boolean isEmpty() { // not implemented
return false ;
}
private boolean isFull() { // not implemented
return false ;
}
}
Condition 接口
java.util.concurrent.locks.Condition
将 Object
的监视器方法(wait、notify
和 notifyAll
)分解成截然不同的对象,以便通过将这些对象与任意 Lock
实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock
替代了 synchronized
方法和语句的使用,Condition
替代了 Object
监视器方法的使用。
Condition
实例实质上被绑定到一个锁上,要获得 Condition
实例时可以在目标 Lock
上调用其 newCondition()
方法。
下面是用显式锁加Condition实现的有界缓存示例:
public class BoundedBufferWithCondition {
final Lock lock = new ReentrantLock(); // 显式锁
// 条件对象维护了在此条件上的等待线程
final Condition notFull = lock.newCondition(); // 非满的等待线程队列
final Condition notEmpty = lock.newCondition(); // 非空的等待线程队列
final Object[] items = new Object[100];
int putptr , takeptr , count ;
public void put(Object x) throws InterruptedException {
// 显式锁要显式获取
lock.lock();
// 锁保护的代码必须放在 try-finally 语句块中执行,在 finally 语句块中确保显式锁释放
try {
while (count == items.length)
notFull.await(); // 缓存是满的,put 线程加入的是非满等待线程队列
items[putptr ] = x;
if (++putptr == items.length)
putptr = 0;
++ count;
notEmpty.signal(); // 缓存不空,需要通知的是 非空的等待线程,不会通知到 put 线程
} finally {
// 在 finally 语句块中确保显式锁释放
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr ];
if (++takeptr == items.length)
takeptr = 0;
-- count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
AbstractQueuedSynchronizer.ConditionObject
AbstractQueuedSynchronizer.ConditionObject
是JUC包里 Condition 接口的一个实现,由于这个实现不是线程安全的,所以它需要在加锁的情况下访问。用法见上节。
属性
属性并没有用 volatile
修饰符修饰。每个条件都维护一个等待队列。
private transient Node firstWaiter; // 条件队列的头结点
private transient Node lastWaiter; // 条件队列的尾结点
队列维护
主要有添加到等待队列和从队列移除已取消结点。
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果最后的结点不处于条件等待,清除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 单向链表的添加操作
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 把已取消的结点从队列里移除。这个方法需要避免在缺少信号的时候持有垃圾。
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
// 从头完整遍历队列,移除非等待结点。
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null) {
firstWaiter = next;
} else {
trail.nextWaiter = next;
}
if (next == null) {
lastWaiter = trail;
// break;
}
} else {
trail = t;
}
t = next;
}
}
await
await有个先释放锁,等待、被唤醒后再获取锁的过程。释放锁前要保存锁的状态,用于恢复。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 把结点添加到等待的条件队列
// 添加到条件队列的操作必须在下面的释放锁之前完成
int savedState = fullyRelease(node); // 释放锁,返回释放前的状态,因为线程唤醒后要恢复锁的状态。
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 如果结点不在同步获取等待队列上,则进入等待(释放锁之后其他的唤醒线程会把它加入同步队列)
LockSupport.park( this);
// checkInterruptWhileWaiting 在结点没有取消的情况下会把结点加入sync队列。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 重新获取锁
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 如果结点已经在等待获取的同步队列上就返回true,否则false。
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null )
return false ;
// 如果有后继,肯定在队列上
if (node.next != null ) // If has successor, it must be on queue
return true ;
/*
* node.prev 可以是非空的,但由于CAS失败还没有加入队列。
* 所以需要从尾部遍历来确认它加入队列了。在调用这个方法时,它将总是在靠近尾部的地方,
* 除非CAS失败(几乎不可能的),它就在尾部,所以我们几乎不用移动太多。
*/
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true ;
if (t == null)
return false ;
t = t.prev;
}
}
signal
public final void signal() {
if (!isHeldExclusively()) // 必须是锁的持有者才能释放锁
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // 唤醒条件队列的头结点,如果有
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // transferForSignal
(first = firstWaiter) != null);
}
// 把结点从条件队列转移到sync队列
final boolean transferForSignal(Node node) {
// 不能修改waitStatus说明结点已经被取消
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false ;
// 把结点加入sync队列,如果可以设置前驱waitStatus 以指明这个线程(可能)在等待。
// 如果前驱已取消或设置前驱waitStatus失败,唤醒结点去resync
//(在这种情况下,结点的waitStatus是处于短暂、无害的错误状态)。// 因为前面设置为 0
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
AQS 并没有使用内置的监视器锁来实现同步,而是基于 volatile
和 CAS 操作来实现同步,内存可视性是通过这两个操作所产生的内存屏障指令来实现的。这都是基于 Java 存储模型的,所以要深入理解 JUC 包,还是需要先理解 Java 存储模型,具体可见这里。
欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。
java.util.concurrent.locks.Condition 将 Object 的监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)
一句话就把Condition的作用说清楚了。博主的文章对读源码很有帮助,Thx
这是来自JDK API说明文档。看源码之前可以先看看这些说明文档,这些文档一般会涵盖为什么这样设计、设计的理论基础 还有用法举例。