JUC 延迟队列 DelayQueue

DelayQueue 一个无界阻塞队列,只有在延迟期满时才能从中提取元素。基于 PriorityQueue 实现的延迟队列,用 ReentrantLock 提供线程安全性。

其元素必须实现 Delayed 接口。

该类可用来实现定时调度的功能,当前时间与任务的下次执行时间的距离作为延迟时间。

实现上采用 Leader_Follower 模式 的变体进行优化:leader 进行限时等待,其他线程作为 follower 无限等待。leader 在等待的过程中可能插入一个更快到期的元素,那么旧 leader 就会被作废,如果又有一个线程来获取,那么它会作为新的 leader 根据新的队列头元素进行限时等待。

public interface Delayed extends Comparable<Delayed> {
    // 返回与此对象相关的剩余延迟时间,以给定的时间单位表示。
    long getDelay(TimeUnit unit);
}

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    // 该变量被赋值给等待元素的队列头结点。这是 Leader_Follower 模式的变体,
    // 用于减少限时等待。当一个线程成为 leader 时,它只等待下一个延迟到达,
    // 但其他的线程都是无限等待的。
    // leader 线程必须在从 take() 或 poll(...) 方法返回前通知其他线程,
    // 除非其他线程在这个过程中成为了 leader 。
    // 每当队列的头被一个更早到期的元素取代时,leader 字段设置为 null 表示作废,
    // 同时,一些等待线程、不一定必须是当前的 leader 被通知。
    // 因此等待线程必须准备好在等待过程中获取和失去 leadership。
    private Thread leader = null;

    // 当一个新的元素在队列头部变的可用或新的线程可能需要成为 leader 时发出通知。
    private final Condition available = lock.newCondition();

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                // 新元素成为了队列的头,作废已有的 leader,通知等待线程。
                // 问题1:为啥要废弃 leader?
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                    // 队列为空,只能无限等待
                else {
                    long delay = first.getDelay(NANOSECONDS);

                    // 队列头到期了
                    // 此时不一定是 leader 获得了队列头元素
                    if (delay <= 0)
                        return q.poll();

                    // 在等待的过程中不持有引用。
                    // 问题2:为啥专门放弃引用?
                    first = null;

                    if (leader != null)
                        // 如果已经有 leader 了,作为 follower 进行无限等待
                        available.await();

                    else {
                        // 队列里有未到期的元素、且没有 leader,自己成为 leader。
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        // 作为 leader 进行限时等待
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            // 作废 leader。
                            if (leader == thisThread)
                                // 问题3:为啥在这里作废 leader?
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // leader 不为空说明 leader 线程还在限时等待,不需要唤醒
            if (leader == null && q.peek() != null)
                available.signal();

            lock.unlock();
        }
    }
}

问题回答

  • 问题1:此时可能是新元素比原来的 leader 的等待时间更短,原来的 leader 失去了 leadership。此处作废是为了让旧 leader 等待的元素之前的元素能够尽快被处理。比如旧 leader 进行限时等待 1000ms,此时连续进来 30ms/50ms 的两个元素;如果此时不作废,后续就有线程来获取元素,会因为有旧 leader 进入无限等待;作废后,后续的线程可能只需要限时等待 30ms,提高了延迟队列的准确性。

  • 问题2:因为在等待过程中,这个元素可能被其他线程处理、需要进行垃圾回收,防止被这个线程的栈引用了而没法垃圾回收。

  • 问题3:首先,leader 可能在等待的过程中变了,因此需要先判断 leader == thisThread。如果在 return 语句前进行作废,对于获取竞争不激烈的场景是不需要作废,这样会多出一些没必要的判断 leader == thisThread。另外是限时等待到时后应该都能获得元素,线程到时唤醒后作废 leader 也是合理的。

小结

引入 leader_follower 模式是为了优化在高并发下的等待,尽量用无限等待替代限时等待,也防止了多个线程同时竞争头结点。

实现上做到了 GC 友好。


欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据