preface

Recently I read the code of the company and found that multi-threaded programming is used a lot, including the use of CompletableFuture, so I want to write an article summarizing it

In everyday Java8 project development, CompletableFuture is a powerful parallel development tool. Its syntax is close to Java8’s syntax style, and it can be used with Stream to greatly increase code simplicity

You can apply it to your work to improve interface performance and optimize code

The article was first published on an official account (Yuebanfeiyu), and then synchronized to the personal website: xiaoflyfish.cn/

Feel there is harvest, hope to help like, forward ha, thank you, thank you

Basic introduction

CompletableFuture is a new class in Java 8 for asynchronous programming that inherits the Future and CompletionStage

This Future mainly has the function of processing the result of the request independently. CompletionStage is used to realize streaming processing and realize the combination or chain processing of each stage of the asynchronous request. Therefore, completableFuture can realize the flattening and streaming processing of the whole asynchronous call interface. Solve the complex encoding of the old Future when handling a series of chained asynchronous requests

Limitations of Future

1. The result of a Future cannot perform further operations without blocking

As we know, a Future can only be determined by isDone() or blocked by get() to wait for the result to return. It cannot perform further operations without blocking.

2. You cannot combine the results of multiple futures

Suppose you have multiple asynchronous Future tasks, and you want to do something else when the fastest task is done, or when all the tasks are done

3. Multiple futures cannot be chained

When there are dependencies between asynchronous tasks, futures cannot pass the results of one task to another asynchronous task, and multiple Futures cannot create chained workflows.

4. No exception handling

Using CompletableFuture now helps us do all of this, allowing us to write more powerful and elegant asynchronous programs

The basic use

Creating an Asynchronous Task

You can usually create an asynchronous task using the following static methods for CompletableFuture

public static CompletableFuture<Void> runAsync(Runnable runnable);              // Create an asynchronous task with no return value
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);     // No return value can be used to specify a thread pool (ForkJoinPool.commonPool by default)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);           // Create an asynchronous task with a return value
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); // The thread pool can be specified
Copy the code

Example:

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    //do something
}, executor);
int poiId = 111;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
 PoiDTO poi = poiService.loadById(poiId);
  return poi.getName();
});
// Block and get the result of the Future
String poiName = future.get();
Copy the code

Use the callback method

Using the future.get() method to retrieve the result of an asynchronous task will still block waiting for the task to complete

CompletableFuture provides several callback methods that automatically execute the code in the callback method after the asynchronous task completes without blocking the main thread

public CompletableFuture<Void> thenRun(Runnable runnable);            // No argument, no return value
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);         // Accepts arguments with no return value
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn); // accept argument T, return value U
Copy the code

Example:

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
                           .thenRun(() -> System.out.println("Do other things. Like print logs or send messages asynchronously."));
// If you just want to do some subsequent processing after a CompletableFuture task has finished, without returning a value, you can use thenRun callback method to do this.
// If the main thread does not depend on the code in thenRun to complete, there is no need to block the main thread with the get() method.
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
                           .thenAccept((s) -> System.out.println(s + " world"));
// Output: Hello world
// If the callback method wants to use the result of an asynchronous task and does not need to return a value, thenAccept can be used
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
  PoiDTO poi = poiService.loadById(poiId);
  return poi.getMainCategory();
}).thenApply((s) -> isMainPoi(s));   // boolean isMainPoi(int poiId);

future.get();
// If you want to further process the result of an asynchronous task and need to return a value, use thenApply.
// If the main thread wants to get the return of the callback method, it still blocks with get()
Copy the code

Combine two asynchronous tasks

// The asynchronous task in thenCompose depends on the asynchronous task that calls the method
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); 
// Used when two independent asynchronous tasks are completed
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, 
                                              BiFunction<? super T,? super U,? extends V> fn); 
Copy the code

Example:

CompletableFuture<List<Integer>> poiFuture = CompletableFuture.supplyAsync(
  () -> poiService.queryPoiIds(cityId, poiId)
);
// The second task is to return an asynchronous method for CompletableFuture
CompletableFuture<List<DealGroupDTO>> getDeal(List<Integer> poiIds){
  return CompletableFuture.supplyAsync(() ->  poiService.queryPoiIds(poiIds));
}
//thenCompose
CompletableFuture<List<DealGroupDTO>> resultFuture = poiFuture.thenCompose(poiIds -> getDeal(poiIds));
resultFuture.get();
Copy the code

ThenCompose and thenApply have similar functions, except that thenCompose accepts a Function that returns the CompletableFuture. When you want to get the result U directly from the CompletableFuture returned by the callback method, Use thenCompose

If you use thenApply, the resultFuture type is CompletableFuture

>>, Rather than CompletableFuture < List < DealGroupDTO > >

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
  .thenCombine(CompletableFuture.supplyAsync(() -> "world"), (s1, s2) -> s1 + s2);
//future.get()
Copy the code

Combine multiple CompletableFutures

The allOf method can be used when multiple asynchronous tasks are required to complete before further processing

CompletableFuture<Void> poiIDTOFuture = CompletableFuture
 .supplyAsync(() -> poiService.loadPoi(poiId))
  .thenAccept(poi -> {
    model.setModelTitle(poi.getShopName());
    //do more thing
  });

CompletableFuture<Void> productFuture = CompletableFuture
 .supplyAsync(() -> productService.findAllByPoiIdOrderByUpdateTimeDesc(poiId))
  .thenAccept(list -> {
    model.setDefaultCount(list.size());
    model.setMoreDesc("more");
  });
// Future3 and more asynchronous tasks will not be written hereCompletableFuture.allOf(poiIDTOFuture, productFuture, future3, ...) .join();//allOf combines all asynchronous tasks and uses JOIN to get the result
Copy the code

This method is suitable for c-side business, such as asynchronously fetching store information from multiple services through POIIDS, assembling it into the model it needs, and finally returning it when all store information is filled

The join method is used to get the results, which blocks like the GET method, waiting for the task to complete

The anyOf method can be used to return the result when any one of multiple asynchronous tasks has completed

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
      return "Result of Future 3";
});

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); // Result of Future 2
Copy the code

Exception handling

Integer age = -1;

CompletableFuture<Void> maturityFuture = CompletableFuture.supplyAsync(() -> {
  if(age < 0) {
    throw new IllegalArgumentException("Age can not be negative");
  }
  if(age > 18) {
    return "Adult";
  } else {
    return "Child";
  }
}).exceptionally(ex -> {
  System.out.println("Oops! We have an exception - " + ex.getMessage());
  return "Unknown!";
}).thenAccept(s -> System.out.print(s));
//Unkown!
Copy the code

The exceptionally method handles exceptions for asynchronous tasks and, if an exception occurs, gives the asynchronous task chain a chance to recover from the error by logging the exception or returning a default value

Exceptions can also be handled using the handler method, and it will be called whether or not an exception occurs

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).handle((res, ex) -> {
    if(ex ! =null) {
        System.out.println("Oops! We have an exception - " + ex.getMessage());
        return "Unknown!";
    }
    return res;
});
Copy the code

Shard processing

Sharding and parallel processing: Sharding is implemented with a Stream, then executed in parallel with the CompletableFuture, and finally aggregated (which is also a Stream method)

CompletableFuture does not provide a separate sharding API, but can be implemented with the help of Stream’s sharding aggregation feature

Here’s an example:

// Batch asynchronous processing when too many items are requested
List<List<Long>> skuBaseIdsList = ListUtils.partition(skuIdList, 10);/ / shard
/ / parallel
List<CompletableFuture<List<SkuSales>>> futureList = Lists.newArrayList();
for (List<Long> skuId : skuBaseIdsList) {
  CompletableFuture<List<SkuSales>> tmpFuture = getSkuSales(skuId);
  futureList.add(tmpFuture);
}
/ / the aggregation
futureList.stream().map(CompletalbleFuture::join).collent(Collectors.toList());
Copy the code

For example

Take a look at the advantages of asynchronous programming CompletableFuture

Here we implement the water tea program with CompletableFuture

First of all, we still need to complete the division of labor plan. In the following program, we have three tasks:

  • Task 1: Wash the kettle and boil water
  • Task 2: Wash the teapot, cup and tea
  • Task 3: Make tea. Task 3 can be started only after task 1 and task 2 are completed

Here is the code implementation. If you skip the less familiar methods runAsync(), supplyAsync(), thenCombine(), you can see the big picture:

  1. There is no need to manually maintain threads, there is no tedious work of manually maintaining threads, and the work of assigning threads to tasks does not need our attention;
  2. The semantics are clearer, for examplef3 = f1.thenCombine(f2, ()->{})Be able to articulateTask 3 should wait for task 1 and Task 2 to complete before starting;
  3. The code is more concise and focused on business logic, and almost all of the code is business logic related
// Task 1: Wash the kettle -> boil water
CompletableFuture f1 = 
  CompletableFuture.runAsync(()->{
  System.out.println("T1: Wash the kettle...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T1: Boil water...");
  sleep(15, TimeUnit.SECONDS);
});
// Task 2: Wash the teapot -> wash the teacup -> get the tea
CompletableFuture f2 = 
  CompletableFuture.supplyAsync(()->{
  System.out.println("T2: Wash the teapot...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T2: Wash the teacup...");
  sleep(2, TimeUnit.SECONDS);

  System.out.println("T2: Get the tea...");
  sleep(1, TimeUnit.SECONDS);
  return "Longjing";
});
// Task 3: Make tea after task 1 and Task 2 are completed
CompletableFuture f3 = 
  f1.thenCombine(f2, (__, tf)->{
    System.out.println(T1: Get the tea: + tf);
    System.out.println("T1: tea...");
    return "Tea." + tf;
  });
// Wait for the result of task 3
System.out.println(f3.join());

void sleep(int t, TimeUnit u) {
  try {
    u.sleep(t);
  }catch(InterruptedException e){}
}
// Result of a single execution:T1: Wash the kettle... T2: Wash the teapot... T1: Boil water... T2: Wash the teacups... T2: Get the tea... T1: Get tea: Longjing T1: Make tea... Serving tea: LongjingCopy the code

Matters needing attention

1.CompletableFuture Whether the default thread pool meets the requirement

As mentioned earlier, static methods such as runAsync and supplyAsync to create an asynchronous task for The CompletableFuture can specify the thread pool to use, or the default thread pool for the CompletableFuture if not specified

private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
Copy the code

As you can see, the default thread pool for the CompletableFuture is created by calling ForkJoinPool commonPool(). The number of core threads in the pool is determined by the number of CPU cores. The formula is runtime.getruntime ().availableProcessors() -1. Take a 4-core dual-slot CPU as an example, the number of core processors is 4*2-1=7

This setting is suitable for CPU-intensive applications, but it is risky for IO-intensive applications. When the QPS is high, the number of threads may be set too low, resulting in online failures

Therefore, thread pools can be customized according to the business situation

2. Get The timeout period cannot be set in serial get. Otherwise, the interface will delayNumber of threads \* Timeout time

The last

It is not easy to write articles and draw pictures. If you like them, I hope you can help me to praise them and forward them. Thanks

Month with flying fish, make a friend