JUC LinkedBlockingQueue

java.util.concurrent.LinkedBlockingQueue 是一个基于单向链表的、范围任意的(其实是有界的)、FIFO 阻塞队列。访问与移除操作是在队头进行,添加操作是在队尾进行,并分别使用不同的锁进行保护,只有在可能涉及多个节点的操作才同时对两个锁进行加锁。

队列是否为空、是否已满仍然是通过元素数量的计数器(count)进行判断的,由于可以同时在队头、队尾并发地进行访问、添加操作,所以这个计数器必须是线程安全的,这里使用了一个原子类 AtomicInteger,这就决定了它的容量范围是: 1 – Integer.MAX_VALUE。

由于同时使用了两把锁,在需要同时使用两把锁时,加锁顺序与释放顺序是非常重要的:必须以固定的顺序进行加锁,再以与加锁顺序的相反的顺序释放锁。

头结点和尾结点一开始总是指向一个哨兵的结点,它不持有实际数据,当队列中有数据时,头结点仍然指向这个哨兵,尾结点指向有效数据的最后一个结点。这样做的好处在于,与计数器 count 结合后,对队头、队尾的访问可以独立进行,而不需要判断头结点与尾结点的关系。
继续阅读

JUC ArrayBlockingQueue

java.util.concurrent.ArrayBlockingQueue 是一个线程安全的、基于数组、有界的、阻塞的、FIFO 队列。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。

此类基于 java.util.concurrent.locks.ReentrantLock 来实现线程安全,所以提供了 ReentrantLock 所能支持的公平性选择。

属性

队列的操作主要有读、写,所以用了两个 int 类型的属性作为下一个读写位置的的指针。存放元素的数组是 final 修饰的,所以数组的大小是固定的。对于并发控制,是所有的访问都必须加锁,并用两个条件对象用于协调读写操作。

// 队列存放元素的容器
final Object[] items;

// 下一次读取或移除的位置
int takeIndex;

// 存放下一个放入元素的位置
int putIndex;

// 队列里有效元素的数量
int count;


// 所有访问的保护锁
final ReentrantLock lock;

// 等待获取的条件
private final Condition notEmpty;

// 等待放入的条件
private final Condition notFull;

环绕处理

如果指针一直往前增加或一直往后减小,那么总会超出数组的有效索引范围。所以需要进行一些环绕处理。

// 指针前移
final int inc(int i) {
    return (++i == items.length) ? 0 : i;
}

// 指针后移
final int dec(int i) {
    return ((i == 0) ? items.length : i) - 1;
}

注意,上面的处理都是对指针值的直接处理,而不关心是读指针还是写指针,因为是否有可读元素、可写空间的判断是通过对 count 计数来判断的。

这也是 count 的作用,它极大地简化了指针有效性的判断。在下面的 insertextract 方法中根本就不需要对读写指针之间的位置关系进行判断,非常精妙。

通过环绕处理可以把这个数组看成是圆形的缓存。
继续阅读

JUC 源码分析 三 AbstractQueuedSynchronizer 共享模式 与 CountDownLatch

共享模式

共享模式允许一组线程获取同一个许可。为实现共享模式子类需要实现两个方法:

  • tryAcquireShared:返回int类型的值,小于0表示获取失败,等于0表示获取成功但不允许后续更多的获取,大于0表示获取成功且允许更多的后续获取。
  • tryReleaseShared:返回true表示释放许可成功,可以唤醒等待线程;false表示失败,不唤醒等待线程。

共享获取 acquireShared

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
       // 添加到等待队列,不管是共享模式还是独占模式,都共享同一个等待队列。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg); // 尝试获取,返回值表示是否允许获取
                if (r >= 0) {
                   // 获取成功
                   // 把自己设为头结点并传递可以获取的信号
                   // node 把自己设为头结点后,它的后继发现它的前驱是头结点了,就会尝试获取。
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * 尝试通知队列里的下一个结点,如果:
     *       调用者指示或者之前操作记录显示需要传递
     *       (注意:这里对waitStatus使用单一检查,因为PROPAGATE可能被转换到SIGNAL)
     *   并且
     *       下一个结点以共享模式等待或者我们根本就不知道,因为它是空的。
     *
     * 在这些检查有点保守,可能导致不必要的唤醒,但只是在多重竞争acquires/releases时,
     * 因此,大多数都是现在或不久就需要通知的。
     */
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

private void setHead(Node node) {
    head = node;
    node.thread = null; // for GC
    node.prev = null;
}

继续阅读

JUC 源码分析 二 ReentrantLock

上一篇文章已完整展示了等待队列的管理(添加结点、移除取消结点)、独占模式下的acquire操作、acquire中断取消、前驱如何通知后继。这些知识已足够用来实现一个可重入锁。

本篇通过java.util.concurrent.locks.ReentrantLock类的源码来分析如何实现可重入锁。

可重入锁

可重入锁就是说当线程拥有这把锁的时候,它再次请求锁是成功的;当线程释放锁时,如果持有锁的线程对锁的请求次数大于释放次数,则该线程仍然拥有锁,直到请求次数与释放次数相等时才真正释放锁。

所以可重入锁需要一个重入计数变量,初始值设为0,当成功请求锁时加1,释放锁时减1,当释放锁之后计数为0则真正释放锁。重入锁还必须持有对锁持有者的引用,用以判断是否可以重入。

锁的公平性

如果锁能够严格按照线程请求锁的先后顺序分配锁,则认为锁具有公平性;如果某一线程能在其他等待线程之前获取到锁,则认为锁不具有公平性。

ReentrantLock

ReentrantLock是JUC包里可重入的独占锁实现,它具有三个内部类:Sync、NonfairSync、FairSync,通过构造函数的参数来指定锁是否是公平的,下面是一些核心代码:

public class ReentrantLock implements Lock, java.io.Serializable {
     private final Sync sync;

     public ReentrantLock(boolean fair) {
         sync = fair ? new FairSync() : new NonfairSync();
     }

     public void lock() {
         sync.lock();
     }

     public void unlock() {
         sync.release(1);     // 这个1表示退出锁1次。
     }

     // 带超时限制的获取
  public boolean tryLock(long timeout, TimeUnit unit)
          throws InterruptedException {
      return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  }

     // 其他代码省略
}

可以看到,ReentrantLock都是把具体实现委托给内部类而不是直接继承自AbstractQueuedSynchronizer,这样的好处是用户不会看到不需要的方法,也避免了用户错误地使用AbstractQueuedSynchronizer的公开方法而导致错误。

ReentrantLock的重入计数是使用AbstractQueuedSynchronizerstate属性的,state大于0表示锁被占用、等于0表示空闲,小于0则是重入次数太多导致溢出了。
继续阅读

JUC 源码分析 一 AbstractQueuedSynchronizer

队列结点

Node类型的waitStatus、prev、next 字段都用volatile 修饰,这样直接的读写操作就具有内存可视性。表示Node状态的waitStatus字段是个int类型,这样通过数值比较就可以判断Node的状态,而不需要很多的分支语句。

它的构造函数也是比较有意思的,有三个,分别用于构建同步队列的初始头结点或共享标识、构造同步队列的有效结点、构造条件队列的结点。也就是说,同步队列和条件队列的结点是相同的类型,所以可以从条件队列转移到同步队列去获取许可。

static final class Node {
       // 表明节点是否以共享模式等待的标记
    static final Node SHARED = new Node();

    // 表明节点是否以独占模式等待的标记
    static final Node EXCLUSIVE = null;

    // 表明线程已被取消
    static final int CANCELLED =  1;

    // 表明后续节点的线程需要unparking
    static final int SIGNAL    = -1;

    // 表明线程正在等待一个条件
    static final int CONDITION = -2;

    // 表明下一次acquireShared应该无条件传播
    static final int PROPAGATE = -3;

    /*
     * 状态字段,只能取下面的值:
     * SIGNAL(-1):    这个结点的后继是(或很快是)阻塞的(通过park),所以当前结点
     *              必须unpark它的后继,当它释放或取消时。为了避免竞争,acquire方法必须
     *              首先表明它们需要一个信号,然后再次尝试原子性acquire,如果失败了就阻塞。
     *               
     * CANCELLED(1):  这个结点由于超时或中断已被取消。结点从不离开这种状态。尤其是,
     *                 这种状态的线程从不再次阻塞。
     *
     * CONDITION(-2): 这个结点当前在一个条件队列上。它将不会用于sync队列的结点,
     *               直到被转移,在那时,结点的状态将被设为0.
     *              这个值在这里的使用与其他字段的使用没有关系,仅仅是简化结构。
     *               
     * PROPAGATE(-3): releaseShared应该传递给其他结点。这是在doReleaseShared里设置
     *                 (仅仅是头结点)以确保传递继续,即使其他操作有干涉。
     *
     * 0:             非以上任何值。
     *
     * 值是组织为数字的用以简化使用。非负值表示结点不需要信号。这样,大部分代码不需要
     * 检查特定的值,只需要(检查)符号。
     *
     * 对于普通同步结点,字段初始化为0;对于条件结点初始化为CONDITION(-2)。
     * 通过CAS操作修改(或者,当允许时,用无条件volatile写。)
     */
    volatile int waitStatus;

    /*
     * 连接到当前结点/线程依赖的用来检查等待状态的前驱结点。
     * 在进入队列时赋值,只在出队列时置为空(为了GC考虑)。
     * 根据前驱结点的取消,我们使查找一个非取消结点的while循环短路,这个总是会退出,
     * 因为头结点从不会是取消了的:一个结点成为头只能是一次成功的acquire操作结果。
     *
     * 一个取消了的线程从不会在获取操作成功,线程只能取消自己,不能是其他结点。
     */
    volatile Node prev;

    /*
     * 连接到当前结点/线程释放时解除阻塞的后续结点。
     * 在入队列时赋值,在绕过已取消前驱节点时调整,出队列时置为空(for GC)。
     * 入队操作不会给前驱结点的next字段赋值,直到附件后(把新节点赋值给队列的tail属性?),
     * 所以看到next字段为空不一定表示它就是队列的尾结点。然而,如果next字段看起来是空,
     * 我们可以从tail向前遍历进行双重检查。
     * 被取消了的结点的next字段被设置为指向它自己而不是空,这让isOnSyncQueue变得容易。
     */
    volatile Node next;

    /*
     * 列队在这个结点的线程,在构造时初始化,用完后置空。
     */
    volatile Thread thread;

    /*
     * 连接到下一个在条件上等待的结点或是特殊的值SHARED。
     * 因为条件队列只在独占模式下持有时访问,我们只需要一个简单的链表队列来持有在条件上等待的结点。
     * 他们然后被转移到队列去re-acquire。
     * 因为条件只能是独占的,我们通过用一个特殊的值来表明共享模式 来节省一个字段。
     */
    Node nextWaiter;

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

添加结点到等待队列

对于添加结点到队列的操作最重要的是要保证:即使添加的CAS操作失败了,也不能影响队列结点现有的连接关系。

对于新结点,它在CAS之前指向它的预期前驱,CAS成功之后再更新预期前驱的后继指针。

在步骤1成功之后、步骤2完成之前,其他线程通过结点的 “next” 连接可能看到“尾结点”(即代码里的 pred)的 “next” 为空,但其实队列里已经加入新的结点,这也是为什么通过 “next” 连接遍历队列时碰到后继为空的,必须从原子地更新的 “tail” 结点向后遍历。
继续阅读

JUC AQS

AQS 是 java.util.concurrent.locks.AbstractQueuedSynchronizer 类的简称,它虽然只是一个类,但也是一个强大的框架,目的是为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架,这些类同步器都依赖单个原子 int 值来表示状态。

AQS 实现了控制同步的框架,并定义抽象方法留给子类定义哪种状态意味着被获取或被释放,是个典型的模板方法实现。

概述

同步器一般包含两种方法,一种是acquire,另一种是release。acquire操作阻塞调用的线程,直到或除非同步状态允许其继续执行。而release操作则是通过某种方式改变同步状态,使得一或多个被acquire阻塞的线程继续执行。

同步器的基本思想

acquire操作:

// 循环里不断尝试,典型的失败后重试
while (synchronization state does not allow acquire) {
     // 同步状态不允许获取,进入循环体,也就是失败后的处理
     enqueue current thread if not already queued;     // 如果当前线程不在等待队列里,则加入等待队列
     possibly block current thread;     // 可能的话,阻塞当前线程
}

// 执行到这里,说明已经成功获取,如果之前有加入队列,则出队列。
dequeue current thread if it was queued; 

release操作:

update synchronization state;    //  更新同步状态
if (state may permit a blocked thread to acquire) // 检查状态是否允许一个阻塞线程获取
      unblock one or more queued threads;     // 允许,则唤醒后继的一个或多个阻塞线程。

为了实现上述操作,需要下面三个基本组件的相互协作:

  • 同步状态的原子性管理:怎么判断同步器是否可用的?怎么维护原子状态不会出现非法状态?怎么让其他线程看到当前线程对状态的修改?
  • 线程的阻塞与解除阻塞:同步器不可用时,怎么挂起线程?同步器可用时,怎么恢复挂起线程继续执行?
  • 队列的管理:有多个线程被阻塞时,怎么管理这些被阻塞的线程?同步器可用时,应该恢复哪个阻塞线程继续执行?怎么处理取消获取的线程?

继续阅读

JUC 原子类

volatile 变量

volatile变量具有可见性,也就是说线程能够自动发现volatile 变量的最新值;对volatile变量进行操作不会造成阻塞。

适用于:多个变量之间或者某个变量的当前值与修改后值之间没有约束。

正确使用volatile变量的条件:

  1. 对变量的写操作不依赖于当前值。
  2. 该变量没有包含在具有其他变量的不变式中。

所以,volatile变量不支持像i++这样的原子操作,因为这条语句包含了三个步骤:读取-加1操作-写变量。
继续阅读