preface

Earlier, I shared an article about asynchronous programming of FutureTask to obtain the execution result of the thread, and obtained various postures of the execution result of the asynchronous thread. At that time, I was thinking about the relevant implementation principle of FutureTask, which should not be too complicated. These two days, I took a look at the source code. I thought it was simple and clear and helpful to understand the idea of multi-threading under Java JUC, so I shared my understanding of this piece. This article mainly includes the analysis of basic attributes and key methods of FutureTask.

The body of the

FutureTask use

For the use of FutureTask, a non-thread pool demo and the corresponding FutureTask class diagram are as follows:

public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() { @Override public Integer call() throws Exception { return 1; }}); Thread thread = new Thread(futureTask); thread.start(); Integer result = futureTask.get(); Result = futureTask.get (10, timeunit.seconds); Futuretask.cancel (true); futureTask.cancel (true); // Cancel the task system.out.println (futureTask.isdone ()); System.out.println(futureTask.isCancelled()); }Copy the code

As you can see from the class diagram, FutureTask implements the Runnable interface, so it can be passed in as a Thread constructor parameter and execute tasks using either the Callable in the FutureTask constructor or the corresponding implementation method in the Runnable interface. Finally, the result of the execution can be obtained through the get method.

Before we dive into the source code, we can first think about how to design the implementation of FutureTask by ourselves. It is more efficient to study the source code with questions.

  1. First, the runnable interface is implemented to implement the run method on behalf of FutureTask, where the child Thread (thread.start ()) can execute the logic in the run method.
  2. Secondly, the get method can be used to obtain the results obstructed in the main thread, and the timeout time can be set.
  3. You can cancel a task and view its status.

From the above we can probably guess at the following main implementations:

  1. FutureTask overrides the run method with a call to its own constructor, callable, which creates threads to execute tasks using the underlying capabilities of the operating system.
  2. Secondly, FutureTask maintains various states of task execution internally, and the state switching needs to ensure thread safety.
  3. Finally, the ordinary GET method and the overloaded GET method with timeout parameter are implemented. When the task is not completed, the get function of the main thread will block the request result all the time. When the task is completed, the corresponding task state is modified, and the returned data is put into an Object Object. Get and return the corresponding object directly.

FutureTask Basic properties

Then we verify our conjecture and the real implementation in FutureTask by looking at the basic properties of FutureTask as follows:

// Current task execution status private volatile int state; Private Callable<V> Callable; // Task execution result of type object private object outcome; Private volatile Thread runner; // The main thread blocking when calling the get method is of waitNode type private volatile Waitnode waiters;Copy the code

So basically consistent with the guess, FutureTask in order to achieve task scheduling, view task status, obtain execution results and other functions, internal fields include task status, task itself, execution results, thread, etc. The main thread calls the get method and blocks the result of the request. My design was to hang the main thread polling for the result, but I didn’t realize that this would be a huge waste of CPU resources. WaitNode is a data structure that encapsulates the waiting thread and wakes it up, more on that later. Here’s a look at these key attributes:

Basic attribute of task status: state

The state field is volatile to ensure visibility in multi-threaded queries. As you can see from the comments, each task starts with the NEW state. It may end in NORMAL, EXCEPTIONAL, CANCELLED, or INTERRUPTED.

While COMPLETING the status of the task has been completed, although the literal meaning but also need to fill the task execution field outcome as a result, therefore can be understood as from COMPLETING to NORMAL or EXCEPTIONAL status changes is very short.

/* * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED  */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;Copy the code

The main thread blocks WaitNode, the wait-wrapped object of GET

WaitNode exists in FutureTask in the form of an inner class as shown below, mainly to optimize the problem of get blocking the main thread. If there is no WaitNode, the main thread will always poll and retry. If the sub-task takes a long time to execute or there are too many main threads getting get results, This is bound to consume a lot of CPU resources, so introduce WaitNode as a linked list or queue of all threads waiting for results. Once a thread executes the get method to get the result, and the task is not complete, a new WaitNode needs to be added to the wait queue. All threads in the waiting queue are notified of the completion of subsequent tasks. The WaitNode data structure is also simple, with the current thread object and the next pointer to the waiting list.

static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); }}Copy the code

Key method logic for FutureTask

The FutureTask method is also relatively simple, in which the run method to execute the task and the get method to get the result of the task are relatively core logic.

The constructor

From the constructor perspective, FutureTask supports callable objects as well as runnable objects with no return value. The runnable is converted to Callable objects by passing generic parameters to define the result of the task execution.

Public FutureTask(Callable<V> Callable) {if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable} // Also supports the runnable object constructor, Public FutureTask(Runnable Runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }Copy the code

Run method

FutureTask implements the Runnable interface and overwrites the corresponding run method. Therefore, when Thread starts, the operating system enables another Thread to call the run method. Therefore, the run method is the main logic for FutureTask to execute the task. The code is as follows:

public void run() { if (state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c ! = null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}Copy the code

The source code is relatively simple, many of which make use of the cas capability at the bottom of the operating system to change the task state and WaitNode. The basic process is to switch the worker thread runner, start to execute the task, change the task state, put in the execution result, wake up the waiting thread with waitNode linked list after the task is executed, and finally wait for the completion of the interrupted state if there is any.

The get method

The main thread get obtains the task execution result interface, determines the task status, and then selects whether to enter the awaitDone method based on the task status.

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
Copy the code

The main logic of the GET method is encapsulated in private int awaitDone(Boolean timed, long nanos), and awaitDone timed parameter indicates whether to set the get method to timeout. In the whole method, different states of the current task are judged constantly, so as to perform corresponding operations on the main thread of the current get result.

  1. Completed: Returns the corresponding data directly.
  2. Completing: Thread.yield () spins wait state to complete;
  3. Set timeout time and not complete, the current thread suspends the corresponding time, subsequent wake up to continue to execute the loop;
  4. Other scenarios: Suspend the thread directly, join waitNode, wake up the main thread through the WaitNode linked list, continue the loop, and walk to scenarios 1 and 2.

Cancel method

The cancel method is relatively simple. The internal logic is mainly to change the task state, interrupt the runner thread currently executing the task, and finally execute the logic similar to the “post-execution” in the run method above to complete the interruption of the task thread.

WaitNode Removes the waiting node method

WaitNode is a wait queue built for a thread whose GET method blocks, and removeWaiter may be required to remove the current thread in timeout scenarios.

private void removeWaiter(WaitNode node) { if (node ! = null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q ! = null; q = s) { s = q.next; if (q.thread ! = null) pred = q; else if (pred ! = null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (! UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; }}}Copy the code

In a multi-threaded environment, the algorithm to remove a node in the linked list is premarked with Node. thread == NULL, and the node whose thread is null in the queue is the node to be removed. When locating nodes as nodes to be removed, there are two scenarios: To prevent concurrency, why not use CAS when it is not a head node? 1. The Node to be removed is a head node. If it is a head node, use CAS to assign the second node object to the waiters. 2. The node to be removed is not a head node. Use the prefix pre node to point to the subsequent node.The reason why removeWaiter is not a thread-safe method to remove a node is that it is possible to have concurrent node initialization operations with thread=null. So you need to iterate over the queue again before breaking.

conclusion

FutureTask source code analysis is finished, compared to other utility classes are relatively simple logic and attributes, there are also many details, such as volatile modification of various attributes to ensure that multiple threads are visible, volatile+ CAS thread-safe operation, constructor compatibility, etc. Considering the two scenarios in which WaitNode is introduced to waste CPU resources when multiple main threads get block and wait, and queue removal wait thread nodes deal with concurrency, the above are all worth learning from multithreaded programming ideas.

WaitNode is also commonly used in other juC tool classes, such as AQS synchronization queue, SYchronized CXQ queue, waitSet and other data structures, to reduce unnecessary waiting and consumption of computing resources when threads acquire resources. Using the strategy of entering the wait queue and waking up when the resource is ready (unpark), the ideas are all interconnected.

reference

Meituan technology dynamic thread pool