Cabbage Java self study room covers core knowledge

1. What is asynchronous programming

Most of the time, we use a single thread in a process to execute the program from beginning to end, and this simple model leads to unacceptable performance and user experience.

For example, the program sends a request to another server. Due to external reasons such as network, such communication task often takes a lot of time. If the process can only wait for the response from the network or other machines on the network during this period, the performance will be seriously reduced. Instead of wasting waiting time, programs should use it more efficiently, performing other tasks while waiting, and then continuing with the first task when the response arrives.

A program is said to be synchronous if it calls a method and waits for it to complete its processing before continuing. Conversely, returning the calling method before processing is complete is asynchronous. We have added a part of asynchronous control to the flow of the programming language, and this part of the programming can be called asynchronous programming. Traditional solutions to asynchronous programming: callback functions and event listeners.

For those who have never been exposed to programming, asynchrony is actually more intuitive than linearity. The real world organization itself is made up of a large number of parallel lines/processes held together by asynchrony.

2. CompletableFuture Killer

In Java8, CompletableFuture offers a very powerful Future extension that helps simplify the complexity of asynchronous programming, and provides the ability to do functional programming with callbacks to compute results, Methods to transform and compose CompletableFutures are also provided.

Asynchronous processing is the nature of the callback (system layer by using pointer, exactly is the function pointer), provide a callback method, the callback function is not effected by the implementation of this function call directly, but in specific events or conditions by the other party calls, when used in response to the events or conditions. From a “macro” perspective, the implementation of the CompletableFuture is simply a callback, a callback after a task has been executed, which may involve other actions, such as the next callback or the execution of the next task.

3. CompletableFuture implementation mechanism

Leaving CompletableFuture aside, if a thread pool is used in your program, how do you perform some actions after a task has been executed? The Java thread pool itself already provides hook methods before and after task execution (beforeExecute and afterExecute) as follows:

public class ThreadPoolExecutor extends AbstractExecutorService {
    // ...
    protected void beforeExecute(Thread t, Runnable r) { }
    protected void afterExecute(Runnable r, Throwable t) { }
    // ...
}
Copy the code

We just need to customize the thread pool to inherit ThreadPoolExecutor and override the beforeExecute and afterExecute methods, where afterExecute can perform some actions.

public class ListenableThreadPoolExecutor extends ThreadPoolExecutor { public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Setter private IListenable listenable; private ConcurrentHashMap<Object, Object> argHashMap = new ConcurrentHashMap<>(); @SuppressWarnings("all") public void execute(Runnable command, Object arg) { Runnable task = () -> command.run(); argHashMap.put(task, arg); execute(task); } @SuppressWarnings("all") public <T> Future<T> submit(Callable<T> task, Object arg) { if (task == null) { throw new NullPointerException(); } RunnableFuture<T> ftask = newTaskFor(task); argHashMap.put(ftask, arg); execute(ftask); return ftask; } @Override protected void beforeExecute(Thread t, Runnable r) { if (this.listenable == null) { return; } this.listenable.beforeExecute(t, r, this.argHashMap.get(r)); } @Override protected void afterExecute(Runnable r, Throwable t) { Object arg = this.argHashMap.remove(r); if (this.listenable == null) { return; } if (r instanceof Future) { this.listenable.callableCallback((Future) r, r, t); } else { this.listenable.runnableCallback(r, t); } this.listenable.afterCallback(this, arg, r, t); }}Copy the code

So what is the implementation mechanism for CompletableFuture? AfterExecute (afterExecute) : afterExecute (afterExecute) : afterExecute (afterExecute) : afterExecute (afterExecute)

CompletableFuture. SupplyAsync (() - > {/ / callable task System. Out. The println (" hello world "); return "result"; }). ThenApply (r - > {/ / task completed action (callback methods), similar to the ThreadPoolExecutor. AfterExecute method System. Out. The println (r); return r; });Copy the code

The above example code basically completes three steps, which are actually the implementation flow of CompletableFuture:

  1. Perform a task
  2. Add actions after the task completes (callback method)
  3. Implement the callback

3.1. Perform tasks

The main logic for executing the task is the AsyncSupply. Run method:

public void run() { CompletableFuture<T> d; Supplier<T> f; // dep is the current CompletableFuture, fn is the task execution logic if ((d = dep)! = null && (f = fn) ! = null) { dep = null; fn = null; If (d.ult == null) {try {// 1 task execution & result cas set d.pleteValue (f.et ()); } catch (Throwable ex) {// 1.1 result CAS exception d.completeThrowable(ex); }} // 2 Task complete, possibly involving the execution of the callback d.postcomplete (); }}Copy the code

3.2. Callback method

The process for adding callback methods starts with thenApply:

public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } private <V> CompletableFuture<V> uniApplyStage( Executor e, Function<? super T,? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e ! = null || ! D.uni apply (this, f, null)) {// When the last CompletableFuture is incomplete, Add // the CompletableFuture to the statck of the last CompletableFuture. UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }Copy the code

3.3. Perform a callback

Implement the callback from CompletableFuture. PostComplete start:

final void postComplete() { /* * On each step, variable f holds current dependents to pop * and run. It is extended along only one path at a time, * pushing others to avoid unbounded recursion. */ CompletableFuture<? > f = this; Completion h; while ((h = f.stack) ! = null || (f ! = this && (h = (f = this).stack) ! = null)) { CompletableFuture<? > d; Completion t; // cas sets h.ext to the current completableFuture.statck if (f.casstack (h, t = h.ext)) {if (t! = null) { if (f ! = this) { pushStack(h); continue; } h.next = null; // detach } f = (d = h.tryFire(NESTED)) == null ? this : d; } } } // UniAccept final CompletableFuture<Void> tryFire(int mode) { CompletableFuture<Void> d; CompletableFuture<T> a; if ((d = dep) == null || ! d.uniAccept(a = src, fn, mode > 0 ? Null: this)) // Return null; dep = null; src = null; fn = null; // Return the current CompletableFuture or recursively call postComplete return d.postfire (a, mode); }Copy the code

If it’s not clear how multiple CompletableFutures are executed, let’s change an example and give a diagram:

The following code corresponds to the CompletableFuture and its Completion relationship:

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world f1");
    sleep(1); // TimeUnit.SECONDS.sleep(1)
    return "result f1";
});
CompletableFuture<String> f2 = f1.thenApply(r -> {
    System.out.println(r);
    sleep(1);
    return "f2";
});
CompletableFuture<String> f3 = f2.thenApply(r -> {
    System.out.println(r);
    sleep(1);
    return "f2";
});

CompletableFuture<String> f4 = f1.thenApply(r -> {
    System.out.println(r);
    sleep(1);
    return "f2";
});
CompletableFuture<String> f5 = f4.thenApply(r -> {
    System.out.println(r);
    sleep(1);
    return "f2";
});
CompletableFuture<String> f6 = f5.thenApply(r -> {
    System.out.println(r);
    sleep(1);
    return "f2";
});
Copy the code

Combining the figure above with the postComplete process, you can see that the order of callbacks is F1 -> F4 -> F5 -> F6 -> F2 -> F3.

4. CompletableFuture basic use

The new CompletableFuture in Java8 provides support for asynchronous computation, which can be processed by callbacks.

4.1. Creation of CompletableFuture

RunAsync and supplyAsync

Typically we let other threads or thread pools perform asynchronous tasks such as futures. In addition to creating the CompletableFuture object directly (which is not recommended), you can create a CompletableFuture object using the following four methods:

// runAsync is a Runnable task with no return value. If the input parameter has an executor, Executor is used to execute asynchronous tasks. Public static CompletableFuture<Void> runAsync(Runnable Runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, // supplyAsync public static <U> CompletableFuture<U> supplyAsync(Supplier<U> Supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)Copy the code

Example:

/ / use the sample CompletableFuture. RunAsync (() - > {System. Out. Println (" hello world "); }, executor); CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; });Copy the code

If an input parameter does not have an executor, ForkJoinPool.commonPool() is used by default as a thread pool to execute asynchronous tasks. Otherwise, use Executor to perform the task.

4.2. Completion action of the CompletableFuture

whenComplete

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)
Copy the code

Example:

/ / use the sample CompletableFuture. SupplyAsync (() - > {System. Out. Println (" hello world "); return "result"; }).whenCompleteAsync((result, e) -> { System.out.println(result + " " + e); }).exceptionally((e) -> { System.out.println("exception " + e); return "exception"; });Copy the code

WhenComplete will execute the action directly in the current thread when the task is completed. Async is used by other threads to execute the action (if it is a thread pool, The action may be executed on the same thread as the asynchronous task.) The input argument with executor is given to the Executor thread pool to execute the action. The exceptionally method is executed on the current thread when an exception occurs.

handle

In addition to performing the completion action with whenComplete above, you can also use the handle method, which returns the return type of a new CompletableFuture.

public <U> CompletableFuture<U>  handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
Copy the code

Example:

/ / handle method example: CompletableFuture < String > f1 = CompletableFuture. SupplyAsync (() - > {System. Out. Println (" hello world "); return "result"; }); CompletableFuture<Integer> f2 = f1.handle((r, e) -> { System.out.println("handle"); return 1; });Copy the code

In addition to using the Handle method to perform the CompletableFuture return type conversion, you can also use thenApply. The difference is that thenApply handles normal return values and exceptions, so you can mask exceptions to avoid further throws. The latter, on the other hand, can only handle normal return values and will throw any exceptions.

public <U> CompletableFuture<U>  thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
Copy the code

Example:

Example: / / thenApply method CompletableFuture. SupplyAsync (() - > {System. Out. Println (" hello world "); return "result"; }).thenApply((r) -> { System.out.println(r); return "aaa"; }).thenApply((r) -> { System.out.println(r); return 1; });Copy the code

thenAccept

Note that handle and thenApply return the new CompletableFuture type. If you only want to perform some consumable action after the CompletableFuture completes, instead of returning the new CompletableFuture type, You can use the thenAccept method.

public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)
Copy the code

Example:

Example: / / thenAccept method CompletableFuture. SupplyAsync (() - > {System. Out. Println (" hello world "); return "result"; }).thenAccept(r -> { System.out.println(r); }).thenAccept(r -> {// where r is Void (null) system.out.println (r); });Copy the code

thenAcceptBoth

The handle, thenApply, and thenAppept above all perform some action on the result of the previous CompletableFuture execution. Is it possible to do something on two CompletableFuture results at the same time? In fact, it is also possible to use thenAppeptBoth method. Note that thenAppeptBoth and handle/thenApply/thenAppep process is the same, just thenAppeptBoth contains another CompletableFuture object (note, The execution of the other CompletableFuture object here doesn’t start after the last CompletableFuture has finished.

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor) public CompletableFuture<Void> runAfterBoth(CompletionStage<? > other, Runnable action)Copy the code

Example:

Example: / / thenAcceptBoth method CompletableFuture. SupplyAsync (() - > {System. Out. Println (" hello world "); return "result"; }).thenAcceptBoth(CompletableFuture.completedFuture("result2"), (r1, r2) -> { System.out.println(r1 + "-" + r2); });Copy the code

thenCombine

Note that the thenAcceptBoth method does not return a value (CompletableFuture), so if you want to use something like thenAcceptBoth and a CompletableFuture with a return value, thenCombine is in.

public <U,V> CompletableFuture<V>    thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>    thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>    thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
Copy the code

Example:

/ / sample CompletableFuture thenCombine method. SupplyAsync (() - > {System. Out. Println (" hello world "); return "result"; }).thenCombine(CompletableFuture.completedFuture("result2"), (r1, r2) -> { System.out.println(r1 + "-" + r2); return r1 + "-" + r2; });Copy the code

AcceptEither and applyToEither

ThenAcceptBoth and runAfterBoth are executed when both completableFutures are evaluated, while the following method is executed when either CompletableFuture is evaluated.

public CompletableFuture<Void>  acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
 
public <U> CompletableFuture<U>  applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U>  applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U>  applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
Copy the code

AllOf and anyOf

The methods allOf and anyOf can be used when you want to perform an action after allOf the completablefutures have been evaluated or after one of the completablefutures has been evaluated.

public static CompletableFuture<Void> allOf(CompletableFuture<? >... cfs) public static CompletableFuture<Object> anyOf(CompletableFuture<? >... cfs)Copy the code

thenRun

If you don’t want to use the result of the CompletableFuture when the task completes, you can use thenRun to execute a Runnable.

public CompletableFuture<Void>  thenRun(Runnable action)
public CompletableFuture<Void>  thenRunAsync(Runnable action)
public CompletableFuture<Void>  thenRunAsync(Runnable action, Executor executor)
Copy the code

thenCompose

All of the above methods return a value (or none) in a method, but you can also return a CompletableFuture, which looks like a combination of classes.

public <U> CompletableFuture<U>  thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
Copy the code

Example:

Example: / / thenCompose method CompletableFuture. SupplyAsync (() - > {System. Out. Println (" hello world "); return "result"; }).thenCompose(r -> { System.out.println(r); return CompletableFuture.supplyAsync(() -> { System.out.println(r + " result2"); return r + " result2"; }); }); / / the code above and below the code effect is the same CompletableFuture supplyAsync (() - > {System. Out. Println (" hello world "); return "result"; }).thenApply(r -> { System.out.println(r); return r; }).thenApplyAsync(r -> { System.out.println(r + " result2"); return r + " result2"; });Copy the code