AQS 是 java.util.concurrent.locks.AbstractQueuedSynchronizer
类的简称,它虽然只是一个类,但也是一个强大的框架,目的是为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架,这些类同步器都依赖单个原子 int 值来表示状态。
AQS 实现了控制同步的框架,并定义抽象方法留给子类定义哪种状态意味着被获取或被释放,是个典型的模板方法实现。
概述
同步器一般包含两种方法,一种是acquire,另一种是release。acquire操作阻塞调用的线程,直到或除非同步状态允许其继续执行。而release操作则是通过某种方式改变同步状态,使得一或多个被acquire阻塞的线程继续执行。
同步器的基本思想
acquire操作:
// 循环里不断尝试,典型的失败后重试 while (synchronization state does not allow acquire) { // 同步状态不允许获取,进入循环体,也就是失败后的处理 enqueue current thread if not already queued; // 如果当前线程不在等待队列里,则加入等待队列 possibly block current thread; // 可能的话,阻塞当前线程 } // 执行到这里,说明已经成功获取,如果之前有加入队列,则出队列。 dequeue current thread if it was queued;
release操作:
update synchronization state; // 更新同步状态 if (state may permit a blocked thread to acquire) // 检查状态是否允许一个阻塞线程获取 unblock one or more queued threads; // 允许,则唤醒后继的一个或多个阻塞线程。
为了实现上述操作,需要下面三个基本组件的相互协作:
- 同步状态的原子性管理:怎么判断同步器是否可用的?怎么维护原子状态不会出现非法状态?怎么让其他线程看到当前线程对状态的修改?
- 线程的阻塞与解除阻塞:同步器不可用时,怎么挂起线程?同步器可用时,怎么恢复挂起线程继续执行?
- 队列的管理:有多个线程被阻塞时,怎么管理这些被阻塞的线程?同步器可用时,应该恢复哪个阻塞线程继续执行?怎么处理取消获取的线程?
实现
同步状态的原子性管理
AQS 的状态是通过一个 int
类型的整数来表示的,这个字段是用volatile
关键字修饰的,这样通过简单的原子读写就可以达到内存可视性,减少了同步的需求。子类可以获取和设置状态的值,通过定义状态的值来表示 AQS 对象是否被获取或被释放。
线程的阻塞与解除阻塞
JDK5新增了一个类 java.util.concurrent.locks.LockSupport
用来支持创建锁和其他同步类需要的基本线程阻塞、解除阻塞原语。
这个类最主要的功能有两个:
- park:把线程阻塞。
- unpark:让线程恢复执行。
此类以及每个使用它的线程与一个许可关联。如果该许可可用,并且可在进程中使用,则调用 park 将立即返回;否则可能 阻塞。如果许可尚不可用,则可以调用 unpark 使其可用。(许可不能累积,并且最多只能有一个许可。)
其API的文档如下:
static Object getBlocker(Thread t) 返回提供给最近一次尚未解除阻塞的 park 方法调用的 blocker 对象,如果该调用不受阻塞,则返回 null。 static void park() 为了线程调度,禁用当前线程,除非许可可用。 static void park(Object blocker) 为了线程调度,在许可可用之前禁用当前线程。 static void parkNanos(long nanos) 为了线程调度禁用当前线程,最多等待指定的等待时间,除非许可可用。 static void parkNanos(Object blocker, long nanos) 为了线程调度,在许可可用前禁用当前线程,并最多等待指定的等待时间。 static void parkUntil(long deadline) 为了线程调度,在指定的时限前禁用当前线程,除非许可可用。 static void parkUntil(Object blocker, long deadline) 为了线程调度,在指定的时限前禁用当前线程,除非许可可用。 static void unpark(Thread thread) 如果给定线程的许可尚不可用,则使其可用。
队列管理
注:翻译自JDK1.7.10 AQS 内部静态类Node源码的DOC。
等待队列是CLH锁队列的一种变体。CLH锁(可参考:CLH锁)通常用于自旋锁,我们反而用于阻塞同步器,但使用相同的基本策略:在(线程)它自己结点的前驱持有关于线程的一些控制信息。每个结点的 “status” 字段跟踪一个线程是否应该阻塞。一个结点在它的前驱释放时被通知。队列的每个结点作为一个特定通知风格(specific-notification-style)的监视器服务,持有单一等待线程。”status” 字段不控制线程是否授予。一个线程可能尝试去获取如果它是第一个进入队列,但成为第一个不保证就成功;它只是获得权利去竞争,所以当前释放的竞争者线程可能需要再次等待(注:这是公平性的问题,子类的实现可以进行控制)。
为了进入CLH锁队列,你只需要原子地把它作为一个新的尾结点拼接;为了出队列,你只需要设置 “head” 字段。
+------+ prev +-----+ +-----+ head | | <---- | | <---- | | tail +------+ +-----+ +-----+
插入到CLH队列只要求在 "tail" 上进行仅仅一个原子操作,所以那就有一个从 unqueued 到 queued 划界的原子点。类似地,出队列包含仅更新 "head"。然而,需要额外一点工作用于结点确认它们的后继是谁,部分地为了处理可能的由于超时和中断导致的取消。
"prev" 连接(原始CLH锁没有使用)主要是出于处理取消的需求。如果一个结点被取消,它的后继(一般)是重新连接到一个非取消的前驱。对于自旋锁形式下的类似结构见:http://www.cs.rochester.edu/u/scott/synchronization/
我们使用 "next" 连接来实现阻塞结构。每个结点的线程id是保存在它自己的结点内,这样一个前驱通知下一个结点去唤醒那个通过遍历 "next" 连接决定的线程。决定后继的过程必须避免与新入队结点竞争设置它的前驱的 "next" 字段。这是这样解决的:当一个结点的后继是空时,从原子地更新的 "tail" 向后检查。(换一种说法,"next" 连接是优化的,这样我们一般不需要向后扫描。)(注:这也就是说,从 "tail" 向后检查总是安全的,但可能需要进行比较多的遍历,所以引入 "next" 来进行优化,因为通知是从头部开始,插入是在尾部进行,所以两者的竞争一般是不多的,当出现竞争,即检测到结点的后继是空时,再从 "tail" 向后检查。)
取消引入一些保守到基本算法。因为我们必须轮询其他结点的取消,我们可以错过通知,不管被取消结点是在前或在后。这是通过在取消时总是unparking后继节点来处理的,允许它们稳定在一个新的前驱,除非我们能够识别一个承担这种责任的前驱(注:状态是 Node.SIGNAL 的)。
CLH队列需要一个虚拟头结点作为开始。但我们不在构造时创建它,如果从来没有竞争,这是浪费精力。当第一次竞争出现时,构造这个结点并设置 head
和 tail
指针。(注:队列里的结点尝试获取前也首先判断自己的前驱是不是头结点。)
在一个Conditions上等待的线程使用相同的结点,但用另外的一条链接。Conditions只需要以简单队列连接结点,因为它们只在独占持有时访问。根据await,结点被插入到一个条件队列。根据signal,结点被转换到主队列。一个 "status" 字段的特殊值(Node.CONDITION)用于标记结点在哪个队列。
AQS 使用
AQS实现了模板方法,使用时具体实现的子类只需要根据需要实现下面的方法:
tryAcquire(int) // 试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。
tryRelease(int) // 试图设置状态来反映独占模式下的一个释放。
tryAcquireShared(int) // 试图在共享模式下获取对象状态。此方法应该查询是否允许它在共享模式下获取对象状态,如果允许,则获取它。
tryReleaseShared(int) // 试图设置状态来反映共享模式下的一个释放。
isHeldExclusively() // 如果对于当前(正调用的)线程,同步是以独占方式进行的,则返回 true。此方法是在每次调用非等待 AbstractQueuedSynchronizer.ConditionObject 方法时调用的。(等待方法则调用 release(int)。)
这些try*方法的返回值都表示是否成功获取且是否允许继续获取。对于独占模式下,如果成功获取,总是直接返回了;而在共享模式下,如果成功获取了,还要判断是否允许继续获取,如果允许继续获取,则要唤醒后继进行获取。
这个实现类应该作为内部类使用,避免用户通过公有方法改变了状态。这些方法的实现必须是线程安全的,通常应该很短并且不被阻塞。
下面是一个锁的实现示例:
class Mutex {
class Sync extends AbstractQueuedSynchronizer {
public boolean tryAcquire(int ignore) {
return compareAndSetState(0, 1);
}
public boolean tryRelease(int ignore) {
setState(0);
return true;
}
}
private final Sync sync = new Sync();
public void lock() { sync.acquire(0); }
public void unlock() { sync.release(0); }
}
一点说明
AQS 的实现是建立在Java内存模型的基础上的,特别是 volatile 变量的内存语义及其 happens-before 规则,后续的文章会对 AQS 的源码进行注释、分析,如果想明白为什么那么做,建议先阅读 Java内存模型 和 内存屏障 这两篇文章。
参考资料
欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。