Introduction to FutureTask usage

FutureTask is an implementation of the JDK and packages for the Future interface, representing an asynchronous computing task that supports cancellation (cancel). It implements the Future and Runnable interfaces, so it is both a computational task object and a result object. It can be submitted to the thread pool for execution, and the result is placed directly in its own FutureTask, not in another Future. The way we submit a FutureTask to a thread pool to run is by implementing a Callable or Runnable and then passing it to the corresponding FutureTask constructor to create a FutureTask, This FutureTask is then submitted to executor.execution (). The code is as follows:

FutureTask<String> future = new FutureTask<String>( new Callable<String>() { public String call() { return searcher.search(target); }}); executor.execute(future);Copy the code

We use the futureTask.get () method to get the result: if get() is called before the calculation is finished, the current thread blocks; If the calculation is complete, get() immediately returns the result, as shown below; If the calculation fails and an exception is thrown, get() does not block and throws the exception directly (the exception information is encapsulated in the syntax clause for this exception). If cancel() is called followed by get(), a CancellationException is thrown. When you call get(), remember to catch exceptions. FutureTask cannot re-evaluate (reuse) after it has been evaluated, so to recalculate, either call runAndReset() or create a new FutureTask.

The realization of the FutureTask

FutureTask implementation is based on the old version of AbstractQueuedSynchronizer (see article), so there will be a disadvantage, when multiple threads to perform the cancel the same FutureTask, FutureTask will keep the interrupted status, Make the caller feel strange. Therefore, the new implementation of FutureTask is modified to use the state field +CAS operation for synchronization control and a simplified version of the Treiber stack to hold waiting threads. Although FutureTask does not use AQS, its implementation is very similar to AQS.

1. Use a volatile int state as the synchronization state

AQS also has state, but AQS does not use what is saved in state, just performs CAS on it to see if it succeeds. Unlike AQS, FutureTask needs the contents of the state variable. There are seven possible values of state, as follows:

  • NEW = 0; Initial state, where FutureTask has just been created and is being evaluated.
  • COMPLETING = 1; Intermediate state, indicating that the calculation has completed and the result is being assigned or an exception is being handled
  • NORMAL = 2; Terminating state, indicating that the calculation has completed and the result has been assigned.
  • EXCEPTIONAL = 3; Terminate status: indicates that the calculation process has been abnormally interrupted.
  • CANCELLED = 4; Terminate status, indicating that the calculation has been terminated by the Cancel operation.
  • INTERRUPTING = 5; Intermediate state: indicates that the computation process has started and is interrupted, and the state is being modified.
  • INTERRUPTED = 6; Termination status, indicating that the computation process has started and is interrupted, and is now completely stopped.

In FutureTask, state is only changed to a terminated state (NORMAL, EXCEPTIONAL, CANCELED, INTERRUPTED) by set(), setException(), cancel(). Note that the ** state variable is declared volatile, ensuring visibility not only for itself but also for other member attributes of the FutureTask class. ** Assignment to other variables should precede assignment to state, and access to other variables should precede assignment to state. For example,

public FutureTask(Callable<V> callable) {
 if (callable == null)
 throw new NullPointerException();
 this.callable = callable;
 this.state = NEW; // ensure visibility of callable
 }
Copy the code

In the FutureTask constructor, the Callable variable becomes visible to other threads after assigning to the state variable declared volatile.

2. Use a Treiber stack to hold the wait thread

The Treiber stack is a lockless data structure, and the waiters’ variables in FutureTask point to the top of the stack. The name is a little scary, but it’s very simple, a lock free thread-safe stack. The push operation is implemented through a one-step CAS operation, that is, modifying the pointer at the top of the stack. Unloading the stack and performing a deletion in the middle of the stack are done through specific loops.

3. Cancel the implementation of operation Cancel ()

The cancel operation first determines whether state is equal to NEW, that is, whether the computational thread (runner) has started. If execution does not begin, the CAS operation immediately changes the status to CANCELED or INTERRUPTING, preventing the Runner from starting the computation. After determining the state, the state is then modified with compareAndSwap rather than overridden assignment, considering that between determining the state and modifying the state, the state may be modified by other threads. By the way, the state judgment is followed by the CAS operation, which allows two atomic operations to be combined into one atomic operation,

state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)
Copy the code

If the status is no longer NEW, the runner has started and decides whether to send an interrupt to the runner based on mayInterruptIfRunning. Next, all the waiting threads in the Treiber stack are woken up and removed. The wake action here allows for multiple threads to call cancel() concurrently, so a loop CAS operation is designed to compete with the Treiber stack operation to ensure that only one thread can enter the inner loop, as follows:

private void finishCompletion() { for (WaitNode q; (q = waiters) ! = null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t ! = null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; }Copy the code

Modifying a data structure based on cyclic CAS operation is a common practice to implement lockless data structure.

4. Implementation of wait operation GET () in wait thread

The waiting thread starts FutureTask execution and calls GET () when it must access the results of its calculations. If the calculation result is already available at this point, get() will not block and will directly return the demerits saved in the outcome variable. If the calculation is not complete, a wait loop is executed. This wait loop is similar to the loop in the AQS lock operation, but differs in that it manages a wait thread. The loop continuously checks for interrupt status and state: exit if an interrupt is encountered; Exit if state indicates the end of calculation; If state= 实 时, spin; If it has timed out, exit. Other cases are blocked. The blocking operation of futureTask.get () is implemented by this loop, as shown in the code for the get() and awaitDone() methods.

5. Calculate the implementation of the run() method called by the thread

Computational threads, such as workers in thread pools, start performing computational work by calling futureTask.run (). See the code for FutureTask.run as follows (it is recommended to read against the code).

public void run() { if (state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c ! = null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}Copy the code

Before actually calculating, check state to see if it has been cancelled (); The Runner field is then competitively assigned with the CAS operation to prevent multiple threads from entering run(). Stop if the previous two steps fail, otherwise run() in Callable or Runnable is executed. COMPLETING the run() calculation, the state is changed to an intermediate state (COMPLETING), the outcome is assigned, and the state is changed to the termination state NORMAL or EXCEPTIONAL with a delay write assignment. C) COMPLETING those years well, and then COMPLETING COMPLETED. Because modifying state and writing the outcome to the outcome variable are two steps, there may be other operations in between, so define an intermediate state COMPLETING state, which tells you that the result has been calculated, but we cannot currently read from the outcome. We can’t read the outcome until it’s NORMAL. This is a lock free design. Finally, all waiting threads are woken up after successful assignment to the outcome variable. That’s the implementation of FutureTask’s run() method. Futuretask.runandreset is substantially similar to futureTask.run, except that runAndReset takes the result from somewhere else and the state reverts to NEW after the calculation, allowing FutureTask to re-execute.