• You have one thought, I have one thought, and when we exchange, one person has two thoughts

  • If you can NOT explain it simply, you do NOT understand it well enough

Now the Demo code and technical articles are organized together Github practice selection, convenient for everyone to read and view, this article is also included in this, feel good, please also Star🌟


preface

In the last article, I did not use Java Future, but I doubt you can make tea as quickly as I did. The Future can be used to obtain the execution result of the thread. Although it solves the “three noes” of Runnable, it still has its own shortcomings:

Calculations cannot be completed manually

Suppose you use the Future running child thread to call a remote API to get the latest price for a product. The server is down due to a flood. If you want to manually end the calculation and return the price from the last cache, Future can’t do that

Calling the get() method blocks the program

The Future does not notify you of its completion. It provides a get() method that blocks until the result is available. There is no way to append to the Future with a callback function and call it automatically when the result is available

Cannot chain execution

In boiling water and making tea, the chain execution of multiple tasks can be achieved through the constructor parameter passing. In case there are more tasks, or the execution order of the task chain changes, the impact on the original program is very large

It is cumbersome to integrate the results of multiple futures

If multiple Futures are executing in parallel, you need to perform follow-up operations after all of these tasks are completed. You need to use the tool Executors to do so

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
Copy the code

No exception handling

Future also does not provide good exception handling


In the last article, I saw Future as discovering a new world, which makes me feel like I’m back before liberation


For those of you on the Java back end, ReactiveX should be a common solution for asynchronous programming prior to Java1.8, and for those of you who do Android, it should be familiar. If you’re familiar with the front end, ES6 Promises also solve asynchronous programming problems

Languages all over the world are learning from each other’s corresponding advantages, and Java, as an established player, naturally has to solve the above problems. In Java1.8, a new concurrency tool class, CompletableFuture, has been added to help you make a cup of tea. Taste a different taste……

Several important Lambda functions

CompletableFuture is available in Java1.8, so it naturally has to hitch a ride with Lambda. To better understand CompletableFuture, I need to introduce a few Lambda functions. We just need to focus on the following:

  • Parameter acceptance form
  • Return value form
  • The name of the function

Runnable

Runnable we’ve said countless times, no arguments, no return value

@FunctionalInterface
public interface Runnable {
    public abstract void run(a);
}
Copy the code

Function

Function<T, R> takes an argument and returns a value

@FunctionalInterface
public interface Function<T.R> {
    R apply(T t);
}
Copy the code

Consumer

The Consumer takes one argument and returns no value

@FunctionalInterface
public interface Consumer<T> {   
    void accept(T t);
}
Copy the code

Supplier

Supplier has no arguments and has a return value

@FunctionalInterface
public interface Supplier<T> {
    T get(a);
}
Copy the code

BiConsumer

BiConsumer<T, U> takes two arguments (Bi) and returns no value

@FunctionalInterface
public interface BiConsumer<T.U> {
    void accept(T t, U u);
Copy the code

All right, let’s do a little summary


Now, some of you might be wondering, why do I care about these functional interfaces, because the function name of The CompletableFuture and what it does are highly dependent on these functional interfaces, as you’ll see in a minute


With foreplay done, I can finally get down to business CompletableFuture

CompletableFuture

Class structure

As usual, let’s start with the class structure:


The Future interface is implemented

If you implement the Future interface, then you have the related features of the Future interface. Please imagine the poor 5 methods of Future, which will not be described here. Please check that you do not use Java Future

The CompletionStage interface is implemented

The interface CompletionStage is still quite strange. The literal translation in Chinese is “CompletionStage”. If boiling water and making tea is compared to a big project, their CompletionStage is different


  1. Looking at thread 1 or thread 2 is a serial relationship, doing one step and then doing the next

  2. If we look at thread 1 and thread 2, they’re parallel to each other, they’re doing things that are independent and complementary to each other

  3. Tea making is the summary/combination of thread 1 and thread 2, that is, the stage can be reached only after both thread 1 and thread 2 are completed (of course, there are also scenarios where either thread 1 or thread 2 can start the next stage upon completion).

So, the CompletionStage interface does just that, and all the functions describe the sequence of tasks, summed up like this:


CompletableFuture, since it implements two interfaces, will naturally implement corresponding methods to take full advantage of its interface properties, so let’s go into its methods and take a look


CompletableFuture has about 50 different ways of handling serialization, parallelism, composition, and handling errors. Younger brother screen is not up to the mark, the method of many, a screen can not be installed, see so many methods, is not the instant to direct collection — > eat gray 2 even walk? Don’t worry, we’ve sorted them by name and function, and we’ve got over 50 ways to do it in a minute


Serial relationship

The words after then (e.g. Run /apply/accept) are the names of the abstract methods used by the functional interfaces

CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
  
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)  CompletableFuture<Void> thenAccept(Consumer<? super T> action) CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)  <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) Copy the code

Aggregation And relation

combine… with… And to both… and… So we’re going to have to satisfy both of them, which is the relationship between and

<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

<U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) <U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)  CompletableFuture<Void> runAfterBoth(CompletionStage
                other, Runnable action) CompletableFuture<Void> runAfterBothAsync(CompletionStage
                 other, Runnable action) CompletableFuture<Void> runAfterBothAsync(CompletionStage
                  other, Runnable action, Executor executor) Copy the code

Aggregation Or relation

Either… or… Or, Or, Or, Or, Or, Or

<U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
<U> CompletableFuture<U> applyToEitherAsync(, CompletionStage <? extends T> other, Function<?super T, U> fn)
<U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)

CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)  CompletableFuture<Void> runAfterEither(CompletionStage
                other, Runnable action) CompletableFuture<Void> runAfterEitherAsync(CompletionStage
                 other, Runnable action) CompletableFuture<Void> runAfterEitherAsync(CompletionStage
                  other, Runnable action, Executor executor) Copy the code

Exception handling

CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn)
CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor)
        
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)   <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) Copy the code

This exception handling can seem intimidating, but the traditional try/catch/finally comparison makes sense


You can see the difference between whenComplete and handle if you look at the functional interface names accepted as arguments. WhenComplete uses Comsumer, there is no return value. The latter uses Function, which naturally returns a value

Not all of them are listed here, but I believe that many students have found the rules:

All the callback methods provided by CompletableFuture have two Async variants, both of which look like this

// a variant of thenApply()
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
Copy the code

In addition, the names of the methods are exactly the same as the functional interfaces described in foreplay. After sorting through the rules, doesn’t the 50-plus methods look easy?


Now that the basic methods have been listed, let’s use some examples to demonstrate them in practice:

Case presentation

Create a CompletableFuture object

So there’s nothing unusual about creating a CompletableFuture object, it’s still built from a constructor, right

CompletableFuture<String> completableFuture = new CompletableFuture<String>();
Copy the code

This is the simplest way to create a CompletableFuture object, and since it implements the Future interface, it’s natural to get the result using the get() method

String result = completableFuture.get();
Copy the code

As stated at the beginning of this article, the get() method will block until the task ends, and since the Future created above does not return, calling get() here will permanently block


We need to manually terminate a Future by calling the complete() method

completableFuture.complete("Future's Result Here Manually");
Copy the code

At this point, all clients waiting for the Future will return the specified result of manual termination

runAsync

Use runAsync for asynchronous computations

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
 }  System.out.println("Running in a single thread."); });  future.get(); Copy the code

Since we use Runnable functional expressions, we don’t get results either


supplyAsync

The supplyAsync() method is used to retrieve the result of an asynchronous computation

  CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
   try {
    TimeUnit.SECONDS.sleep(3);
   } catch (InterruptedException e) {
    throw new IllegalStateException(e);
 }  log.info("Running in a single thread.");  return "I have a return value.";  });   log.info(future.get()); Copy the code

Because the Supplier functional expression is used, the result is naturally returned


As we have stated several times, the get() method is blocked until the Future is evaluated. For truly asynchronous processing, we would like to pass in a callback function that is automatically called when the Future ends, so that we don’t have to wait for the result

CompletableFuture<String> comboText = CompletableFuture.supplyAsync(() -> {
    // You can comment it out to quickly return to start
   try {
    TimeUnit.SECONDS.sleep(3);
   } catch (InterruptedException e) {
 throw new IllegalStateException(e);  }  log.info("👍");  // You can comment it out to quickly return end  return "Praise";  })  .thenApply(first -> {  log.info("Looking at");  return first + "Looking at";  })  .thenApply(second -> second + Forward ",");   log.info("Third company?");  log.info(comboText.get()); Copy the code

The call to thenApply does not block the application from printing the log via a callback notification mechanism. Here you see thenApply using supplyAsync as the thread used by supplyAsync.


ThenApply is using the main thread, so:

Successive operations do not necessarily use the same thread as the preceding operations

thenAccept

If you don’t want to return any results from the callback function, you can use thenAccept

  final CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(
    // Simulates a remote API call, where only one constructed object is returned
    () -> Product.builder().id(12345L).name("Cervical vertebra/Lumbar spine treatment apparatus").build())
    .thenAccept(product -> {
     log.info("Get remote API product name" + product.getName());
 });  voidCompletableFuture.get(); Copy the code

thenRun

ThenAccept can get the result of the prior execution from the callback function, but thenRun cannot because its callback functional expression definition has no arguments

CompletableFuture.supplyAsync(() -> {
    // Preorder operations
}).thenRun(() -> {
    // Serial afteroperation with no arguments and no return value
});
Copy the code

As we also mentioned earlier, every function that provides a callback method has two Async variants, where Async is a separate thread

  CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
   log.info("Preorder operation");
   return "Previous required operation results";
  }).thenApplyAsync(result -> {
   log.info("Subsequent Operations");
 return "Subsequent Operation Results";  }); Copy the code

At this point, I believe you are very familiar with the serial operation

thenCompose

In everyday tasks, methods are usually defined to return the CompletableFuture type, which gives more leeway for subsequent operations, if there is a business like this. :

// Get user details
 CompletableFuture<User> getUsersDetail(String userId) {
  return CompletableFuture.supplyAsync(() -> User.builder().id(12345L).name("A soldier of the Sun arch").build());
 }

 // Get user credit rating  CompletableFuture<Double> getCreditRating(User user) {  return CompletableFuture.supplyAsync(() -> CreditRating.builder().rating(7.5).build().getRating());  } Copy the code

At this point, if we still use thenApply() to describe the serial relationship, the result returned by the CompletableFuture will be nested

  CompletableFuture<CompletableFuture<Double>> result = completableFutureCompose.getUsersDetail(12345L)
    .thenApply(user -> completableFutureCompose.getCreditRating(user));
Copy the code

This is obviously not what we want, and the thenCompose method comes in handy if we want to “pat” the result back

CompletableFuture<Double> result = completableFutureCompose.getUsersDetail(12345L)
    .thenCompose(user -> completableFutureCompose.getCreditRating(user));
Copy the code

This works the same way as Lambda maps and flatmaps

thenCombine

ThenCombine comes in handy if you want to aggregate the results of two independent futures

  CompletableFuture<Double> weightFuture = CompletableFuture.supplyAsync(() -> 65.0);
  CompletableFuture<Double> heightFuture = CompletableFuture.supplyAsync(() -> 183.8);
  
  CompletableFuture<Double> combinedFuture = weightFuture
    .thenCombine(heightFuture, (weight, height) -> {
 Double heightInMeter = height/100;  return weight/(heightInMeter*heightInMeter);  });   log.info("Body mass index -" + combinedFuture.get()); Copy the code

Of course, most of the time we’re dealing with two futures, but if there are more than two futures, how do we deal with some of their aggregate relationships?

AllOf | anyOf

If you see the signature of the method, you already understand its use, so I won’t introduce it here

static CompletableFuture<Void>  allOf(CompletableFuture
       ... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture
        ... cfs)
Copy the code

The next step is handling the exception

exceptionally

  Integer age = -1;

  CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
   if( age < 0 ) {
    throw new IllegalArgumentException("Who?");
 }  if(age > 18) {  return "We're all adults.";  } else {  return "No minors allowed.";  }  }).thenApply((str) -> {  log.info("Game on.");  return str;  }).exceptionally(ex -> {  log.info("There's gotta be a catch, here." + ex.getMessage());  return "Unknown!";  });   log.info(maturityFuture.get()); Copy the code

Exceptionally is a catch. If an exception occurs, thenApply will be skipped and the exception will be caught directly

handle

With multithreading, it is a good practice to use the try/ Finally paradigm. Handle can act as finally and make a small change to the above program. Handle accepts two arguments, one a normal return value and one an exception

Note: The writing of handle is also a form of paradigm

  Integer age = -1;

  CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
   if( age < 0 ) {
    throw new IllegalArgumentException("Who?");
 }  if(age > 18) {  return "We're all adults.";  } else {  return "No minors allowed.";  }  }).thenApply((str) -> {  log.info("Game on.");  return str;  }).handle((res, ex) -> {  if(ex ! =null) {  log.info("There's gotta be a catch, here." + ex.getMessage());  return "Unknown!";  }  return res;  });   log.info(maturityFuture.get()); Copy the code

Now that you’ve covered the basics of using CompletableFuture, I don’t know if you’ve noticed that the method we mentioned earlier with Sync executes on a single thread, but we didn’t create a thread. How does that work?


If you look closely at the third method of each variant, you may find that it contains an Executor parameter that specifies a thread pool. In the real business, we are strictly creating threads manually. Why use a thread pool? Clearly stated in the article; If no thread pool is specified, there is naturally a default thread pool, ForkJoinPool

private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
Copy the code

The default number of threads in a ForkJoinPool is the CPU core. However, it was clearly stated in the preface article:

Do not share a thread pool with all services, because once a task performs some slow I/O operations, all threads in the thread pool will block the I/O operations, resulting in thread hunger, which affects the performance of the entire system

conclusion

CompletableFuture method is not completely introduced, there is no need to introduce all, I believe that everyone according to this idea to understand CompletableFuture will not be any big problem, the rest is left to practice/time and their own experience

Afterword.

You think JDK1.8 CompletableFuture is already perfect, but the road to perfection is never ending, Java 9 CompletableFuture has been partially upgraded and revamped

  1. New factory methods have been added

  2. Supports delay and timeout handling

    orTimeout()
    completeOnTimeout()
    Copy the code
  3. Improved support for subclasses

Java 9 CompletableFuture API Improvements. How can I quickly switch between Different Java versions to try something new? SDKMAN gives you a unified and flexible way to manage multiple versions of Java in this article

Finally, let’s make a pot of tea and feel the new change

Soul asking

  1. ForkJoinPool thread pools are more efficient. Why?
  2. Are there any alternatives available for batch processing asynchronous programs?

reference

  1. Java concurrent programming practice
  2. The art of Concurrent programming in Java
  3. The beauty of Concurrent programming in Java
  4. https://www.baeldung.com/java-completablefuture
  5. https://www.callicoder.com/java-8-completablefuture-tutorial/