Future
来自 Java DOC 文档:Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。
也就是说Future具有这样的特性:
- 异步执行,可用 get 方法获取执行结果。
- 如果计算还没完成,get 方法是会阻塞的,如果完成了,是可以多次获取并立即得到结果的。
- 如果计算还没完成,是可以取消计算的。
- 可以查询计算的执行状态。
埋两个小问题用于设想下怎么实现Future:
- Future在计算完成前阻塞 get 访问,完成后可以自由访问,如何实现 get 方法?
- 计算的取消是怎么实现的?被取消的计算会终止执行吗?
FutureTask
FutureTask 是JUC里对Future的一个实现,还实现了 Runnable 接口,所以可以提交给线程池执行。
FutureTask 只有一个自定义的同步器 Sync 的属性,所有的方法都是委派给此同步器来实现。这也是JUC里使用AQS的通用模式。
下面的代码是 JDK 1.7.10 的源码。
FutureTask 实现
FutureTask的定义,省略了很多代码。
public class FutureTask<V> implements RunnableFuture<V> {
// FutureTask 用于同步控制
private final Sync sync;
public boolean isCancelled() {
return sync.innerIsCancelled();
}
public boolean isDone() {
return sync.innerIsDone();
}
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}
// 省略其他方法
}
FutureTask 的同步器
由于Future在任务完成后,可以多次自由获取结果,因此,用于控制同步的AQS使用共享模式。
FutureTask 底层任务的执行状态保存在AQS的状态里。AQS是否允许线程获取(是否阻塞)是取决于任务是否执行完成,而不是具体的状态值。
private final class Sync extends AbstractQueuedSynchronizer {
// 定义表示任务执行状态的常量。由于使用了位运算进行判断,所以状态值分别是2的幂。
// 表示任务已经准备好了,可以执行
private static final int READY = 0;
// 表示任务正在执行中
private static final int RUNNING = 1;
// 表示任务已执行完成
private static final int RAN = 2;
// 表示任务已取消
private static final int CANCELLED = 4;
// 底层的表示任务的可执行对象
private final Callable<V> callable;
// 表示任务执行结果,用于get方法返回。
private V result;
// 表示任务执行中的异常,用于get方法调用时抛出。
private Throwable exception;
/*
* 用于执行任务的线程。在 set/cancel 方法后置为空,表示结果可获取。
* 必须是 volatile的,用于确保完成后(result和exception)的可见性。
* (如果runner不是volatile,则result和exception必须都是volatile的)
*/
private volatile Thread runner;
/**
* 已完成或已取消 时成功获取
*/
protected int tryAcquireShared( int ignore) {
return innerIsDone() ? 1 : -1;
}
/**
* 在设置最终完成状态后让AQS总是通知,通过设置runner线程为空。
* 这个方法并没有更新AQS的state属性,
* 所以可见性是通过对volatile的runner的写来保证的。
*/
protected boolean tryReleaseShared( int ignore) {
runner = null;
return true;
}
// 执行任务的方法
void innerRun() {
// 用于确保任务不会重复执行
if (!compareAndSetState(READY, RUNNING))
return;
// 由于Future一般是异步执行,所以runner一般是线程池里的线程。
runner = Thread.currentThread();
// 设置执行线程后再次检查,在执行前检查是否被异步取消
// 由于前面的CAS已把状态设置RUNNING,
if (getState() == RUNNING) { // recheck after setting thread
V result;
//
try {
result = callable.call();
} catch (Throwable ex) {
// 捕获任务执行过程中抛出的所有异常
setException(ex);
return;
}
set(result);
} else {
// 释放等待的线程
releaseShared(0); // cancel
}
}
// 设置结果
void innerSet(V v) {
// 放在循环里进行是为了失败后重试。
for (;;) {
// AQS初始化时,状态值默认是 0,对应这里也就是 READY 状态。
int s = getState();
// 已完成任务不能设置结果
if (s == RAN)
return;
// 已取消 的任务不能设置结果
if (s == CANCELLED) {
// releaseShared 会设置runner为空,
// 这是考虑到与其他的取消请求线程 竞争中断 runner
releaseShared(0);
return;
}
// 先设置已完成,免得多次设置
if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0); // 此方法会更新 runner,保证result的可见性
done();
return;
}
}
}
// 获取异步计算的结果
V innerGet() throws InterruptedException, ExecutionException {
acquireSharedInterruptibly(0);// 获取共享,如果没有完成则会阻塞。
// 检查是否被取消
if (getState() == CANCELLED)
throw new CancellationException();
// 异步计算过程中出现异常
if (exception != null)
throw new ExecutionException(exception);
return result;
}
// 取消执行任务
boolean innerCancel( boolean mayInterruptIfRunning) {
for (;;) {
int s = getState();
// 已完成或已取消的任务不能再次取消
if (ranOrCancelled(s))
return false;
// 任务处于 READY 或 RUNNING
if (compareAndSetState(s, CANCELLED))
break;
}
// 任务取消后,中断执行线程
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null)
r.interrupt();
}
releaseShared(0); // 释放等待的访问结果的线程
done();
return true;
}
/**
* 检查任务是否处于完成或取消状态
*/
private boolean ranOrCancelled( int state) {
return (state & (RAN | CANCELLED)) != 0;
}
// 其他方法省略
}
从 innerCancel 方法可知,取消操作只是改变了任务对象的状态并可能会中断执行线程。如果任务的逻辑代码没有响应中断,则会一直异步执行直到完成,只是最终的执行结果不会被通过get方法返回,计算资源的开销仍然是存在的。
总的来说,Future 是线程间协调的一种工具。
欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。