JUC ConcurrentLinkedQueue

java.util.concurrent.ConcurrentLinkedQueue 是一个基于链接结点的、无界、线程安全的、FIFO队列。它的是实现采用了无等待(wait-free)、无锁(lock-free)算法,该算法基于 Maged M. Michael 和 Michael L. Scott 合著的 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms 中描述的算法。

一、设计思路

  1. 使用 CAS 原子指令来处理对数据的并发访问,把同步最小化到单个硬件指令上;这是无锁算法的基础。
  2. 分别用 head、tail 两个原子指针来指向队头和队尾,可以同时进行入队和出队操作,但允许 head、tail 并不总是指向有效的头和尾;把入队、出队需要同步更新的范围最小化到单个原子变量上,这是无锁算法实现的关键。
  3. 在入队、出队操作上并不总是更新 head、tail,而是在多次操作后才进行更新,节省了CAS指令。具有批量更新的效果。
  4. 由于 head、tail 并不总是指向头和尾,这就是说,队列会出现不一致的情况,所以在 head、tail 上定义了一些不变式来维护算法的正确性。

二、不变式

不变式是在执行方法之前和之后,队列必须要保持的。可变式是在执行过程中,队列允许出现的不一致情况。

head 的约束

通过这个结点能够在 O(1) 时间到达第一个存活(非已删除)结点,如果有的话。

不变式
  • 所有存活结点可以 从 head 开始通过 succ() 访问。
  • head != null
  • (tmp = head).next != tmp || tmp != head,这个不变式是说 head 在方法开始之前、之后,head 指向的结点应该是在队列上的。
可变式
  • head.item 可能是、也可能不是 空的。
  • 允许 tail 落后于 head,那是因为 tail 不能从 head 到达。

tail 的约束

从它可以在 O(1) 时间访问到队列的最后一个结点(唯一的满足 node.next == null 的结点)。这个也就是说 tail 指向的并不总是最后一个存活结点。

不变式
  • 最后的结点总是可以从 tail 开始通过 succ() 访问。
  • tail != null
可变式
  • tail.item 可能是、也可能不是空。
  • 允许 tail 落后于 head,那是因为 tail 不能从 head 到达。
  • tail.next 可能是、也可能不是 自指向到 tail。

三、具体实现

属性与链表结点类

// 内部结点类
// 作为无锁算法,只能通过 volatile 和 sun.misc.Unsafe UNSAFE 提供的
//  CAS 操作来保证内存可见性和原子性。
private static class Node<E> {
    volatile E item; // 结点存储的实际元素
    volatile Node<E> next; // 指向后继的指针

    Node(E item) {
      // 使用宽松的写是因为item只能在通过casNext发布之后才能看到。
      // (这里也就是说 UNSAFE.putObject 的内存语义比 volatile 写要宽松些,也就高效点)
        UNSAFE.putObject( this, itemOffset, item);
    }

     boolean casItem(E cmp, E val) {
         return UNSAFE.compareAndSwapObject( this, itemOffset, cmp, val);
     }

     void lazySetNext(Node<E> val) {
         UNSAFE.putOrderedObject( this, nextOffset, val);
     }

     boolean casNext(Node<E> cmp, Node<E> val) {
         return UNSAFE.compareAndSwapObject( this, nextOffset, cmp, val);
     }

     // 其他方法及一些属性省略
}

// ConcurrentLinkedQueue 的属性

/* 见上面说明 */
private transient volatile Node<E> head;

/* 见上面说明 */
private transient volatile Node<E> tail;

offer 操作

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
             // p 是队列的最后一个结点。(后继为空的就是最后一个存活结点。这很重要!)
            if (p.casNext(null, newNode)) {
                   // CAS 成功使 e 成为队列的元素,新节点变为 "存活"。

                   // 每次跳过两个结点,也就是不是每次添加一个结点都修改tail属性的。
                   // 这是因为在下面的casTail成功之前,其他线程可能又添加了新的结点,
                   // 所以在并发大的情况下,这个casTail几乎总是失败的,没必要每次都调用。
                   // (这里也就是说 tail 是批量更新的)
                if (p != t) // hop two nodes at a time
                   // 失败是 OK的,因为允许tail不总是指向最后一个结点。
                    casTail(t, newNode);
                return true ;
            }
            // 与其他线程竞争 CAS 失败,重新读取 next 属性。
        }
        else if (p == q)
             // p == q 表示结点 p 已经从队列里移除。如果 tail 没有被修改,它也被从队列移除,
             // 在这种情况下,我们需要跳到 head;如果 tail 被修改,新的tail是个更好的赌注。
             // (tail不是实时更新,而是批量更新,此时 tail 落后于 head,
             //   还可能即使更新了也可能又落后了)
            p = (t != (t = tail)) ? t : head;
        else
             // 结点 p 仍然在队列上,且不是最后结点,p 向前移。
             // 在两跳之后检测 tail 的更新。
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

poll 操作

获取并移除此队列的头,如果此队列为空,则返回 null。

public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {
                   // 找到当前第一个存活结点,并从队列移除成功。
                   // 批量更新 head。
                if (p != h) // hop two nodes at a time
                      // 基于不变式:head != null
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                   // p 是最后结点,且 p 的item已出队列,设置 p 为头结点。
                   // 空队列,返回null。
                updateHead(h, p);
                return null ;
            }
            else if (p == q)
                   // p 已从队列移除,重新从 head 开始。
                continue restartFromHead;
            else
                   // 结点 p 还在队列,但 p.item 已出队,前移。
                p = q;
        }
    }
}

取后继操作

在迭代过程中,返回某个结点的后继。

/*
 * 返回 p 的后继,如果 p 已被移除,则返回 head。
 */
final Node<E> succ(Node<E> p) {
    Node<E> next = p.next;
    return (p == next) ? head : next;
}

size 操作

返回此队列中的元素数量。如果此队列包含的元素数大于 Integer.MAX_VALUE,则返回 Integer.MAX_VALUE。确定当前的元素数需要进行一次花费 O(n) 时间的遍历。

public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}

QA

  • Q:为什么要定义不变式、可变式?
    A:不变式用于确保队列在操作开始之前、完成之后处于正确一致的状态。可变式允许队列在操作的执行过程中处于不一致状态,为的是提高并发性,当然也是有代价的,大多数操作方法的代码都是在处理可变式的情况。

  • Q:为什么以 node.next == null 作为尾结点的判断条件?
    A:如果以 tail 指向的结点作为尾结点的判断条件,那么在更新尾结点的 next 属性 与 把新结点设置为 tail 之间就会形成一个同步区,在第一个操作成功之后、第二个操作完成之前,所有的线程都不能进行添加操作。以 node.next == null 作为尾结点的判断条件,那么需要同步的范围就是CAS更新尾结点,而这只是一个原子操作,所有线程失败后都可以立即重试。

  • Q:head、tail 为什么都不能是 null ?
    A:如果 head、tail 允许是null,那么进行添加、移除操作时,就需要对 head、tail 是否为 null 进行判断,使每个操作都依赖于两个变量,这会使操作的状态管理非常复杂。并发编程本质就是对状态的管理,状态越少,越容易实现并发。

四、参考资料


欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。

JUC ConcurrentLinkedQueue》上有6个想法

  1. ”它的是实现采用了无等待(wait-free)、无锁(lock-free)算法”
    这个说法是正确的,赞一个,但是我想可以解释得更清楚一些:
    Offer和Poll都是Lock-Free的实现,而Contain操作则是Wait-Free(Wait-Free Population Oblivious)的实现,更多的讨论可见:
    http://cs.oswego.edu/pipermail/concurrency-interest/2013-June/011417.html
    此外,关于Wait-Free Population Oblivious的定义,可见:
    http://concurrencyfreaks.blogspot.com/2013/05/lock-free-and-wait-free-definition-and.html
    简单的理解就是这个操作的性能不受线程数影响,而Wait-Free Bounded则受线程数影响,另外Wait-Free的定义要general一些,只是说在一定的步骤内完成。

    • 关于无锁、无等待的定义、区别也可以参考下这篇文章:https://docs.google.com/presentation/d/1JkOUQ07nr0WQ8SKqcWA5D3M0v1gUdAwgNBbUcMhhGis/edit,最近发现的。

  2. 请教下为什么p==q这个条件用来判断p已被移除,一直理解不了这个

    • q 是 p 的后继,如果 p == q 的节点还在队列上,其实就是说 p 自身形成一个循环的链了。

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据