Future

A Future has been added since Java 5 to describe the result of an asynchronous computation. There are fewer ways to get a result, either by polling isDone and calling GET () to get the value after confirming completion, or by calling GET () to set a timeout. But the get() method blocks the calling thread in a way that clearly defeats the purpose of asynchronous programming. Such as:

@Test public void testFuture() throws InterruptedException { ExecutorService es = Executors.newSingleThreadExecutor(); Future<Integer> f = es.submit(() -> {// long time asynchronous computation thread.sleep (2000L); System.out.println(" long time asynchronous computation "); return 100; }); While (true) {system.out.println (" block "); if (f.isDone()) { try { System.out.println(f.get()); es.shutdown(); break; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } Thread.sleep(100L); }}Copy the code

While Future and related usage methods provide the ability to execute tasks asynchronously, it is very inconvenient to obtain results, which can only be obtained through blocking or polling. Blocking works against the purpose of asynchronous programming, polling uses unnecessary CPU resources, and doesn’t get results in time. Why not use observer design to notify listeners when the results are complete? For example, Netty extends the ChannelFuture interface of Future, and Node.js implements asynchronous programming by means of callback.

To solve this problem, Since Java 8, It has absorbed Guava’s design ideas and added many extension functions of Future to form CompletableFuture. When a Future might need to be completed explicitly, use the CompletionStage interface to support functions and actions that are triggered on completion. When two or more threads simultaneously attempt to complete, exception complete, or cancel a CompletableFuture, only one will succeed. CompletableFuture implements the following strategy for the CompletionStage interface:

  1. Non-asynchronous completion operations are provided for the thread that completes the current CompletableFuture interface or the callback function of another completion method.
  2. No explicit into all the async method and Executor use ForkJoinPool.com monPool () in order to simplify the monitoring, debugging, and tracking, all generated an asynchronous task are marker interface AsynchronousCompletionTask instance.
  3. All CompletionStage methods are implemented independently of the other common methods, so the behavior of one method is not overridden by other methods in the subclass.

CompletableFuture implements the following strategy for the Future interface:

  • CompletableFuture has no direct control over completion, so the cancel operation is treated as another form of exception completion. methodsisCompletedExceptionallyCan be used to determine if a CompletableFuture is completed with any exception.
  • The methods get() and get(long,TimeUnit) throw an ExecutionException, corresponding to the CompletionException. To simplify usage in most contexts, the class also defines methodsjoin()andgetNow(Return the result if the result has been evaluated or throw an exception, otherwise return the given valueIfAbsent value), rather than throwing CompletionException directly in these cases.

CompletableFuture

The CompletableFuture class implements the CompletionStage and Future interface, so you can still get results by blocking or polling as before, although this is not recommended.

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    //...
}Copy the code

Create the CompletableFuture object

Four static methods are provided to create the CompletableFuture object in this class:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}Copy the code

Methods that end Async and do not specify an Executor use ForkJoinPool.commonPool() as a thread pool to execute asynchronous code.

  • The runAsync method is used for tasks that have no return value, and it takes a Runnable functional interface type as an argument, so the CompletableFuture evaluates to null.

  • The supplyAsync method is used for tasks that have a return value, taking the Supplier functional interface type as an argument, and the CompletableFuture calculates the result of type U.

These threads are Daemon threads. The main thread ends and the Daemon thread doesn’t end, except when the JVM is shut down.

@Test public void testForCreate() { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); / / set the Date format String result. = CompletableFuture supplyAsync (() - > {return df. The format (new Date ()); }).thenapply (s -> "+ s).join(); System.out.println(result); CompletableFuture.runAsync(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("sleep for 1s :" + df.format(new Date())); // new Date() to get the current system time}).join(); }Copy the code

Processing of calculation results when they are completed

When the CompletableFuture evaluates to complete, or throws an exception, there are four methods:

public CompletableFuture<T> whenComplete(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(null, action);
}
public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(asyncPool, action);
}
public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action, Executor executor) {
    return uniWhenCompleteStage(screenExecutor(executor), action);
}
public CompletableFuture<T> exceptionally(
    Function<Throwable, ? extends T> fn) {
    return uniExceptionallyStage(fn);
}Copy the code

You can see that the Action type is BiConsumer
It can handle normal calculation results, or exception cases.

Methods do not end with Async, meaning that actions are executed using the same thread, while Async may be executed using other threads (and may be selected by the same thread if using the same thread pool).

@Test public void testComplete() { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); / / set the Date format CompletableFuture. RunAsync (() - > {System. Out. Println (" is the current time is: "+ df. The format (new Date ())); throw new ArithmeticException("illegal exception!" ); }). Exceptionally (e -> {system.out.println (" exception: "+ LLDB etMessage())); return null; }).whenComplete((v, e) -> System.out.println("complete")).join(); }Copy the code

The exceptionally method returns a new CompletableFuture, which is calculated when the original CompletableFuture throws an exception and calls function to calculate the value. Otherwise, if the original CompletableFuture evaluates properly, the new CompletableFuture evaluates as well, and it has the same value as the original CompletableFuture evaluates. The exceptionally method is used to handle exceptions.

convert

You can also concatenate these operations, or combine completableFutures. The key input parameter is only a Function, which is a functional interface, so Lambda is more elegant. The input parameter is the result of the previous calculation, and the return value is the transformed result.

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);Copy the code

The function passes the result to fn after the original CompletableFuture computes, and treats the result of fn as the result of the new CompletableFuture calculation. So it does the equivalent of converting CompletableFuture

to CompletableFuture
.

@Test
public void testFConvert() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
    String f = future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString()).join();
    System.out.println(f);
}Copy the code

Note that these transformations are not executed immediately and do not block, but continue after the previous stage has completed.

They differ from the Handle method in that the Handle method handles both normal computed values and exceptions, so it can mask exceptions from being thrown further. ThenApply only deals with normal values, so any exception is thrown.

consumption

The above method generates a new calculation result (thenApply, Handle) when the calculation is completed, or returns the same calculation result. WhenComplete, CompletableFuture also provides a method to process the result. Only Action is performed on the result and no new calculated value is returned, so the calculated value is null:

@Test
public void testAccept() {
    CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello world";
    }).thenAccept(System.out::println);
}Copy the code

ThenAcceptBoth and related methods provide similar functionality in that when both CompletionStages have completed their calculations properly, the provided action is executed, which is used to combine another asynchronous result. RunAfterBoth is the execution of a Runnable that does not use the result of the calculation when both completionstages have completed the calculation properly.

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action); public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action); public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor); public CompletableFuture<Void> runAfterBoth(CompletionStage<? > other, Runnable action);Copy the code

The following implementation prints the results of both computations after both completionstages have completed normally:

@Test
public void testAcceptBoth() {
    CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "first";
    }).thenAcceptBoth(CompletableFuture.completedFuture("second"), (first, second) -> System.out.println(first + " : " + second)).join();
}Copy the code

The following set of methods executes a Runnable when the calculation is complete. Unlike thenAccept, Runnable does not use the result of the CompletableFuture calculation.

public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);Copy the code

We carry out the following applications:

@ Test public void testRun () {CompletableFuture. SupplyAsync (() - > {System. Out. Println (" executive CompletableFuture "); return "first"; }).thenRun(() -> System.out.println("finished")).join(); }Copy the code

The result of the previous CompletableFuture calculation is ignored, and this method returns an object of type CompletableFuture

.

Reference documentation

  1. Java8 Doc
  2. Java CompletableFuture,
  3. CompletableFuture,