preface

Before diving into the source code, let’s take a look at what FutureTask actually does. As the name suggests, FutureTask consists of two parts: Future and Task. FutureTask implements the RunnableFuture interface, which in turn inherits both Runnable and Future interfaces, so FutureTask has both Runnable to perform tasks and Futrue to retrieve results. State, queue, and CAS are the three axes of Java concurrency tools, such as AQS, Condition, FutureTask, and others are implemented on this basis.

state

In FutrueTask, the state is represented by the state attribute, which is modified by Valatile, indicating that the modified result is immediately visible to other threads and reordering is not allowed.

private volatile int state; / / state
private static final int NEW          = 0; // Create a state
private static final int COMPLETING   = 1; // Set the result
private static final int NORMAL       = 2; // Normal end
private static final int EXCEPTIONAL  = 3; / / exception
private static final int CANCELLED    = 4; / / cancel
private static final int INTERRUPTING = 5; // Set interrupt
private static final int INTERRUPTED  = 6; / / has been interrupted
Copy the code

State is the core attribute running through the whole FutureTask. The value of the attribute represents different execution states. As the task is executed, the states change constantly. FutureTask defines seven states: 1 initial state, 2 intermediate states, and 3 final states.

Although there are so many states, there are only four paths to state transition:The meaning of each state has been commented out in the code above and will not be explained here.

The queue

The FutureTask queue is a one-way list based on the Treeber Stack implementation, and all threads waiting for results are eventually added to the Treeber Stack. If you’re not familiar with the Treeber Stack, you can see it here.

 static final class WaitNode {
    volatile Thread thread; // Corresponding thread
    volatile WaitNode next; // Next node
    WaitNode() { thread = Thread.currentThread(); } // Sets the current thread of execution
}
Copy the code

There is a waiters attribute in FutureTask that points to the top node of the stack.

private volatile WaitNode waiters; // The Treiber stack points to the top node of the stack
Copy the code

Queue structure:

Source code analysis

The main properties

/** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED * /
private volatile int state; / / state
private static final int NEW          = 0; // Create a state
private static final int COMPLETING   = 1; // Set the result
private static final int NORMAL       = 2; // Normal end
private static final int EXCEPTIONAL  = 3; / / exception
private static final int CANCELLED    = 4; / / cancel
private static final int INTERRUPTING = 5; // Set interrupt
private static final int INTERRUPTED  = 6; / / has been interrupted

private Callable<V> callable; / / task

private Object outcome; // non-volatile, protected by state reads/writes //

private volatile Thread runner; // The thread that executes the task

private volatile WaitNode waiters; // The Treiber stack points to the top node of the stack

private static final sun.misc.Unsafe UNSAFE; / / magic class
private static final long stateOffset; // State offset
private static final long runnerOffset; // The offset of the task execution thread
private static final long waitersOffset; // The offset to the top node
Copy the code

The main method

FutureTask implements RunnableFuture, which in turn inherits Runnable and Future, so FutureTask also implements Runnable indirectly.

run

public void run(a) {
    if(state ! = NEW ||// If state is not the initial state or the current thread of execution fails to be updated, the task can only be executed by one thread! UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread())) // Atomic update executes the task thread
        return;
    try {
        Callable<V> c = callable; / / task
        if(c ! =null && state == NEW) { // The task is not null and the execution state is initial, again indicating that a task can have only one thread to execute
            V result; // Save the execution result
            boolean ran; // Check whether the task is successfully completed
            try {
                result = c.call(); // The call method to execute the task
                ran = true;
            } catch (Throwable ex) { // An exception occurred when executing the business code
                result = null;
                ran = false;
                setException(ex); // Set the exception thrown
            }
            if (ran)
            /** * If cancle (false) is CANCELLED while the task is executing, the business will execute normally * only CANCELLED when the assignment fails because state has changed from "NEW" to "CANCELLED" */
                set(result); // The task is complete}}finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        PutOrderedInt () {putOrderedInt () {putOrderedInt (); * Reasons for not operating with CAS -- We cannot be sure whether we are in a COMPLETING or INTERRUPTING state before setting state. * /
        int s = state;
        if (s >= INTERRUPTING) // If the state is intermediate, wait to become final
            handlePossibleCancellationInterrupt(s); // Handle interrupts}}protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // If the thread is updated from the "initial" state atom to "finished"
        outcome = v; // Assign the execution result
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // Final state is set to complete
        finishCompletion(); // The loop wakes up all threads waiting for the result of the task}}protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // If the thread is updated from "initial" to "Finished"
        outcome = t; // Assign an abnormal result
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion(); // The loop wakes up all threads waiting for the result of the task}}Copy the code

Internally, the run method is relatively simple.

  1. If the state is initial and no other thread has executed it
  2. At the end of NORMAL execution, call the set method, assign the result to the outcome attribute, update state to “NORMAL”, and call finishCompletion method to wake up the waiting thread. 2.2 An exception occurs during the execution of the business code. Call setException, assign the exception to the outcome attribute, update state to “EXCEPTIONAL”, and call the finishCompletion method to wake up the waiting thread
  3. The finishing touches, the waiting state becomes the final state

finishCompletion

private void finishCompletion(a) {
    // assert state > COMPLETING;
    for(WaitNode q; (q = waiters) ! =null;) { // Start at the top of the stack
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // Clear the Treiber stack
            for (;;) { // Loop to wake up all threads waiting on the task
                Thread t = q.thread;
                if(t ! =null) {
                    q.thread = null;
                    LockSupport.unpark(t); // Wake up the waiting thread
                }
                WaitNode next = q.next;
                if (next == null) // Exit the loop if you have reached the end of the stack
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}
Copy the code

The job of this method is to wake up the thread waiting for the result

get

public V get(a) throws InterruptedException, ExecutionException {
    int s = state; / / state
    if (s <= COMPLETING) // If it is in the initial state or in the result of setting, the queue is added to wait
        s = awaitDone(false.0L);
    return report(s);
}

private V report(int s) throws ExecutionException {
    Object x = outcome; / / the result
    if (s == NORMAL) // Normal end status
        return (V)x;
    if (s >= CANCELLED) // If the call is cancelled or interrupted, there is no intermediate state because either the result is returned normally, or an exception occurs when the call is executed or
        // Other threads cancel the task and wait for updates to the final state before waking up the thread waiting for the result
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}
Copy the code

The awaitDone method is used to wait for the task to terminate while the task is not finished or the result is being set. Note that the return value of awaitDone is the state of the task, not the result. After the task enters the terminating state, we either return the result of the calculation or throw an exception depending on the execution result of the task. Let’s start with the awaitDone method, which is the core method for fetching the result, suspending the thread, responding to an interrupt, and so on:

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L; // If the timeout period is set, calculate how long it will take to time out
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) { // If interrupted, the node is deleted
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) { // The task is complete
            if(q ! =null) // If a holiday point has been created
                q.thread = null;
            return s; // Return status
        }
        else if (s == COMPLETING) // cannot time out Yet If the state is in the set result, the CPU is discarded
            Thread.yield();
        else if (q == null)
            q = new WaitNode(); // Create a waiting node
        else if(! queued)/* Here is the Treeber stack used to update the new node atom to the top node */
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) { // Set timeout
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) { // Delete the node
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos); // Suspend over time
        }
        else
            LockSupport.park(this); // Suspend the thread}}Copy the code

cancel

/* Cancel a task: The task cannot be cancelled in several cases: 1: the task has been completed; 2: the task has been cancelled; 3: the task cannot be cancelled in other cases */
public boolean cancel(boolean mayInterruptIfRunning) {
    if(! (state == NEW &&// If the task is already complete, it cannot be cancelled
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) // If a task has already been completed or cancelled, it cannot be cancelled again
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) { / / the interrupt
            try {
                Thread t = runner; // The thread that executes the task
                if(t ! =null)
                /** * interrupts the thread executing the task. This interrupt only sends an interrupt signal, depending on whether the task code supports thread interrupt operation */
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // Set it to interrupted}}}finally {
        finishCompletion(); // Wake up the thread in the waiting queue
    }
    return true; // Return true, does not represent a true break
}
Copy the code

conclusion

FutureTask implements the Runnable and Future interfaces. It represents a task with a task state and a task result, and its various operations are carried out around the task state. It is worth noting that in all seven task states, any state that is not NEW indicates that the task has completed or is no longer executed. It does not indicate a task in progress status.

In addition to the Callable object representing the task and the outcome attribute representing the task’s execution, FutureTask also contains a Treiber stack representing all threads waiting for the task to end. This is very similar to the wait queue for various locks, i.e. if the lock is not obtained, the current thread will be thrown into the wait queue. In this case, if the task is not finished, all threads waiting for the completion of the task will be thrown into the Treiber stack and will not be woken up until the completion of the task.

While FutureTask provides us with a way to get the result of a task’s execution, unfortunately, the current thread spins or hangs and waits if the task has not completed its execution.

Reference: blog.csdn.net/qq_42499188… Segmentfault.com/a/119000001…