Before the first contact was this way to deal with I/O amazed, write Netty there is always a way not to write business; It feels like you’re actually writing code! I have written a lot of codes for Reactor model, but most of them are Echo versions, so I really want to know what Netty encapsulates and why it is chosen by many high-performance frameworks (SpringWebFlux, Dubbo, Lettuce). So get ready to dig into the source code sometime.

In fact, this source code is not I strongly want to see, because the Internet did not find what articles and resources, the official document is not what good-looking. Except for the book “Netty Field” I read before, nothing. I decided to look at the implementation to understand it further.

The structure of the organization

Netty’s source organization structure is as follows:

(How complicated! (:p

Bootstrap

The bootstrap package is used to start services. For example, bootstrap is used to start the client connection service. ServerBootStrap Is responsible for starting the server listening service. Similar to our traditional Socket meaning, so there is nothing to talk about. The source code is relatively simple:

Buffer

Buffers are operations on data, including operations on direct memory, operations on cached data, and so on. Netty combines network I/O buffers with direct memory buffers, so we can use this abstraction to write convenient code when processing data.

Channel

Channel defines some core components, such as encapsulating channel for connection, composing channel Pipeline for request-response chain, and polling management EventLoop/EventLoopGroup for channel.

Handler

For our convenience, Netty provides some out-of-the-box handlers, such as Http codecs, loggers, and so on.

Resolver

Some parsers

Util

Common utility classes. One thing to note, however, is the Concurrent package, which is a wrapper and extension of java.util.concurrent to facilitate the use of threads in Netty for some concurrent operations and the management of threads (including the management of polling). One of the most important reasons for Netty’s high connections is its use of threads, allowing one thread to manage multiple connections (or even thousands of connections if the connection is small).

Core components

EventLoop

If you’ve written the Java NIO network communication for Reactor model, you know that in NIO, there’s one thread for one Selector, and one Selector manages N remote connections.

In NIO, we want to know if the N connections currently managed by Selector have a ready event. We need to call the select() method, which is implemented by calling the operating system’s select/poll/epoll. When a ready event occurs, the method returns from the blocked state. It then polls the ready list, so it’s called “Loop”. Polling for a set of times of interest is where EventLoop gets its name.

NioEventLoop

In Netty, there are many classes that implement the EventLoop interface, which is one manifestation of the difference in the underlying Netty encapsulation. Except for OS specific ones such as EpollEventLoop for Linux and Kqueue for macOS; We usually use NIO. Because NIO shields system differences and automatically selects the best system-specific approach based on the current program system.

The top three layers are provided by the JUC standard library, and the bottom three are encapsulated by Netty. It’s easy to see by looking at it,

  • 🍔 EventExecutorGroup

First, we analyze EventExecutorGroup. It is an interface that extends JUC thread pool operations by doing three main things:

1 ️ adds a method to “gracefully close” the thread pool.

2 ️ implements the iterator interface and adds the next() method to implement the function of returning the next EventExecutor (that is, the iteration function).

3 ️ rewrites the return values of submit() and execute() methods to Netty’s own Future(extending JUC’s Future by adding Netty’s own functionality).

  • 🍟 EventExecutor

We mentioned the EventExecutor interface, which is an event execution interface in Netty. It provides a method to determine whether the current execution thread is an EventLoop thread. The parent() method returns the EventExecutorGroup that manages it. Return it to itself by rewriting next(). The EventExecutorGroup is responsible for managing multiple EventExecutors.

  • 🍕 EventLoopGroup

At this point we discover that EventLoopGroup also inherits from EventExecutorLoop, and it overwrites the next() method to get the EventLoop it manages. In addition, we added a method to register a Channel. When we register a Channel with an EventLoopGroup, EventLoopGroup will select an EventLoop managed by the group to poll the Channel.

  • 🥪 AbstractEventExecutor

So what does an AbstractEventExecutor do? We know from inheritance that it isAn abstract class implementation of EventExecutorIn fact, you can see it by looking at the source codeOnly part of the functionality is implemented.

For example, it implements the next() method, which returns an EventExecutor, which is itself; Note that one thing we need to be clear about is that an EventExecutor does not necessarily have a Group to manage it. So its parent(EventExecutorGroup) can be empty.

  • 🥙 AbstractScheduledEventExecutor

Provides the EventExecutor with the scheduled execution function.

But so far, we haven’t implemented threading.

  • 🌮 OrderedEventExecuto

Here we go! OrderedEventExecutor is simply a token interface that indicates that tasks will be executed in the order they are submitted.

  • 🥘 SingleThreadEventExecutor

And although SingleThreadEventExecutor implements a thread pool the execute () method, but its implementation behavior and we anticipate that the thread pool implementations are quite different, the core implementation in the execute () method, we take a look at:

private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    // Add a task to a task queue. The execution of the task in the task queue is implemented by the subclass.
    // The subclass will run all the tasks in the task queue at one go
    // The queue should be empty. No tasks should exist. So a task with an empty method body is added to act as a placeholder in the queue.
    / / SingleThreadEventExecutor subclasses run run () method of the internal implementation is to do online polling operation + operation task queue all the tasks
    addTask(task);
    if(! inEventLoop) {// When first called, the startThread() method is triggered because thread == null
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true; }}catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
            }
            if(reject) { reject(); }}} instead! [image.png](https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0411f7f6044a4e64a415220b019d17b3~tplv-k3u1fbpfcp-watermark.image)
    // When the task is added again, wake up the thread of execution
    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}
Copy the code

Then look at startThread() :

Found that it calls the doStartThread() method:

Looking at the source code, you can see that what this method does is simply run a custom run() method and close the thread pool after running it. And notice that thread = thread.currentThread (); So execute() only enters startThread() once!

Note that there is a small detail, is SingleThreadEventExecutor. Enclosing the run () to run in a Runnable, but the Runnable submitted to the executor to perform, the executor is passed as a parameter to come; Through the source code we can know the executor is MultithreadEventExecutorGroup. The executor’s execute(task) creates a new thread to run the task. So this Runnable is executed on the new thread. The run() method in this Runnable ends up running in a new thread, but since run() is only called once, each EventExecutor has only one new thread to perform the logical operation. This implements the logic of one thread per EventLoop.

To sum it up:

  • Pass to the refs EventExecutor implementation class executor (Java. Util. Concurrent. Executor) the execute (Runnable) method will create a thread to execute each Runnable.
  • In order to achieve the goal of a single thread managing a Selector and task execution, we can’t allow the task (the Runnable object) to be submitted to a property executor every time (otherwise it would be threaded every time), so we put the task on a task queue.
  • But how can tasks in the task queue be executed? The answer is to put it in the subclass’s run() method, which has an infinite loop that repeats: polling network I/O + executing all tasks in the queue; This is the process.
  • So since a new thread is created each time, and run() happens to be capable of continuously processing the I/O+ queue, we simply submit run() to an executor, which has only one thread and implements the I/O+ queue logic.

Pay attention! Run the run () this will not be implemented through the Runnable interface, is SingleThreadEventExecutor custom abstract methods:

Subclasses, such as NioEventLoop, are responsible for implementing this abstract method. Through the debug we do see the SingleThreadEventExecutor. Enclosing the run () have been NioEventLoop realized, so the final call is NioEventLoop run () method.

  • 🌯 EventLoop

Now let’s look at EventLoop. An EventLoop is to an EventLoopGroup what an EventExecutor is to An EventExecutorGroup. EventLoop overrides the Parent () of an EventExecutor to return the Group that manages itself.

  • 🥗 SingleThreadEventLoop

SingleThreadEventLoop is the penultimate step, which implements the register logic.

  • 🥫 NioEventLoop

The select() operations that we need to do, turn on the Selector, and set up the connection to the event of interest are done inside the NioEventLoop.

As mentioned earlier, NioEventLoop implements the run() method:

protected void run(a) {
    int selectCnt = 0;
    for (;;) {
        int strategy;
        strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
        switch (strategy) {
            case SelectStrategy.CONTINUE:
                continue;
            case SelectStrategy.BUSY_WAIT:
            case SelectStrategy.SELECT:
                long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                if (curDeadlineNanos == -1L) {
                    curDeadlineNanos = NONE;
                }
                nextWakeupNanos.set(curDeadlineNanos);
                try {
                    if(! hasTasks()) {if (curDeadlineNanos == NONE) {
                            return selector.select();
                        }
                        long timeoutMillis = deadlineToDelayNanos(curDeadlineNanos + 995000L) / 1000000L;
                        return timeoutMillis <= 0? selector.selectNow() : selector.select(timeoutMillis); }}finally {
                    nextWakeupNanos.lazySet(AWAKE);
                }
            default:
        }
        selectCnt++;
        cancelledKeys = 0;
        needsToSelectAgain = false;
        final int ioRatio = this.ioRatio;
        boolean ranTasks;
        if (ioRatio == 100) {
            try {
                if (strategy > 0) { processSelectedKeys(); }}finally {
                // Ensure that all tasks in the task queue can always be runranTasks = runAllTasks(); }}else if (strategy > 0) {
            final long ioStartTime = System.nanoTime();
            try {
                // This function gets a set of ready events and processes them
                processSelectedKeys();
            } finally {
                // Ensure that all tasks in the task queue can always be run
                final long ioTime = System.nanoTime() - ioStartTime;
                ranTasks = runAllTasks(ioTime * (100- ioRatio) / ioRatio); }}else {
            ranTasks = runAllTasks(0); // This will run the minimum number of tasks}}}Copy the code

As you can see from the inheritance relationship, the final implementation of EventLoop (including those specific to the system) is ultimately the same inheritance hierarchy as NioEventLoop.

To be clear, NioEventLoop uses threads to perform all of the above operations, but it does not create the threads. Instead, NioEventLoopGroup generates such a number of threads based on the number set by the input parameter, and then binds them to each NioEventLoop. See the source code below.

EventLoopGroup

NioEventLoopGroup

For the same reason above, we analyze NioEventLoopGroup.

Here it is easy to notice, NioEventLoop AbstractEventExecutor has been replaced by AbstractEventExecutorGroup. This is an abstract implementation of the Group that manages EventExecutor.

  • 🍝 AbstractEventExecutorGroup

AbstractEventExecutorGroup implementation is simple and crude, it basically achieved the submit () and execute () method (these two methods from JUC) and elegance to close. And submit() and execute() are implemented by simply calling the next() method to get an EventExecutor and then executing through the EventExecutor.

By looking at the source code, we can easily see this:

  • 🍜 MultithreadEventExecutorGroup

By understanding NioEventLoop, we know that the extension of the abstract class implementation is implemented through the single-threaded EventExecutor model, so the Group version, since it manages multiple Eventexecutors, must be a collection of multiple single-threaded, i.e., multiple threads. So I have MultithreadEventExecutorGroup this class.

MultithreadEventExecutorGroup defines a new method, called the newChild (). This method returns an EventExecutor that can be returned by the next() method, and is called once for each thread. In other words, for MultithreadEventExecutorGroup management N threads, we through the newChild () method returns N EventExecutor, each corresponding to a thread. This method needs to be overridden by a subclass.

It also has a Chooser property, which is used to select one from the array of available Eventexecutors by custom/default policy as the return value of the next() method. In addition, its iterator method returns an immutable iterator object.

The constructor implementation for this class is interesting. An EventLoopGroup relies heavily on some data constructed by this constructor.

First take a look at its fields!

Then take a look at its constructor:

The elements of the EventExecutor array are populated by the newChild() method, which is consistent with our analysis above. And then we’re going to build the Chooser and the immutable set that we’re going to iterate on.

  • 🍲 MultithreadEventLoopGroup

MultithreadEventLoopGroup registered logic is implemented, the corresponding SingleThreadEventLoop registration logic, its logic is registered by calling the next () method to obtain EventLoop, then register to the EventLoop.

  • 🍛 NioEventLoopGroup

NioEventLoopGroup implements only the newChild() method, which is the implementation of creating an EventExecutor and binding it to a thread. Essentially by MultithreadEventExecutorGroup pass parameters, received by specialized EventLoopGroup parameters and generate EventExecutor specific to a particular platform or way.

summary

EventLoop/EventLoopGroup: EventLoop/EventLoopGroup: EventLoop/EventLoopGroup

  • * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
  • The idea of managing multiple EventExecutors led to the EventExecutorGroup series; But Netty’s choice to implement this combination of operations by having EventExecutor inherit from EventExecutorGroup is a bit confusing at first.
  • But for now we just encapsulate the execution, not implement it, and we know that Netty is specific to network operations, so the events we need should be network I/O events, so we have EventLoop and its corresponding administrator EventLoopGroup.
  • EventLoop multiplex polling I/O events compared to EventExecutor, so that we can use EventExecutor to process network I/O. In other words, it extends EventExecutor.
  • Although an EventExecutor can execute a Runnable, its implementation is to queue tasks and then queue all tasks in the run() method of XxxEventLoop, with the run() call of XxxEventLoop on the new thread. It is called only once at the same time, so an EventExecutor has a single thread to execute all the tasks given to it while implementing event polling.

ChannelInboundInvoker/ChannelOutboundInvoker

  • 🍣 ChannelInboundInvoker/ChannelOutboundInvoker

These two interfaces are not really much to say, but they are important basic interfaces:

So I’m going to give you a little bit of specificity. ChannelInboundInvoker defines a bunch of inbound operations, such as fireXxx, that indicate the next trigger in the inbound direction (such as data read, exception catch that needs to be passed down). ChannelOutboundInvoker defines a bunch of out operations, such as write operations, exposed operations such as join/bind operations, and read() operations that require reading from outside.

Channel

  • 🍱 Channel

Channel is an abstract interface, so it is difficult to analyze it directly. Let’s look at the inheritance hierarchy first.

The Channel keeps the read and flush operations to itself, throwing write and flush(the null return version) to the Unsafe implementation (which is its internal interface).

  • 🍫 AbstractChannel

AbstractChannel throws most of the implementation to ChannelPipeline. But it constructs The ChannelPipeline. It constructs a DefaultChannelPipeline instance with its own input (the Channel instance).

A common constructor looks like this:

Parent refers to ServerSocketChannel, that is, the ServerSocketChannel that creates the SocketChannel on the server. In Netty, it is the encapsulated NioServerSocketChannel.

  • 🍩 AbstractNioChannel

AbstractNioChannel further encapsulates NIO-related operations, including providing the NioUnsafe interface.

  • 🍪 AbstractNioByteChannel

AbstractNioByteChannel provides I/O specific ByteBuf operations to read and write data in byte form, which is a further embodiment. So it implements the two most basic functions of reading and writing.

  • 🍯 NioSocketChannel

NioSocketChannel can be seen as the final interaction implementation. We can guess some usage by looking at the constructor:

The SocketChannel back to Java,. It is a Java nio. Channels. SocketChannel. This class is more about closing the pipe.

And the whole Channel chain need interaction fundamental — Java. Nio. Channels. A SocketChannel is provided here, let’s see who is calling the constructor:

The accept() method is pretty familiar:

Channel.Unsafe

  • 🍿 Channel. The Unsafe

Directly up to the inheritance level:

First, Unsafe itself provides methods that interact directly with external data. These methods should supposedly be called only from within Netty, not from user code.

  • 🥛 NioUnsafe

NioUnsafe extends Unsafe. The main way it extends Unsafe is by providing access to SelectableChannel:

  • 🍵 AbstractUnsafe

The AbstractUnsafe? It refines many of the main methods, such as flush, write, and register. However, the implementation of these methods is delegated to the subclass, which simply adds the message to the flush buffer, wraps the Channel to be registered, and then calls the subclass register to complete the registration logic.

  • ☕ ️ AbstractNioUnsafe

AbstractNioUnsafe implements most of the logic, including the logic to return SelectableChannel (the implementation is simple, just return javaChannel).

  • 🍺 NioByteUnsafe

NioByteUnsafe, on the other hand, implements the important read() method, which puts the data read by the last read operation of interest into the ByteBuf and fires it to the next Handler. Note that doBeginRead() is responsible for setting the current Channel to be read interested, and read() is responsible for putting the read data into the ByteBuf for use.

summary

The Channel family, including channal. Unsafe, is a complex family because it encapsulates the JDK’s SocketChannel and involves actual data writing. Netty wraps too many layers for unity and convenience. Each Abstract has its own responsibilities and work, and each final implementation class has its own methods that are delegated by its parent class, so it is best to analyze it in combination with the inheritance hierarchy.

ChannelPipeline

  • 🥟 ChannelPipeline

First look at the structure level:

ChannelPipeline is relatively simple to implement because it has only one implementation class DefaultChannelPipeline. And its methods fall into the following four categories:

  • 1 Add a ChannelHandler on ️

  • 2 ️ Removes a ChannelHandler

  • 3 ️ fires the fireXxx() method on the ChannelHandler chain from the beginning

  • 4 ⃣ ️ get ChannelHandler/ChannelHandlerContext/Channel

  • 🦪 DefaultChannelPipeline

If we look directly at the source of DefaultChannelPipeline, we can see that the implementation is very simple:

  • 1 ️ For fireXxx operation, it is implemented by calling invokeXxx method of head node, so if fireXxx of pipeline is called, the operation from scratch will be triggered. FireXxx calling ChannelHandlerContext will trigger the next Handler.
  • 2 ️ Operations such as read, write, bind and connect are implemented by calling relevant methods of tail node. Write of tail is easy to understand. After all, tail is at the end and you want to write data, it is certainly correct to refresh from the last node. Tail’s read operation is not channelRead(). Instead, it means that the program wants to read. The end result is a call to the read method of head, which triggers the registration of the actual event of interest to read.

ChannelHandlerContext

  • 🍤 ChannelHandlerContext

Let’s look directly at the inheritance hierarchy diagram for a better understanding:

ChannelHandlerContext gives the current ChannelHandler the ability to interact with its own ChannelPipeline and other handlers that are part of the agreed Pipeline. This refers to the ability to interact with the next Handler in the direction of the data flow in the Pipeline (essentially the stack of fireXxx methods).

In fact, we can see that the method provided by the ChannelPipeline, ChannelHandler, Channel, EventExecutor, and overwrite fireXxx five operations. I haven’t. And then there are some inherited operations. ChannelHandlerContext provides ChannelHandler to interact with other ChannelHandlers and Channel to interact with ChannelPipeline.

And many ChannelHandlerContext key operations are AbstractChannelHandlerContext implemented:

  • 🍙 AbstractChannelHandlerContext

It delegates the key implementation to the invokeXxx() method, and the logic in invokeXxx is basically the same: Get the next ChannelHandlerContext=> determine whether the EventExecutor of the next context (actually ChannelHandler’s EventExecutor) is a polling thread (Netty can for each ChannelHandler) Specify an EventExecutor)=> If so, the next read operation is called directly; If not, then the read() operation is performed on its eventExecutor (next).

I want to talk about customizable execution contexts for a certain ChannelHandler. If you specify an EventExecutor for a certain ChannelHandler, AbstractChannelHandlerContext before invokeXxx will add a judgment, because Singlethread. Enclosing the thread when opening the I/O polling thread has been set to the I/O polling thread (and will be called only once), So the call to executor.ineventLoop () will get false, and then execute in executor, which looks fine at first glance. However, if you debug, you will find that all the handlers behind this Handler will be on the new thread instead of cutting back to the I/O thread. Why is this? I did not open up a new execution context for the following?

The answer is the executor’s inEventLoop() method. This Handler returns true because it does not specify an executor, so it will run on the current I/O thread. Note ⚠️ that the current thread is not an I/O thread. The current thread has already switched to the new thread because of the previous Handler’s custom executor, so it will execute in the new thread.

Because the initial HeadContext runs on the I/O thread, all subsequent handlers will run on its thread unless someone has explicitly switched the execution context. So Netty’s fireXxx method causes subsequent handlers to run in the same execution context as the current Handler.

Other operations are basically the same.

  • 🍡 DefaultChannelHandlerContext

And AbstractChannelHandlerContext is not related to ChannelHandler implementation, it entrusting ChannelHandler implementation related to the implementation class to do it, and in the Netty, implementation class is only one, Is DefaultChannelHandlerContext, so its implementation is also quite simple:

  • 🍧 HeadContext/TailContext

In addition, we also have two special ChannelHandlerContext. As we mentioned earlier, Netty organizes channelHandlerHandlers through a linked list structure. So you have to have the first and the last nodes as the boundaries. So there are two special implementations, HeadContext and TailContext.

HeadContext implements InboundHandler/OutboundHander interface, also inherited AbstractHandlerContext. As we can guess, as a linked list head, it must be responsible for reading and writing, so it must exist as a two-way BoundHandler, and it should also have the ability to actually read data from the Channel, and fire to the Handler that follows; And you have to be able to write data, actually write data to a Channel. This is the ability it must have as a head node.

There are three ways to implement HeadContext:

  • 1. Directly ignored by ️, e.g. handlerAdded/Removed notification method.
  • 2 ️ calls channel. Unsafe methods such as bind, connect, read, write, which correspond to JavaChannel.
  • 3 ️ is directly forwarded to the next Handler. As the head node acts as the boundary and the actual Channel, the business logic of exception processing should not belong to it for reading and writing data, so it directly fires to the next Handler. For example, the channelXxx method.

TailContext is even simpler, it only acts as a tail delimiter, and all methods are null (in fact, some methods just log, which is roughly equal to null).

summary

  • ChannelHandlerContext provides the ability for a ChannelHandler to interact with a later ChannelHandler under the same ChannelPipeline.
  • The implementation of forwarding requests to the next ChannelHandler in the chain is implemented by determining the execution context and then calling the relevant methods of the next Handler.

ChannelHandler

  • 🍦 ChannelHandler

This is the actual composition of a ChannelHandler chain:Since it involves logical composition, it can also be viewed as the following way:

Look directly at the interface definition:

In fact, there are only two methods to do back, one is to add Handler, one is to remove Handler.

There is also an annotation, @shareable, indicating whether this Handler can be shared, i.e. the same instance is shared between multiple pipelines.

  • 🥧 ChannelHandlerAdapter

But the main ChannelHandlerAdapter added by parsing annotations to determine whether this Handler is shared method, the implementation is very simple, do not say.

We are not very good at singling out the Handler, because in the actual use, usually in and out of two Handler:

ChannelInboundHandler

  • 🍰 ChannelInboundHandler

If ChannelHandler provides a callback for adding/removing ChannelHandler, then ChannelInboundHandler provides a more detailed callback for a Channel state change. In other words, when a Channel is registered, activated, read data, When an exception is encountered after the data is read, a callback is obtained.

ChannelOutboundHandler

  • 🍭 ChannelOutboundHandler

The ChannelOutboundHandler handler is called back when the I/O operations (bind, connect, write) are available.

ChannelDuplexHandler

  • 🍬 ChannelDuplexHandler

A two-way Handler with both In and Out functions.

ByteBuf

ByteBuf is Netty data buffer, operation, read and write the core class, first dig a pit, later fill ByteBuf related source code.

reference

no