One of the implementation cores of thread pools is FutureTask. When submitting a task, the user-implemented Callable instance Task is wrapped as the FutureTask instance FTask; After submission, the task is executed asynchronously and users do not need to care about it. Call FutureTask#get() to get the result — or exception — when the user needs it.

The question then is, how does ** elegantly retrieve fTask results and handle exceptions? ** This article discusses the proper posture for using FutureTask.

JDK version: Oracle Java 1.8.0_102

Change your style today.

Source code analysis

Start by submitting a Callable instance task.

submit()

ThreadPoolExecutor directly inherits the AbstractExecutorService implementation.

public abstract class AbstractExecutorService implements ExecutorService {...public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        returnftask; }... }Copy the code

Subsequent process can reference source | from serial closed to the object pool threads, thread pool. Task.run () will eventually be executed in Threadpoolor# runWorker().

Task is the fTask created in line 5, see newTaskFor().

newTaskFor()

AbstractExecutorService#newTaskFor() create a FutureTask of type RunnableFuture.

public abstract class AbstractExecutorService implements ExecutorService {...protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return newFutureTask<T>(callable); }... }Copy the code

Look at the implementation of FutureTask.

FutureTask

public class FutureTask<V> implements RunnableFuture<V> {...private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6; .public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable}... }Copy the code

The constructor focuses on initializing the fTask state to NEW.

The state machine

The state conversion is less, and the state sequence is directly given:

* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
Copy the code

States are useful later.

run()

Simplified as follows:

public class FutureTask<V> implements RunnableFuture<V> {...public void run(a) {
        try {
            Callable<V> c = callable;
            if(c ! =null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if(ran) set(result); }}finally {
            runner = null;
            int s = state;
            if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}... }Copy the code

If no exception is thrown at execution time

If no exception is thrown, ran==true and FutureTask#set() sets the result.

public class FutureTask<V> implements RunnableFuture<V> {...protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion(); }}... }Copy the code
  • Outcome saves result in outcome
  • Set the state to NORMAL in two consecutive steps
  • FinishCompletion () performs some cleanup

Remember that outcome.

This is equivalent to lines 4 acquiring the exclusive lock, and lines 5-6 performing operations in the lock (note that lines 7 are unlocked).

If an exception is thrown during execution

If the runtime throws an exception, it is caught by a 12-line catch, FutureTask#setException() sets the result; Also, ran==false, so FutureTask#set() is not executed.

public class FutureTask<V> implements RunnableFuture<V> {...protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion(); }}... }Copy the code
  • Exception T is preserved in outcome
  • Set the state to EXCEPTIONAL in two consecutive steps
  • FinishCompletion () performs some cleanup

If no exception is thrown, outcome records normal results. If an exception is thrown, Outcome logs the exception.

It is reasonable to use the same variable outcome record if both normal and abnormal outcomes are considered to be “outputs of the task”; Meanwhile, different end states are used to distinguish the contents recorded in outcome.

The run () summary

FutureTask encapsulates user-implemented tasks as FTasks, and uses state machines and outcome to manage the execution process of FTasks. These procedures are invisible to the user until the user invokes the get() method.

You learned how Callable instances are executed and why the Callable# Call () method can be implemented with checked exceptions thrown outside (whereas the Runable#run() method must be handled inside the method and cannot be thrown).

get()

public class FutureTask<V> implements RunnableFuture<V> {...public V get(a) throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false.0L);
        returnreport(s); }... }Copy the code
  • Line 5 uses the actual values defining the status to determine whether fTask has completed. If it has not (NEW, COMPLETING), wait until it completes, and exits with InterruptedException.
  • After fTask completes, call report() to report the end status.

5 lines are unreadable and rejected.

report()

public class FutureTask<V> implements RunnableFuture<V> {...private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw newExecutionException((Throwable)x); }... }Copy the code
  • If the end state is NORMAL, the outcome saves the NORMAL result, the generic is strong, and the outcome is returned.
  • Line 7 uses the actual value of the defined status to determine whether fTask has been CANCELLED (CANCELLED, INTERRUPTED, INTERRUPTED) and if so, throws a CancellationException.
  • If it is either canceled or the Task itself throws an exception during execution, the outcome saves the exception T and the wrapper returns ExecutionException.

Wrap exception T as the cause of ExecutionException. Do you really read Java exception messages? .

CancellationException and ExecutionException

  • CancellationException is a non-checked exception. In principle, you can not handle it, but it is still recommended to handle it.
  • ExecutionException is checked and must be handled in the outer layer.

Source summary

  • Callable#.call() can be implemented to throw checked exceptions out.
  • Catch ExecutionException when FutureTask#get(), regardless of whether a checked exception was thrown when implementing Callable#.Call (); It is recommended to catch a CancellationException.
  • The blocking method is called in FutureTask#get(), so you also need to catch InterruptedException.
  • CancellationException does not give the reason for the cancellation, including whether it was interrupted.
  • The project recommends using a timeout version of FutureTask#get(), which throws a TimeoutException and needs to be handled.

Consider the API declaration for Future#get() :

public interface Future<V> {.../**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    V get(a) throws InterruptedException, ExecutionException; . }Copy the code

Right.

A proper posture

Give a more comprehensive correct posture, only for reference.

int timeoutSec = 30;
try {
  MyResult result = ftask.get(timeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
  Throwable t = e.getCause();
  // handle some checked exceptions
  if (t instantanceof IOExcaption) {
    xxx;
  } else if(...). { xxx; }else { // handle remained checked exceptions and unchecked exceptions
    throw new RuntimeException("xxx", t); }}catch (CancellationException e) {
  xxx;
  throw new UnknownException(String.format("Task %s canceled unexpected", taskId));
} catch (TimeoutException e) {
  xxx;
  LOGGER.error(String.format("Timeout for %ds, trying to cancel task: %s", timeoutSec, taskId));
  ftask.cancel();
  LOGGER.debug(String.format("Succeed to cancel task: %s" % taskId));
} catch (InterruptedException e) {
  xxx;
}
Copy the code
  • Delete according to actual needs.
  • Monkeys like to add assert or throw UnknownException in place of comments that are semantically vague.
  • The handling of InterruptedException is left out (a rare exception used to control the flow that is somewhat obscure to monkeys) and is referred to as handling InterruptedException.

Nice change of style. It’s a lot faster.


This paper links: source | use FutureTask correct posture Author: monkey 007 reference: Monkeysayhi.github. IO This article is published under the Creative Commons Attribution – Share Alike 4.0 International License. You are welcome to republish, reproduce, or use it for commercial purposes.