If the profile

Prior to Java 8, we used Java multithreaded programming, usually through the run method in Runnable, which had the obvious disadvantage of not returning a value. In this case, you might try using the call method in the Callable and return the result with a Future, as follows:

public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> stringFuture = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return "async thread"; }}); Thread.sleep(1000);
        System.out.println("main thread");
        System.out.println(stringFuture.get());
}
Copy the code
  • By looking at the console, we can see that printing main thread first and async thread one second later seems to satisfy our needs. The main thread is blocked when the future’s get() method is called. This is not what we want to see.

  • Another way to get the returned result is to poll first, call isDone and get it when it’s done, but that doesn’t satisfy us either.

1. The execution time of many asynchronous threads may be inconsistent. My main thread may not be able to wait forever, so I may want to wait for the fastest thread to finish or the most important task to finish.

2. I execute two asynchronous tasks independently, but the second one depends on the execution result of the first one.

Java8’s CompletableFuture makes its debut in this messy and imperfect multithreaded world. CompletableFuture makes Future functions and usage scenarios greatly improved and expanded, providing functional programming capabilities, making the code more beautiful and elegant, and can be calculated through callback processing results, exception handling has a better means of handling.

The CompletableFuture source contains four static methods to perform asynchronous tasks:

Create a task

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){.. }public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){.. }public static CompletableFuture<Void> runAsync(Runnable runnable){.. }public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){.. }Copy the code
  • If we know the basics of multithreading, it’s easy to see that the two methods at the beginning of run are used to execute tasks that have no return value, because their input is a Runnable object.

  • Methods that start with supply obviously execute tasks that have a return value. As for the method’s input parameters, if no Executor object is passed, the ForkJoinPool.commonPool() will be used as its thread pool to execute asynchronous code. In practice, we usually use the thread pool object we created to pass in as a parameter, which is faster.

The way to perform asynchronous tasks is as simple as using the above method:

CompletableFuture<String> future= CompletableFuture.supplyAsync(() -> {/ /... Perform a task
    return "hello";
}, executor)
Copy the code

Let’s look at a few methods to get the results of execution.

V get(a);
V get(long timeout,Timeout unit);
T getNow(T defaultValue);
T join(a);
Copy the code
  • The above two methods are implemented in the Future. Get () clogs the current thread, which creates a problem. If the executing thread is too late to return data,get() will wait forever.

  • The getNow() method is interesting because it returns a result when it is available, and if the asynchronous thread throws an exception it returns its default value.

Let’s take a look at some of the other common methods used in CompletableFuture with some examples of scenarios

thenAccept()
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
Copy the code
  • Run the following command to execute the current task after the current task is completed. The execution result of the current task can be used as the input parameter of the next task. No return value is displayed.
  • Scenario: Task A is executed and task B is executed asynchronously. After task B returns normally, the return value of TASK B is executed and task C has no return value
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "The task A");
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "The task B");
CompletableFuture<String> futureC = futureB.thenApply(b -> {
      System.out.println("Carry out the task C.");
      System.out.println("Parameters." + b);// Parameter: task B
      return "a";
});
thenRun(..)
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
Copy the code
  • Function: Performs the next operation on the result of an unconcerned step
  • Scenario: Task A is executed. After task A is executed, task B is executed. Task B does not accept the return value of task A (whether A has A return value or not), and there is no return value
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "The task A");
futureA.thenRun(() -> System.out.println("Carry out Mission B"));
thenApply(..)
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
Copy the code
  • Run the following command to execute the current task after the current task is completed. The result of the current task is used as the input parameter of the next task and has a return value
  • Scenario: Multiple tasks are executed in tandem. The execution of the next task depends on the results of the last task. Each task has inputs and outputs

Task A is executed asynchronously. When task A is completed, resultA returned by A is used as the input parameter to process task B, which can realize the series execution of any multiple tasks

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> futureB = futureA.thenApply(s->s + " world");
CompletableFuture<String> future3 = futureB.thenApply(String::toUpperCase);
System.out.println(future3.join());
Copy the code

In the above code, we can call future.join() to get the return value of task A, and then use the return value as an input to execute task B. thenApply simplifies this step. We don’t have to block the calling thread while waiting for A calculation to complete. It’s telling the CompletableFuture when you’re done with it to do the next step. You connect multiple tasks.

thenCombine(..)  thenAcceptBoth(..)  runAfterBoth(..)
public <U,V> CompletableFuture<V>  thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>  thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>  thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
Copy the code
  • Function: Combine the results of two completionstages and return after conversion
  • Scenario: You need to query the current price of the product according to the product ID. You need to query the original price and discount of the product in two steps. These two queries are independent of each other. Use :thenCombine(..)
CompletableFuture<Double> futurePrice = CompletableFuture.supplyAsync(() -> 100d);
 CompletableFuture<Double> futureDiscount = CompletableFuture.supplyAsync(() -> 0.8);
 CompletableFuture<Double> futureResult = futurePrice.thenCombine(futureDiscount, (price, discount) -> price * discount);
 System.out.println("Final price is :" + futureResult.join()); // Final price :80.0
Copy the code
  • thenCombine(..) If you do not need to return the value, then you need to return
  • thenAcceptBoth(..) , by the same token, if the two tasks don’t care about the return value, that requires runAfterBoth, if you understand the above three methods, thenApply, thenAccept, thenRun, there would be no need for separate mention these two methods, only mention here.
thenCompose(..)
public <U> CompletableFuture<U>  thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
Copy the code
  • Function: This method takes the computed value of the current CompletableFuture as input and returns a new CompletableFuture

This method is much like thenApply in that it accepts the result of the previous task as an input, performs its own operation, and then returns. So what’s the difference?

  • ThenApply (): This does the equivalent of converting a CompletableFuture to a CompletableFuture, changing the generic type in the same CompletableFuture
  • ThenCompose (): Used to connect two CompletableFutures, returning a new CompletableFuture
 CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> futureB = futureA.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "world"));
CompletableFuture<String> future3 = futureB.thenCompose(s -> CompletableFuture.supplyAsync(s::toUpperCase));
System.out.println(future3.join());
Copy the code
applyToEither(..)  acceptEither(..)  runAfterEither(..)
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
Copy the code
  • Function: execute the results of two completionstages, the one that finished first, which return value is used for the next operation

  • Scenario: Suppose we can query commodity A in two ways, a and B, but a and B are not executed at the same speed, and we want to use the return value of the first one.

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Obtain commodity A by means A";
        });
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Obtain commodity A by means B";
        });

CompletableFuture<String> futureC = futureA.applyToEither(futureB, product -> "The results." + product);
System.out.println(futureC.join()); // Result: obtain commodity A by means A
Copy the code

In the same way,applyToEither is a cousin of acceptEither() and runAfterEither(). I don’t think I need to explain to you what to use.

exceptionally(..)
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
Copy the code
  • Function: When an exception occurs in the run, this method is called to perform some compensation operations, such as setting default values.
  • Scenario: Task A is asynchronously executed to obtain the result. If an exception is thrown during the execution of task A, the default value 100 is returned.
CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "Result of execution :" + (100 / 0))
                .thenApply(s -> "futureA result:" + s)
                .exceptionally(e -> {
                    System.out.println(e.getMessage()); //java.lang.ArithmeticException: / by zero
                    return "futureA result: 100";
                });
CompletableFuture<String> futureB = CompletableFuture.
                supplyAsync(() -> "Result of execution :" + 50)
                .thenApply(s -> "futureB result:" + s)
                .exceptionally(e -> "futureB result: 100");
System.out.println(futureA.join());//futureA result: 100
System.out.println(futureB.join());//futureB result: Execution result: 50
Copy the code

The above code shows the normal flow and the occurrence of an exception, which can be understood as a catch, and can be appreciated based on the return value.

whenComplete(..)
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);
Copy the code

Function: When the completion of a CompletableFuture calculation is complete, or when an exception is thrown, it can be executed in the whenComplete method, for example

CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "Result of execution :" + (100 / 0))
                .thenApply(s -> "apply result:" + s)
                .whenComplete((s, e) -> {
                    if(s ! = null) { System.out.println(s);/ / not executed
                    }
                    if (e == null) {
                        System.out.println(s);/ / not executed
                    } else {
                        System.out.println(e.getMessage());//java.lang.ArithmeticException: / by zero
                    }
                })
                .exceptionally(e -> {
                    System.out.println("ex"+e.getMessage()); //ex:java.lang.ArithmeticException: / by zero
             return "futureA result: 100"; }); 
System.out.println(futureA.join());//futureA result: 100
Copy the code

SupplyAsync ->whenComplete-> nexApply -> thenApply -> thenApply It will only be executed when it returns normally. WhenComplete does not return a value. WhenComplete does not return a value.

We’re using a functional programming style and calling whenComplete first and then exceptionally. Here’s what happens if we call exceptionally whenComplete first.

CompletableFuture
futureA = CompletableFuture.

supplyAsync(() -> "Result of execution :" + (100 / 0))

.thenApply(s -> "apply result:" + s)

.exceptionally(e -> {

System.out.println("ex:"+e.getMessage()); //ex:java.lang.ArithmeticException: / by zero

return "futureA result: 100";

})

.whenComplete((s, e) -> {

if (e == null) {

System.out.println(s);//futureA result: 100

} else {

System.out.println(e.getMessage());/ / not executed}}); System.out.println(futureA.join());//futureA result: 100
Copy the code

Code that executes whenComplete first is exceptionally and then whenComplete is exceptionally. You can see that the result received in whenComplete is a normal result that is exceptionally polished because it handled the exception in exceptionally and returned the default value As a result, this one needs to be noticed.

handle(..)
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
Copy the code
  • Function: When the result of a CompletableFuture is completed, or when an exception is thrown, the result can be handled by the handle method
CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "Result of execution :" + (100 / 0))
                .thenApply(s -> "apply result:" + s)
                .exceptionally(e -> {
                    System.out.println("ex:" + e.getMessage()); //java.lang.ArithmeticException: / by zero
                    return "futureA result: 100";
                })
                .handle((s, e) -> {
                    if (e == null) {
                        System.out.println(s);//futureA result: 100
                    } else {
                        System.out.println(e.getMessage());/ / not executed
                    }
                    return "handle result:" + (s == null ? "500" : s);
                });
System.out.println(futureA.join());//handle result:futureA result: 100
Copy the code

From the console, we can see that the final print is Handle result:futureA result: 100. The execution of “exceptionally” refines the exception and returns the default value. Then “handle” gets a normal return.

CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "Result of execution :" + (100 / 0))
                .thenApply(s -> "apply result:" + s)
                .handle((s, e) -> {
                    if (e == null) {
                        System.out.println(s);/ / not executed
                    } else {
                        System.out.println(e.getMessage());//java.lang.ArithmeticException: / by zero
                    }
                    return "handle result:" + (s == null ? "500" : s);
                })
                .exceptionally(e -> {
                    System.out.println("ex:" + e.getMessage()); / / not executed
                    return "futureA result: 100";
                });
System.out.println(futureA.join());//handle result:500
Copy the code

From the console output, we can see that the handle was executed first, prints an exception, and has the default value of 500. Exceptionally, it is not executed. This is because it gets the value that the handle returns

1. Handle returns a value,whenComplete does not return a value 2. The presence of 1 gives the Handle an exceptionally feature that enables exceptionally functionality in the Handle

allOf(..)  anyOf(..)
public static CompletableFuture<Void>  allOf(CompletableFuture
       ... cfs)
public static CompletableFuture<Object>  anyOf(CompletableFuture
       ... cfs)
Copy the code
  • AllOf: Performs the calculation when all CompletableFutures have been executed
  • AnyOf: Performs the calculation after the fastest CompletableFuture has finished

Scenario 2: To query the details of a product, you need to check the product information, seller information, inventory information, order information, etc. These queries are independent of each other, assuming that each query takes one to two seconds on different services, and the overall query time is required to be less than two seconds.

public static void main(String[] args) throws Exception {

        ExecutorService executorService = Executors.newFixedThreadPool(4);

        long start = System.currentTimeMillis();
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + RandomUtils.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Commodity Details";
        },executorService);

        CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + RandomUtils.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Seller information";
        },executorService);

        CompletableFuture<String> futureC = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + RandomUtils.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Inventory Information";
        },executorService);

        CompletableFuture<String> futureD = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + RandomUtils.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Order Information";
        },executorService);

        CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureA, futureB, futureC, futureD);
        allFuture.join();

        System.out.println(futureA.join() + futureB.join() + futureC.join() + futureD.join());
        System.out.println("Total time :" + (System.currentTimeMillis() - start));
    }
Copy the code