• You have one thought, I have one thought, and when we exchange, one person has two thoughts

  • If you can NOT explain it simply, you do NOT understand it well enough

Now the Demo code and technical articles are organized together Github practice selection, convenient for everyone to read and view, this article is also included in this, feel good, please also Star🌟


preface

How many ways can you create a thread? The answer to this question is easy to blurt out

  • Thread class inheritance
  • Implement the Runnable interface

But these two ways to create threads are “three products” :

  • No parameters
  • No return value
  • No way to throw an exception
class MyThread implements Runnable{
   @Override
   public void run(a) {
      log.info("my thread");
   }
} Copy the code

The Runnable interface is a core artifact of JDK1.0

 / * * * @since   JDK1.0
* /
@FunctionalInterface
public interface Runnable {
 public abstract void run(a); } Copy the code

There are always some drawbacks to using a “three nothing product”, of which not being able to get a return value is one of the most annoying, and Callable was born

Callable

It’s Master Doug Lea again, and the magic of Java 1.5

 / * * * @see Executor
 * @since 1.5
 * @author Doug Lea
 * @param <V> the result type of method {@code call} * / @FunctionalInterface public interface Callable<V> {   V call(a) throws Exception; } Copy the code

Callable is a generic interface with a single call() method that returns the generic value V and is used like this:

Callable<String> callable = () -> {
    // Perform some computation
    Thread.sleep(2000);
    return "Return some result";
};
Copy the code

Both Runnable and Callable are functional interfaces that have only one method in them.

Runnable VS Callable

Both interfaces are designed for multithreading tasks, but there are significant differences between them


Enforcement mechanism

Runnable can be used in both the Thread class and the ExecutorService class with Thread pools. Bu~~~~t, Callable is available only from the ExecutorService. You cannot find Callable from the Thread class


Exception handling

There is no throws on the signature of the RUN method on the Runnable interface, so the checked exceptions cannot be propagated upward. However, Callable’s Call () method signature has throws, so it can handle checked exceptions.

So the main differences are as follows:


The overall difference is small, but the difference is significant

Returning values and handling exceptions is easy to understand, and in practice we often use thread pools to manage threads (the reason is already why? ExecutorService), so how do you use both

ExecutorService

Take a look at the ExecutorService class diagram first


I have put the method marked above in its own place here

void execute(Runnable command);

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<? > submit(Runnable task);Copy the code

As you can see, the execute() method of the ExecutorService still returns no value, while the Submit () method uniformly returns a Future value

The submit() method has a silly distinction between CountDownLatch and CyclicBarrier. It’s used several times in this article, but we didn’t get the return value, so

  • What exactly is Future?
  • How do I get the return value from that?

Let’s take these questions one by one

Future

The Future is an interface that contains only five methods:


You can already see what these methods do from their names

// Cancel the task
boolean cancel(boolean mayInterruptIfRunning);

// Obtain the task execution result
V get(a) throws InterruptedException, ExecutionException;
 // Get the execution result of the task with a timeout limit V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;  // Check whether the task has been canceled boolean isCancelled(a);  // Determine whether the task has ended boolean isDone(a);  Copy the code

So with all this stuff going on, you might be a little bit confused by this, but let’s just go through an example and show you how some of these things work

@Slf4j
public class FutureAndCallableExample {

   public static void main(String[] args) throws InterruptedException, ExecutionException {
      ExecutorService executorService = Executors.newSingleThreadExecutor();
  // Use Callable to get the return value  Callable<String> callable = () -> {  log.info("Call method into Callable");  // Simulate subthread task, sleep 2s here,  // Minor detail: Since the call method throws exceptions, there is no need to try/catch like the Runnable run method  Thread.sleep(5000);  return "Hello from Callable";  };   log.info("Submit Callable to thread pool");  Future<String> future = executorService.submit(callable);   log.info("Main thread continues execution");   log.info("Main thread waiting for Future result");  // Future.get() blocks until the result is available  String result = future.get();  log.info("Main thread gets Future result: {}", result);   executorService.shutdown();  } } Copy the code

The running results of the program are as follows:


If you run the example code above, the main thread calling the future.get() method will block itself until the subtask completes. We can also use the isDone method provided by the Future method, which can be used to check whether the task has completed. Let’s make some minor changes to the above procedure:

// If the child thread does not terminate, sleep 1s to re-check
while(! future.isDone()) {   System.out.println("Task is still not done...");
   Thread.sleep(1000);
}
Copy the code

To see the results:


If the subroutine is running too long, or for some other reason, and we want to cancel the subroutine, we can continue to make some changes to the program using the Cancel method provided by the Future

while(! future.isDone()) {   System.out.println("Child thread task not finished yet...");
   Thread.sleep(1000);

   double elapsedTimeInSec = (System.nanoTime() - startTime)/1000000000.0;
  // Cancel the child thread if the program runs for more than 1s  if(elapsedTimeInSec > 1) {  future.cancel(true);  } } Copy the code

To see the results:


Why does a CancellationException appear when you call the Cancel method? Because when the get() method is called, it explicitly states:

When the get() method is called, a CancellationException is thrown if the result of the calculation is cancelled (as you will see in the source code analysis below).


It is very unprofessional not to handle exceptions, so we need to further modify the program to handle exceptions in a friendlier way

// Determine whether the program has been cancelled by isCancelled. If cancelled, the log is printed. If not, get() is called normally
if(! future.isCancelled()){   log.info("Child thread task completed");
   String result = future.get();
   log.info("Main thread gets Future result: {}", result);
}else {  log.warn("Child thread task cancelled"); } Copy the code

View the running results of the program:


While Future is an interface, the executorService.submit () method always returns FutureTask, the implementation class of Future


Let’s dive into the core implementation class to see what’s going on

FutureTask

Again, let’s look at the class structure


public interface RunnableFuture<V> extends Runnable.Future<V> {
    void run(a);
}
Copy the code

FutureTask implements the RunnableFuture interface. The RunnableFuture interface implements the Runnable and Future interfaces respectively, so it can be inferred that FutureTask has the characteristics of both interfaces:

  • There areRunnableProperties, so can be used inExecutorServiceIn conjunction with thread pools
  • There areFutureProperty, so you can get execution results from it

FutureTask source code analysis

If you’ve read the full AQS analysis, you may have noticed that reading the source code of the Java concurrent utility class is all about the following three points:

- State (primary control of code logic) - queue (waiting queue) - CAS (secure set value)Copy the code

With these three points in mind, let’s take a look at the FutureTask source code and see how it implements the logic around these three points

As mentioned earlier in this article, threads that implement Runnable do not get the return value, but those that implement Callable do. Therefore, FutureTask must be associated with Callable if it wants to get the return value. As can be seen from the constructor:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
} Copy the code

The FutureTask constructor converts the thread to the Callable type by running the Executors. Callable factory method:

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
Copy the code

But FutureTask implements the Runnable interface, which means that the run() method can only be overridden, and the run() method does not return a value.

  • How does FutureTask get the return value in the run() method?
  • Where does it put the return value?
  • How does the get() method get this return value?

Let’s take a look at the run() method (the key code is annotated)

public void run(a) {
   // If the status is not NEW, the task has been executed or cancelled
   // If the state is NEW, try to save the thread of execution in runnerOffset (runner field), and return directly if assignment fails
    if(state ! = NEW ||! UNSAFE.compareAndSwapObject(this, runnerOffset,
 null, Thread.currentThread()))  return;  try {  // Get the Callable value passed in by the constructor  Callable<V> c = callable;  if(c ! =null && state == NEW) {  V result;  boolean ran;  try {  // The return value can be obtained by calling the call method of Callable normally  result = c.call();  ran = true;  } catch (Throwable ex) {  result = null;  ran = false;  Save the exception thrown by the call method  setException(ex);  }  if (ran)  // Save the result of the call method  set(result);  }  } finally {  runner = null;  int s = state;  // If the task is interrupted, interrupt processing is performed  if (s >= INTERRUPTING)  handlePossibleCancellationInterrupt(s);  } } Copy the code

The run() method does not return a value. How does the run() method save both the call() method’s return and the exception? In fact, it is very simple to use set(result) to save the normal program running results, or use setException(ex) to save the program exception information

/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes

// Save the exception result
protected void setException(Throwable t) {
 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {  outcome = t;  UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state  finishCompletion();  } }  // Save normal results protected void set(V v) {  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {  outcome = v;  UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state  finishCompletion();  } } Copy the code

SetException and SET methods are very similar in that they both save exceptions or results in the Outcome variable of Object type. The outcome is a member variable, which requires thread safety, so they need to set the value of the outcome variable through CAS. Since the value of the outcome is changed after CAS is successful, this is why the outcome is not modified by volatile.


Save the normal result value (set method) and save the abnormal result value (setException method) two method code logic, the only difference is the state passed by CAS is different. As we mentioned above, state is mostly used to control code logic, as is FutureTask, so to understand code logic, we need to understand state changes

 / * *
 * Possible state transitions:
* NEW -> COMPLETING -> NORMAL //* NEW -> EXCEPTIONAL (实 习* NEW -> CANCELLED //* NEW -> interrupt -> INTERRUPTED // The thread is INTERRUPTED during execution* / private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; Copy the code

Seven states, don’t panic, the whole state flow is really only four lines


The FutureTask object is created, and the state is NEW. As you can see from the constructor above, the four final states are NORMAL, EXCEPTIONAL, CANCELLED, and INTERRUPTED. Two intermediate states are slightly confusing:

  • COMPLETING: Outcome is being set
  • INTERRUPTING: When a thread is being interrupted by the cancel(true) method

In general, these two intermediate states represent an instantaneous state. Let’s graphically show several states:


Now that we know how the run() method saves results, and now that we know how to store normal/abnormal outcomes in the outcome variable, we need to look at how FutureTask gets results using the get() method:

public V get(a) throws InterruptedException, ExecutionException {
    int s = state;
   If state has not reached the set outcome, the awaitDone() method is called to block itself
    if (s <= COMPLETING)
        s = awaitDone(false.0L);
 // Return the result  return report(s); } Copy the code

The awaitDone method is one of the core methods of FutureTask

The // get method supports timeout limits, and if no timeout is passed, the arguments taken are false and 0L
InterruptedException (InterruptedException) InterruptedException (InterruptedException
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
   // Calculate the wait cutoff time
 final long deadline = timed ? System.nanoTime() + nanos : 0L;  WaitNode q = null;  boolean queued = false;  for (;;) {  // If the current thread is interrupted, if so, the node is deleted while waiting and InterruptedException is thrown  if (Thread.interrupted()) {  removeWaiter(q);  throw new InterruptedException();  }   int s = state;  // A final state (normal end/abnormal end/cancellation) has been achieved  // Empty thread and return the result  if (s > COMPLETING) {  if(q ! =null)  q.thread = null;  return s;  }  COMPLETING the task (实 实) : COMPLETING the task well (实 实); COMPLETING the task well (实 实) : COMPLETING the task well (实 实); COMPLETING the task well (实 实); COMPLETING the task well (实 实);  else if (s == COMPLETING) // cannot time out yet  Thread.yield();  // Wait node is empty  else if (q == null)  // Construct the node from the current thread  q = new WaitNode();  // If the server is not yet queued, please add the current node to the first node and replace the original waiters  else if(! queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset,  q.next = waiters, q);  // If the timeout period is set  else if (timed) {  nanos = deadline - System.nanoTime();  // When the time is up, no longer wait for the result  if (nanos <= 0L) {  removeWaiter(q);  return state;  }  // Block waiting for a specific time  LockSupport.parkNanos(this, nanos);  }  else  // Suspend the current thread until it is woken up by another thread  LockSupport.park(this);  } } Copy the code

In general, entering this method usually takes three rounds

  1. In the first for loop, the logic isq == null, a new node q will be created, and the first cycle will end.
  2. In the second for loop, the logic is! queue“, the next pointer to the waiters generated in the first run of the loop points to the waiters, and the CAS node replaces node Q with the waiters, which means that a newly generated node is added to the first node in the waiters. If the replacement succeeds, queued=true. The second cycle ends.
  3. The third for loop blocks and waits. Either block for a certain amount of time, or block until it is woken up by another thread.

For the second loop, if you’re a little bit confused, as we said earlier, if there’s a block, there’s a queue, and if there’s a queue, there’s a queue, and FutureTask also maintains a queue

/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
Copy the code

The wait queue is really a Treiber stack, and as a stack, it works like a pistol’s magazine (imagine the bullet inside the magazine), so the second loop adds a newly generated node to the front of the waiters Stack

If the program is running properly, the usual call to get() suspends the current thread. Who wakes it up? Naturally, the run() method will wake up after it has run, and the finishCompletion method will be called in either of the methods that return the result (set method) or the exception (setException method), which will wake up the thread in the waiting queue

private void finishCompletion(a) {
    // assert state > COMPLETING;
    for(WaitNode q; (q = waiters) ! =null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
 Thread t = q.thread;  if(t ! =null) {  q.thread = null;  // Wake up the thread in the wait queue  LockSupport.unpark(t);  }  WaitNode next = q.next;  if (next == null)  break;  q.next = null; // unlink to help gc  q = next;  }  break;  }  }   done();   callable = null; // to reduce footprint } Copy the code

There are only three ways to set the state of a task to terminated:

  • set
  • setException
  • cancel

Now that the first two methods have been analyzed, let’s look at the Cancel method

Look at the Future Cancel (), which is annotated to specify three cases in which the Cancel operation must fail

  1. The mission has been carried out
  2. The mission has already been canceled
  3. The task cannot be cancelled for some reason

In other cases, cancel returns true. It is worth noting that the fact that the cancel operation returns true does not mean that the task is actually canceled, depending on the state in which the task is at the time the Cancel state is invoked

  • If the task is not already running when cancel is issued, the task will not be executed subsequently;
  • This is required if the task is already running when cancel is initiatedmayInterruptIfRunningThe parameters are:
    • If mayInterruptIfRunning is true, the current task is interrupted
    • If mayInterruptIfRunning is false, the task in progress can be allowed to continue running until it finishes

With that in mind, take a look at the logic of the Cancel code

public boolean cancel(boolean mayInterruptIfRunning) {
  
    if(! (state == NEW &&          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
 return false;  try { // in case call to interrupt throws exception  // The task execution thread needs to be interrupted  if (mayInterruptIfRunning) {  try {  Thread t = runner;  // Interrupt the thread  if(t ! =null)  t.interrupt();  } finally { // final state  // Change to the final status INTERRUPTED  UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);  }  }  } finally {  // Wake up the waiting thread  finishCompletion();  }  return true; } Copy the code

Now that the core method has been analyzed, let’s take a tea break


I mean, use FutureTask to practice the classic procedure of boiling water and making tea


As shown above:

  • Wash the kettle for one minute
  • Boil water for 15 minutes
  • Wash the teapot for 1 minute
  • Wash the cup for 1 minute
  • Hold the tea for 2 minutes

In the end make tea

Let me do the math in my head, it would take a total of 20 minutes in serial, but obviously we can wash the teapot/cup/tea while the water is boiling


In this era of high concurrency, there are too many things that can be done in 4 minutes. It is inevitable to learn how to use Future optimization program (in fact, optimization program is to find the critical path, and the critical path was found. Non-critical path tasks can usually be executed in parallel with the contents of the critical path.

@Slf4j
public class MakeTeaExample {

   public static void main(String[] args) throws ExecutionException, InterruptedException {
      ExecutorService executorService = Executors.newFixedThreadPool(2);
  // Create thread 1's FutureTask  FutureTask<String> ft1 = new FutureTask<String>(new T1Task());  // Create thread 2's FutureTask  FutureTask<String> ft2 = new FutureTask<String>(new T2Task());   executorService.submit(ft1);  executorService.submit(ft2);   log.info(ft1.get() + ft2.get());  log.info("Start making tea");   executorService.shutdown();  }   static class T1Task implements Callable<String> {   @Override  public String call(a) throws Exception {  log.info("T1: Wash the kettle...");  TimeUnit.SECONDS.sleep(1);   log.info("T1: Boil water...");  TimeUnit.SECONDS.sleep(15);   return T1: Boiling water is ready.;  }  }   static class T2Task implements Callable<String> {  @Override  public String call(a) throws Exception {  log.info("T2: Wash the teapot...");  TimeUnit.SECONDS.sleep(1);   log.info("T2: Wash the teacup...");  TimeUnit.SECONDS.sleep(2);   log.info("T2: Get the tea...");  TimeUnit.SECONDS.sleep(1);  return "T2: Fuding White Tea got it";  }  } } Copy the code

Thread 1 takes longer to boil the water. Thread 1 wants to get the tea when the water boils. How to make the tea?


We just need to get the result of thread 2’s FutureTask from thread 1. Let’s modify the program slightly:

@Slf4j
public class MakeTeaExample1 {

   public static void main(String[] args) throws ExecutionException, InterruptedException {
      ExecutorService executorService = Executors.newFixedThreadPool(2);
  // Create thread 2's FutureTask  FutureTask<String> ft2 = new FutureTask<String>(new T2Task());  // Create thread 1's FutureTask  FutureTask<String> ft1 = new FutureTask<String>(new T1Task(ft2));   executorService.submit(ft1);  executorService.submit(ft2);   executorService.shutdown();  }   static class T1Task implements Callable<String> {   private FutureTask<String> ft2;  public T1Task(FutureTask<String> ft2) {  this.ft2 = ft2;  }   @Override  public String call(a) throws Exception {  log.info("T1: Wash the kettle...");  TimeUnit.SECONDS.sleep(1);   log.info("T1: Boil water...");  TimeUnit.SECONDS.sleep(15);   String t2Result = ft2.get();  log.info("T1 gets the {} of T2 and starts making tea.", t2Result);  return "T1: 上茶!!!";  }  }   static class T2Task implements Callable<String> {  @Override  public String call(a) throws Exception {  log.info("T2: Wash the teapot...");  TimeUnit.SECONDS.sleep(1);   log.info("T2: Wash the teacup...");  TimeUnit.SECONDS.sleep(2);   log.info("T2: Get the tea...");  TimeUnit.SECONDS.sleep(1);  return "Fuding White Tea";  }  } } Copy the code

To see the results of the program:


With this change in mind, let’s go back to the three submit methods of the ExecutorService:

<T> Future<T> submit(Runnable task, T result);
Future<? > submit(Runnable task);<T> Future<T> submit(Callable<T> task);
Copy the code

The first method, layer by layer code, is shown here:


You’ll notice that, in a similar way to the way we thought about our program for boiling water and making tea, you can pass in a result that acts as a bridge between the main thread and the child thread, through which the main child thread can share data

The second method argument is of type Runnable, which returns null even when the get() method is called, so it can only be used to declare that the task has ended, similar to thread.join ().

The third method argument is of type Callable, and the return value of the call() method can be explicitly retrieved through the get() method

That’s the end of the whole lecture on Future, which needs to be digested briefly


conclusion

For those of you familiar with Javascript, Future’s features are similar to Javascript promises, and private banter often compares it to a boyfriend’s Promise

Back to Java, let’s take a look at the evolution of the JDK, and talk about the birth of Callable, which makes up for the absence of Runnable returns, through a simple demo of Callable and Future use. FutureTask is the core implementation class of the Future interface, through reading the source code to understand the entire implementation logic, and finally combined with FutureTask and thread pool demonstration of boiling water and tea procedures, I believe that here, you can easily obtain the results of the thread

It is very simple to boil water and make tea. If the business logic is more complex, using Future in this way will definitely bring great chaos (there is no active notification at the end of the program, and the link and integration of Future need to be manually operated). In order to solve this shortcoming, yes, Doug Lea again, the CompletableFuture utility class appears in Java1.8, and with the use of Lambda, it makes writing asynchronous programs as easy as writing serial code


Now let’s look at the use of CompletableFuture

Soul asking

  1. How do you divide tasks and collaborate on a daily basis? Are there any basic rules?
  2. How do you batch asynchronous tasks?

reference

  1. Java concurrent programming practice
  2. The art of Concurrent programming in Java
  3. The beauty of Concurrent programming in Java