Netty source code parsing (a) : start

Netty source code parsing (2) : Netty Channel

Netty’s Future and Promise

Netty source code parsing (four) : Netty’s ChannelPipeline

Netty source code parsing (five) : Netty thread pool analysis

Netty source code parsing (six) : Channel register operation

NioEventLoop (NioEventLoop

Return to Channel register operation

Netty source code parsing (nine) : Connect process and bind process analysis


Today! Mr. Lighthouse told us:



Netty’s Future and Promise


Asynchronous programming in Netty: Future and Promise

There are a lot of asynchronous calls in Netty, so before we talk more about NIO, let’s take a look at how its asynchronous interface is used.
Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo



Try to make the underlined lines clear after reading this section.

The Future interface is familiar to most of you, most often when using Java’s thread pool, ThreadPoolExecutor. in
submitWhen a task is added to the thread pool, it returns one
FutureInstance, which is used to obtain the execution status and final execution result of the submitted task
isDone()
get()Methods.
Below is the Future interface in JDK Java. Util. Concurrent. The Future:
Public interface Future<V> {// Cancel the task Boolean Cancel (Boolean mayInterruptIfRunning); // Whether the task has been cancelled Boolean isCancelled(); Boolean isDone(); V get() throws InterruptedException, ExecutionException; V GET (Long timeout, TimeUnit Unit) throws InterruptedException, ExecutionException, TimeoutException; }Copy the code



The Future interface in Netty (with the same name) inherits the JDK’s Future interface and adds some methods:


// io.netty.util.concurrent.Future

Public interface Future < V > extends Java. Util. Concurrent. The Future < V > {/ / success of Boolean isSuccess (); Boolean isCancellable(); // If the task fails, this method returns an exception message Throwable cause(); Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); Future<V> sync() throws InterruptedException; // Block and wait for the task to end. If the task fails, throw the "exception causing the failure" again. // Sync () does not respond to interrupts. Future<V> syncUninterruptibly(); Future<V> await() throws InterruptedException; // Block to wait for the task to complete, the same as sync(), except that if the task fails, it does not throw an exception during execution. Future<V> awaitUninterruptibly(); boolean await(long timeout, TimeUnit unit) throws InterruptedException; boolean await(long timeoutMillis) throws InterruptedException; boolean awaitUninterruptibly(long timeout, TimeUnit unit); boolean awaitUninterruptibly(long timeoutMillis); // Get the execution result without blocking. We all know Java. Util. Concurrent. The Future of the get () is blocking V getNow (); CancellationException CancellationException (isSuccess()==))false, and the cause() method above returns an instance of CancellationException. // mayInterruptIfRunning says: Whether to interrupt the thread executing the task (so that the task can be stopped), // It seems that the various implementation classes of the Future interface in Netty, @override Boolean cancel(Boolean mayInterruptIfRunning); }Copy the code

So there are really two paradigms of use
BTW, the difference between sync() and await() : sync() internally calls the await() method first, and then checks after the await() method returns
Whether the task failedIf it fails, the exception that caused the failure is thrown again. That is, if you use await(), the task throws an exception and the await() method returns but does not throw an exception, whereas sync() returns with an exception.
We can also see that the Future interface is not associated with the IO operation and is still compared

pure

The interface.

Next, let’s look at ChannelFuture, the subinterface of the Future interface, which is the most commonly used interface. It is associated with channels in IO operations and is used to asynchronously handle events in channels.

Public interface ChannelFuture extends Future<Void> {// ChannelFuture extends Future (); Override ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture sync() throws InterruptedException; @Override ChannelFuture syncUninterruptibly(); @Override ChannelFuture await() throws InterruptedException; @Override ChannelFuture awaitUninterruptibly(); // To mark the future as void, // so that addListener(...) is not allowed. , sync(), await() and several overloaded methods of them Boolean isVoid(); }Copy the code

This is a little bit of a skip, but let’s introduce the Promise interface, which has nothing to do with ChannelFuture, and everything to do with the Future interface, which is very important.
The Promise interface, like ChannelFuture, inherits Netty’s Future interface and adds some Promise content:

Public Interface Promise<V> extends Future<V> {// Mark the success of the Future and set the result of its execution, and notify all listeners. // If the operation fails, an exception will be thrown (failure means the future already has a result, a successful result, or a failed result) Promise<V>setSuccess(V result); / / andsetThe Success method is the same, except that it does not throw an exception and returns on failurefalseboolean trySuccess(V result); // Mark the future failure and why it failed. // If it fails, an exception will be thrown (failure means there is already a result). Promise<V>setFailure(Throwable cause); // Mark the future failure and why it failed. // If there is already a result, returnfalse, do not throw an exception Boolean tryFailure(Throwable cause); // Flag that the future cannot be cancelled BooleansetUncancellable(); Override Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> await() throws InterruptedException; @Override Promise<V> awaitUninterruptibly(); @Override Promise<V> sync() throws InterruptedException; @Override Promise<V> syncUninterruptibly(); }Copy the code

I think it’s only important to understand that a Promise instance is internally a task, and the execution of the task tends to be asynchronous, usually with a thread pool to handle the task. The setSuccess(V result) or setFailure(Throwable t) provided by a Promise will be called by a thread executing the task after it completes, The thread calls setSuccess(result) or setFailure(t) and calls the listeners’ callback functions. You can also submit the callback task to a thread pool for execution. And, once setSuccess(…) Or setFailure (…). After that, the threads with await() or sync() are returned from the wait.
So there are two ways of programming. One is to await(), wait for the await() method to return, get the promise execution result, and then process it. The other way is to provide an instance of a Listener. We don’t care much about when the task will finish executing, as long as it executes the process in the Listener.
Now, let’s take a look
ChannelPromise, which inherits the ChannelFuture and Promise interfaces introduced earlier.

The ChannelPromise interface is widely used in Netty because it combines ChannelFuture and Promise:

/**
 * Special {@link ChannelFuture} whichis writable. */ public interface ChannelPromise extends ChannelFuture, Override channel channel(); // Override channel channel(); // The following methods Override the interface in a Promise in order to return a value of type ChannelPromise @override ChannelPromisesetSuccess(Void result);
    ChannelPromise setSuccess();
    boolean trySuccess();
    @Override
    ChannelPromise setFailure(Throwable cause); Override ChannelPromise addListener(GenericFutureListener<? Override ChannelPromise addListener <? extends Future<? super Void>> listener); @Override ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise sync() throws InterruptedException; @Override ChannelPromise syncUninterruptibly(); @Override ChannelPromise await() throws InterruptedException; @Override ChannelPromise awaitUninterruptibly(); /** * Returns a new {@link ChannelPromise}if {@link #isVoid()} returns {@code true} otherwise itself.*/ // Let's ignore this method. ChannelPromise unvoid(); }Copy the code

To summarize, we introduced several interfaces above, Future and its subinterfaces ChannelFuture and Promise, and then the ChannelPromise interface inherits both ChannelFuture and Promise.
Let me list the main methods of these interfaces together so that you can see them clearly:

Next, we need an implementation class so that we can see visually how they are used, because these are all interface definitions, depending on how the implementation class works.
Now, let’s introduce it
DefaultPromiseThis implementation class, this class is very common, its source code is not short, we first introduce a few key content, and then introduce an example use.
First, let’s take a look at what properties it has:
Public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {private volatile Object result; // The thread pool in which the task is executed. The promise holds a reference to the executor, which is a bit strange because the task does not need to know where it is executed. // Listeners, callback functions, private Object listeners executed after task completion (normal or abnormal); // The number of threads waiting for this promise (the number of threads waiting by calling sync()/await())) private short waiters; Private Boolean notifyingListeners; // Indicates whether the waiting thread is being woken up to prevent repeat listeners from being woken up. . }Copy the code

As you can see, this class implements Promise, but does not implement ChannelFuture, so it is not associated with Channel.
Don’t worry, we’ll see the use of another class, DefaultChannelPromise, which is a combination of ChannelFuture and Promise, but its implementation is largely inherited from the DefaultPromise class here.
Now that I’ve done the properties above, you can take a look
setSuccess(V result)
trySuccess(V result)
setFailure(Throwable cause)
tryFailure(Throwable cause)Here are a few methods:

See the difference between setSuccess(result) and trySuccess(result)?
The above methods are very simple: set the values and then execute the listener callback methods. The notifyListeners can also check it out, but there are also notifyListeners on Netty thread pools that we haven’t covered yet. The above code wakes up threads blocking sync() or await() in either setSuccess0 or setFailure0 methods
Also, take a look at the difference between sync() and await(), the rest I think I’ll just look around.

@Override public Promise<V> sync() throws InterruptedException { await(); // If the task is a failure, re-throw the corresponding exception rethrowIfFailed();return this;
}Copy the code

Next, let’s write an example code:

Public static void main(String[] args) {EventExecutor Executor = new DefaultEventExecutor(); // Create DefaultPromise instance. Promise Promise = new DefaultPromise(executor); // Add two listener promises.addListener (new GenericFutureListener<Future<Integer>>() {@override public void operationComplete(Future future) throws Exception {if (future.isSuccess()) {
                    System.out.println("Mission ended, result:" + future.get());
                } else {
                    System.out.println("Task failed, exception:"+ future.cause()); } } }).addListener(new GenericFutureListener<Future<Integer>>() { @Override public void operationComplete(Future future)  throws Exception { System.out.println("Mission over, Balabala..."); }}); Submit (new); // Submit the task to the thread pool and execute after five secondsRunnable() {
            @Override
            public void run() { try { Thread.sleep(5000); } catch (InterruptedException e) {} // The result of setting promise // promise.setFailure(new RuntimeException()); promise.setSuccess(123456); }}); // The main thread blocks waiting for the result. Try {promise.sync(); } catch (InterruptedException e) { } }Copy the code

Run the code and two listeners will output after 5 seconds:

Mission ended, result: 123456 Mission ended, Balabala...Copy the code
Readers can try the difference between sync() and await() here by calling promise.setFailure(new RuntimeException()) in the task.
In the code above, you might be a little confused about the relationship between thread pool executors and promises. The reader should also be aware that specific tasks do not have to be performed in this Executor. When the task is over, you need to call promise.setsuccess (result) as a notification.


In general, the future that promises represent does not need to be associated with a thread pool. The future only cares about the completion of the task and the result of its execution. The future does not really care about which thread or thread pool performs the task.
After all, Netty is not trying to create a generic thread pool implementation, but rather is tied to the IO it handles, so we just need to understand it.


So that’s it, let’s go back and look at this picture again and see if you can make sense of this:

Let’s take a look at the left-hand side of the figure above, and while we don’t know exactly what bind() does, we should be able to guess.
Of course, calling b.bind(port) from the main thread returns a ChannelFuture. Bind () is an asynchronous method, and when one of the executing threads has done the actual binding, The thread of execution must mark the future as successful (we assume bind will succeed), and the sync() method here (the main thread) will return.
If bind(port) fails, we know that sync() throws the exception and then executes to the finally block.
Once the bind port has been completed, the f.channel() method returns the channel associated with the future on the next line.
Channel.closefuture () also returns a ChannelFuture and calls sync(), which returns the following condition:
Another thread closed NioServerSocketChannelThe sync() method usually returns because the service needs to be stopped and then the thread sets the future state (setSuccess(result) or setFailure(cause)).
This article is here, I hope you have some understanding of Netty asynchronous programming, when the subsequent encounter source code can know how to use it.