A Future in the JDK can only be obtained asynchronously by blocking the result using the get method. That is, can we not block the calling thread (the main thread), and when the called thread (the business thread) finishes executing the result, call back the result to the code that needs it? The JDK Future can only throw an ExecutionException. The analysis is presented in turn.

It is equivalent to encapsulating a task to be executed as a FutureTask. When the task is put into the thread pool for execution, the FutureTask’s run method is actually executed. After the FutureTask’s run method executes the task, the execution result of the task is set. This allows the main thread and the business thread to establish a communication channel through the FutureTask object so that the main thread can obtain the results of the business thread’s execution. It is obvious that the main thread needs to be blocked to wait for tasks to complete in the business thread.

Netty’s Future, however, is anything but. It has PromiseTask, but it’s not a public class, it’s just for internal use. Netty’s Future gets the result or exception of a task execution entirely through listeners. So how does Netty set the result? The JDK has FutureTask to pass results between threads. What does Netty pass? Netty sets the result through Promise encoding and calls back listeners.

The Future of Netty

1. Future

Netty’s Future interface directly inherits from JDK’s Future and adds some functionality:

/ * * * The result of an asynchronous operation. * / / / inherited from The JDK Future @ SuppressWarnings (" ClassNameSameAsAncestorName ") Public interface Future < V > extends Java. Util. Concurrent. The Future < V > {/ / ignore some simple method, /** * Returns the cause of the failed I/O operation if the I/O operation has * failed. ** @return the cause of the failure. * {@code null} if succeeded or this future is not * completed yet. */ Throwable cause(); /** * Adds the specified listener to this future. The * specified listener is notified when this future is * {@linkplain  #isDone() done}. If this future is already * completed, the specified listener is notified immediately. */ Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); /** * Removes the first occurrence of the specified listener from this future. * The specified listener is no longer notified when this * future is {@linkplain #isDone() done}. If the specified * listener is not associated with this future, this method * does nothing and returns silently. */ Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); /** * Waits for this future until it is done, and rethrows the cause of the failure if this future * failed. */ Future<V> sync() throws InterruptedException; Future<V> syncUninterruptibly(); /** * Waits for this future to be completed. * * @throws InterruptedException * if the current thread was interrupted */  Future<V> await() throws InterruptedException; /** * Waits for this future to be completed without * interruption. This method catches an {@link InterruptedException} and * discards it silently. */ Future<V> awaitUninterruptibly(); /** * Return the result without blocking. If the future is not done yet this will return {@code null}. * * As it is possible that a {@code null} value is used to mark the future as successful you also need to check * if the future is really done with {@link #isDone()} and not relay on the returned {@code null} value. */ V getNow(); /** * {@inheritDoc} * * If the cancellation was successful it will fail the future with an {@link CancellationException}. */ @Override boolean cancel(boolean mayInterruptIfRunning); }Copy the code

Some methods are omitted and only the ones discussed in detail are listed.

Netty’s Future adds a Listener mode for callbacks. The Future can use the cause method to obtain the cause of the abnormal task execution. The main thread can also block with the sync() or await() methods to wait for the asynchronous task to finish.

2. ChannelFuture

Netty is a network framework that definitely connects network channels, and ChannelFuture does just that.

/**
 * The result of an asynchronous {@link Channel} I/O operation.
 * <p>
 * All I/O operations in Netty are asynchronous.  It means any I/O calls will
 * return immediately with no guarantee that the requested I/O operation has
 * been completed at the end of the call.  Instead, you will be returned with
 * a {@link ChannelFuture} instance which gives you the information about the
 * result or status of the I/O operation.
 * <p>
 * A {@link ChannelFuture} is either <em>uncompleted</em> or <em>completed</em>.
 * When an I/O operation begins, a new future object is created.  The new future
 * is uncompleted initially - it is neither succeeded, failed, nor cancelled
 * because the I/O operation is not finished yet.  If the I/O operation is
 * finished either successfully, with failure, or by cancellation, the future is
 * marked as completed with more specific information, such as the cause of the
 * failure.  Please note that even failure and cancellation belong to the
 * completed state.
 * <pre>
 *                                      +---------------------------+
 *                                      | Completed successfully    |
 *                                      +---------------------------+
 *                                 +---->      isDone() = true      |
 * +--------------------------+    |    |   isSuccess() = true      |
 * |        Uncompleted       |    |    +===========================+
 * +--------------------------+    |    | Completed with failure    |
 * |      isDone() = false    |    |    +---------------------------+
 * |   isSuccess() = false    |----+---->      isDone() = true      |
 * | isCancelled() = false    |    |    |       cause() = non-null  |
 * |       cause() = null     |    |    +===========================+
 * +--------------------------+    |    | Completed by cancellation |
 *                                 |    +---------------------------+
 *                                 +---->      isDone() = true      |
 *                                      | isCancelled() = true      |
 *                                      +---------------------------+
 * </pre>
 *
 * Various methods are provided to let you check if the I/O operation has been
 * completed, wait for the completion, and retrieve the result of the I/O
 * operation. It also allows you to add {@link ChannelFutureListener}s so you
 * can get notified when the I/O operation is completed.
 *
 * <h3>Prefer {@link #addListener(GenericFutureListener)} to {@link #await()}</h3>
 *
 * It is recommended to prefer {@link #addListener(GenericFutureListener)} to
 * {@link #await()} wherever possible to get notified when an I/O operation is
 * done and to do any follow-up tasks.
 * <p>
 * {@link #addListener(GenericFutureListener)} is non-blocking.  It simply adds
 * the specified {@link ChannelFutureListener} to the {@link ChannelFuture}, and
 * I/O thread will notify the listeners when the I/O operation associated with
 * the future is done.  {@link ChannelFutureListener} yields the best
 * performance and resource utilization because it does not block at all, but
 * it could be tricky to implement a sequential logic if you are not used to
 * event-driven programming.
 * <p>
 * By contrast, {@link #await()} is a blocking operation.  Once called, the
 * caller thread blocks until the operation is done.  It is easier to implement
 * a sequential logic with {@link #await()}, but the caller thread blocks
 * unnecessarily until the I/O operation is done and there's relatively
 * expensive cost of inter-thread notification.  Moreover, there's a chance of
 * dead lock in a particular circumstance, which is described below.
 *
 * <h3>Do not call {@link #await()} inside {@link ChannelHandler}</h3>
 * <p>
 * The event handler methods in {@link ChannelHandler} are usually called by
 * an I/O thread.  If {@link #await()} is called by an event handler
 * method, which is called by the I/O thread, the I/O operation it is waiting
 * for might never complete because {@link #await()} can block the I/O
 * operation it is waiting for, which is a dead lock.
 * <pre>
 * // BAD - NEVER DO THIS
 * {@code @Override}
 * public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
 *     {@link ChannelFuture} future = ctx.channel().close();
 *     future.awaitUninterruptibly();
 *     // Perform post-closure operation
 *     // ...
 * }
 *
 * // GOOD
 * {@code @Override}
 * public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
 *     {@link ChannelFuture} future = ctx.channel().close();
 *     future.addListener(new {@link ChannelFutureListener}() {
 *         public void operationComplete({@link ChannelFuture} future) {
 *             // Perform post-closure operation
 *             // ...
 *         }
 *     });
 * }
 * </pre>
 * <p>
 * In spite of the disadvantages mentioned above, there are certainly the cases
 * where it is more convenient to call {@link #await()}. In such a case, please
 * make sure you do not call {@link #await()} in an I/O thread.  Otherwise,
 * {@link BlockingOperationException} will be raised to prevent a dead lock.
 *
 * <h3>Do not confuse I/O timeout and await timeout</h3>
 *
 * The timeout value you specify with {@link #await(long)},
 * {@link #await(long, TimeUnit)}, {@link #awaitUninterruptibly(long)}, or
 * {@link #awaitUninterruptibly(long, TimeUnit)} are not related with I/O
 * timeout at all.  If an I/O operation times out, the future will be marked as
 * 'completed with failure,' as depicted in the diagram above.  For example,
 * connect timeout should be configured via a transport-specific option:
 * <pre>
 * // BAD - NEVER DO THIS
 * {@link Bootstrap} b = ...;
 * {@link ChannelFuture} f = b.connect(...);
 * f.awaitUninterruptibly(10, TimeUnit.SECONDS);
 * if (f.isCancelled()) {
 *     // Connection attempt cancelled by user
 * } else if (!f.isSuccess()) {
 *     // You might get a NullPointerException here because the future
 *     // might not be completed yet.
 *     f.cause().printStackTrace();
 * } else {
 *     // Connection established successfully
 * }
 *
 * // GOOD
 * {@link Bootstrap} b = ...;
 * // Configure the connect timeout option.
 * <b>b.option({@link ChannelOption}.CONNECT_TIMEOUT_MILLIS, 10000);</b>
 * {@link ChannelFuture} f = b.connect(...);
 * f.awaitUninterruptibly();
 *
 * // Now we are sure the future is completed.
 * assert f.isDone();
 *
 * if (f.isCancelled()) {
 *     // Connection attempt cancelled by user
 * } else if (!f.isSuccess()) {
 *     f.cause().printStackTrace();
 * } else {
 *     // Connection established successfully
 * }
 * </pre>
 */
public interface ChannelFuture extends Future<Void> {

    /**
     * Returns a channel where the I/O operation associated with this
     * future takes place.
     */
    Channel channel();
    // 忽略部分API
}
Copy the code

Netty annotations are about as good as JDK annotations, they’re both great.

Netty network I/OS are asynchronous. That is, all operations to network IO are returned directly, without waiting for the IO operation to complete. So how do you get the result or state of the IO operation? The result or status of the IO operation can be obtained through the ChannelFuture instance returned by the IO operation.

A ChannelFuture is either completed or uncompleted, depending on whether isDone() returns true or false. Once an IO operation starts, a new ChannelFuture instance is created. The Future is initialized to uncompleted and is neither successful, failed, nor cancelled. Once the IO operation succeeded, failed, or was cancelled, the Future is the completed state.

Netty recommends using the Listener mode instead of wait mode. The Listener approach is completely asynchronous. Wait blocks the main thread until the I/O operation completes. Interestingly, each time you create a Netty Future, you pass an EventExecutor, which is the thread that calls the Listener. That is, the IO operation does not have to be in the EventExecutor thread, but the final callback must be in the EventExecutor thread (with a thread switch).

There are also two examples in the comments:

  • Do not call wait in an I/O thread. This can cause deadlocks. I/O operations are performed in the I/O thread and the I/O thread is waiting for the I/O operation to complete.

  • Do not confuse IO operation timeouts with await timeouts. It is possible that the await has timed out, but the IO operation has not finished yet.

2. Netty promises

1. Promise

The result of the IO operation can be obtained through the Future, so how to set the result of the IO operation? Through Promise.

/** * Special {@link Future} which is writable. */ public interface Promise<V> extends Future<V> Marks this future as a success and notifies all * listeners. * * If it is success or failed already it will throw an {@link IllegalStateException}. */ Promise<V> setSuccess(V result); /** * Marks this future as a failure and notifies all * listeners. * * If it is success or failed already it will throw an {@link IllegalStateException}. */ Promise<V> setFailure(Throwable cause); }Copy the code

As you can see, promises inherit from Netty’s Future.

2. ChannelPromise

A ChannelPromise connects with a channel.

/**
 * Special {@link ChannelFuture} which is writable.
 */
public interface ChannelPromise extends ChannelFuture, Promise<Void> {

    @Override
    Channel channel();
}
Copy the code

Three, Netty Promise implementation: DefaultPromise

1. Construction method

/** * Creates a new instance. * * It is preferable to use {@link EventExecutor#newPromise()} to create a new promise * *  @param executor * the {@link EventExecutor} which is used to notify the promise once it is complete. * It is assumed this executor will protect against {@link StackOverflowError} exceptions. * The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack * depth exceeds a threshold. * */ public DefaultPromise(EventExecutor executor) { this.executor = checkNotNull(executor, "executor"); }Copy the code

As noted in the comment, the EventExecutor parameter calls the Listener back and forth. Also, this notification callback avoids StackOverflowError, which depends on how deep the current stack is (how many methods are nested in a row). If the stack is too deep, the callback will not be done! Of course, this depends on which thread the task is executed in. If the task is not executed in EventExecutor, there is no problem with stack depth because the callback switches threads.

2. setSuccess

@Override public Promise<V> setSuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return this; } // throw new IllegalStateException(" Complete already: "+ this); } @Override public boolean trySuccess(V result) { if (setSuccess0(result)) { notifyListeners(); return true; } // return false; } private boolean setSuccess0(V result) { // private static final Object SUCCESS = new Object(); return setValue0(result == null ? SUCCESS : result); } private Boolean setValue0(Object objResult) {if (RESULT_UPDATER.compareAndSet(this, null, ObjResult) | | RESULT_UPDATER.com pareAndSet (this, UNCANCELLABLE objResult)) {/ / if there is a wait, here will be to inform, That is, threads that are blocked because of sync or await calls will be woken up. // Note that there is no callback to the Listener yet. checkNotifyWaiters(); return true; } return false; } private void notifyListeners() { EventExecutor executor = executor(); // Check if the current thread (i.e. the thread executing setSuccess()) is in the EventExecutor passed in by the constructor. The Listener must be called back in the EventExecutor thread if (executor.ineventLoop ()) {final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); // To prevent stack overflow, the Listener is not notified. if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); }}); }Copy the code

3. get

The GET method is usually performed in a Listener.

@override public V get() throws InterruptedException, ExecutionException {// Wait to complete await(); // If Throwable cause = cause(); If (cause == null) {return getNow(); } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); } @Override public boolean isDone() { return isDone0(result); } private static Boolean isDone0(Object result) {return result! = null && result ! = UNCANCELLABLE; } @Override public Throwable cause() { Object result = this.result; return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null; } @SuppressWarnings("unchecked") @Override public V getNow() { Object result = this.result; if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) { return null; } return (V) result; }Copy the code

So we get the result through the get method. You can see that there is a getNow method that gets the result directly, without waiting to see if it has been executed.

4. setFailure

@Override
public Promise<V> setFailure(Throwable cause) {
    if (setFailure0(cause)) {
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this, cause);
}

@Override
public boolean tryFailure(Throwable cause) {
    if (setFailure0(cause)) {
        notifyListeners();
        return true;
    }
    return false;
}

private boolean setFailure0(Throwable cause) {
    return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}
Copy the code

After looking at setSuccess, setFailure is easy. Cause () is pretty simple.

5. sync

@Override public Promise<V> sync() throws InterruptedException { await(); rethrowIfFailed(); return this; } @Override public Promise<V> syncUninterruptibly() { // awaitUninterruptibly(); // Throw the exception rethrowIfFailed(); return this; } private void rethrowIfFailed() { Throwable cause = cause(); if (cause == null) { return; } PlatformDependent.throwException(cause); }Copy the code

You can see that sync calls the await method. Then it decides if it needs to throw an exception.

6. await

@Override public Promise<V> await() throws InterruptedException { if (isDone()) { return this; } // If interrupted, continue throwing InterruptedException if (thread.interrupted ()) {throw new InterruptedException(toString()); } checkDeadLock(); synchronized (this) { while (! isDone()) { incWaiters(); try { wait(); } finally { decWaiters(); } } } return this; Override public Promise<V> awaitUninterruptibly() {if (isDone()) {return this; } checkDeadLock(); boolean interrupted = false; synchronized (this) { while (! isDone()) { incWaiters(); try { wait(); } catch (InterruptedException e) { // Interrupted while waiting. interrupted = true; } finally { decWaiters(); } } } if (interrupted) { Thread.currentThread().interrupt(); } return this; }Copy the code

Test code

public class NettyFutureTest { public static void main(String[] args) { System.out.println("current thread: " + Thread.currentThread().isDaemon()); EventExecutor eventExecutor = new DefaultEventExecutor(new DefaultThreadFactory("promise", true)); Promise<String> promise = eventExecutor.newPromise(); promise.addListener(new GenericFutureListener<Future<? super String>>() { @Override public void operationComplete(Future<? super String> future) throws Exception { System.out.println("complete...." ); if (future.isSuccess()) { System.out.println("future is success, result: " + future.get()); } else { System.out.println("future is failed, result: " + future.cause()); }}}); Executors.newSingleThreadExecutor(new DefaultThreadFactory("biz", true)).execute(new Runnable() { @Override public void run() { System.out.println("task is running..." ); sleep(800); if (promise.isCancelled()) { System.out.println("promise has been canceled..." ); } else { promise.setSuccess("ok"); } System.out.println("biz execute over..." ); }}); // eventExecutor.execute(new Runnable() { // @Override // public void run() { // System.out.println("task is running..." ); // sleep(800); // if (promise.isCancelled()) { // System.out.println("promise has been canceled..." ); // } else { // promise.setSuccess("ok"); // } // System.out.println("biz execute over..." ); / / / /}}); // sleep(100); // boolean canceled = promise.cancel(true); // System.out.println("promise cancel result: " + canceled); // Thread.currentThread().interrupt(); // System.out.println("main thread isInterrupted: " + Thread.currentThread().isInterrupted()); try { promise.sync(); } catch (InterruptedException e) { System.out.println("here..." ); e.printStackTrace(); } System.out.println("main over..." ); } private static void sleep(long timeMs) { try { TimeUnit.MILLISECONDS.sleep(timeMs); } catch (InterruptedException e) { e.printStackTrace(); }}}Copy the code

The result of executing the above code:

// The main thread is not a daemon thread. Current thread: false After setSuccess, the main thread is woken up and exits main over... Biz execute over... // Call the Listener in EventExecutor. EventExecutor is a daemon thread, so it may not be scheduled. complete.... future is success, result: okCopy the code

Again, DefaultEventExecutor’s run method is an infinite loop.

The above test program has a lot of commented code. You can try executing them separately.

Hard advertising

Welcome to the public account: Double6