JUC 源码分析 二 ReentrantLock

上一篇文章已完整展示了等待队列的管理(添加结点、移除取消结点)、独占模式下的acquire操作、acquire中断取消、前驱如何通知后继。这些知识已足够用来实现一个可重入锁。

本篇通过java.util.concurrent.locks.ReentrantLock类的源码来分析如何实现可重入锁。

可重入锁

可重入锁就是说当线程拥有这把锁的时候,它再次请求锁是成功的;当线程释放锁时,如果持有锁的线程对锁的请求次数大于释放次数,则该线程仍然拥有锁,直到请求次数与释放次数相等时才真正释放锁。

所以可重入锁需要一个重入计数变量,初始值设为0,当成功请求锁时加1,释放锁时减1,当释放锁之后计数为0则真正释放锁。重入锁还必须持有对锁持有者的引用,用以判断是否可以重入。

锁的公平性

如果锁能够严格按照线程请求锁的先后顺序分配锁,则认为锁具有公平性;如果某一线程能在其他等待线程之前获取到锁,则认为锁不具有公平性。

ReentrantLock

ReentrantLock是JUC包里可重入的独占锁实现,它具有三个内部类:Sync、NonfairSync、FairSync,通过构造函数的参数来指定锁是否是公平的,下面是一些核心代码:

public class ReentrantLock implements Lock, java.io.Serializable {
     private final Sync sync;

     public ReentrantLock(boolean fair) {
         sync = fair ? new FairSync() : new NonfairSync();
     }

     public void lock() {
         sync.lock();
     }

     public void unlock() {
         sync.release(1);     // 这个1表示退出锁1次。
     }

     // 带超时限制的获取
  public boolean tryLock(long timeout, TimeUnit unit)
          throws InterruptedException {
      return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  }

     // 其他代码省略
}

可以看到,ReentrantLock都是把具体实现委托给内部类而不是直接继承自AbstractQueuedSynchronizer,这样的好处是用户不会看到不需要的方法,也避免了用户错误地使用AbstractQueuedSynchronizer的公开方法而导致错误。

ReentrantLock的重入计数是使用AbstractQueuedSynchronizerstate属性的,state大于0表示锁被占用、等于0表示空闲,小于0则是重入次数太多导致溢出了。
继续阅读

JUC 源码分析 一 AbstractQueuedSynchronizer

队列结点

Node类型的waitStatus、prev、next 字段都用volatile 修饰,这样直接的读写操作就具有内存可视性。表示Node状态的waitStatus字段是个int类型,这样通过数值比较就可以判断Node的状态,而不需要很多的分支语句。

它的构造函数也是比较有意思的,有三个,分别用于构建同步队列的初始头结点或共享标识、构造同步队列的有效结点、构造条件队列的结点。也就是说,同步队列和条件队列的结点是相同的类型,所以可以从条件队列转移到同步队列去获取许可。

static final class Node {
       // 表明节点是否以共享模式等待的标记
    static final Node SHARED = new Node();

    // 表明节点是否以独占模式等待的标记
    static final Node EXCLUSIVE = null;

    // 表明线程已被取消
    static final int CANCELLED =  1;

    // 表明后续节点的线程需要unparking
    static final int SIGNAL    = -1;

    // 表明线程正在等待一个条件
    static final int CONDITION = -2;

    // 表明下一次acquireShared应该无条件传播
    static final int PROPAGATE = -3;

    /*
     * 状态字段,只能取下面的值:
     * SIGNAL(-1):    这个结点的后继是(或很快是)阻塞的(通过park),所以当前结点
     *              必须unpark它的后继,当它释放或取消时。为了避免竞争,acquire方法必须
     *              首先表明它们需要一个信号,然后再次尝试原子性acquire,如果失败了就阻塞。
     *               
     * CANCELLED(1):  这个结点由于超时或中断已被取消。结点从不离开这种状态。尤其是,
     *                 这种状态的线程从不再次阻塞。
     *
     * CONDITION(-2): 这个结点当前在一个条件队列上。它将不会用于sync队列的结点,
     *               直到被转移,在那时,结点的状态将被设为0.
     *              这个值在这里的使用与其他字段的使用没有关系,仅仅是简化结构。
     *               
     * PROPAGATE(-3): releaseShared应该传递给其他结点。这是在doReleaseShared里设置
     *                 (仅仅是头结点)以确保传递继续,即使其他操作有干涉。
     *
     * 0:             非以上任何值。
     *
     * 值是组织为数字的用以简化使用。非负值表示结点不需要信号。这样,大部分代码不需要
     * 检查特定的值,只需要(检查)符号。
     *
     * 对于普通同步结点,字段初始化为0;对于条件结点初始化为CONDITION(-2)。
     * 通过CAS操作修改(或者,当允许时,用无条件volatile写。)
     */
    volatile int waitStatus;

    /*
     * 连接到当前结点/线程依赖的用来检查等待状态的前驱结点。
     * 在进入队列时赋值,只在出队列时置为空(为了GC考虑)。
     * 根据前驱结点的取消,我们使查找一个非取消结点的while循环短路,这个总是会退出,
     * 因为头结点从不会是取消了的:一个结点成为头只能是一次成功的acquire操作结果。
     *
     * 一个取消了的线程从不会在获取操作成功,线程只能取消自己,不能是其他结点。
     */
    volatile Node prev;

    /*
     * 连接到当前结点/线程释放时解除阻塞的后续结点。
     * 在入队列时赋值,在绕过已取消前驱节点时调整,出队列时置为空(for GC)。
     * 入队操作不会给前驱结点的next字段赋值,直到附件后(把新节点赋值给队列的tail属性?),
     * 所以看到next字段为空不一定表示它就是队列的尾结点。然而,如果next字段看起来是空,
     * 我们可以从tail向前遍历进行双重检查。
     * 被取消了的结点的next字段被设置为指向它自己而不是空,这让isOnSyncQueue变得容易。
     */
    volatile Node next;

    /*
     * 列队在这个结点的线程,在构造时初始化,用完后置空。
     */
    volatile Thread thread;

    /*
     * 连接到下一个在条件上等待的结点或是特殊的值SHARED。
     * 因为条件队列只在独占模式下持有时访问,我们只需要一个简单的链表队列来持有在条件上等待的结点。
     * 他们然后被转移到队列去re-acquire。
     * 因为条件只能是独占的,我们通过用一个特殊的值来表明共享模式 来节省一个字段。
     */
    Node nextWaiter;

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

添加结点到等待队列

对于添加结点到队列的操作最重要的是要保证:即使添加的CAS操作失败了,也不能影响队列结点现有的连接关系。

对于新结点,它在CAS之前指向它的预期前驱,CAS成功之后再更新预期前驱的后继指针。

在步骤1成功之后、步骤2完成之前,其他线程通过结点的 “next” 连接可能看到“尾结点”(即代码里的 pred)的 “next” 为空,但其实队列里已经加入新的结点,这也是为什么通过 “next” 连接遍历队列时碰到后继为空的,必须从原子地更新的 “tail” 结点向后遍历。
继续阅读

JUC AQS

AQS 是 java.util.concurrent.locks.AbstractQueuedSynchronizer 类的简称,它虽然只是一个类,但也是一个强大的框架,目的是为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架,这些类同步器都依赖单个原子 int 值来表示状态。

AQS 实现了控制同步的框架,并定义抽象方法留给子类定义哪种状态意味着被获取或被释放,是个典型的模板方法实现。

概述

同步器一般包含两种方法,一种是acquire,另一种是release。acquire操作阻塞调用的线程,直到或除非同步状态允许其继续执行。而release操作则是通过某种方式改变同步状态,使得一或多个被acquire阻塞的线程继续执行。

同步器的基本思想

acquire操作:

// 循环里不断尝试,典型的失败后重试
while (synchronization state does not allow acquire) {
     // 同步状态不允许获取,进入循环体,也就是失败后的处理
     enqueue current thread if not already queued;     // 如果当前线程不在等待队列里,则加入等待队列
     possibly block current thread;     // 可能的话,阻塞当前线程
}

// 执行到这里,说明已经成功获取,如果之前有加入队列,则出队列。
dequeue current thread if it was queued; 

release操作:

update synchronization state;    //  更新同步状态
if (state may permit a blocked thread to acquire) // 检查状态是否允许一个阻塞线程获取
      unblock one or more queued threads;     // 允许,则唤醒后继的一个或多个阻塞线程。

为了实现上述操作,需要下面三个基本组件的相互协作:

  • 同步状态的原子性管理:怎么判断同步器是否可用的?怎么维护原子状态不会出现非法状态?怎么让其他线程看到当前线程对状态的修改?
  • 线程的阻塞与解除阻塞:同步器不可用时,怎么挂起线程?同步器可用时,怎么恢复挂起线程继续执行?
  • 队列的管理:有多个线程被阻塞时,怎么管理这些被阻塞的线程?同步器可用时,应该恢复哪个阻塞线程继续执行?怎么处理取消获取的线程?

继续阅读