Introduction to this Article

  • This section describes service requirement scenarios
  • Technical design scheme thinking
  • Future design pattern practice
  • CompletableFuture mode combat
  • CompletableFuture production suggestion
  • CompletableFuture Performance test
  • CompletableFuture uses extensions

1. Introduction of service requirement scenarios

What is constant is always changing.

Presumably, in your spare time, you will often watch videos and use several apps, such as Youku, IQiyi and Tencent.

These video apps can play not only on your phone, but also on your TV.

The APP played on the TV terminal is a independently released version, which is different from the APP on the mobile phone.

When we watch a movie, we click on a movie and we go to the album details page. At this point, the player will play the video automatically. The album details page you see on your mobile phone is different from the album details page you see on your TV.

So let’s see what that looks like visually.

Tencent video album details page on mobile phone:

Screenshots of the first half, below there are recommendations for you, star actors, peripheral recommendations, comments and other functions.

Accordingly, the album details page is displayed differently on TV. Suppose the product manager makes a request for a change to the details page. The style requirements are shown below:

A comparison of the styles of the two terminals, in the TV side album details page, contains many plates, each plate horizontally displays multiple content.

Product design requirements are, some plate content from recommendation, some plate from search, some plate from CMS (content management system). In a nutshell, each board has a different content source, and the content requirements from the recommendation and search interfaces are near real-time requests.

2. Thinking of technical design scheme

Considering the requirements of the product, it is not difficult to implement.

It is mainly divided into the static data part and the dynamic data part. The data that does not change often can be obtained through the static interface, and the nearly real-time data can be obtained through the dynamic interface.

Static interface design:

The attributes of the album itself, as well as the video data under the album, generally do not change very often. In the requirement scene introduction, I screenshot the movie channel. If it is a DRAMA channel, the list of episodes will be displayed (all videos under albums, e.g., episode 1, Episode 2…). Video updates are generally infrequent, so the episode list data on the album details page can be retrieved from the static interface.

Static interface data generation process:

The other part is the need for dynamic interface to achieve, call third-party interface to obtain data, such as recommendation, search data. At the same time, the content between plates is not allowed to repeat.

Dynamic interface design:

Solution a:

Serial call, that is, according to the sequence of the display of each plate, call the corresponding third-party interface to obtain data.

Scheme 2:

Parallel invocation, that is, multiple blocks can be invoked in parallel to improve the overall interface response efficiency.

In fact, the above two programs, each has advantages and disadvantages.

The first scheme is serial call, which has the advantage of simple development model. The interfaces are called in sequence in serial way, the content and data are removed, and all the data are aggregated and returned to the client.

However, the interface response time depends on the response time of the third-party interface. Generally, the third-party interface is unreliable, which may increase the overall interface response time and lead to a long thread occupation, affecting the overall throughput of the interface.

The parallel invocation of scheme 2 can theoretically improve the overall response time of the interface, assuming that multiple third-party interfaces are called at the same time, depending on the response time of the slowest interface.

When making parallel calls, you need to take into account the “pooling technique,” which means you cannot create too many threads on the JVM process indefinitely. At the same time, we should also take into account the content data between plates and plates, and do heavy according to the sequence of product design.

Based on this requirement scenario, we choose the second solution to implement more appropriate.

Choosing option 2, we abstracted a simple model as shown in the figure below:

T1, T2 and T3 represent multiple plate content threads. T1 thread returns the result first. The result returned by T2 thread cannot be repeated with the result returned by T1 thread, and the result returned by T3 thread cannot be repeated with the result returned by T1 and T2 threads.

From the perspective of technical implementation, when multiple third-party interfaces are called in parallel, the return result of the interface needs to be obtained. The first thing that comes to mind is the Future, which can realize the asynchronous acquisition of task results.

In addition, JDK8 provides an easy-to-use tool class for CompletableFuture to obtain asynchronous results, which solves some pain points in the use of Future, and realizes combinative asynchronous programming in a more elegant way, as well as functional programming.

3. Future design mode actual combat

Future interface design:

It provides interfaces for obtaining task results, canceling tasks and judging task status. Call the method that gets the result of the task, which blocks the call if the task is not completed.

The Future interface provides the following methods:

V get() throws InterruptedException, ExecutionException; V GET (Long timeout, TimeUnit Unit) throws InterruptedException, ExecutionException, TimeoutException; Boolean isDone(); Boolean isCancelled(); // Check whether the task has been cancelled. // Cancel the task Boolean cancel(Boolean mayInterruptIfRunning);Copy the code

Typically, we use ThreadPoolExecutor or FutureTask to fulfill functional requirements when we consider using a Future to obtain the results of a task.

Interface relationship class diagram of ThreadPoolExecutor, FutureTask, and Future:

TheadPoolExecutor provides three submit methods:

The Runnable interface run() method does not return a value. Public Future<? > submit(Runnable task) { } // 2. Public <T> Future<T> submit(Callable<T> task) {} // 3. Submit (Runnable task, T result) {} Public <T> Future<T> submit(Runnable task, T result) {}Copy the code

The third submit method is used as follows:

Static String x = "static String x "; public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(1); R Result r = new Result(); r.setName(x); Future<Result> Future = executor.submit(new Task(r), r); Result fr = future.get(); System.out.println(fr == r); // System.out.println(fr == r); System.out.println(fr.getName() == x); System.out.println(fr.getNick() == x); } static class Result { private String name; private String nick; / /... ignore getter and setter } static class Task implements Runnable { Result r; // Pass result Task(result r) {this.r = r; } @override public void run() {result String name = r.getName(); r.setNick(name); }}Copy the code

The result is always true.

FutureTask design implementation:

Runnable and Future interfaces are implemented. The implementation of the Runnable interface means that it can be submitted directly to ThreadPoolExecutor for execution as a task object. If the Future interface is implemented, the result of executing a task can be obtained.

Let’s use FutureTask to simulate two threads according to the requirements of the product. Understanding with sample code comments:

Public static void main(String[] args) throws Exception {// Create FutureTask for task T1, FutureTask<String> ft1 = new FutureTask<>(new T1Task); FutureTask<String> ft2 = new FutureTask<>(new T2Task(ft1)); Ft1 Thread T1 = new Thread(ft1); T1.start(); Ft2 Thread T2 = new Thread(ft2); T2.start(); System.out.println(ft2.get())); } // T1Task Calls a recommended interface to obtain data. Static Class T1Task implements Callable<String> {@override public String Call () throws Exception {system.out. println("T1: call the recommended interface to get data..." ); TimeUnit.SECONDS.sleep(1); System.out.println("T1: get recommended interface data... ); TimeUnit.SECONDS.sleep(10); Return "[T1 data] "; Static class T2Task implements Callable<String> {FutureTask<String> ft1; T2Task(FutureTask<String> ft1) {this.ft1 = ft1; } @override public String call() throws Exception {system.out.println ("T2: Call the search interface to obtain data..." ); TimeUnit.SECONDS.sleep(1); System.out.println("T2: get search interface data..." ); TimeUnit.SECONDS.sleep(5); System.out.println("T2: call t1.get () to get recommended data "); String tf1 = ft1.get(); System.out.println("T2: get recommended interface data :" + tf1); System.out.println("T2: reprocess T1 and T2 "); Return "[T1 and T2 plate data aggregation result]"; }}Copy the code

The result is as follows:

> Task: futureTasktest.main () T1: Call the recommendation interface to get data... T2: Call the search interface to get data... T1: Get the recommended interface data... T2: Get the search interface data... T2: call t1.get () interface to obtain recommended data T2: obtain recommended interface data: [T1 plate data] T2: de-process T1 and T2 plate data [aggregation results of T1 and T2 plate data]Copy the code

Summary:

Future, which means “Future”, refers to the task of transferring time-consuming operations to a separate thread. In order to achieve the purpose of asynchrony, the current thread submitting the task, after submitting the task and in the process of obtaining the task result, the current thread can continue to perform other operations, there is no need to wait there to return the execution result.

4, CompleteableFuture mode combat

In the case of the Future design pattern, although we do not enter any blocking when we submit a task, we may block until the task is completed when the caller wants to obtain the execution result of the task.

This has been a problem since the beginning of JDK1.5’s design, and it’s only been perfectly enhanced with the introduction of CompletableFuture in JDK1.8.

In the meantime, Google’s open source Guava toolkit provides ListenableFuture, which supports callbacks when tasks are completed, and you can check the research for yourself.

In the introduction of business requirement scenarios, data sources of different sectors are different, and data dependence exists between sectors.

It can be understood that there is a sequential relationship between tasks, and according to some features provided by CompletableFuture, it is very suitable for this business scenario.

CompletableFuture class diagram:

CompletableFuture implements both the Future and CompletionStage interfaces. The Future interface is implemented to keep track of when asynchronous tasks end and to get the results of their execution. The CompletionStage interface is realized, which provides a very rich function, realizing the serial relation, parallel relation, convergence relation and so on.

CompletableFuture core advantages:

1) There is no need to manually maintain threads and assign threads to tasks without the attention of developers;

2) In terms of usage, the semantics are more clear and definite;

For example: t3 = t1.thenCombine(T2, () -> {// doSomething… } Can explicitly state that task 3 will not be executed until task 2 and task 1 are completed.

3) More concise code, support chain call, let you focus more on business logic.

4) Convenient handling of abnormal situations

Next, CompletableFuture is used to simulate the multi-plate data aggregation processing under the album.

The code looks like this:

Public static void main(String[] args) throws Exception {// List<String> stashList = Lists. NewArrayList (); / / task 1: call recommend interface for data CompletableFuture < String > t1 = CompletableFuture. SupplyAsync (() - > {System. Out. Println (" t1: Get recommended interface data..." ); sleepSeconds(5); Stashlist. add("[T1 plate data]"); Return "[T1 data]"; }); / / task 2: the search interface to get the data CompletableFuture < String >. T2 = CompletableFuture supplyAsync (() - > {System. Out. Println (" t2: Call the search interface to get data..." ); sleepSeconds(3); Return "[T2 plate data] "; }); // Task 3: After task 1 and Task 2 are completed, CompletableFuture<String> t3 = t1.thenCombine(t2, (t1Result, T2Result) -> {system.out. println(t1Result + "and" + t2Result + "); Return "[T1 and T2 plate data aggregation result]"; }); System.out.println(t3.get(6, timeunit.seconds)); // Wait for the execution result of task 3. } static void sleepSeconds(int timeout) { try { TimeUnit.SECONDS.sleep(timeout); } catch (InterruptedException e) { e.printStackTrace(); }}Copy the code

The result is as follows:

T1: > Task: CompletableFutureTest. The main () to obtain recommended interface data... T2: Call the search interface to get data... De-weighting logic processing of [T1 plate data] and [T2 plate data] [Aggregation results of T1 and T2 plate data]Copy the code

The above example code in IDEA to create a Class, directly copy into, you can run normally.

5. CompletableFuture production suggestion

Create a reasonable thread pool:

In a production environment, it is not recommended to use the above sample code form directly. As used in the sample code

 CompletableFuture.supplyAsync(() -> {}); Copy the code

The supplyAsync() method that creates the CompletableFuture object (the factory method pattern used here), the default thread pool used underneath, may not meet the business requirements.

Take a look at the underlying source code:

ForkJoinPool is used by default. Private static Final Executor asyncPool = useCommonPool? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier);  }Copy the code

Create a ForkJoinPool thread pool: The default thread pool size is Runtime.getruntime ().availableProcessors() -1 (CPU cores -1), Can through the JVM parameters – Djava.util.concurrent.ForkJoinPool.com mon. Parallelism setting the thread pool size.

JVM parameter configuration – Djava.util.concurrent.ForkJoinPool.com on mon. ThreadFactory setting thread factory class; Configuration – Djava.util.concurrent.ForkJoinPool.com mon. ExceptionHandler set exception handling Class, after the two parameter Settings, internal will be through the system Class loader to load the Class.

If all CompletableFutures use the default thread pool, any task that performs slow I/O operations will cause all threads to block on I/O operations, affecting overall system performance.

Therefore, it is recommended that you create different thread pools based on different business types in the production environment to avoid interaction.

CompletableFuture also provides additional ways to support thread pools.

Public static <U> CompletableFuture<U> supplyAsync(Supplier<U> Supplier, Supplier) Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); }Copy the code

You are advised to customize a thread pool by referring to the Alibaba Java Development Manual. You are advised to use ThreadPoolExecutor to customize a thread pool, use a bounded queue, and set the queue size based on actual services.

In his book “Java Concurrent Programming in Action,” Brian Goetz offers a number of optimizations for thread pool sizes. If there are too many thread pools competing for CPU and memory resources, a lot of time is spent on context switching. Conversely, if the number of thread pools is too small to take full advantage of CPU multicore.

The ratio of thread pool size to CPU processor utilization can be estimated using the following formula:

Exception handling:

CompletableFuture provides very simple exception handling, including the following methods, which support chained programming.

Catch {} public CompletionStage<T> exceptionally (Function<Throwable,? extends T> fn); Public CompletionStage<T> whenComplete (BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync (BiConsumer<? super T, ? super Throwable> action); Public <U> CompletionStage<U> Handle (BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync (BiFunction<? super T, Throwable, ? extends U> fn);Copy the code

6. CompletableFuture performance test:

The number of cyclic pressure testing tasks is shown as follows. Each time a pressure test is performed, the jobNum data is accumulated and aggregated, and the calculation time is calculated. Statistical dimension: CompletableFuture Default thread pool versus custom thread pool. Performance test code:

AsList (-3, -1, 0, 1, 2, 4, 5, 10, 16, 17, 30, 50, 100, 150, 200, 300).forEach(offset -> { int jobNum = PROCESSORS + offset; System.out.println( String.format("When %s tasks => stream: %s, parallelStream: %s, future default: %s, future custom: %s", testCompletableFutureDefaultExecutor(jobNum), testCompletableFutureCustomExecutor(jobNum))); }); / / CompletableFuture using default ForkJoinPool thread pool private static long testCompletableFutureDefaultExecutor (int jobCount) { List<CompletableFuture<Integer>> tasks = new ArrayList<>(); IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(CompleteableFuturePerfTest::getJob))); long start = System.currentTimeMillis(); int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum(); checkSum(sum, jobCount); return System.currentTimeMillis() - start; } / / CompletableFuture using a custom thread pool private static long testCompletableFutureCustomExecutor (int jobCount) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(200, 200, 5, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("CUSTOM_DAEMON_COMPLETABLEFUTURE"); thread.setDaemon(true); return thread; } }, new ThreadPoolExecutor.CallerRunsPolicy()); List<CompletableFuture<Integer>> tasks = new ArrayList<>(); IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(CompleteableFuturePerfTest::getJob, threadPoolExecutor))); long start = System.currentTimeMillis(); int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum(); checkSum(sum, jobCount); return System.currentTimeMillis() - start; }Copy the code

Test machine configuration: 8-core CPU, 16G memory

Performance test results:

According to the results of the pressure test, the performance of using the default thread pool becomes worse as the number of pressure tasks increases.

7. CompletableFuture uses extensions:

Object creation:

In addition to the supplyAsync method mentioned earlier, CompletableFuture provides the following methods:

// Execute the task CompletableFuture<Void> Public static CompletableFuture<Void> runAsync(Runnable Runnable) {return asyncRunStage(asyncPool, Runnable); } // Execute the task CompletableFuture<Void> Public static CompletableFuture<Void> runAsync(Runnable Runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }Copy the code

In CompletableFuture mode, we mentioned that CompletableFuture implements the CompletionStage interface, which provides very rich functionality.

The CompletionStage interface supports serial relationships, aggregation AND relationships, aggregation OR relationships. The following is a brief description of the interfaces for these relationships, and you can check the JDK API for yourself when using them. At the same time, each method in these relational interfaces provides a corresponding xxxAsync() method that represents asynchronous execution of the task.

Serial relation:

CompletionStage describes the serial relationship, mainly including thenApply, thenRun, thenAccept and thenCompose series interfaces.

The source code is as follows:

U public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); Public CompletionStage<Void> thenRun(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action); Public CompletionStage<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); Public <U> CompletionStage<U> thenCompose (Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn);Copy the code

Convergence AND relation:

CompletionStage describes convergence AND relation, mainly including thenCombine, thenAcceptBoth AND runAfterBoth series interfaces.

The source code looks like this (omitting Async methods) :

// When both the current and the other CompletableFuture are completed, two arguments are passed to fn, which returns the value public <U,V> CompletionStage<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); // When both the current and the other CompletableFuture are complete, two arguments are passed to the action, Public <U> CompletionStage<Void> thenAcceptBoth (CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action); // Execute action public CompletionStage<Void> runAfterBoth(CompletionStage<? > other, Runnable action);Copy the code

Convergence OR relation:

CompletionStage describes the aggregation OR relationship, including applyToEither, acceptEither, and runAfterEither interfaces.

The source code looks like this (omitting Async methods) :

Public <U> CompletionStage<U> applyToEither (CompletionStage<? extends T> other, Function<? super T, U> fn); Public CompletionStage<Void> acceptEither (CompletionStage<? extends T> other, Consumer<? super T> action); Action public CompletionStage<Void> runAfterEither(CompletionStage<? > other, Runnable action);Copy the code

At this point, the relevant features of CompletableFuture are covered.

Asynchronous programming is slowly becoming more mature, and the Java language official website is beginning to support asynchronous programming mode, so it is necessary to learn asynchronous programming.

This paper, driven by business requirements, leads to the actual combat of Future design mode, and then makes a further analysis of how to use CompletableFuture in JDK1.8, core advantages, performance test comparison and use extension.

Hope to help you!

Welcome to pay attention to my public number, scan the TWO-DIMENSIONAL code to pay attention to unlock more wonderful articles, grow up with you ~