Future

Future is a Class added to Java5 to describe the result of an asynchronous computation. You can use the isDone method to check if the calculation is complete, or you can use the GET method to block the calling thread until the calculation is complete and the result is returned. You can also stop the task with the cancel method. Here’s a chestnut:

public class FutureDemo {

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(10);
        Future<Integer> f = es.submit(() ->{
            Thread.sleep(10000);
            / / the result
            return 100;
        });

        // do something

        Integer result = f.get();
        System.out.println(result);

// while (f.isDone()) {
// System.out.println(result);
/ /}}}Copy the code

In this example, we submit a task to the thread pool and immediately return a Future object. We can then do some other operations, and finally use its GET method to block and wait for the result, or its isDone method to poll and wait for the result. [Concurrent Programming] Future pattern and implementation in JDK

Although these methods provide the ability to execute tasks asynchronously, they are inconvenient for obtaining results, which can only be obtained by blocking or polling.

The blocking method obviously goes against the purpose of asynchronous programming, and the polling method consumes unnecessary CPU resources and does not produce results in a timely manner. Why not use observer design mode to notify listeners when the results are complete?

Many languages, such as Node.js, use Callback to implement asynchronous programming. Some Java frameworks, such as Netty, extend the Java Future interface themselves, providing multiple extension methods such as addListener. Google’s Guava also provides generic extension Futures: ListenableFuture, SettableFuture, and helper Futures for asynchronous programming. To this end, Java finally added a more powerful Future class in JDK1.8: CompletableFuture. It provides very powerful extensions to Future that help simplify the complexity of asynchronous programming and provide the ability to program functionally, with the ability to process the results of calculations through callbacks. Let’s look at these.

Netty-Future

Introducing Maven dependencies:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.29. The Final</version>
</dependency>
Copy the code
public class NettyFutureDemo {

    public static void main(String[] args) throws InterruptedException {
        EventExecutorGroup group = new DefaultEventExecutorGroup(4);
        System.out.println("Start." + DateUtils.getNow());

        Future<Integer> f = group.submit(new Callable<Integer>() {
            @Override
            public Integer call(a) throws Exception {
                System.out.println("Start time calculation :" + DateUtils.getNow());
                Thread.sleep(10000);
                System.out.println("End time calculation :" + DateUtils.getNow());
                return 100; }}); f.addListener(new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> objectFuture) throws Exception {
                System.out.println("Calculated results :"+ objectFuture.get()); }}); System.out.println("End." + DateUtils.getNow());
        // Do not let the daemon thread exit
        new CountDownLatch(1).await(); }}Copy the code

Output results:

Start:2019-05-16 08:25:40:779The end:2019-05-16 08:25:40:788Start time calculation:2019-05-16 08:25:40:788End Time calculation:2019-05-16 08:25:50:789Calculation results:100
Copy the code

It can be seen from the result that the completion method of the Listener is automatically triggered after the end of the time calculation, so as to avoid the main thread unnecessary blocking wait, so how does it do it? Let’s look at the source code

DefaultEventExecutorGroup EventExecutorGroup interface is achieved, and the EventExecutorGroup is implements the JDK ScheduledExecutorService interface thread group interface, So it owns all the methods of the thread pool. But it all back to Java. Util. Concurrent. The Future of the method is rewritten as return to the io.net ty. Util. Concurrent. The Future, All return to Java. Util. Concurrent. ScheduledFuture rewritten to return to io.net. Ty util. Concurrent. ScheduledFuture.

public interface EventExecutorGroup extends ScheduledExecutorService.可迭代<EventExecutor> {
    Return an EventExecutor */
    EventExecutor next(a);

    Iterator<EventExecutor> iterator(a); Future<? > submit(Runnable task); <T>Future<T> submit(Runnable task, T result);
    <T> Future<T> submit(Callable<T> task); ScheduledFuture<? > schedule(Runnable command,long delay, TimeUnit unit);
    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); ScheduledFuture<? > scheduleAtFixedRate(Runnable command,long initialDelay, longperiod, TimeUnit unit); ScheduledFuture<? > scheduleWithFixedDelay(Runnable command,long initialDelay, long delay, TimeUnit unit);
}
Copy the code

The EventExecutorGroup submit method, overwritten by newTaskFor, returns netty’s Future implementation class, which is called the PromiseTask.

@Override
public <T> Future<T> submit(Callable<T> task) {
    return (Future<T>) super.submit(task);
}

@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new PromiseTask<T>(this, callable);
}
Copy the code

PromiseTask is a simple implementation. It ccache Callable tasks to be executed, and complete the task invocation and Listener notification in the run method.

@Override
public void run(a) {
    try {
        if(setUncancellableInternal()) { V result = task.call(); setSuccessInternal(result); }}catch(Throwable e) { setFailureInternal(e); }}@Override
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}

@Override
public Promise<V> setFailure(Throwable cause) {
    if (setFailure0(cause)) {
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this, cause);
}
Copy the code

The success or failure of a task is called a notifyListeners, so you need to call the isSuccess method in the callback function to check the status.

If the Future calls the addListener method and the task has already completed, will the notification fail?

@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    synchronized (this) {
        addListener0(listener);
    }

    if (isDone()) {
        notifyListeners();
    }

    return this;
}
Copy the code

As you can see, the status is checked immediately after the Listener is successfully added, and a callback is made if the task has completed, so don’t worry here. OK, let’s look at the implementation of Guava-Future.

Guava-Future

First, introduce Guava’s Maven dependencies:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>22.0</version>
</dependency>
Copy the code
public class GuavaFutureDemo {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Start." + DateUtils.getNow());
        
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);
        ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
            @Override
            public Integer call(a) throws Exception {
                System.out.println("Start time calculation :" + DateUtils.getNow());
                Thread.sleep(10000);
                System.out.println("End time calculation :" + DateUtils.getNow());
                return 100; }}); future.addListener(new Runnable() {
            @Override
            public void run(a) {
                System.out.println("Call successful");
            }
        }, executorService);
        System.out.println("End." + DateUtils.getNow());
        new CountDownLatch(1).await(); }}Copy the code

ListenableFuture can add a callback function via the addListener method, which is typically used in places where the result of execution does not matter. To obtain the result on success or the exception on failure, use the addCallback method of the Futures tool class:

Futures.addCallback(future, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(@Nullable Integer result) {
        System.out.println("Success, calculated results :" + result);
    }

    @Override
    public void onFailure(Throwable t) {
        System.out.println("Failure");
    }
}, executorService);
Copy the code

As mentioned earlier, in addition to ListenableFuture, there is a SettableFuture class that also supports callback capabilities. It is implemented from ListenableFuture, so it has all the capabilities of ListenableFuture.

public class GuavaFutureDemo {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Start." + DateUtils.getNow());
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        ListenableFuture<Integer> future = submit(executorService);
        Futures.addCallback(future, new FutureCallback<Integer>() {
            @Override
            public void onSuccess(@Nullable Integer result) {
                System.out.println("Success, calculated results :" + result);
            }

            @Override
            public void onFailure(Throwable t) {
                System.out.println("Failure." + t.getMessage());
            }
        }, executorService);
        Thread.sleep(1000);
        System.out.println("End." + DateUtils.getNow());
        new CountDownLatch(1).await();
    }

    private static ListenableFuture<Integer> submit(Executor executor) {
        SettableFuture<Integer> future = SettableFuture.create();
        executor.execute(new Runnable() {
            @Override
            public void run(a) {
                System.out.println("Start time calculation :" + DateUtils.getNow());
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("End time calculation :" + DateUtils.getNow());
                / / the return value
                future.set(100);
                // Set the exception information
// future.setException(new RuntimeException("custom error!" ));}});returnfuture; }}Copy the code

There may not seem to be much difference in usage, but there is an important issue that can easily be overlooked. When this method of SettableFuture finally calls the cancel method, the task in the thread pool will continue to execute, but the ListenableFuture method returned by the submit method will immediately cancel execution, in particular. Here’s the source code:

Like Netty’s Future, Guava overwrites the submit method by implementing a custom ExecutorService implementation class, ListeningExecutorService.

public interface ListeningExecutorService extends ExecutorService {
  <T> ListenableFuture<T> submit(Callable<T> task); ListenableFuture<? > submit(Runnable task); <T>ListenableFuture<T> submit(Runnable task, T result);
}
Copy the code

Similarly, newTaskFor method were also used to rewrite, returned to the custom Future classes: TrustedListenableFutureTask

@Override
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return TrustedListenableFutureTask.create(runnable, value);
}

@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return TrustedListenableFutureTask.create(callable);
}
Copy the code

Task calls go TrustedFutureInterruptibleTask run method:

@Override
public void run(a) {
    TrustedFutureInterruptibleTask localTask = task;
    if(localTask ! =null) { localTask.run(); }}@Override
public final void run(a) {
    if(! ATOMIC_HELPER.compareAndSetRunner(this.null, Thread.currentThread())) {
        return; // someone else has run or is running.
    }
    try {
        // Abstract method, subclass override
        runInterruptibly();
    } finally {
        if (wasInterrupted()) {
            while(! doneInterrupting) { Thread.yield(); }}}}Copy the code

Eventually call to TrustedFutureInterruptibleTask runInterruptibly method, waiting for the call set method task is completed.

@Override
void runInterruptibly(a) {
    if(! isDone()) {try {
            set(callable.call());
        } catch(Throwable t) { setException(t); }}}protected boolean set(@Nullable V value) {
    Object valueToSet = value == null ? NULL : value;
    // Set the CAS value
    if (ATOMIC_HELPER.casValue(this.null, valueToSet)) {
        complete(this);
        return true;
    }
    return false;
}
Copy the code

The Listener is retrieved at the end of the complete method to call back.

The above mentioned SettableFuture and ListenableFuture cancel methods have different effects because one overwrites the afterDone method and the other does not.

Here’s the ListenableFuture afterDone method:

@Override
protected void afterDone(a) {
    super.afterDone();

    if (wasInterrupted()) {
        TrustedFutureInterruptibleTask localTask = task;
        if(localTask ! =null) { localTask.interruptTask(); }}this.task = null;
}
Copy the code

WasInterrupted is used to determine if cancel has been called (the Cancel method sets a Cancellation object in value).

protected final boolean wasInterrupted(a) {
    final Object localValue = value;
    return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted;
}
Copy the code

The interruptTask method actually interrupts the execution of a threaded task by using the thread’s interrupt method:

final void interruptTask(a) {
    Thread currentRunner = runner;
    if(currentRunner ! =null) {
        currentRunner.interrupt();
    }
    doneInterrupting = true;
}
Copy the code

The Promise pattern is derived from the Callback Hell

If you’re familiar with ES6, the Promise pattern won’t be new to you, and if you’re not familiar with the front end, let’s take a look at what Callback Hell is.

Callbacks are one of the preferred methods of asynchronous invocation, but there is also a problem, namely the nesting of callbacks. This is what happens when multiple asynchronous callbacks need to be written together (for example, in JS):

asyncFunc1(opt, (... args1) => { asyncFunc2(opt, (... args2) => { asyncFunc3(opt, (... args3) => { asyncFunc4(opt, (... args4) => {// some operation
      });
    });
  });
});
Copy the code

Although callbacks are rarely nested at multiple levels in JAVA business code, they are always a problem because the code is hard to read and too deeply nested is troublesome to modify. So ES6 introduced the Promise pattern to solve the problem of callback hell. One might ask: Does a Promise pattern exist in Java? The answer is yes.

As mentioned earlier, both Netty and Guava’s extensions provide interfaces such as addListener to handle Callback calls, but JDK1.8 already provides a more advanced Callback method: CompletableFuture. First try to override the above callback problem with a CompletableFuture.

public class CompletableFutureTest {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Start." + DateUtils.getNow());
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Start time calculation :" + DateUtils.getNow());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("End time calculation :" + DateUtils.getNow());
            return 100;
        });
        completableFuture.whenComplete((result, e) -> {
            System.out.println(Callback result: + result);
        });
        System.out.println("End." + DateUtils.getNow());
        new CountDownLatch(1).await(); }}Copy the code

The use of CompletableFuture time-consuming operation does not occupy the time slice of the main thread, to achieve the effect of asynchronous call. We don’t need to introduce any third party, it’s all depends on the java.util.concurrent.Com pletableFuture appear. CompletableFuture provides more than 50 methods, greatly convenient Java multithreaded operations, and asynchronous call writing method.

Use CompletableFuture to solve the callback hell problem:

public class CompletableFutureDemo {
    public static void main(String[] args) throws InterruptedException {
        long l = System.currentTimeMillis();
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Perform time-consuming operations in a callback...");
            Thread.sleep(10000);
            return 100;
        });
        completableFuture = completableFuture.thenCompose(i -> {
            return CompletableFuture.supplyAsync(() -> {
                System.out.println("Perform time-consuming operations within the callback of a callback...");
                Thread.sleep(10000);
                return i + 100;
            });
        });
        completableFuture.whenComplete((result, e) -> {
            System.out.println("Calculated results :" + result);
        });
        System.out.println("Main thread operation time :" + (System.currentTimeMillis() - l) + " ms");
        new CountDownLatch(1).await(); }}Copy the code

Output:

Performing time-consuming operations in a callback... Main thread operation time:58Ms performs time-consuming operations in the callback of the callback... Calculation results:200
Copy the code

Callbacks for callbacks can be implemented using methods such as thenCompose or thenComposeAsync, and the written method is easy to maintain.

In general, adding a callback to the Future mode eliminates the need to block and wait for the result to return and unnecessary CPU resources to poll the processing status. JDK8 can use its own CompletableFuture class after using Netty or Guava. There are two types of Future: Future and callback. The problem with callbacks is callback hell, and the Promise pattern is derived to solve this problem. This is where the Future and Promise patterns are relevant.