JUC 并发 Queue 设计与介绍

Queue 体系

Queue 是一种先进先出的队列。

ArrayBlockingQueue 和 LinkedBlockingQueue 是带阻塞特性,基于锁来实现。ArrayBlockingQueue 采用同一把锁来控制出、入队列操作;LinkedBlockingQueue 用两把锁来分别控制出、入队列操作,提高了并发性能。

ConcurrentLinkedQueue 非阻塞,采用无锁算法、利用 CAS 操作来实现。

0.1 BlockingQueue

当生产者向队列添加元素但队列已满时,生产者会被阻塞;当消费者从队列移除元素、但队列为空时,消费者会被阻塞。

其实现类必须是线程安全,入队列 happen-before 出队列。

0.2 TransferQueue

继承自 BlockingQueue,更进一步:生产者会一直阻塞直到添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列)。

特别适用于这种应用间传递消息的场景:生产者有时需要等待消费者接收消息,有时只需把消息放进队列、不需要等待消费者接收。

// 传递元素给消费者,如果需要则等待。确保一次传递完成。
void transfer(E e);

// 非阻塞
boolean tryTransfer(E e);

// 基于等待时间的。
boolean tryTransfer(E e, long timeout, TimeUnit unit);

// 返回是否有在等待接收元素的消费者
// (BlockingQueue.take()或带等待时间的 poll 方法调用)
boolean hasWaitingConsumer();

// 返回大概的在等待接收元素的消费者
//(BlockingQueue.take()或带等待时间的 poll 方法调用)
int getWaitingConsumerCount();

0.3 Deque

允许在两端进行插入、删除元素的线性集合。

实现类:

  • ArrayDeque:基于数组加头尾两个指针来实现、非线程安全的。
  • LinkedList:基于双向链表实现、非线程安全的。
  • ConcurrentLinkedDeque:基于双向链表、CAS 元语实现、无界的。

0.4 BlockingDeque

当 Deque 里没有元素时阻塞消费者,当没有空闲空间时阻塞生产者。

目前只有一个实现类 LinkedBlockingDeque,使用双向链表来存储元素,支持容量限制,用一把锁来保证线程安全性。因为允许在两端进行操作,双向链表更合适。

1. ArrayBlockingQueue

1.1 设计思想

ArrayBlockingQueue 由 Array、Blocking、Queue 组成,归根结底是 Queue 。

Array 表明它是基于数组来存储的,Blocking 表示队列存放不下时可以阻塞入队列调用者、为空时阻塞出队列调用者,Queue 表示这是一种先进先出的队列。

队列是先进先出的,基于数组来存储,那么第一个进队列的放在下标 0 位置,第二个放在下标 1 位置,,,出队列的时候,也是从下标 0 开始,这时,如果像 ArrayList 那样,把后续的元素往前拷贝会带来很大的性能开销,这个实现思路是不现实的。

我们不能采用移动元素的方式来处理出队列。那么换种思路,用指针来标记可以读取、写入的位置,在读写时只需修改指针,如下图:

          takeIndex                           putIndex
|-------|-----------|-------|-------|-------|----------|-------|
|       |-----5-----|---4---|---3---|---3---|          |       |
|-------|-----------|-------|-------|-------|----------|-------|

takeIndex 指向可出队列的位置,putIndex 指向可以写入的位置。

由于数组是固定大小的,需要严格控制进入的队列的数量,需要维护元素的数量 count 。count == 0 表示没有元素可出队列,count == items.length 表示满,没有空间可以入队列。

一个入队列操作涉及:更新 array[putIndex] 的内容、更新 putIndex、count 加1 。涉及的步骤多,采用加锁的方式是更合适的。

1.1.0 设计小结

  1. 用数组和出、入两个指针来实现高效访问。
  2. 用锁来实现队列访问的线程安全、公平性。
  3. 利用锁关联的两个条件 notEmpty/notFull 来实现阻塞和唤醒。
  4. 维护一个普通整型变量 count 来方便统计队列里的元素数量,同时用于协调出入操作(对 count 进行判断),非 volatile 修饰的,因为在同一个锁保护下访问。

1.2 实现源码

1.2.1 数据模型

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
}

出入队列的操作,都必须在持有锁的前提下进行:

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

具体的源码解释见 JUC ArrayBlockingQueue

2. LinkedBlockingQueue

如果出、入队列几乎以同样的速率进行操作,那么两者之间其实是没有竞争的,可以用不同的锁来分别保护出、入操作,两种操作之间通过原子变量来感知。

LinkedBlockingQueue 是基于链表的、容量任意(最大 Integer.MAX_VALUE)的阻塞队列。

实现上:采用两把锁分别用于控制入队列、出队列,出入队列之间通过一个原子变量 count 来协调。如果 count 等于 0 则认为没有可以出队列的元素,需要等待;如果 count 等于容量,是没有剩余空间可以存放元素,入队列操作需要阻塞等待。

这个的实现有点类似于两阶段提交,以入队列为例,首先获取入队列的锁 putLock:
1. 进行链表队列,此时原子变量 count 没变,出队列操作是看不到的;
2. 原子变量 count 加 1,这一步相当于事务 commit,出队列操作可以看到。

如果不是出入队列操作,比如判等某个对象是否在队列里,则需要同时加两把锁来保证线程安全性。

LinkedBlockingQueue 的吞吐量通常要高于 ArrayBlockingQueue 队列,但是在大多数并发应用程序中,其可预知的性能要低。

出入队列的操作,必须在持有 putLock 或 takeLock 锁的前提下进行:

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

具体的源码解释见 JUC LinkedBlockingQueue

3. ConcurrentLinkedQueue

3.1 设计折衷

如果并发进一步提高,出入队列的操作都很频繁,那么线程基本上是不需要等待或者等待时间极短。阻塞一般要借助锁,锁又会明显限制并发能力,因此我们可以放弃阻塞的特性。

并发程序设计,难在共享状态的管理。如果没有共享状态,那么天然是线程安全的。可以尽量减少共享状态来简化程序。

在 ArrayBlockingQueue 和 LinkedBlockingQueue 里都涉及表示队列里元素数量的共享状态 count ,对于高并发的队列,我们其实不是那么关心某一时刻队列里元素的精确数量。为了高并发,我们可以放弃这个 count,通过迭代来统计一个不那么准确的数量。

如果采用基于数组加两个指针来实现,两个指针的判断与更新就会形成一个同步区,很难用 CAS 来实现。

基于链表来实现也也没法像 LinkedBlockingQueue 做到一致的状态,ConcurrentLinkedQueue 允许中间过程出现一些不一致的状态,但能达到最终一致。

3.2 实现

ConcurrentLinkedQueue 是基于链表的、线程安全的、无边界 队列。

注意:无边界的队列可能会导致内存耗尽。

该队列实现参考了 Maged M. Michael 和 Michael L. Scott 提出的算法:《Fast, and Practical Non-Blocking and Blocking Concurrent Queue
Algorithms》。

ConcurrentLinkedQueue 采用的是单向链表来实现,为方便出、入队列,维护了 head/tail 两个指针指向队列的头和尾。

头尾结点是要遵守一定的规则和不一致的状态,具体见 JUC ConcurrentLinkedQueue

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {

    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
    }

    private transient volatile Node<E> head;
    private transient volatile Node<E> tail;
}

4. 对比

是否基于锁 底层存储 容量 使用场景/备注
ArrayBlockingQueue 基于一把重入锁 数组+出、入指针 固定容量 适用于出入队列并发不高的场景
LinkedBlockingQueue 出、入队列各用一把重入锁,用原子变量协调 单向链表、头尾指针 可指定容量 Executors.newFixedThreadPool
/newSingleThreadExecutor 方法使用
ConcurrentLinkedQueue 无锁/CAS 单向链表、头尾指针 无界 允许中间过程出现不一致,node.next==null作为尾结点的判断,而不是 tail 指针

欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据