java.util.concurrent.ConcurrentLinkedQueue
是一个基于链接结点的、无界、线程安全的、FIFO队列。它的是实现采用了无等待(wait-free)、无锁(lock-free)算法,该算法基于 Maged M. Michael 和 Michael L. Scott 合著的 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms 中描述的算法。
一、设计思路
- 使用 CAS 原子指令来处理对数据的并发访问,把同步最小化到单个硬件指令上;这是无锁算法的基础。
- 分别用 head、tail 两个原子指针来指向队头和队尾,可以同时进行入队和出队操作,但允许 head、tail 并不总是指向有效的头和尾;把入队、出队需要同步更新的范围最小化到单个原子变量上,这是无锁算法实现的关键。
- 在入队、出队操作上并不总是更新 head、tail,而是在多次操作后才进行更新,节省了CAS指令。具有批量更新的效果。
- 由于 head、tail 并不总是指向头和尾,这就是说,队列会出现不一致的情况,所以在 head、tail 上定义了一些不变式来维护算法的正确性。
二、不变式
不变式是在执行方法之前和之后,队列必须要保持的。可变式是在执行过程中,队列允许出现的不一致情况。
head 的约束
通过这个结点能够在 O(1) 时间到达第一个存活(非已删除)结点,如果有的话。
不变式
- 所有存活结点可以 从 head 开始通过 succ() 访问。
head != null
。(tmp = head).next != tmp || tmp != head
,这个不变式是说 head 在方法开始之前、之后,head 指向的结点应该是在队列上的。
可变式
head.item
可能是、也可能不是 空的。- 允许 tail 落后于 head,那是因为 tail 不能从 head 到达。
tail 的约束
从它可以在 O(1) 时间访问到队列的最后一个结点(唯一的满足 node.next == null
的结点)。这个也就是说 tail 指向的并不总是最后一个存活结点。
不变式
- 最后的结点总是可以从 tail 开始通过
succ()
访问。 tail != null
。
可变式
- tail.item 可能是、也可能不是空。
- 允许 tail 落后于 head,那是因为 tail 不能从 head 到达。
- tail.next 可能是、也可能不是 自指向到 tail。
三、具体实现
属性与链表结点类
// 内部结点类
// 作为无锁算法,只能通过 volatile 和 sun.misc.Unsafe UNSAFE 提供的
// CAS 操作来保证内存可见性和原子性。
private static class Node<E> {
volatile E item; // 结点存储的实际元素
volatile Node<E> next; // 指向后继的指针
Node(E item) {
// 使用宽松的写是因为item只能在通过casNext发布之后才能看到。
// (这里也就是说 UNSAFE.putObject 的内存语义比 volatile 写要宽松些,也就高效点)
UNSAFE.putObject( this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject( this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject( this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject( this, nextOffset, cmp, val);
}
// 其他方法及一些属性省略
}
// ConcurrentLinkedQueue 的属性
/* 见上面说明 */
private transient volatile Node<E> head;
/* 见上面说明 */
private transient volatile Node<E> tail;
offer 操作
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p 是队列的最后一个结点。(后继为空的就是最后一个存活结点。这很重要!)
if (p.casNext(null, newNode)) {
// CAS 成功使 e 成为队列的元素,新节点变为 "存活"。
// 每次跳过两个结点,也就是不是每次添加一个结点都修改tail属性的。
// 这是因为在下面的casTail成功之前,其他线程可能又添加了新的结点,
// 所以在并发大的情况下,这个casTail几乎总是失败的,没必要每次都调用。
// (这里也就是说 tail 是批量更新的)
if (p != t) // hop two nodes at a time
// 失败是 OK的,因为允许tail不总是指向最后一个结点。
casTail(t, newNode);
return true ;
}
// 与其他线程竞争 CAS 失败,重新读取 next 属性。
}
else if (p == q)
// p == q 表示结点 p 已经从队列里移除。如果 tail 没有被修改,它也被从队列移除,
// 在这种情况下,我们需要跳到 head;如果 tail 被修改,新的tail是个更好的赌注。
// (tail不是实时更新,而是批量更新,此时 tail 落后于 head,
// 还可能即使更新了也可能又落后了)
p = (t != (t = tail)) ? t : head;
else
// 结点 p 仍然在队列上,且不是最后结点,p 向前移。
// 在两跳之后检测 tail 的更新。
p = (p != t && t != (t = tail)) ? t : q;
}
}
poll 操作
获取并移除此队列的头,如果此队列为空,则返回 null。
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// 找到当前第一个存活结点,并从队列移除成功。
// 批量更新 head。
if (p != h) // hop two nodes at a time
// 基于不变式:head != null
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
// p 是最后结点,且 p 的item已出队列,设置 p 为头结点。
// 空队列,返回null。
updateHead(h, p);
return null ;
}
else if (p == q)
// p 已从队列移除,重新从 head 开始。
continue restartFromHead;
else
// 结点 p 还在队列,但 p.item 已出队,前移。
p = q;
}
}
}
取后继操作
在迭代过程中,返回某个结点的后继。
/*
* 返回 p 的后继,如果 p 已被移除,则返回 head。
*/
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;
}
size 操作
返回此队列中的元素数量。如果此队列包含的元素数大于 Integer.MAX_VALUE,则返回 Integer.MAX_VALUE。确定当前的元素数需要进行一次花费 O(n) 时间的遍历。
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
QA
-
Q:为什么要定义不变式、可变式?
A:不变式用于确保队列在操作开始之前、完成之后处于正确一致的状态。可变式允许队列在操作的执行过程中处于不一致状态,为的是提高并发性,当然也是有代价的,大多数操作方法的代码都是在处理可变式的情况。 -
Q:为什么以
node.next == null
作为尾结点的判断条件?
A:如果以 tail 指向的结点作为尾结点的判断条件,那么在更新尾结点的 next 属性 与 把新结点设置为 tail 之间就会形成一个同步区,在第一个操作成功之后、第二个操作完成之前,所有的线程都不能进行添加操作。以node.next == null
作为尾结点的判断条件,那么需要同步的范围就是CAS更新尾结点,而这只是一个原子操作,所有线程失败后都可以立即重试。 -
Q:head、tail 为什么都不能是 null ?
A:如果 head、tail 允许是null,那么进行添加、移除操作时,就需要对 head、tail 是否为 null 进行判断,使每个操作都依赖于两个变量,这会使操作的状态管理非常复杂。并发编程本质就是对状态的管理,状态越少,越容易实现并发。
四、参考资料
- JDK 1.7.0_10 源码
- 非阻塞算法在并发容器中的实现
- 聊聊并发(六)ConcurrentLinkedQueue的实现原理分析
欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。
”它的是实现采用了无等待(wait-free)、无锁(lock-free)算法”
这个说法是正确的,赞一个,但是我想可以解释得更清楚一些:
Offer和Poll都是Lock-Free的实现,而Contain操作则是Wait-Free(Wait-Free Population Oblivious)的实现,更多的讨论可见:
http://cs.oswego.edu/pipermail/concurrency-interest/2013-June/011417.html
此外,关于Wait-Free Population Oblivious的定义,可见:
http://concurrencyfreaks.blogspot.com/2013/05/lock-free-and-wait-free-definition-and.html
简单的理解就是这个操作的性能不受线程数影响,而Wait-Free Bounded则受线程数影响,另外Wait-Free的定义要general一些,只是说在一定的步骤内完成。
关于无锁、无等待的定义、区别也可以参考下这篇文章:https://docs.google.com/presentation/d/1JkOUQ07nr0WQ8SKqcWA5D3M0v1gUdAwgNBbUcMhhGis/edit,最近发现的。
请教下为什么p==q这个条件用来判断p已被移除,一直理解不了这个
q 是 p 的后继,如果 p == q 的节点还在队列上,其实就是说 p 自身形成一个循环的链了。
什么情况下会出现 p == q 的情况呢
q 是 p 的后继,如果 p == q 的节点还在队列上,其实就是说 p 自身形成一个循环的链了。