Introduction to the

Introduced as an improvement to the Java 8 Concurrency API, this article is an introduction to the features and use cases of the CompletableFuture class. There are also some improvements to CompletableFuture in Java 9, which I’ll cover later.

The Future calculation

Future asynchronous computation is difficult to manipulate, and typically we want to think of any computation logic as a series of steps. But in the case of asynchronous computation, the methods represented as callbacks tend to be scattered across the code or deeply nested within each other. But things can get more complicated when we need to deal with errors that might occur in one of these steps.

The Futrue interface was added as an asynchronous computation in Java 5, but it doesn’t have any way to combine computations or handle errors that might occur.

In Java 8, the CompletableFuture class was introduced. Along with the Future interface, it also implements the CompletionStage interface. This interface defines asynchronous computing contracts that can be combined with other futures.

CompletableFuture is both a combination and a framework, with about 50 different constructs that combine, perform asynchronous computation steps and handle errors.

Such a large API can be overwhelming, but here are some important ones to highlight.

Use CompletableFuture as the Future implementation

First, the CompletableFuture class implements the Future interface, so you can use it as a Future implementation, but you need additional completion implementation logic.

For example, you can create an instance of this class using the no-constructor and then complete it using the complete method. The consumer can use the get method to block the current thread until the result of get().

In the example below, we have a method that creates an instance of CompletableFuture, then evaluates it in another thread and returns the Future immediately.

Once the calculation is complete, the method completes the Future by feeding the result to the full method:


public Future<String> calculateAsync() throws InterruptedException {
    CompletableFuture<String> completableFuture 
      = new CompletableFuture<>();
 
    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello");
        return null;
    });
 
    return completableFuture;
}

Copy the code

To separate the computation, we use the Executor API, which creates and completes the CompletableFuture with any parallel packet (including the original thread).

Notice that the calculateAsync method returns a Future instance.

We simply call the method, receive the Future instance and call its GET method when we are ready to block the result.

Also note that the GET method throws some checked exceptions, namely ExecutionException (an exception that occurs during encapsulation computation) and InterruptedException (an exception that indicates that the thread executing the method is interrupted) :

Future<String> completableFuture = calculateAsync();
 
// ... 
 
String result = completableFuture.get();
assertEquals("Hello", result);

Copy the code

If you already know the result of the calculation, you can also return the result synchronously.

Future<String> completableFuture = 
  CompletableFuture.completedFuture("Hello");
 
// ...
 
String result = completableFuture.get();
assertEquals("Hello", result);
Copy the code

As in some scenarios, you might want to cancel the execution of the Future task.

Suppose we don’t find the results and decide to cancel the asynchronous task entirely. This can be done with the Future’s cancel method. This method mayInterruptIfRunning, but in the case of the CompletableFuture, it has no effect because the interrupt is not used to control the processing of the CompletableFuture.

This is a modified version of the asynchronous method:


public Future<String> calculateAsyncWithCancellation() throws InterruptedException {
    CompletableFuture<String> completableFuture = new CompletableFuture<>();
 
    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.cancel(false);
        return null;
    });
 
    return completableFuture;
}

Copy the code

When we block the result with the future.get () method, cancel() means cancel, which throws a CancellationException:

Future<String> future = calculateAsyncWithCancellation();
future.get(); // CancellationException

Copy the code

The API is introduced

Static method Description

The code above is simple, but here are a few static methods that use tasks to instantiate an instance of CompletableFuture.


CompletableFuture.runAsync(Runnable runnable);
CompletableFuture.runAsync(Runnable runnable, Executor executor);

CompletableFuture.supplyAsync(Supplier<U> supplier);
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)


Copy the code
  • The runAsync method receives an instance of Runnable, but it returns no value
  • The supplyAsync method is a JDK8 functional interface that takes no arguments and returns a result
  • These two methods are executor upgrades that allow tasks to be executed in the specified thread pool, or ForkJoinPool.commonPool().

SupplyAsync () is used

The static methods runAsync and supplyAsync allow us to create CompletableFuture instances accordingly from the Runnable and Supplier function types.

The Runnable interface is used in threads using the old interface, which does not allow return values.

The Supplier interface is a generic functional interface for a single method that takes no arguments and returns a value of the parameterized type.

This allows an instance of Supplier to be supplied as a lambda expression that performs the calculation and returns the result:


CompletableFuture<String> future
  = CompletableFuture.supplyAsync(() -> "Hello");
 
// ...
 
assertEquals("Hello", future.get());

Copy the code

ThenRun () is used

In both tasks, task A and task B, you can pass the Runnable lambda to thenRun() if you neither need the value of task A nor want to reference it in task B. In the following example, after calling the future.get() method, we simply print a line in the console:

The template

CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); 
CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});

Copy the code
  • The first row is going to bethenRun(Runnable runnable), task A completes execution B, and B does not need the result of A.
  • The second row is going to bethenRun(Runnable runnable)Is returned after task A completes execution BresultABut B doesn’t need the result of A.

In actual combat


CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<Void> future = completableFuture
  .thenRun(() -> System.out.println("Computation finished."));
 
future.get();

Copy the code

ThenAccept () is used

In the two tasks, task A and Task B, if you don’t need A return value in the Future, you can use thenAccept to pass the calculation to it. The final future.get () call returns an instance of Void.

The template

CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); 

CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});

Copy the code
  • In the first row,runAsyncNo return value. Second methodthenAccept, the received resultA value is null, and no result is returned from task B
  • In the second row,supplyAsyncThere is a return value, and task B does not return a result.

In actual combat


CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<Void> future = completableFuture
  .thenAccept(s -> System.out.println("Computation returned: " + s));
 
future.get();

Copy the code

ThenApply () is used

In two tasks, task A and Task B, task B wants the result of task A’s calculation, and can use thenApply to accept an instance of A function, use it to process the result, and return the return value of A Future function:

The template

CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");


Copy the code
  • ThenApply (Function fn) = thenApply(Function fn) = thenApply(Function fn);

In actual combat


CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<String> future = completableFuture
  .thenApply(s -> s + " World");
 
assertEquals("Hello World", future.get());

Copy the code

Of course, in the case of multiple tasks, if task B is followed by task C, continue to call.thenxxx ().

ThenCompose () is used

And then there’s an interesting design pattern;

The best case scenario for the CompletableFuture API is the ability to combine CompletableFuture instances in a series of computational steps.

The result of this combination is itself a CompletableFuture, allowing further combinations to continue. This approach is ubiquitous in functional languages and is often referred to as the Monadic design pattern.

Simply put, Monad is a design pattern that represents the decomposition of an operational process into interconnected steps through functions. You just supply the function you need for the next operation, and the whole operation will go on automatically.

In the following example, we combine two Futures in order using the thenCompose method.

Notice that this method takes a function that returns an instance of CompletableFuture. The arguments to this function are the results of previous computed steps. This allows us to use this value in the lambda of the next CompletableFuture:


CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
 
assertEquals("Hello World", completableFuture.get());

Copy the code

The thenCompose method implements the combined calculation of the results as well as thenApply. But their internal form is different, and they have a similar design to the Stream and Optional classes available in Java 8 for map and flatMap methods.

Both methods receive a CompletableFuture and apply it to the computed result, but the thenCompose (flatMap) method receives a function that returns another CompletableFuture object of the same type. This functional structure allows instances of these classes to continue to be combined.

thenCombine()

Take the results of both tasks

If you want to perform two separate tasks and perform some action on their results, you can use the Future’s thenCombine method:

The template


CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");

cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});

cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");

Copy the code

In actual combat


CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCombine(CompletableFuture.supplyAsync(
      () -> " World"), (s1, s2) -> s1 + s2));
 
assertEquals("Hello World", completableFuture.get());

Copy the code

More simply, when you want to use two Future results but do not need to return any result values, thenAcceptBoth means that subsequent processing does not need to return a value, and thenCombine means that a value is required:


CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
  .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
    (s1, s2) -> System.out.println(s1 + s2));

Copy the code

The difference between thenApply() and thenCompose()

In the previous section, we showed examples of thenApply() and thenCompose(). Both apis use CompletableFuture calls, but the use of the two apis is different.

thenApply()

This method is used to process the results of previous calls. However, the key thing to remember is that the return type is the same CompletableFuture that transforms the type in the generic.

So, when we want to transform the result of the CompletableFuture call, it looks like this:


CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);


Copy the code

thenCompose()

The thenCompose() method is similar to thenApply() in both cases returning a new calculation result. ThenCompose (), however, takes the previous Future as an argument. Instead of the nested Future we used in thenApply(), it is used to connect two CompletableFutures, generating a new CompletableFuture:


CompletableFuture<Integer> computeAnother(Integer i){
    return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);

Copy the code

Therefore, if you want to continue nesting the link CompletableFuture method, it’s best to use thenCompose().

Run multiple tasks in parallel

When we need to perform multiple tasks in parallel, we often want to wait for all of them to execute and then process their combined results.

The completableFuture. allOf static method allows waiting for all completed tasks:

API

public static CompletableFuture<Void> allOf(CompletableFuture<? >... cfs){... }Copy the code

In actual combat


CompletableFuture<String> future1  
  = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2  
  = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3  
  = CompletableFuture.supplyAsync(() -> "World");
 
CompletableFuture<Void> combinedFuture 
  = CompletableFuture.allOf(future1, future2, future3);
 
// ...
 
combinedFuture.get();
 
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

Copy the code

Note that the return type for CompleTableFuture.allof () is CompletableFuture. The limitation of this approach is that it does not return a comprehensive result for all tasks. Instead, you must manually get the results from Futures. Fortunately, the CompletableFuture.join() method and the Java 8 Streams API solve the problem:


String combined = Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(""));
 
assertEquals("Hello Beautiful World", combined);

Copy the code

CompletableFuture provides the Join () method, which is identical to the get() method in that it blocks to get values. The difference is that Join () throws an unchecked Exception. This allows it to be used as a method reference in the stream.map () method.

Exception handling

With that said, let’s talk a little bit about exception handling for CompletableFuture. Here we will introduce two methods:


public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

Copy the code

Look at the code


CompletableFuture.supplyAsync(() -> "resultA")
    .thenApply(resultA -> resultA + " resultB")
    .thenApply(resultB -> resultB + " resultC")
    .thenApply(resultC -> resultC + " resultD");

Copy the code

In the above code, tasks A, B, C, and D execute in sequence, and if task A throws an exception (which the above code does not), then none of the subsequent tasks can be executed. If task C throws an exception, task D is not executed.

So how do we handle exceptions? Looking at the following code, we throw an exception in task A and handle it:


CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException();
})
        .exceptionally(ex -> "errorResultA")
        .thenApply(resultA -> resultA + " resultB")
        .thenApply(resultB -> resultB + " resultC")
        .thenApply(resultC -> resultC + " resultD");

System.out.println(future.join());

Copy the code

In the code above, task A throws an exception, which is then handled via the. Exceptionally () method and returns A new result, which is passed to task B. So the final output is:

errorResultA resultB resultC resultD


String name = null;
 
// ...
 
CompletableFuture<String> completableFuture  
  =  CompletableFuture.supplyAsync(() -> {
      if (name == null) {
          throw new RuntimeException("Computation error!");
      }
      return "Hello, "+ name; })}).handle((s, t) -> s ! = null ? s :"Hello, Stranger!");
 
assertEquals("Hello, Stranger!", completableFuture.get());

Copy the code

Of course, they can both be null, because s is null if the CompletableFuture instance it’s on doesn’t return a value.

Async suffix method

Most methods of the API in the CompletableFuture class have two additional modifiers with the Async suffix. These methods are represented for asynchronous threads.

Methods without the Async suffix run the next execution thread phase using the calling thread. The ForkJoinPool.commonPool() thread pool implements an operation using fork/join without Async methods. With Async methods to run using transitive Executor tasks.

Here’s an accompanying example where you can see the thenApplyAsync method. Within the program, threads are wrapped into a ForkJoinTask instance. This allows you to further parallelize your calculations and use system resources more efficiently.


CompletableFuture<String> completableFuture  
  = CompletableFuture.supplyAsync(() -> "Hello");
 
CompletableFuture<String> future = completableFuture
  .thenApplyAsync(s -> s + " World");
 
assertEquals("Hello World", future.get());

Copy the code

JDK 9 CompletableFuture API

In Java 9, the CompletableFuture API has been further enhanced with the following changes:

  • New factory methods were added
  • Delay and timeout are supported
  • Improved support for subclassing.

New instance apis have been introduced:

  • Executor defaultExecutor()
  • CompletableFuture newIncompleteFuture()
  • CompletableFuture copy()
  • CompletionStage minimalCompletionStage()
  • CompletableFuture completeAsync(Supplier<? extends T> supplier, Executor executor)
  • CompletableFuture completeAsync(Supplier<? extends T> supplier)
  • CompletableFuture orTimeout(long timeout, TimeUnit unit)
  • CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit)

There are also some static utilities:

  • Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
  • Executor delayedExecutor(long delay, TimeUnit unit)
  • CompletionStage completedStage(U value)
  • CompletionStage failedStage(Throwable ex)
  • CompletableFuture failedFuture(Throwable ex)

Finally, to address the timeout issue, Java 9 introduces two new features:

  • orTimeout()
  • completeOnTimeout()

conclusion

In this article, we describe the methods and typical use cases of the CompletableFuture class.