preface

Right now you’re uploading a video of your life, but it’s too big, so you start a new thread to upload it, so you can do something else, but you have to check it every few seconds to see if it’s done.

public class CompletableFutureTest {
    private static ExecutorService executor = Executors.newSingleThreadExecutor();
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Callable<String> callable = () -> {
            int progress = 0;
            while (progress <= 100) {
                int i = new Random().nextInt(10);
                progress += i;
                System.out.println(progress + "%");
                try {
                    Thread.sleep(50);
                } catch(InterruptedException e) { e.printStackTrace(); }}return "http://www.xxxx.mp4"; }; Future<? > submit = executor.submit(callable);while(! submit.isDone()) {// Do something else
        }
        System.out.println("Upload completed"+ submit.get()); }}Copy the code

And then you realize that it’s cumbersome and inefficient, so you start looking for something more convenient, and finally you find CompletableFuture, and CompletableFuture solves something that Future can’t solve, and it can call back asynchronously, and it’ll notify you when it’s done, No longer need you to ask, from now on, everything is easy.

public class CompletableFutureTest {
    public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            int progress = 0;
            while (progress <= 100) {
                int i = new Random().nextInt(10);
                progress += i;
                System.out.println(progress + "%");
                try {
                    Thread.sleep(50);
                } catch(InterruptedException e) { e.printStackTrace(); }}return "http://www.xxxx.mp4";
        });
        completableFuture.thenAccept(s -> System.out.println("Upload completed"+s));
        while(! completableFuture.isDone()){ } } }Copy the code

CompletableFuture profile

So what exactly is CompletableFuture?

Asynchronous programming is a way of writing non-blocking code by running a task on a different thread than the main thread and notifying the main thread of progress, completion, or failure. This way, the main thread doesn’t block/wait for tasks to complete and can execute other tasks in parallel. Having this parallelism greatly improves the performance of the program, so the CompletableFuture is used for asynchronous programming in Java.

CompletableFuture has emerged to solve asynchronous problems in a more elegant way.

SupplyAsync, runAsync

The CompletableFuture is built using the supplyAsync() method, which indicates that the asynchronous task has a return value. If there is no return value, the runAsync() method can be used, as follows:

 CompletableFuture
         .runAsync(() -> System.out.println(Thread.currentThread().getName()))
         .thenAccept(unused -> System.out.println(Thread.currentThread().getName()+"Complete"));

Copy the code
/ / output
ForkJoinPool.commonPool-worker-1The main completedCopy the code

If there is a return value, there must be a method to get it, which, like Future, is get().

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "string");

System.out.println(completableFuture.get());
Copy the code

thenAccept

So when you’re at the end of a task and you want to get its return value and do something with it, you can use thenAccept in the CompletableFuture, and its return value is still CompletableFuture, and it’s called multiple times, and it gets called back.

 CompletableFuture
         .runAsync(() -> System.out.println("Deal with tasks"+Thread.currentThread().getName()))
         .thenAccept(unused -> System.out.println("1"))
         .thenAccept(unused -> System.out.println("Complete 2"))
         .thenAccept(unused -> System.out.println("Complete 3"))
         .thenAccept(unused -> System.out.println("4"));
Copy the code

ThenAccept () can access the results of CompletableFuture, and thenRun(), which calls Runnable synchronously after an asynchronous task has been executed, cannot.

The thenAccept() method simply consumes the results of the previous asynchronous task synchronously.

Combine two CompletableFutures together (chain execution)

When A demand need two interfaces to complete, we would put them together, for example, to get the list of user information the same as the user A hobby, you first need to retrieve the user A hobby from A server, and then from B server access to users of the same hobbies, the value of the first phase of the second phase required at this time.


public class Move {
public static void main(String[] args) {

    CompletableFuture<CompletableFuture<List<String>>> result = getUsersHobby("userId")
            .thenApply(s -> getHobbyList(s));
    try {
        System.out.println(result.get().get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch(ExecutionException e) { e.printStackTrace(); }}static CompletableFuture<String> getUsersHobby(String userId) {
    return CompletableFuture.supplyAsync(() -> {
        // Initiate a network request to get the userId's hobby
        return "Basketball";
    });
}

static CompletableFuture<List<String>> getHobbyList(String hobby) {
    // Hobby is the result of the asynchronous processing of getUsersHobby()
    return CompletableFuture.supplyAsync(() -> {
        // Initiate a network request to obtain all users with a hobby
        return Arrays.asList("Zhang"."Bill"); }); }}Copy the code

Using thenApply() results in nested results, and we have to call get twice to get the final value, but if we want the end result to be a top-level Future, thenCompose() can be used instead

public static void main(String[] args) {
    CompletableFuture<List<String>> result = getUsersHobby("userId")
            .thenCompose(s -> getHobbyList(s));
    try {
        System.out.println(result.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}
Copy the code

The thenApply method simply continues processing the results of the previous asynchronous task synchronously, and of course the thenApplyAsync() method asynchronously processes the previous asynchronous task.

The CompletableFuture is evaluated again after both completableFutures run

This scenario applies when A and B get the value asynchronously and use A and B to calculate C. For example, to obtain the hobby intersection of two users, first obtain the hobby of user A, then obtain the hobby of user B, and then calculate the intersection.


 public static void main(String[] args) throws ExecutionException, InterruptedException {
     CompletableFuture<List<String>> aUser = CompletableFuture.supplyAsync(() -> {
         return Stream.of("Basketball"."Badminton").collect(Collectors.toList());
     });
     CompletableFuture<List<String>> bUser = CompletableFuture.supplyAsync(() -> {
         return Stream.of("Basketball"."Volleyball").collect(Collectors.toList());
     });
     CompletableFuture<List<String>> combinedFuture = aUser
             .thenCombine(bUser, (aHobby, bHobby) -> {
                 aHobby.retainAll(bHobby);
                 return new ArrayList<>(aHobby);
             });
     System.out.println( combinedFuture.get());
 }
Copy the code

Another method is runAfterBoth(), which executes Runnable synchronously or asynchronously after both tasks are complete.

 public CompletableFuture<Void> runAfterBoth(CompletionStage
        other, Runnable action)
                                              
 public CompletableFuture<Void> runAfterBothAsync(CompletionStage
        other, Runnable action)
Copy the code

Combine multiple CompletableFutures together

The problem with allOf() is that it returns CompletableFuture

, indicating that there is no result set, But we can get the result of all wrapped CompletableFutures by writing a few extra lines of code


 public static void main(String[] args) throws ExecutionException, InterruptedException {
     CompletableFuture<Void> completableFuture = CompletableFuture.allOf(getUserName("1"), 
             getUserName("2"),
             getUserName("3")).thenAccept(new Consumer<Void>() {
         @Override
         public void accept(Void unused) {
             System.out.println("Complete"); }}); completableFuture.join(); }static CompletableFuture<String> getUserName(String userId) {
     return CompletableFuture.supplyAsync(() -> {
         try {
             Thread.sleep(new Random().nextInt(1000));
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         return "Users -" + userId;
     });
 }
Copy the code

If you want to run one of a batch of tasks, you can use the anyOf() method, which will pick one of them and execute it.

CompletableFuture Exception handling

If an error occurs in the supplyAsync() task, then no callbacks to thenApply() are called. If an error occurs in the first thenApply() callback, The second and subsequent ones are not called, and so on.

CompletableFuture.supplyAsync(() -> {
	// Code which might throw an exception
	return "Some result";
}).thenApply(result -> {
	return "processed result";
}).thenApply(result -> {
	return "result after further processing";
}).thenAccept(result -> {
	// do something with the final result
});
Copy the code

Exceptionly () callback is used to handle exceptions

If the program wants to catch errors encountered during an asynchronous task, it can be exceptionally handled and returns a default value.

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> maturityFuture = CompletableFuture.supplyAsync(() -> {
        return  1/0;
    }).exceptionally(ex -> {
        ex.printStackTrace();
       return 1;
    });
    System.out.println(maturityFuture.get());
}
Copy the code