Introduction to the

FutureTask is an asynchronous task wrapper class that supports cancellation. That is, when FutureTask is executed, it does not return the result immediately. It can get the result asynchronously by calling the GET method or cancel the task halfway. And it’s important to remember that FutureTask is just a wrapper class for a task, not a real task class.

FutureTask implements the RunnableFuture interface, which inherits the Runnable, Future interface.

implementation

Let’s take a look at the implementation of FutureTask.

FutureTask state

Because FutureTask is just a wrapper class for task execution, it must need a lot of states to maintain the state of task execution. Otherwise, how can it cancel and get? Let’s see.

private volatile int state;
private static final int NEW          = 0; // Create task status
private static final int COMPLETING   = 1; // I would call it ongoing
private static final int NORMAL       = 2; // The execution is complete
private static final int EXCEPTIONAL  = 3; // When an exception occurs
private static final int CANCELLED    = 4; // Task cancelled
private static final int INTERRUPTING = 5; / / the interrupt
private static final int INTERRUPTED  = 6; // Interrupt complete
Copy the code

As described in the code comments, there are four possibilities for this state to occur:

process meaning
NEW -> COMPLETING -> NORMAL Normal execution, from start to finish
NEW -> COMPLETING -> EXCEPTIONAL An exception occurred during execution. Procedure
NEW -> CANCELLED Mission canceled
NEW -> INTERRUPTING -> INTERRUPTED Task interrupted

FutureTask constructor

FutureTask is constructed in two main ways, similar to thread pools, for different task types

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
Copy the code

Simply wrap runable as a Callble object and return the result that was passed in.

Run method

There are two methods of execution in FutureTask, the run method and the runAndReset method. Let’s look at the implementation of the Run method.

public void run(a) {
   // Tasks that are not in the NEW state cannot be executed
    if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
       // Check again
        if(c ! =null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call(); / / execution
                ran = true; // Set the status to true in normal state
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex); // The status changes when an exception occurs
            }
            if (ran) // In normal caseset(result); }}finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING) // If an interrupt occurshandlePossibleCancellationInterrupt(s); }}Copy the code

The logic of the method is very simple, just three things

1. First of all, determine whether the task is executable, if not, then end, otherwise

2. Perform the task. If an exception occurs, set the logical status

3. Finally, set the status before returning. If the status is interrupted, set the update status.

Exception handling

Next we need to look at handling the exception state.

protected void setException(Throwable t) {
   // Try setting to the COMPLETING state
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t; Outcome is set to the exception object
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // The final statefinishCompletion(); }}Copy the code

This satisfies the state change process we introduced in the above table in the abnormal state. After the exception occurs, you also need to execute the finishCompletion method. The main purpose of finishCompletion is to wake up all the threads waiting for the result, so we’ll leave that to the get method.

The end of the normal

Now let’s take a look at what the set method does, which should be easy to figure out based on what we described in the table earlier.

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion(); }}Copy the code

The set method logic is similar to setException logic, except that they set different final status and outcome values. I won’t go into that.

A disruption

Let’s move on to the methods in finally in the run method

private void handlePossibleCancellationInterrupt(int s) {
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); 
}
Copy the code

The simple thing to do here is to release the lock if an INTERRUPTED event occurs and retry until the status changes to INTERRUPTED.

RunAndReset method

Let’s look at the differences between the runAndReset method and the run method.

protected boolean runAndReset(a) {
    if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if(c ! =null && s == NEW) {
            try {
                c.call(); // don't set result
                ran = true;
            } catch(Throwable ex) { setException(ex); }}}finally {
        runner = null;
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}
Copy the code

It is easy to see that runAndReset does not change state after normal execution, which would make it impossible to retrieve the results of the program under normal circumstances. This is also done because tasks are reusable, because this method is used for periodic scheduling. So it doesn’t change the state, it doesn’t set the value. The concrete embodiment of we can then ScheduleThreadPoolExecutor specific view.

Cancel method

So we talked about the state change when we interrupt, but we didn’t talk about how it happens, so let’s see, let’s just say that the interrupt and cancel methods actually use the same method, but the state values are different. Okay

public boolean cancel(boolean mayInterruptIfRunning) {
   //mayInterruptIfRunning true interrupts, false cancels
   // If an interrupt or cancellation is performed in the NEW state, skip it; otherwise, end it directly
    if(! (state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {  
       // Interrupt case, set thread interrupt
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if(t ! =null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); }}}finally {
        finishCompletion();
    }
    return true;
}
Copy the code

As you can see, Cancel has both cancel and interrupt functions,

1. If the task is still in the NEW state and the state is changed successfully, it indicates that the task cannot be executed. Set the thread state, if it is not in the NEW state or fails to modify the state, the method is terminated directly.

2. If the task is not met, it will judge whether the task is interrupted. If it is interrupted, the state of the thread will also be set to interrupt, and change the final state.

3. All waiting threads are released eventually (more on that later).

The get method

There are two main get methods: one is to wait all the time, and the other is to set the timeout time.

public V get(a) throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false.0L);
    return report(s);
}
private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
Copy the code

And if the state of the task is less well spent, the task is not done, accident or no accident. In this case, the thread that gets the result should be added to the list of waiting results. If it is completed, it will get the result directly. It is very simple and not much described.

Let’s move on to the get method with timeout

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}
Copy the code

It’s the same regardless of the awaitDone method, so let’s just look at the awaitDone method.

AwaitDone method

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
           // If the thread is interrupted, remove it
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
           // Check the status again
            if (s > COMPLETING) {
                if(q ! =null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // If your status has changed, you will not be able to join the team
                Thread.yield();
            else if (q == null) // Still in NEW state,
                q = new WaitNode();
            else if(! queued)/ / team
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q); // Set to queue header
            else if (timed) { // If the timeout period is set
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q); // Remove the waiting thread
                    return state;
                }
                LockSupport.parkNanos(this, nanos); // Otherwise suspend for the specified time
            }
            else
                LockSupport.park(this); }}Copy the code

Well, this is a bit more complicated, so let’s summarize it briefly

1. Check whether the thread is interrupted. If the thread is interrupted, end the wait. Otherwise,

2. Judge whether the state of those patients is COMPLETING well, if it is greater than that, it is time to get the value. Returns directly if not

3. In judging whether for COMPLETING, also shows that the execution is carried out, now in a state of change, can immediately take value, so give up time, next time to determine. If not

4. Otherwise, the thread will join the queue if it does not join the queue. Otherwise, if the timeout period is set, the thread will be removed if the timeout period is set; otherwise, the thread will be suspended for a specified time.

With that said, many of the methods that call the finishCompletion method all require the release of the thread waiting for the result, so let’s take a look at the logic behind that.

FinishCompletion method

The logic of the implementation is pretty simple,

private void finishCompletion(a) {
        // Process the list of waiting threads
        for(WaitNode q; (q = waiters) ! =null;) {
            // Clear the waiting thread
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                   // Unlocks the waiting thread
                    Thread t = q.thread;
                    if(t ! =null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done(); // Save the extension, do nothing
        callable = null;        // to reduce footprint
    }
Copy the code

The logic here is very simple: wake up all waiting threads and suspend them if they haven’t finished processing.

conclusion

This article clarified the functionality of FutureTask in general from the run, GET, cancel, and other methods involved.

1.FutureTask is just a wrapper class for the task, not the actual execution logic.

2. It is important to understand several state values of FutureTask.

3. Only tasks in the NEW state can be executed. The run method normally changes the value of state, but runAndReset does not, because the scenarios of the two methods are different.

4. RunAndReset Schedules tasks. If abnormal tasks occur, subsequent scheduling is terminated.

Currently FutureTask is primarily used in thread pools for asynchronous fetching of execution results and scheduling of thread pools.

The last

This article is personal understanding and collation, if there is any logical omissions or improper expression, welcome to ridicule feedback!!

With your mutual encouragement!!