This blog post reviews JAVA8’s CompletionStageAPI and its standard implementation in the JAVA library, CompletableFuture. Several examples will be presented to illustrate the various behaviors of the API.

Because the CompletableFuture is an implementation of the CompletionInterface interface, we first need to understand the contract for that interface. It represents a phase of a synchronous or asynchronous computation. You can think of it as a unit on an assembly line of computations designed to produce a valuable end result. This means that multiple ComletionStage directives can be linked so that the completion of one stage triggers the execution of the next.

In addition to implementing the CompletionStage interface, Completion also inherits the Future, which is used to implement an asynchronous event before it starts. It is named CompletableFuture because it can explicitly complete the Future.

1. Create a new CompletableFuture

This simple example creates a CompletableFuture with pre-set results that are already complete. Usually used as the starting point of a calculation.

static void completedFutureExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message");
    assertTrue(cf.isDone());
    assertEquals("message", cf.getNow(null));
}
Copy the code

The getNow method returns the finished result (message in this case), or the passed default value null if it has not done so.

2. Run a simple asynchronous stage

The following example explains how to create a stage that runs Runnable asynchronously.

static void runAsyncExample() {
    CompletableFuture cf = CompletableFuture.runAsync(() -> {
        assertTrue(Thread.currentThread().isDaemon());
        randomSleep();
    });
    assertFalse(cf.isDone());
    sleepEnough();
    assertTrue(cf.isDone());
}
Copy the code

This example illustrates two things:

  1. CompletableFutureIn order toAsyncMethods ending with are executed asynchronously
  2. By default (that is, no incomingExecutorAsynchronous execution will be usedForkJoinPoolImplementation, the thread pool uses a background thread to executeRunnableTask. Note that this is only specific toCompletableFutureImplementation, other thingsCompletableStageImplementations can override this default behavior.

3. Apply the method to the previous Stage

The following example references the CompletableFuture already completed in the first example, which references the result of the generated string and capitalizes the string.

static void thenApplyExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> {
        assertFalse(Thread.currentThread().isDaemon());
        return s.toUpperCase();
    });
    assertEquals("MESSAGE", cf.getNow(null));
}
Copy the code

The key word here is thenApply:

  1. thenAn operation that takes place after the current phase completes normal execution (normal execution means no exceptions are thrown). In this case, the current phase has been completed and values have been obtainedmessage.
  2. ApplyIt means to put aFunctionStudent: The result of the previous phase

Function is blocked, which means that the getNow() method is executed only after the uppercase operation has completed.

4. Asynchronously apply the method to the previous Stage

By adding an Async suffix to the end of the method, the CompletableFuture chain is executed asynchronously (using ForkJoinPool.commonPool()).

static void thenApplyAsyncExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
        assertTrue(Thread.currentThread().isDaemon());
        randomSleep();
        return s.toUpperCase();
    });
    assertNull(cf.getNow(null));
    assertEquals("MESSAGE", cf.join());
}
Copy the code

Use a custom Executor to execute the method asynchronously

One advantage of the asynchronous approach is that you can provide an Executor to execute the CompletableStage. This example shows how to use a thread pool of a fixed size to implement capitalization.

static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
    int count = 1;
    @Override
    public Thread newThread(Runnable runnable) {
        return new Thread(runnable, "custom-executor-"+ count++); }}); static voidthenApplyAsyncWithExecutorExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
        assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
        assertFalse(Thread.currentThread().isDaemon());
        randomSleep();
        return s.toUpperCase();
    }, executor);
    assertNull(cf.getNow(null));
    assertEquals("MESSAGE", cf.join());
}
Copy the code

6. Consume the results of the previous Stage

If the next Stage receives the result of the current Stage but does not return a value in the evaluation (such as its return value of void), then it uses the method thenAccept and passes in a Consumer interface.

static void thenAcceptExample() {
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture("thenAccept message")
            .thenAccept(s -> result.append(s));
    assertTrue("Result was empty", result.length() > 0);
}
Copy the code

The Consumer will execute synchronously, so we don’t need to join on the returned CompletableFuture.

7. Run Comsume asynchronously

Again, use the Asyn suffix to implement:

static void thenAcceptAsyncExample() {
    StringBuilder result = new StringBuilder();
    CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message")
            .thenAcceptAsync(s -> result.append(s));
    cf.join();
    assertTrue("Result was empty", result.length() > 0);
}
Copy the code

8. When the calculation is abnormal

Let’s now simulate a scenario where an exception occurs. For brevity, we’ll capitalize a string, but we’ll simulate a delay. We will use thenApplyAsyn(Function, Executor). The first parameter is an uppercase conversion method, and the second parameter is a delayed Executor that will wait one second before submitting the operation to ForkJoinPool.

static void completeExceptionallyExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
    CompletableFuture exceptionHandler = cf.handle((s, th) -> { return(th ! = null) ?"message upon cancel" : ""; });
    cf.completeExceptionally(new RuntimeException("completed exceptionally"));
    assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
    try {
        cf.join();
        fail("Should have thrown an exception");
    } catch(CompletionException ex) { // just for testing
        assertEquals("completed exceptionally", ex.getCause().getMessage());
    }
    assertEquals("message upon cancel", exceptionHandler.join());
}
Copy the code

  1. First, we create a new one that is done with a return valuemessagetheCompletableFutureObject. And then we callthenApplyAsyncMethod, which returns a newCompletableFuture. This method performs capitalization asynchronously. And it shows you how to use it, rightdelayedExecutor(timeout, timeUnit)Method to delay asynchronous operations.
  2. Then we created a handler stage,exceptionHandler, which handles any exceptions and returns another messagemessage upon cancel.
  3. Finally, we explicitly complete the second phase and throw an exception, which causes the uppercase phase to be thrownCompletionException. It also triggershandlerPhase.

API added:



<U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)


Returns a new CompletionStage, whether or not the previous Stage completed normally. The parameters passed in include the results of the previous phase and the exception thrown.

9. Cancel the calculation

Much like computation-time exception handling, we can cancel the calculation with cancel(Boolean mayInterruptIfRunning) in the Future interface.

static void cancelExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
    CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message");
    assertTrue("Was not canceled", cf.cancel(true));
    assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
    assertEquals("canceled message", cf2.join());
}
Copy the code

API supplement



public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)


Returns a new CompletableFuture, the result of execution in this method if an exception occurs, otherwise the result of normal execution.

10. Apply Function to one of the results of two completed stages

The following example creates a CompletableFuture object and applies Function to either of the two completed stages (there is no guarantee which one will be passed to Function). The two phases are as follows: one capitalizes the string, the other lowercase.

static void applyToEitherExample() {
    String original = "Message";
    CompletableFuture cf1 = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s));
    CompletableFuture cf2 = cf1.applyToEither(
            CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
            s -> s + " from applyToEither");
    assertTrue(cf2.join().endsWith(" from applyToEither"));
}
Copy the code

public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T,U> fn)


Return a brand new CompletableFuture containing fn executed on either this or Other after the operation is complete

11. Consume the results of either phase

As in the previous example, replace Function with Consumer

static void acceptEitherExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture cf = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s))
            .acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                    s -> result.append(s).append("acceptEither"));
    cf.join();
    assertTrue("Result was empty", result.toString().endsWith("acceptEither"));
}
Copy the code

12. Run Runnable after both phases are complete

Note that both stages run synchronously: after the first Stage converts the string to uppercase, the second Stage converts it to lowercase.

static void runAfterBothExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
            CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
            () -> result.append("done"));
    assertTrue("Result was empty", result.length() > 0);
}
Copy the code

13. Receive the results of two stages with Biconsumer

BiConsumer supports simultaneous manipulation of the results of two stages.

static void thenAcceptBothExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
            CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
            (s1, s2) -> result.append(s1 + s2));
    assertEquals("MESSAGEmessage", result.toString());
}
Copy the code

14. Bifunction is applied simultaneously to the results of both stages

If CompletableFuture wants to combine the results of both phases and return values, we can use the method thenCombine. The flow is synchronous, so the final getNow() method yields the final result, a concatenation of the uppercase and lowercase results.

static void thenCombineExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
            .thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),
                    (s1, s2) -> s1 + s2);
    assertEquals("MESSAGEmessage", cf.getNow(null));
}
Copy the code

15. The Bifunction is asynchronously applied to both stages

Similar to the previous example, but in a different way: both phases are asynchronous. Then thenCombine will also execute asynchronously, even if it does not have the Async suffix.

static void thenCombineAsyncExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s))
            .thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                    (s1, s2) -> s1 + s2);
    assertEquals("MESSAGEmessage", cf.join());
}
Copy the code

16.Compose CompletableFuture

We can use thenCompose to do the same for the first two examples.

static void thenComposeExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
            .thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s))
                    .thenApply(s -> upper + s));
    assertEquals("MESSAGEmessage", cf.join());
}
Copy the code

17. Create a new completion phase when any of the phases is completed

static void anyOfExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a"."b"."c");
    List<CompletableFuture> futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> {
        if(th == null) { assertTrue(isUpperCase((String) res)); result.append(res); }}); assertTrue("Result was empty", result.length() > 0);
}
Copy the code

18. When all the phases are complete, create a new completion phase

static void allOfExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a"."b"."c");
    List<CompletableFuture> futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
        .whenComplete((v, th) -> {
            futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
            result.append("done");
        });
    assertTrue("Result was empty", result.length() > 0);
}
Copy the code

19. When all phases are complete, create an asynchronous completion phase

static void allOfAsyncExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a"."b"."c");
    List<CompletableFuture> futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
            .whenComplete((v, th) -> {
                futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
                result.append("done");
            });
    allOf.join();
    assertTrue("Result was empty", result.length() > 0);
}
Copy the code

20. Real life

Here is a scenario where CompletableFuture is practiced:

  1. First by callingcars()Method acquisition asynchronouslyCarList. It’s going to return oneCompletionStage<List<Car>>.cars()Methods should be implemented using a remote REST endpoint.
  2. We combine this Stage with another Stage that passes the callrating(manufactureId)To asynchronously get the score for each vehicle.
  3. When all Car objects have been scored, we callallOf()To enter the final Stage, which is executed after the two phases are complete
  4. Used on the final StagewhenComplete()To print out the vehicle’s rating.
cars().thenCompose(cars -> {
    List<CompletionStage> updatedCars = cars.stream()
            .map(car -> rating(car.manufacturerId).thenApply(r -> {
                car.setRating(r);
                return car;
            })).collect(Collectors.toList());
    CompletableFuture done = CompletableFuture
            .allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));
    return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture)
            .map(CompletableFuture::join).collect(Collectors.toList()));
}).whenComplete((cars, th) -> {
    if (th == null) {
        cars.forEach(System.out::println);
    } else {
        throw new RuntimeException(th);
    }
}).toCompletableFuture().join();
Copy the code


Original link:
20 Examples of Using Java’s CompletableFuture – DZone Java


Translation link:
20 examples of using JAVA CompletableFuture