java.util.concurrent.ArrayBlockingQueue
是一个线程安全的、基于数组、有界的、阻塞的、FIFO 队列。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
此类基于 java.util.concurrent.locks.ReentrantLock
来实现线程安全,所以提供了 ReentrantLock
所能支持的公平性选择。
属性
队列的操作主要有读、写,所以用了两个 int
类型的属性作为下一个读写位置的的指针。存放元素的数组是 final
修饰的,所以数组的大小是固定的。对于并发控制,是所有的访问都必须加锁,并用两个条件对象用于协调读写操作。
// 队列存放元素的容器
final Object[] items;
// 下一次读取或移除的位置
int takeIndex;
// 存放下一个放入元素的位置
int putIndex;
// 队列里有效元素的数量
int count;
// 所有访问的保护锁
final ReentrantLock lock;
// 等待获取的条件
private final Condition notEmpty;
// 等待放入的条件
private final Condition notFull;
环绕处理
如果指针一直往前增加或一直往后减小,那么总会超出数组的有效索引范围。所以需要进行一些环绕处理。
// 指针前移
final int inc(int i) {
return (++i == items.length) ? 0 : i;
}
// 指针后移
final int dec(int i) {
return ((i == 0) ? items.length : i) - 1;
}
注意,上面的处理都是对指针值的直接处理,而不关心是读指针还是写指针,因为是否有可读元素、可写空间的判断是通过对 count
计数来判断的。
这也是 count
的作用,它极大地简化了指针有效性的判断。在下面的 insert
和 extract
方法中根本就不需要对读写指针之间的位置关系进行判断,非常精妙。
通过环绕处理可以把这个数组看成是圆形的缓存。
添加元素
所有添加操作最终都是调用到内部方法 insert
。
// 在持有锁的前提下调用
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex); // 指针前移 1
++count; // 有效元素数量加 1
notEmpty.signal(); // 通知在非空条件上等待的读线程
}
读取元素
所有读取操作最终都是调用到内部方法 extract
。
// 在持有锁的前提下调用
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null; // for GC,避免内存泄露;也用于判断元素是否被移除
takeIndex = inc(takeIndex); // 指针前移 1
--count; // 有效元素数量减 1
notFull.signal(); // 通知在非满条件上等待的写线程
return x;
}
移除指定位置元素
// 在持有锁的前提下调用
void removeAt(int i) {
final Object[] items = this.items;
// 如果要移除是元素就是下一个可读数据,直接移除、修改读指针即可。
// 这是一种优化,避免数据拷贝。
if (i == takeIndex) {
items[takeIndex] = null;
takeIndex = inc(takeIndex);
} else {
// 如果要移除元素是在有效数据的中间,那么要把它之后添加的元素后移
// 注意:这里不能用读写指针的大小关系作为终结条件,也是因为环绕。
for (;;) {
int nexti = inc(i);
if (nexti != putIndex) {
items[i] = items[nexti];
i = nexti;
} else {
items[i] = null; // for GC
putIndex = i; // putIndex 不是直接减 1 还是因为环绕。
break;
}
}
}
--count;
notFull.signal(); //
}
方法加锁
作为线程安全的类,ArrayBlockingQueue
的所有公开方法的逻辑都是在加锁的前提下进行的。这里以put方法为例。
通过 put
方法添加元素时,线程会一直等待,直到有空闲空间可以放入元素。
public void put(E e) throws InterruptedException {
checkNotNull(e); // 不允许存空值,JUC下线程安全的容器都不允许存空值。
// 在JUC的很多类里,都会看到这种写法:把类的属性赋值给方法内的一个变量。
// 这是因为类的属性是存放在堆里的,方法内的变量是存放在方法栈上的,访问方法栈比访问堆要快。
// 在这里,this.lock属性要访问两次,通过赋值给方法的局部变量,就节省了一次堆的访问。
// 其他的类属性只访问一次就不需要这样处理了。优化无处不在!!
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加锁
try {
// 放在循环里是避免虚假唤醒
// 容器满的时候持续等待
while (count == items.length)
// await 方法会导致当前线程释放锁,等待其他线程唤醒,唤醒后重新获取锁
notFull.await();
insert(e);
} finally {
lock.unlock(); // 释放锁
}
}
迭代
根据DOC文档说明,此类 iterator()
方法每次返回的 Iterator
是一个“弱一致”的迭代器,从不抛出 ConcurrentModificationException
,并且确保可遍历迭代器构造时存在的元素,此外还可能(但并不保证)反映构造后的所有修改。
内部的迭代器类:
private class Itr implements Iterator<E> {
private int remaining; // 剩余可返回元素数量
private int nextIndex; // 下一个可返回元素的位置
private E nextItem; // 下一个可返回的元素
private E lastItem; // 上一次返回的元素
private int lastRet; // 上次返回的元素的位置,-1 表示没有。
Itr() {
final ReentrantLock lock = ArrayBlockingQueue. this.lock;
lock.lock();
try {
lastRet = -1;
// 初始化的时候记录容器的当前存在元素(通过记录数 count 和读指针 takeIndex)。
if ((remaining = count) > 0)
nextItem = itemAt(nextIndex = takeIndex);
} finally {
lock.unlock();
}
}
public boolean hasNext() {
return remaining > 0;
}
public E next() {
final ReentrantLock lock = ArrayBlockingQueue. this.lock;
lock.lock();
try {
if (remaining <= 0)
throw new NoSuchElementException();
lastRet = nextIndex;
E x = itemAt(nextIndex); // 检查元素是否被更新
if (x == null) { // 该位置元素被移除了
x = nextItem; // 只能被迫返回旧值
lastItem = null ; // 用于使移除操作失败
} else {
lastItem = x;
}
// 跳过空元素(也就是迭代器创建之后被移除的元素)
while (--remaining > 0 && // skip over nulls
(nextItem = itemAt(nextIndex = inc(nextIndex))) == null )
// 设置下一次要返回的元素
;
return x;
} finally {
lock.unlock();
}
}
public void remove() {
final ReentrantLock lock = ArrayBlockingQueue. this.lock;
lock.lock();
try {
int i = lastRet;
if (i == -1)
throw new IllegalStateException();
lastRet = -1;
E x = lastItem;
lastItem = null ;
// only remove if item still at index
if (x != null && x == items[i]) {
// 只有在上次调用 next() 方法时候当前遍历位置的元素仍然在那里时才移除
boolean removingHead = (i == takeIndex);
removeAt(i);
if (!removingHead)
nextIndex = dec(nextIndex);
}
} finally {
lock.unlock();
}
}
}
从迭代器的源码可以看出,它只能感知在它创建时有效元素位置上的变化(删除、被替换),而不能感知新增的元素。
关于线程安全类的使用注意
《Java 并发编程实践》提到:就算代码都是用基于线程安全类构建的程序也不一定就是线程安全的。
static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
for (Iterator<Integer> iterator = queue.iterator(); iterator.hasNext(); ) {
iterator.next();
iterator.remove();
}
在上面的代码片段里,如果 queue
是可以被多个线程访问的,那么上面的代码就不是线程安全的,因为 iterator 创建之后,hasNext、next、remove 这三个方法的调用之间,就有可能被其他线程修改了 queue ,导致抛出异常 NoSuchElementException。
线程安全类只保证了它的单个方法是线程安全的,如果要确保多个方法调用之间还是线程安全的,就必须使用另外的同步进行保证,而且要用同一个锁来保护,比如:
synchronized (queue) {
for (Iterator<Integer> iterator = queue.iterator(); iterator.hasNext(); ) {
iterator.next();
iterator.remove();
}
}
欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。