Queue 体系
Queue 是一种先进先出的队列。
ArrayBlockingQueue 和 LinkedBlockingQueue 是带阻塞特性,基于锁来实现。ArrayBlockingQueue 采用同一把锁来控制出、入队列操作;LinkedBlockingQueue 用两把锁来分别控制出、入队列操作,提高了并发性能。
ConcurrentLinkedQueue 非阻塞,采用无锁算法、利用 CAS 操作来实现。
0.1 BlockingQueue
当生产者向队列添加元素但队列已满时,生产者会被阻塞;当消费者从队列移除元素、但队列为空时,消费者会被阻塞。
其实现类必须是线程安全,入队列 happen-before 出队列。
0.2 TransferQueue
继承自 BlockingQueue,更进一步:生产者会一直阻塞直到添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列)。
特别适用于这种应用间传递消息的场景:生产者有时需要等待消费者接收消息,有时只需把消息放进队列、不需要等待消费者接收。
// 传递元素给消费者,如果需要则等待。确保一次传递完成。
void transfer(E e);
// 非阻塞
boolean tryTransfer(E e);
// 基于等待时间的。
boolean tryTransfer(E e, long timeout, TimeUnit unit);
// 返回是否有在等待接收元素的消费者
// (BlockingQueue.take()或带等待时间的 poll 方法调用)
boolean hasWaitingConsumer();
// 返回大概的在等待接收元素的消费者
//(BlockingQueue.take()或带等待时间的 poll 方法调用)
int getWaitingConsumerCount();
0.3 Deque
允许在两端进行插入、删除元素的线性集合。
实现类:
- ArrayDeque:基于数组加头尾两个指针来实现、非线程安全的。
- LinkedList:基于双向链表实现、非线程安全的。
- ConcurrentLinkedDeque:基于双向链表、CAS 元语实现、无界的。
0.4 BlockingDeque
当 Deque 里没有元素时阻塞消费者,当没有空闲空间时阻塞生产者。
目前只有一个实现类 LinkedBlockingDeque,使用双向链表来存储元素,支持容量限制,用一把锁来保证线程安全性。因为允许在两端进行操作,双向链表更合适。
1. ArrayBlockingQueue
1.1 设计思想
ArrayBlockingQueue 由 Array、Blocking、Queue 组成,归根结底是 Queue 。
Array 表明它是基于数组来存储的,Blocking 表示队列存放不下时可以阻塞入队列调用者、为空时阻塞出队列调用者,Queue 表示这是一种先进先出的队列。
队列是先进先出的,基于数组来存储,那么第一个进队列的放在下标 0 位置,第二个放在下标 1 位置,,,出队列的时候,也是从下标 0 开始,这时,如果像 ArrayList 那样,把后续的元素往前拷贝会带来很大的性能开销,这个实现思路是不现实的。
我们不能采用移动元素的方式来处理出队列。那么换种思路,用指针来标记可以读取、写入的位置,在读写时只需修改指针,如下图:
takeIndex putIndex
|-------|-----------|-------|-------|-------|----------|-------|
| |-----5-----|---4---|---3---|---3---| | |
|-------|-----------|-------|-------|-------|----------|-------|
takeIndex 指向可出队列的位置,putIndex 指向可以写入的位置。
由于数组是固定大小的,需要严格控制进入的队列的数量,需要维护元素的数量 count 。count == 0
表示没有元素可出队列,count == items.length
表示满,没有空间可以入队列。
一个入队列操作涉及:更新 array[putIndex] 的内容、更新 putIndex、count 加1 。涉及的步骤多,采用加锁的方式是更合适的。
1.1.0 设计小结
- 用数组和出、入两个指针来实现高效访问。
- 用锁来实现队列访问的线程安全、公平性。
- 利用锁关联的两个条件
notEmpty/notFull
来实现阻塞和唤醒。 - 维护一个普通整型变量
count
来方便统计队列里的元素数量,同时用于协调出入操作(对 count 进行判断),非volatile
修饰的,因为在同一个锁保护下访问。
1.2 实现源码
1.2.1 数据模型
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
}
出入队列的操作,都必须在持有锁的前提下进行:
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
具体的源码解释见 JUC ArrayBlockingQueue 。
2. LinkedBlockingQueue
如果出、入队列几乎以同样的速率进行操作,那么两者之间其实是没有竞争的,可以用不同的锁来分别保护出、入操作,两种操作之间通过原子变量来感知。
LinkedBlockingQueue 是基于链表的、容量任意(最大 Integer.MAX_VALUE)的阻塞队列。
实现上:采用两把锁分别用于控制入队列、出队列,出入队列之间通过一个原子变量 count 来协调。如果 count 等于 0 则认为没有可以出队列的元素,需要等待;如果 count 等于容量,是没有剩余空间可以存放元素,入队列操作需要阻塞等待。
这个的实现有点类似于两阶段提交,以入队列为例,首先获取入队列的锁 putLock:
1. 进行链表队列,此时原子变量 count 没变,出队列操作是看不到的;
2. 原子变量 count 加 1,这一步相当于事务 commit,出队列操作可以看到。
如果不是出入队列操作,比如判等某个对象是否在队列里,则需要同时加两把锁来保证线程安全性。
LinkedBlockingQueue 的吞吐量通常要高于 ArrayBlockingQueue 队列,但是在大多数并发应用程序中,其可预知的性能要低。
出入队列的操作,必须在持有 putLock 或 takeLock 锁的前提下进行:
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
具体的源码解释见 JUC LinkedBlockingQueue 。
3. ConcurrentLinkedQueue
3.1 设计折衷
如果并发进一步提高,出入队列的操作都很频繁,那么线程基本上是不需要等待或者等待时间极短。阻塞一般要借助锁,锁又会明显限制并发能力,因此我们可以放弃阻塞的特性。
并发程序设计,难在共享状态的管理。如果没有共享状态,那么天然是线程安全的。可以尽量减少共享状态来简化程序。
在 ArrayBlockingQueue 和 LinkedBlockingQueue 里都涉及表示队列里元素数量的共享状态 count ,对于高并发的队列,我们其实不是那么关心某一时刻队列里元素的精确数量。为了高并发,我们可以放弃这个 count,通过迭代来统计一个不那么准确的数量。
如果采用基于数组加两个指针来实现,两个指针的判断与更新就会形成一个同步区,很难用 CAS 来实现。
基于链表来实现也也没法像 LinkedBlockingQueue 做到一致的状态,ConcurrentLinkedQueue
允许中间过程出现一些不一致的状态,但能达到最终一致。
3.2 实现
ConcurrentLinkedQueue
是基于链表的、线程安全的、无边界 队列。
注意:无边界的队列可能会导致内存耗尽。
该队列实现参考了 Maged M. Michael 和 Michael L. Scott 提出的算法:《Fast, and Practical Non-Blocking and Blocking Concurrent Queue
Algorithms》。
ConcurrentLinkedQueue
采用的是单向链表来实现,为方便出、入队列,维护了 head/tail
两个指针指向队列的头和尾。
头尾结点是要遵守一定的规则和不一致的状态,具体见 JUC ConcurrentLinkedQueue 。
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
private static class Node<E> {
volatile E item;
volatile Node<E> next;
}
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
}
4. 对比
类 | 是否基于锁 | 底层存储 | 容量 | 使用场景/备注 |
---|---|---|---|---|
ArrayBlockingQueue | 基于一把重入锁 | 数组+出、入指针 | 固定容量 | 适用于出入队列并发不高的场景 |
LinkedBlockingQueue | 出、入队列各用一把重入锁,用原子变量协调 | 单向链表、头尾指针 | 可指定容量 | Executors.newFixedThreadPool /newSingleThreadExecutor 方法使用 |
ConcurrentLinkedQueue | 无锁/CAS | 单向链表、头尾指针 | 无界 | 允许中间过程出现不一致,node.next==null作为尾结点的判断,而不是 tail 指针 |
欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。