DelayQueue 一个无界阻塞队列,只有在延迟期满时才能从中提取元素。基于 PriorityQueue
实现的延迟队列,用 ReentrantLock
提供线程安全性。
其元素必须实现 Delayed
接口。
该类可用来实现定时调度的功能,当前时间与任务的下次执行时间的距离作为延迟时间。
实现上采用 Leader_Follower 模式 的变体进行优化:leader 进行限时等待,其他线程作为 follower 无限等待。leader 在等待的过程中可能插入一个更快到期的元素,那么旧 leader 就会被作废,如果又有一个线程来获取,那么它会作为新的 leader 根据新的队列头元素进行限时等待。
public interface Delayed extends Comparable<Delayed> {
// 返回与此对象相关的剩余延迟时间,以给定的时间单位表示。
long getDelay(TimeUnit unit);
}
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 该变量被赋值给等待元素的队列头结点。这是 Leader_Follower 模式的变体,
// 用于减少限时等待。当一个线程成为 leader 时,它只等待下一个延迟到达,
// 但其他的线程都是无限等待的。
// leader 线程必须在从 take() 或 poll(...) 方法返回前通知其他线程,
// 除非其他线程在这个过程中成为了 leader 。
// 每当队列的头被一个更早到期的元素取代时,leader 字段设置为 null 表示作废,
// 同时,一些等待线程、不一定必须是当前的 leader 被通知。
// 因此等待线程必须准备好在等待过程中获取和失去 leadership。
private Thread leader = null;
// 当一个新的元素在队列头部变的可用或新的线程可能需要成为 leader 时发出通知。
private final Condition available = lock.newCondition();
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
// 新元素成为了队列的头,作废已有的 leader,通知等待线程。
// 问题1:为啥要废弃 leader?
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
// 队列为空,只能无限等待
else {
long delay = first.getDelay(NANOSECONDS);
// 队列头到期了
// 此时不一定是 leader 获得了队列头元素
if (delay <= 0)
return q.poll();
// 在等待的过程中不持有引用。
// 问题2:为啥专门放弃引用?
first = null;
if (leader != null)
// 如果已经有 leader 了,作为 follower 进行无限等待
available.await();
else {
// 队列里有未到期的元素、且没有 leader,自己成为 leader。
Thread thisThread = Thread.currentThread();
leader = thisThread;
// 作为 leader 进行限时等待
try {
available.awaitNanos(delay);
} finally {
// 作废 leader。
if (leader == thisThread)
// 问题3:为啥在这里作废 leader?
leader = null;
}
}
}
}
} finally {
// leader 不为空说明 leader 线程还在限时等待,不需要唤醒
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
}
问题回答
-
问题1:此时可能是新元素比原来的 leader 的等待时间更短,原来的 leader 失去了 leadership。此处作废是为了让旧 leader 等待的元素之前的元素能够尽快被处理。比如旧 leader 进行限时等待 1000ms,此时连续进来 30ms/50ms 的两个元素;如果此时不作废,后续就有线程来获取元素,会因为有旧 leader 进入无限等待;作废后,后续的线程可能只需要限时等待 30ms,提高了延迟队列的准确性。
-
问题2:因为在等待过程中,这个元素可能被其他线程处理、需要进行垃圾回收,防止被这个线程的栈引用了而没法垃圾回收。
-
问题3:首先,leader 可能在等待的过程中变了,因此需要先判断
leader == thisThread
。如果在 return 语句前进行作废,对于获取竞争不激烈的场景是不需要作废,这样会多出一些没必要的判断leader == thisThread
。另外是限时等待到时后应该都能获得元素,线程到时唤醒后作废 leader 也是合理的。
小结
引入 leader_follower 模式是为了优化在高并发下的等待,尽量用无限等待替代限时等待,也防止了多个线程同时竞争头结点。
实现上做到了 GC 友好。
欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。