Future 与 FutureTask

Future

来自 Java DOC 文档:Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。

也就是说Future具有这样的特性:

  • 异步执行,可用 get 方法获取执行结果。
  • 如果计算还没完成,get 方法是会阻塞的,如果完成了,是可以多次获取并立即得到结果的。
  • 如果计算还没完成,是可以取消计算的。
  • 可以查询计算的执行状态。

埋两个小问题用于设想下怎么实现Future:

  1. Future在计算完成前阻塞 get 访问,完成后可以自由访问,如何实现 get 方法?
  2. 计算的取消是怎么实现的?被取消的计算会终止执行吗?

FutureTask

FutureTask 是JUC里对Future的一个实现,还实现了 Runnable 接口,所以可以提交给线程池执行。

FutureTask 只有一个自定义的同步器 Sync 的属性,所有的方法都是委派给此同步器来实现。这也是JUC里使用AQS的通用模式。

下面的代码是 JDK 1.7.10 的源码。

FutureTask 实现

FutureTask的定义,省略了很多代码。

public class FutureTask<V> implements RunnableFuture<V> {
    // FutureTask 用于同步控制
    private final Sync sync;

    public boolean isCancelled() {
        return sync.innerIsCancelled();
    }

    public boolean isDone() {
        return sync.innerIsDone();
    }

    public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
    }

    // 省略其他方法
}

FutureTask 的同步器

由于Future在任务完成后,可以多次自由获取结果,因此,用于控制同步的AQS使用共享模式。

FutureTask 底层任务的执行状态保存在AQS的状态里。AQS是否允许线程获取(是否阻塞)是取决于任务是否执行完成,而不是具体的状态值。

private final class Sync extends AbstractQueuedSynchronizer {
    // 定义表示任务执行状态的常量。由于使用了位运算进行判断,所以状态值分别是2的幂。

    // 表示任务已经准备好了,可以执行
    private static final int READY     = 0;

    // 表示任务正在执行中
    private static final int RUNNING   = 1;

    // 表示任务已执行完成
    private static final int RAN       = 2;

    // 表示任务已取消
    private static final int CANCELLED = 4;


    // 底层的表示任务的可执行对象
    private final Callable<V> callable;

    // 表示任务执行结果,用于get方法返回。
    private V result;

    // 表示任务执行中的异常,用于get方法调用时抛出。
    private Throwable exception;

     /*
     * 用于执行任务的线程。在 set/cancel 方法后置为空,表示结果可获取。
     * 必须是 volatile的,用于确保完成后(result和exception)的可见性。
     * (如果runner不是volatile,则result和exception必须都是volatile的)
     */
    private volatile Thread runner;


     /**
     * 已完成或已取消 时成功获取
     */
    protected int tryAcquireShared( int ignore) {
        return innerIsDone() ? 1 : -1;
    }

    /**
     * 在设置最终完成状态后让AQS总是通知,通过设置runner线程为空。
     * 这个方法并没有更新AQS的state属性,
     * 所以可见性是通过对volatile的runner的写来保证的。
     */
    protected boolean tryReleaseShared( int ignore) {
        runner = null;
        return true;
    }


     // 执行任务的方法
    void innerRun() {
        // 用于确保任务不会重复执行
        if (!compareAndSetState(READY, RUNNING))
            return;

        // 由于Future一般是异步执行,所以runner一般是线程池里的线程。
        runner = Thread.currentThread();

        // 设置执行线程后再次检查,在执行前检查是否被异步取消
        // 由于前面的CAS已把状态设置RUNNING,
        if (getState() == RUNNING) { // recheck after setting thread
            V result;
            //
            try {
                result = callable.call();
            } catch (Throwable ex) {
                // 捕获任务执行过程中抛出的所有异常
                setException(ex);
                return;
            }
            set(result);
        } else {
      // 释放等待的线程
            releaseShared(0); // cancel
        }
    }

    // 设置结果
    void innerSet(V v) {
        // 放在循环里进行是为了失败后重试。
        for (;;) {
            // AQS初始化时,状态值默认是 0,对应这里也就是 READY 状态。
            int s = getState();

            // 已完成任务不能设置结果
            if (s == RAN)
                return;

            // 已取消 的任务不能设置结果
            if (s == CANCELLED) {
                // releaseShared 会设置runner为空,
                // 这是考虑到与其他的取消请求线程 竞争中断 runner
                releaseShared(0);
                return;
            }

            // 先设置已完成,免得多次设置
            if (compareAndSetState(s, RAN)) {
                result = v;
                releaseShared(0); // 此方法会更新 runner,保证result的可见性
                done();
                return;
            }
        }
    }

    // 获取异步计算的结果
    V innerGet() throws InterruptedException, ExecutionException {
        acquireSharedInterruptibly(0);// 获取共享,如果没有完成则会阻塞。

        // 检查是否被取消
        if (getState() == CANCELLED)
            throw new CancellationException();

        // 异步计算过程中出现异常
        if (exception != null)
            throw new ExecutionException(exception);

        return result;
    }

    // 取消执行任务
    boolean innerCancel( boolean mayInterruptIfRunning) {
        for (;;) {
            int s = getState();

            // 已完成或已取消的任务不能再次取消
            if (ranOrCancelled(s))
                return false;

            // 任务处于 READY 或 RUNNING
            if (compareAndSetState(s, CANCELLED))
                break;
        }
        // 任务取消后,中断执行线程
        if (mayInterruptIfRunning) {
            Thread r = runner;
            if (r != null)
                r.interrupt();
        }
        releaseShared(0); // 释放等待的访问结果的线程
        done();
        return true;
    }

    /**
     * 检查任务是否处于完成或取消状态
     */
    private boolean ranOrCancelled( int state) {
        return (state & (RAN | CANCELLED)) != 0;
    }

     // 其他方法省略
}

从 innerCancel 方法可知,取消操作只是改变了任务对象的状态并可能会中断执行线程。如果任务的逻辑代码没有响应中断,则会一直异步执行直到完成,只是最终的执行结果不会被通过get方法返回,计算资源的开销仍然是存在的。

总的来说,Future 是线程间协调的一种工具。


欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据