This series of Netty source code analysis articles is based on the 4.1.56.final version

When you first see this flowchart, is your brain buzzing?

The purpose of this article is to give you a clear understanding of this flow chart, so as to deeply understand the entire process of Netty Reactor startup, including the various code design details involved.

In the last article, WE introduced the Reactor group (NioEventLoopGroup) and the Reactor model The creation of NioEventLoop. Finally, we get the operating skeleton of Netty Reactor model as follows:

Now the framework of the Netty server program is built, this article is based on this framework to in-depth analysis of the Netty server startup process.

After creating the primary/secondary Reactor thread groups :bossGroup, workerGroup, we will configure the Netty server boot helper ServerBootstrap.

public final class EchoServer {
    static final int PORT = Integer.parseInt(System.getProperty("port"."8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        // Create a primary/secondary Reactor thread group
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)// Configure the primary/secondary Reactor
             .channel(NioServerSocketChannel.class)// Configure the channel type in the primary Reactor
             .option(ChannelOption.SO_BACKLOG, 100)// Set the channel option in the primary Reactor
             .handler(new LoggingHandler(LogLevel.INFO))// Set the primary Reactor to Channel->pipline->handler
             .childHandler(new ChannelInitializer<SocketChannel>() {// Set up a pipeline to register a channel from the Reactor
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(serverHandler); }});// Start the server. The bound port starts the service and listens for accept events
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code

In the previous article, we briefly introduced some of the ServerBootstrap configuration methods in the code template. If you forget, you can review them in the back.

The ServerBootstrap class has no special logic. It mainly manages some core information needed during Netty startup. For example:

  • Netty’s core engine components primary/secondary Reactor thread groups: bossGroup, Reactor Group Configuration via the ServerBootstrap#group method.

  • The Channel type used by the Netty server is NioServerSocketChannel, which is configured using the ServerBootstrap# Channel method.

And SocketOption used when configuring NioServerSocketChannel. SocketOption is used to set some options for the underlying JDK NIO Socket. This is configured using the ServerBootstrap#option method.

The MainReactor in the primary ReactorGroup manages Channel type NioServerSocketChannel, as shown in the figure, which is mainly used to monitor ports, receive client connections, and initialize NioSocketChannel for clients. Then, a SubReactor is selected from the ReactorGroup to bind with the client NioSocketChannel by round-robin polling.

The Channel type managed by the SubReactor from the ReactorGroup is NioSocketChannel, which is a model in netty that defines client connections, one for each connection. As shown in the figure, the SubReactor is responsible for listening and processing IO events on all NIO SocketChannels bound to it.

  • Save the serverNioServerSocketChannelAnd the clientNioSocketChannelThe correspondingpipelineSpecified in theChannelHandler. Initializes the pipeline in the Channel after the subsequent Channel has registered with the Reactor.

Each Channel instance has a Pipeline, whether it is server socketChannel or client socketChannel. A Pipeline has multiple ChannelHandlers for choreographing IO events of interest on the corresponding Channel.

The ServerBootstrap structure contains all the configuration information for netty server program startup. Before we introduce the startup process, let’s take a look at the ServerBootstrap source structure:

ServerBootstrap

ServerBootstrap has a simple inheritance structure and a clear division of responsibilities at the inheritance level.

ServerBootstrap manages the configurations related to the primary and secondary Reactor thread groups. The configuration method prefixed with Child manages the configurations related to the secondary Reactor thread groups. The client NioSocketChannel configuration that the Sub Reactor manages from the Reactor thread group is stored in the ServerBootstrap structure.

The parent class AbstractBootstrap is mainly responsible for the configuration management related to the Main Reactor thread group and the configuration management related to the ServerSocketChannel processed by the Main Reactor thread group.

1. Configure the primary/secondary Reactor thread group

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)// Configure the primary/secondary Reactor
Copy the code
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap.ServerChannel> {

     //Main Reactor thread group
    volatile EventLoopGroup group;
    //Sub Reactor thread group
    private volatile EventLoopGroup childGroup;

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        // The parent class manages the primary Reactor thread group
        super.group(parentGroup);
        if (this.childGroup ! =null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
        return this; }}Copy the code

2. Configure ServerSocketChannel

ServerBootstrap b = new ServerBootstrap();
b.channel(NioServerSocketChannel.class);
Copy the code
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap.ServerChannel> {

    // Used to create ServerSocketChannel ReflectiveChannelFactory
    private volatile ChannelFactory<? extends C> channelFactory;

    public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(
                ObjectUtil.checkNotNull(channelClass, "channelClass"))); }@Deprecated
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        ObjectUtil.checkNotNull(channelFactory, "channelFactory");
        if (this.channelFactory ! =null) {
            throw new IllegalStateException("channelFactory set already");
        }

        this.channelFactory = channelFactory;
        returnself(); }}Copy the code

In the channel method that configures the server ServerSocketChannel to ServerBootstrap, Is actually created a ChannelFactory ReflectiveChannelFactory factory instance, in the process of Netty server startup, will be to create the corresponding Channel through the ChannelFactory instance.

ServerSocketChannel can be configured in different IO models.

BIO NIO AIO
OioServerSocketChannel NioServerSocketChannel AioServerSocketChannel

EventLoopGroup Reactor Thread group implementation under different IO models:

BIO NIO AIO
ThreadPerChannelEventLoopGroup NioEventLoopGroup AioEventLoopGroup
We just need to putIO modelImplementation classes corresponding to these core interfacesThe prefixTo the correspondingIO modelCan be easily done in NettyIO modelThe switch.

2.1 ReflectiveChannelFactory

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    / / NioServerSocketChannelde constructor
    private final Constructor<? extends T> constructor;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            // Reflection gets the constructor for NioServerSocketChannel
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                    " does not have a public non-arg constructor", e); }}@Override
    public T newChannel(a) {
        try {
            // Create NioServerSocketChannel instance
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class "+ constructor.getDeclaringClass(), t); }}}Copy the code

From the class signature, we can see that the factory class creates the corresponding Channel instance by generics and reflection.

  • The generic parameterT extends ChannelRepresents the one to be created through the factory classThe Channel typeSo, here we initializeNioServerSocketChannel.
  • inReflectiveChannelFactory Pass through the constructorreflectionThe method of obtainingNioServerSocketChannelConstructor of.
  • innewChannel Method is created by constructor reflectionNioServerSocketChannelInstance.

Note that this is only the configuration phase and NioServerSocketChannel is not created at this time. It is created at startup time.

3. Configure ChannelOption for SERVER SocketChannel

ServerBootstrap b = new ServerBootstrap();
// set Socket options for server socketchannel managed by MainReactor
b.option(ChannelOption.SO_BACKLOG, 100)
Copy the code
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B.C>, C extends Channel> implements Cloneable {

    // Set ChannelOption in serverSocketChannel
    private finalMap<ChannelOption<? >, Object> options =newLinkedHashMap<ChannelOption<? >, Object>();public <T> B option(ChannelOption<T> option, T value) {
        ObjectUtil.checkNotNull(option, "option");
        synchronized (options) {
            if (value == null) {
                options.remove(option);
            } else{ options.put(option, value); }}returnself(); }}Copy the code

Both the server NioServerSocketChannel and the client NioSocketChannel have their underlying Socket option ChannelOption configuration stored in a map-type data structure.

Since client NioSocketChannel is handled by the Sub Reactor from the Reactor thread group, all methods and configurations involving client NioSocketChannel begin with the child prefix.

ServerBootstrap b = new ServerBootstrap();
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
Copy the code
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap.ServerChannel> {

   // Set ChannelOption for SocketChannel on the client
    private finalMap<ChannelOption<? >, Object> childOptions =newLinkedHashMap<ChannelOption<? >, Object>();public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
        ObjectUtil.checkNotNull(childOption, "childOption");
        synchronized (childOptions) {
            if (value == null) {
                childOptions.remove(childOption);
            } else{ childOptions.put(childOption, value); }}return this; }}Copy the code

The ChannelOption class provides a list of Socket options, including netty, and Socket options. In future articles in this series, I will describe these parameters in detail.

public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {... Omit...public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");
    public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
    public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
    public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
    public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
    public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
    public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
    public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT"); . Omit... }Copy the code

4. Configure ChannelHandler for the Pipeline in the server NioServerSocketChannel

    // SocketChannel handler
    private volatile ChannelHandler handler;

    public B handler(ChannelHandler handler) {
        this.handler = ObjectUtil.checkNotNull(handler, "handler");
        return self();
    }
Copy the code

There are two ways to add a ChannelHandler to a Pipeline in NioServerSocketChannel:

  • Explicitly add:An explicit addition is done by the user in the main threadServerBootstrap#handlerThe way to add. If you need to add multipleChannelHandler, you can passChannelInitializertopipelineTo add.

ChannelInitializer is a special ChannelHandler used to initialize pipelines. Suitable for scenarios where multiple ChannelHandlers are added to a pipeline.

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)// Configure the primary/secondary Reactor
             .channel(NioServerSocketChannel.class)// Configure the channel type in the primary Reactor
             .handler(new ChannelInitializer<NioServerSocketChannel>() {
                 @Override
                 protected void initChannel(NioServerSocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(channelhandler1) .addLast(channelHandler2) ...... .addLast(channelHandler3); }})Copy the code
  • Implicit addition:Implicit add is basically addingThe main ReactorGroupThe core components are shown belowacceptor, the implementation in Netty isServerBootstrapAcceptorIs essentially a kind ofChannelHandlerIs mainly responsible for initializing the client after the client connection is establishedNioSocketChannelIn theFrom the Reactor thread groupSelect aSub Reactor, will the clientNioSocketChannelRegistered toSub ReactorIn theselectorOn.

Implicit add-serverBootstrapAcceptor is added by the Netty framework at startup without user concern.

In this case, NioServerSocketChannel has only two ChannelHandlers in the PipeLine, a LoggingHandler explicitly added externally by the user and a ServerBootstrapAcceptor implicitly added by the Netty framework.

In fact, we will not add additional ChannelHandler to netty server NioServerSocketChannel during the actual project use. NioServerSocketChannel only needs to concentrate on its most important job of receiving client connections. An additional LoggingHandler is added here just to show you how to configure ServerBootstrap.

5. Configure a ChannelHandler for the Pipeline in the NioSocketChannel client

            final EchoServerHandler serverHandler = new EchoServerHandler();

            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {// Set up a pipeline to register a channel from the Reactor
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
            
                     p.addLast(newLoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); }});Copy the code
    //socketChannel processing handler in pipeline
    private volatile ChannelHandler childHandler;

    public ServerBootstrap childHandler(ChannelHandler childHandler) {
        this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
        return this;
    }
Copy the code

Adding a ChannelHandler to a Pipeline in the client NioSocketChannel is completely user-controlled, and there is no limit to how many channelhandlers can be added.

In Netty’s IO thread model, a single Sub Reactor thread is responsible for executing Pipeline in client NioSocketChannel, and a Sub Reactor thread is responsible for processing IO events in multiple nio socketchannels. If too many Channelhandlers are added to the Pipeline, it will affect the Sub Reactor thread to execute pipelines on other NIO Socketchannels, thus reducing IO processing efficiency and throughput.

Therefore, it is not easy to add too many ChannelHandlers in the Pipeline, and time-consuming business processing tasks cannot be performed in the ChannelHandler.

When we configure the netty server startup information with ServerBootstrap, whether adding ChannelHandler to server NioServerSocketChannel pipeline, ChannelHandler will be added to the NioSocketChannel pipeline. If multiple channelhandlers are added, we will use ChannelInitializer. So who is this ChannelInitializer, and why? Let’s move on

ChannelInitializer

First, ChannelInitializer inherits from ChannelHandler and is itself a ChannelHandler, so it can be added to childHandler.

Other parent class we can not take care of here, the author will be introduced in detail for you in the following article.

So why not just add itChannelHandlerYou choose to useChannelInitializer?

There are two main reasons:

  • As mentioned earlier, the client NioServerSocketChannel is created in the server NioServerSocketChannel after the server accepts the connection. However, at this moment, we are in the stage of configuring ServerBootStrap, the server has not been started, let alone the client has not been connected, and the client NioSocketChannel has not been created. There is no way to add a ChannelHandler to the client NioSocketChannel pipeline.

  • The client NioSocketChannel can add any number of channelhandlers to the Pipeline, but the Netty framework cannot predict how many channelhandlers the user needs to add. So the Netty framework provides a callback function ChannelInitializer#initChannel that allows users to customize the behavior of adding ChannelHandler.

When the client NioSocketChannel is registered with the corresponding Sub Reactor, the Pipeline in NioSocketChannel is initialized. The Netty framework calls back ChannelInitializer#initChannel to perform the user-defined add logic.

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // When the channelRegister event occurs, initChannel is called to initialize pipeline
        if(initChannel(ctx)) { ................. Omit... }else{... Omit... }}private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                // The client single NioSocketChannel has been created and initialized
                initChannel((C) ctx.channel());
            } catch(Throwable cause) { ................. Omit... }finally{... Omit... }return true;
        }
        return false;
    }

    protected abstract void initChannel(C ch) throws Exception; . Omit... }Copy the code

The ChannelInitializer#initChannel method called back by the netty framework here is our custom add logic.

            final EchoServerHandler serverHandler = new EchoServerHandler();

            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {// Set up a pipeline to register a channel from the Reactor
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
            
                     p.addLast(newLoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); }});Copy the code

Up to now, all the necessary configuration information for Netty server startup has been stored in the ServerBootStrap startup helper class.

The next thing to do is start the server.

// Start the server. The bound port starts the service and listens for accept events
ChannelFuture f = serverBootStrap.bind(PORT).sync();
Copy the code

Start the Netty server

After the foreshadowing in front finally came to the core content of this article —-Netty server startup process.

As shown in example of code templates, Netty server startup package at io.net ty. The bootstrap. AbstractBootstrap# bind function (int).

What does the Netty server do during startup?

Do not panic when you see this startup flowchart. The author will take you to break it one by one in the following content. At the end of the article, we will ensure that you understand this flowchart.

We’ll start today’s tour with an entry function that netty server launches:

    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }

    public ChannelFuture bind(SocketAddress localAddress) {
        // Verify that Netty core components are properly configured
        validate();
        // The server starts up, binds the port address, and receives client connections
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }

   private ChannelFuture doBind(final SocketAddress localAddress) {
        // Asynchronously create, initialize, register ServerSocketChannel to main Reactor
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if(regFuture.cause() ! =null) {
            return regFuture;
        }

        if(regFuture.isDone()) { ........ ServerSocketChannel Binds the port.... after the server successfully registers with the Main Reactor ,}else {
            // If the registration is not complete at this point, add the operationComplete callback to regFuture, which will be called back upon successful registration.
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {... ServerSocketChannel Binds the port.... after the server successfully registers with the Main Reactor });returnpromise; }}Copy the code

The startup process of the Netty server is as follows:

  • Create server NioServerSocketChannel and initialize it.

  • Register server SERVER SocketChannel with the primary Reactor thread group.

  • After successful registration, initialize the pipeline in NioServerSocketChannel, and then fire the channelRegister event in the pipeline.

  • The port address is then bound by NioServerSocketChannel.

  • After the binding port address is successful, the ChannelActive event is propagated to the Pipeline corresponding to NioServerSocketChannel, and the OP_ACCEPT event is registered to the Main Reactor in the ChannelActive event callback. Start waiting for the client to connect. The server is started successfully.

Once the Netty server is up and running, we end up with the following configuration, ready to receive connections from clients, and Reactor is running.

Next, we look at the Netty source code is how to achieve the above steps ~~

1. initAndRegister

    final ChannelFuture initAndRegister(a) {
        Channel channel = null;
        try {
            / / create NioServerSocketChannel
            / / ReflectiveChannelFactory through generic, reflection, factory flexible way to create different types of channel
            channel = channelFactory.newChannel();
            // Initialize NioServerSocketChannel
            init(channel);
        } catch(Throwable t) { .............. Omit... }// Register ServerSocketChannel with MainReactorChannelFuture regFuture = config().group().register(channel); . Omit...return regFuture;
    }
Copy the code

As we can see from the name of the function, the main thing this function does is create NioServerSocketChannel first and initialize NioServerSocketChannel, Finally, the SERVER SocketChannel is registered in the Main Reactor.

1.1 create NioServerSocketChannel

Remember the factory class ReflectiveChannelFactory mentioned when we introduced the ServerBootstrap bootstrap bootstrap helper class to configure the server ServerSocketChannel type?

When we configure ServerBootstrap to start the helper classes, we are not in the startup phase, and the configuration phase is not the time to create a specific ServerSocketChannel.

So the type of ServerSocketChannel that Netty will create through the factory mode (specified by generics) and the creation process (encapsulated in the newChannel function) are first encapsulated in the factory class ReflectiveChannelFactory.

ReflectiveChannelFactory creates channels of different types in a generic, reflective, and factory manner

Waiting for the time to create, we call the channelFactory stored in ServerBootstrap to create directly.

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Constructor<? extends T> constructor;

    @Override
    public T newChannel(a) {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class "+ constructor.getDeclaringClass(), t); }}}Copy the code

NioServerSocketChannel:

1.1.1 NioServerSocketChannel

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    SelectorProvider(used to create Selector and Selectable Channels)
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    public NioServerSocketChannel(a) {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

    // Create JDK NIO ServerSocketChannel
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e); }}//ServerSocketChannel configuration
    private final ServerSocketChannelConfig config;

    public NioServerSocketChannel(ServerSocketChannel channel) {
        The AbstractNioChannel parent holds the JDK NIO native ServerSocketChannel and the event OP_ACCEPT to listen on
        super(null, channel, SelectionKey.OP_ACCEPT);
        / / DefaultChannelConfig set to Channel receive data in the buffer - > AdaptiveRecvByteBufAllocator
        config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }}Copy the code
  • First call newSocket to create JDK NIO native ServerSocketChannel, Here we call SelectorProvider#openServerSocketChannel to create the JDK NIO native ServerSocketChannel, SelectorProvider is used to create SelectorProvider SelectorProvider SelectorProvider SelectorProvider SelectorProvider SelectorProvider SelectorProvider SelectorProvider SelectorProvider Do you remember?

  • The parent class constructor sets the I/O event of interest to NioServerSocketChannel, in this case the selectionKey. OP_ACCEPT event. And encapsulate the JDK NIO native ServerSocketChannel.

  • Create NioServerSocketChannelConfig Channel configuration, the configuration class encapsulates the Channel at the bottom of some configuration behavior, and the ServerSocket JDK. And creating a Buffer distributor AdaptiveRecvByteBufAllocator NioServerSocketChannel receive data.

NioServerSocketChannelConfig nothing important things, we don’t have to get here, it is to manage the NioServerSocketChannel related configuration, Here only need attention is this used for Channel receive data in the Buffer distributor AdaptiveRecvByteBufAllocator, behind us in the introduction of Netty also mentions how to receive link.

NioServerSocketChannel’s overall construction process is introduced, now let’s go back to the hierarchy of NioServerSocketChannel’s construction, and see what each layer creates and encapsulates. These information are the core information of a Channel. So it’s important to know.

During the creation of NioServerSocketChannel, we mainly focus on the three classes highlighted in the red box in the inheritance structure diagram, and leave the rest out for now.

NioServerSocketChannel AbstractNioMessageChannel class mainly in case of read and write the underlying behavior of encapsulation and definition, such as the accept receive client connection. We’re going to talk about this later, but we’re not going to expand it here.

1.1.2 AbstractNioChannel

public abstract class AbstractNioChannel extends AbstractChannel {
   JDK NIO native Selectable Channel
    private final SelectableChannel ch;
    // Channel listens on the set of events
    protected final int readInterestOp;

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            // Set Channel to non-blocking with IO multiplexing model
            ch.configureBlocking(false);
        } catch(IOException e) { ............. Omit... }}}Copy the code
  • Encapsulates the JDK NIO native ServerSocketChannel created by SelectorProvider.

  • An encapsulation Channel specifies the I/O event of interest when it is created. For NioServerSocketChannel, the I/O event of interest is OP_ACCEPT.

  • Set JDK NIO native ServerSocketChannel to non-blocking mode with IO multiplexing model.

1.1.3 AbstractChannel

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    // Channels are created hierarchically, for example ServerSocketChannel is the parent of SocketChannel
    private final Channel parent;
    //channel machineId+processId+ Sequence +timestamp+random
    private final ChannelId id;
    //unsafe encapsulates operations on underlying sockets
    private final Unsafe unsafe;
    // Assign a separate pipeline to a channel for IO event choreography
    private final DefaultChannelPipeline pipeline;

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        //channel machineId+processId+ Sequence +timestamp+random
        id = newId();
        //unsafe is used to define the underlying operation to implement a Channel
        unsafe = newUnsafe();
        // Assign a separate pipeline to a channel for IO event choreographypipeline = newChannelPipeline(); }}Copy the code
  • For example, NioServerSocketChannel is a top-level Channel, so its parent = null. The client NioSocketChannel is created by NioServerSocketChannel, so its parent = NioServerSocketChannel.

  • Assigns a globally unique ChannelId to a Channel. ChannelId consists of machineId (machineId), processId (processId), sequence number (sequence), timestamp, and random number (random)

   private DefaultChannelId(a) {
        data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];
        int i = 0;

        // machineId
        System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);
        i += MACHINE_ID.length;

        // processId
        i = writeInt(i, PROCESS_ID);

        // sequence
        i = writeInt(i, nextSequence.getAndIncrement());

        // timestamp (kind of)
        i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());

        // random
        int random = PlatformDependent.threadLocalRandom().nextInt();
        i = writeInt(i, random);
        assert i == data.length;

        hashCode = Arrays.hashCode(data);
    }
Copy the code
  • createNioServerSocketChannelThe underlying operation class ofUnsafe . So what I’m creating here isio.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.

Unsafe is an internal interface of the Channel interface, which defines and implements operations on the underlying Channel. The Unsafe interface defines operations that can only be called by the Reactor thread of the Netty framework and cannot be called by the user thread.

interface Unsafe {
        
        // Allocate Buffer to receive data
        RecvByteBufAllocator.Handle recvBufAllocHandle(a);

        // Port address bound to the server
        SocketAddress localAddress(a);
        // Remote address
        SocketAddress remoteAddress(a);
        // Channel registers with Reactor
        void register(EventLoop eventLoop, ChannelPromise promise);

        // Address of the server bound port
        void bind(SocketAddress localAddress, ChannelPromise promise);
        // The client connects to the server
        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
        / / close the channle
        void close(ChannelPromise promise);
        / / read the data
        void beginRead(a);
        / / write data
        void write(Object msg, ChannelPromise promise);

    }
Copy the code
  • forNioServerSocketChannelDistributive independentpipelineUsed for I/O event choreography.pipelineIt’s actually aChannelHandlerContextType of bidirectional linked list. The first nodeHeadContext, end nodeTailContext.ChannelHandlerContextIn the packagingChannelHandler.

ChannelHandlerContext holds ChannelHandler context information for event propagation. Later I will open a separate article introduction, here we still focus on the main line of startup.

This is just to give you a simple understanding of the general structure of pipeline, and I will write a detailed article on pipeline.

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }
Copy the code


Now that the NioServerSocketChannel is created, let’s review what core information it contains.

1.2 initializing NioServerSocketChannel

   void init(Channel channel) {
        / / to ServerSocketChannelOption NioServerSocketChannelConfig Settings
        setChannelOptions(channel, newOptionsArray(), logger);
        // set attributes to netty's custom NioServerSocketChannel
        setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

        ChannelPipeline p = channel.pipeline();
        
        // Get data from the Reactor thread group
        final EventLoopGroup currentChildGroup = childGroup;
        // Get the ChannelInitializer used to initialize client NioSocketChannel
        final ChannelHandler currentChildHandler = childHandler;
        // Get the channelOption and Attributes of the SocketChannel configured by the user
        finalEntry<ChannelOption<? >, Object>[] currentChildOptions;synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
        }
        finalEntry<AttributeKey<? >, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);// add logic to initialize ChannelHandler to pipeline in NioServerSocketChannel
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ServerBootstrap User specified channelHandler
                ChannelHandler handler = config.handler();
                if(handler ! =null) {
                    //LoggingHandler
                    pipeline.addLast(handler);
                }
                // Add acceptors for receiving client connections
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run(a) {
                        pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }Copy the code
  • To set ServerSocketChannelOption NioServerSocketChannelConfig.

  • Set ChannelAttributes to netty’s custom SERVER SocketChannel

Netty’s custom SocketChannel types inherit from the AttributeMap interface and DefaultAttributeMap class, which define ChannelAttributes. Use to add user-defined information to a Channel.

This ChannelAttributes feature is very useful, and many features of Netty are based on this ChannelAttributes feature. What can you do with ChannelAttributes?

  • Get from childGroup Reactor thread group, and is used to initialize the client NioSocketChannel ChannelInitializer, ChannelOption, ChannelAttributes, This information is the client NioServerChannel configuration information added by the user to ServerBootstrap at startup time. This information is used here to initialize ServerBootstrapAcceptor. The ServerBootstrapAcceptor will then accept client connections and create a NioServerChannel.

  • Add the ChannelInitializer used to initialize the pipeline to the pipeline in NioServerSocketChannel.

So the question is, why don’t we just do itChannelHandlerAdded to thepipeline“, but used againChannelInitializer?

There are two reasons:

  • In order to ensure that the pipeline is initialized safely, the Reactor thread needs to initialize the pipeline, and the current thread is the Main thread of the user application, not the Reactor thread. There is no immediate initialization.

  • The action of initializing pipeline in Channel can not be initialized until the Channel is registered with the corresponding Reactor. Currently, NioServerSocketChannel has been created, but the Main Reactor has not been registered.

The time to initialize a pipeline in the NioServerSocketChannel is after the NioServerSocketChannel is registered with the Main Reactor and before the binding port address.

Remember that ChannelInitializer was used for ServerBootstrap childHandler.

Here we go again. Pay attentionChannelInitializer#initChannelMethod, which adds a LoggingHandler directly to the pipeline. Why not add acceptors directly to the pipeline instead of encapsulating them as asynchronous tasks?

Here to sell you a secret, the author will be in the follow-up process for you to answer ~~~~~

The pipeline structure in NioServerSocketChannel is as follows:

1.3 register NioServerSocketChannel with the Main Reactor

Obtain the primary Reactor thread group NioEventLoopGroup from ServerBootstrap and register NioServerSocketChannel with NioEventLoopGroup.

ChannelFuture regFuture = config().group().register(channel);
Copy the code

Let’s take a look at the specific registration process:

1.3.1 Select a Main Reactor thread group for registration

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public EventExecutor next(a) {
        return chooser.next();
    }

    // Get the binding policy
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return newGenericEventExecutorChooser(executors); }}// Select Reactor by round-robin
    @Override
    public EventExecutor next(a) {
            return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    }
Copy the code

Netty uses the next() method to select a Reactor from the ReactorGroup and register the binding according to the channel to Reactor binding strategy mentioned in the previous article. Then all I/O events in the Channel life cycle are processed by this Reactor, such as ACCEPT, Connect, Read, write and other I/O events.

A channel can be bound to only one Reactor, and a Reactor listens to multiple channels.

Since NioServerSocketChannle registers and binds to the Main Reactor, the Main Reactor deals with OP_ACCEPT events.

1.3.2 Register with the binding Main Reactor

The behavior of registering with the Reactor is defined in NioEventLoop’s parent class, SingleThreadEventLoop. For those of you who are confused, please refer back to the NioEventLoop inheritance structure section in the previous article.

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    @Override
    public ChannelFuture register(Channel channel) {
        // Register a channel to the binding Reactor
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        // Unsafe is responsible for the underlying operations of channel
        promise.channel().unsafe().register(this, promise);
        returnpromise; }}Copy the code

The underlying specific registration action is performed through the Unsafe class in NioServerSocketChannel.

protected abstract class AbstractUnsafe implements Unsafe {

        /** * register the Channel to the binding Reactor ** /
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            // The EventLoop type must be the same as the Channel type Nio Oio Aio
            if(! isCompatible(eventLoop)) { promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            // Set a binding Reactor on the channel
            AbstractChannel.this.eventLoop = eventLoop;

            /** * The channel registration must be performed by the Reactor thread ** 1: If the current thread is a Reactor thread, register register0 directly; if the current thread is an external thread, wrap the asynchronous Task of register0 by the Reactor thread
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run(a) { register0(promise); }}); }catch(Throwable t) { ............... Omit... }}}}Copy the code
  • First check to see if NioServerSocketChannel has been registered. If the registration is complete, the ChannelPromise that represents the result of the registration operation is set to fail.

  • IsCompatible method is used to verify whether the Reactor model EventLoop matches the type of Channel. NioEventLoop corresponds to NioServerSocketChannel.

In the last article we introduced Netty’s support for three IO models: Oio,Nio, and Aio. Users can easily switch IO models by changing the Netty core class prefix. The purpose of isCompatible method is to ensure that Reactor and Channel use the same IO model.

  • Save its bound Reactor instance in a Channel.

  • The actions that a Channel registers with the Reactor must be performed in the Reactor thread.

    • If the current thread isReactor threadThe registration action is performed directlyregister0
    • If the current thread is notReactor thread, you need to register the actionregister0Encapsulate it as an asynchronous task and store itReactorIn thetaskQueue, waiting forReactor threadThe execution.

The current executing thread is not the Reactor thread, but the Main thread that starts the user application.

1.3.3 Starting the Reactor thread

In the previous article we introduced the creation of the NioEventLoopGroup with a constructor parameter executor, which is used to start the Reactor thread of type ThreadPerTaskExecutor.

At that time, I sold you a riddle: “When did the Reactor thread start?”

So now it’s time to reveal the answer

Reactor threadThe startup is in the directionReactorIt is started when the first asynchronous task is submitted.

The Main ReactorNioEventLoop of the Main Reactor thread group NioEventLoopGroup in Netty is the Main thread of the user program to Main Reactor starts when it submits an asynchronous task to register a SERVER SocketChannel.

   eventLoop.execute(new Runnable() {
                        @Override
                        public void run(a) { register0(promise); }});Copy the code

Let’s look at the Execute method of NioEventLoop

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    @Override
    public void execute(Runnable task) {
        ObjectUtil.checkNotNull(task, "task"); execute(task, ! (taskinstanceof LazyRunnable) && wakesUpForTask(task));
    }

    private void execute(Runnable task, boolean immediate) {
        // Whether the current thread is a Reactor thread
        boolean inEventLoop = inEventLoop();
        AddTaskWakesUp = true addTask wakes up the Reactor thread to execute the task
        addTask(task);
        if(! inEventLoop) {// Start the Reactor thread if the current thread is not a Reactor thread
            The Reactor thread was started by adding an asynchronous task to the NioEventLoopstartThread(); . Omit... }... Omit... }}Copy the code
  • First add the asynchronous task task to the taskQueue in the Reactor.

  • Determine whether the current thread is a Reactor thread, and the current executing thread is the user program starting thread, so call startThread to start the Reactor thread.

1.3.4 startThread

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    // Define the Reactor thread status
    private static final int ST_NOT_STARTED = 1;
    private static final int ST_STARTED = 2;
    private static final int ST_SHUTTING_DOWN = 3;
    private static final int ST_SHUTDOWN = 4;
    private static final int ST_TERMINATED = 5;

     //Reactor thread status was initially not started
    private volatile int state = ST_NOT_STARTED;

    //Reactor thread status field state atomic updates
    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
    AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");

    private void startThread(a) {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    doStartThread();
                    success = true;
                } finally {
                    if(! success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }

}
Copy the code
  • The Reactor thread initialized to ST_NOT_STARTED, and the CAS was first updated to ST_STARTED

  • DoStartThread Starts the Reactor thread

  • If startup fails, change the Reactor thread status back to ST_NOT_STARTED

    ThreadPerTaskExecutor is used to start the Reactor thread
    private final Executor executor;

    private void doStartThread(a) {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run(a) {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    //Reactor thread starts
                    SingleThreadEventExecutor.this.run();
                    success = true; }... Omit... }Copy the code

This is where executors of the ThreadPerTaskExecutor type come in.

  • Reactor threadThe core work of thePolls IO ready events in all channels registered on them.Process I/O events on the corresponding Channel.Executing asynchronous Tasks. Netty encapsulates these core tasks inio.netty.channel.nio.NioEventLoop#runMethods.

  • willNioEventLoop#runEncapsulated in an asynchronous task, submitted toexecutorTo perform,ReactorThe thread starts working at this point.
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    @Override
    public void execute(Runnable command) {
        // Start the Reactor threadthreadFactory.newThread(command).start(); }}Copy the code

At this point, the Reactor thread is started, and the rest of the work is done by the Reactor thread.

After submitting the NioServerSocketChannel registration task register0 to Reactor, the user start thread will step out of the call stack and return to the initial start entry ChannelFuture f = B.bind (PORT).sync().

At this time, there is only one task register0 in the task queue of the Reactor. After the Reactor thread starts, the task will be taken out from the task queue for execution.

At this point, the registration of NioServerSocketChannel is officially started

1.3.5 register0

       //true if the channel has never been registered, false otherwise 
        private boolean neverRegistered = true;

        private void register0(ChannelPromise promise) {
            try {
                // Check whether the registration operation has been cancelled or the corresponding channel has been closed
                if(! promise.setUncancellable() || ! ensureOpen(promise)) {return;
                }
                boolean firstRegistration = neverRegistered;
                // Perform the actual registration
                doRegister();
                // Change the registration status
                neverRegistered = false;
                registered = true;
                // Calls the handlerAdded method of ChannelInitializer added to the pipeline, where the channelPipeline is initialized
                pipeline.invokeHandlerAddedIfNeeded();
                // Set regFuture to success, trigger the operationComplete callback, place the BIND operation in the Reactor's task queue, and wait for the Reactor thread to execute.
                safeSetSuccess(promise);
                // Trigger the channelRegister event
                pipeline.fireChannelRegistered();
                // For server ServerSocketChannel, the state of a channel is active only if the binding port address is successful.
                // The bind is in the Reactor's task queue as an asynchronous task. The bind hasn't started yet, so isActive() is false
                if (isActive()) {
                    if (firstRegistration) {
                        // Trigger the channelActive event
                        pipeline.fireChannelActive();
                    } else if(config().isAutoRead()) { beginRead(); }}}catch(Throwable t) { ............ Omit... }}Copy the code

Register0 is the key method that drives the entire Channel registration binding process. Let’s take a look at its core logic:

  • First check to see if the Channel’s registration has been cancelled outside the Reactor thread. Promise. SetUncancellable (). Check if the Channel you want to register is closed! EnsureOpen (promise). If the Channel is closed or the registration operation has been cancelled, go back and stop the registration process.

  • Call the doRegister() method to perform the actual registration. Finally, it’s implemented in AbstractChannel subclass AbstractNioChannel, which we’ll talk about in a moment, but we’ll focus on the overall process.

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

   /**
     * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
     *
     * Sub-classes may override this method
     */
    protected void doRegister(a) throws Exception {
        // NOOP}}Copy the code
  • whenChanneltoReactorAfter registration, callpipeline.invokeHandlerAddedIfNeeded()Method that triggers the handlerAdded method of ChannelInitializer added in the callback pipeline, using the previously mentioned handlerAdded methodChannelInitializerInitialize theChannelPipeline.

The Channel pipeline is initialized using ChannelInitializer in the handlerAdded event callback after the Channel has registered with the Reactor.

  • Set up theregFutureforSuccessAnd the callback is registered inregFutureOn theChannelFutureListener#operationCompleteMethods,operationCompleteIn the callback methodBind operationsEncapsulate it as an asynchronous task and submit it toReactorthetaskQueueIn the. Waiting for theReactorThe execution.

Remember where regFuture came out? Where was it created and where was the ChannelFutureListener added? Does that ring a bell? It doesn’t matter if you can’t recall it, I’ll mention it later

  • throughpipeline.fireChannelRegistered()inpipelineIn the triggerChannelRegister event.

The channelRegistered method of the channelHandler in the pipeline was called back.

  • For the Netty serverNioServerSocketChannelFor the sake ofSucceeded in binding the port address. ProcedureThe post-channel state isactive. At this timeBind operationsinregFutureRegistered on theChannelFutureListener#operationCompleteThe callback method is submitted as an asynchronous taskReactorIn the task queue of,Reactor threadalsoDidn't startperformBinding tasks. So this isisActive()isfalse.

The binding task is performed only after the Reactor thread completes executing the register0 method.

Let’s look at the implementation of these core steps in the register0 method:

1.3.6 doRegister ()

public abstract class AbstractNioChannel extends AbstractChannel {

    // The Selector key obtained by a channel registered with a Selector
    volatile SelectionKey selectionKey;

    @Override
    protected void doRegister(a) throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0.this);
                return;
            } catch(CancelledKeyException e) { ............... Omit... }}}}Copy the code

Calling the underlying JDK NIO Channel method Java. NIO. Channels. SelectableChannel# register (Java. NIO. Channels. The Selector, int, Java. Lang. Object). Will be packed in NettyNioServerSocketChannel JDK NIO ServerSocketChannel registered into the Reactor JDK NIO Selector.

SelectableChannel#register

  • Selector: Represents which Selector the JDK NIO Channel will register with.

  • Int OPS: represents the I/O events of interest on a Channel. When the corresponding I/O event is ready, the Selector returns the corresponding SelectionKey of the Channel.

SelectionKey can be understood as a special representation of a Channel in the Selector. SelectionKey encapsulates the interestinterestOps set of IO events that the Channel is interested in and the readyOps set of IO ready events. It also encapsulates the corresponding JDK NIO Channel and the registered Selector. Finally, we have an important attachment property that allows us to attach some custom objects to the SelectionKey.

  • The Object attachment:toSelectionKeyTo add user-defined additional objects.

Here, the IO event registered by NioServerSocketChannel with the Selector in the Reactor is 0. The main purpose of this operation is to obtain the SelectionKey corresponding to the Channel in the Selector and complete the registration. After the binding operation is complete, add the IO event ~~~OP_ACCEPT event of interest to SelectionKey.

Attach Netty’s custom NioServerSocketChannel to the attechment property of SelectionKey via the SelectableChannel#register method. Complete Netty custom Channel and JDK NIO Channel relationship binding. This allows Netty to retrieve a custom Channel object from the SelectionKey returned by the JDK’s NIO Selector each time a Selector is polling for an IO ready event.

Initialize ChannelPipeline in the 1.3.7 HandlerAdded event callback

When a SERVER SocketChannel is registered with a Selector on the Main Reactor, Netty by calling a pipeline. InvokeHandlerAddedIfNeeded () callback of NioServerSocketChannel ChannelHandler handlerAdded method in the pipeline.

The pipeline structure of NioServerSocketChannel is as follows:

The ChannelInitializer that was added when the NioServerSocketChannel was initialized is the only one in the pipeline.

Let’s take a look at what the handlerAdded callback method in ChannelInitializer does

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            if (initChannel(ctx)) {
                // After initialization, it needs to be removed from the pipelineremoveState(ctx); }}}//ChannelInitializer instances are shared by all channels and used to initialize ChannelPipeline
    // save the initialized ChannelPipeline through the Set collection to avoid repeated initialization of the same ChannelPipeline
    private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(
            new ConcurrentHashMap<ChannelHandlerContext, Boolean>());

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                     // After initialization, remove itself from pipeline
                    pipeline.remove(this); }}return true;
        }
        return false;
    }

    // Anonymous class implementation, where specific initialization logic is specified
    protected abstract void initChannel(C ch) throws Exception;

    private void removeState(final ChannelHandlerContext ctx) {
        // Delete ChannelInitializer from initMap
        if (ctx.isRemoved()) {
            initMap.remove(ctx);
        } else {
            ctx.executor().execute(new Runnable() {
                @Override
                public void run(a) { initMap.remove(ctx); }}); }}}Copy the code

The initialization logic in ChannelInitializer is straightforward:

  • First of all, it must be judged that the current Channel has completed registration before pipeline initialization. ctx.channel().isRegistered()

  • Calling the initChannel specified by the anonymous class of ChannelInitializer performs custom initialization logic.

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ServerBootstrap User specified channelHandler
                ChannelHandler handler = config.handler();
                if(handler ! =null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run(a) {
                        pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }});Copy the code

Remember when you initialized NioServerSocketChannel. Io.net ty. The bootstrap. ServerBootstrap# init method to the pipeline to add ChannelInitializer?

  • When performing theInitChannel methodLater,ChannelPipelineThe initialization is done, and nowChannelInitializer There’s no point in staying any longerIn the pipeline, will be neededChannelInitializer frompipelineRemoved.pipeline.remove(this)

When the pipeline is initialized, the structure of the pipeline changes again:

At this point, the structure of the taskQueue in the Main Reactor changes as follows:

The ServerBootstrapAcceptor task is submitted to the Main Reactor when the SERVER SocketChannel is initialized. Remember?

1.3.8 Callback ChannelFutureListener for regFuture

In this section of the Netty server startup at first, we introduced the server startup entry function io.net ty. The bootstrap. AbstractBootstrap# doBind, The initAndRegister() method is called at the beginning of the function to create and initialize the NioServerSocketChannel, which is then registered to the Main Reactor.

The operation of registration is an asynchronous process, so a ChannelFuture regFuture representing the result of registration is returned after the initAndRegister() method call.

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B.C>, C extends Channel> implements Cloneable {

    private ChannelFuture doBind(final SocketAddress localAddress) {
        // Asynchronously create, initialize, register ServerSocketChannel
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if(regFuture.cause() ! =null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // If the registration is complete, the binding operation is performed
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            // Add the registration completion callback
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {... Omit...// The Reactor thread calls back here after the registration is completedoBind0(regFuture, channel, localAddress, promise); }}});returnpromise; }}}Copy the code

A post-registration callback function ~~~~ ChannelFutureListener is then added to ChannelFuture regFuture. Initiate the binding port address process in the callback function operationComplete.

So when is this callback function? Where did it start?

Let’s go back to the flow of the register0 method, the topic of this section:

After the SERVER SocketChannel is registered with the Main Reactor by calling doRegister(), Then invokes the pipeline. InvokeHandlerAddedIfNeeded () method of trigger ChannelInitializer# handlerAdded callback initialized to the pipeline.

Finally, in the safeSetSuccess method, start calling the ChannelFutureListener registered on regFuture.

   protected final void safeSetSuccess(ChannelPromise promise) {
        if(! (promiseinstanceofVoidChannelPromise) && ! promise.trySuccess()) { logger.warn("Failed to mark a promise as success because it is done already: {}", promise); }}@Override
    public boolean trySuccess(a) {
        return trySuccess(null);
    }

    @Override
    public boolean trySuccess(V result) {
        return setSuccess0(result);
    }

   private boolean setSuccess0(V result) {
        return setValue0(result == null ? SUCCESS : result);
    }

    private boolean setValue0(Object objResult) {
        if (RESULT_UPDATER.compareAndSet(this.null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            if (checkNotifyWaiters()) {
                // The listeners registered on the promise
                notifyListeners();
            }
            return true;
        }
        return false;
    }
Copy the code

The logic of safeSetSuccess is simple, first setting the regFuture result to SUCCESS and calling the ChannelFutureListener registered on regFuture.

It should be noted that the execution of the safeSetSuccess method and subsequent callback to ChannelFutureListener on regFuture are performed by the Reactor thread.

I’ll write a special article about the Promise model in Netty later, but you just need to understand the general process. Don’t worry about too much detail.

Let’s switch to the ChannelFutureListener callback on regFuture and see what Netty does after a Channel is registered.

2. doBind0

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B.C>, C extends Channel> implements Cloneable {

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run(a) {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else{ promise.setFailure(regFuture.cause()); }}}); }}Copy the code

Netty encapsulates the port address binding as an asynchronous task and submits it to Reactor.

But there’s a problem with actually executing at this pointdoBind0Method thread is exactly thatReactor threadWhy not just do it hereThe bind operation, but again encapsulated as an asynchronous task submitted toReactorIn thetaskQueue?

It all ends up withReactor threadExecution, what’s the difference?

As we know from the last section, Ty bind0 method calls are made from io.net. The channel. AbstractChannel. AbstractUnsafe# register0 method after NioServerSocketChannel register to Main Reactor, And NioServerSocketChannel pipeline has been initialized after completion, through the safeSetSuccess method callback.

This process is performed by the Reactor thread, but the register0 method is not completed, and the following logic needs to be executed.

The binding logic needs to be executed after the registration logic is executed, so in doBind0 the Reactor thread encapsulates the binding as an asynchronous task and submits it to the taskQueue. This makes the Reactor thread return from safeSetSuccess immediately. Continue with the rest of the register0 method logic.

        private void register0(ChannelPromise promise) {
            try{... Omit... doRegister(); pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise);// Trigger the channelRegister event
                pipeline.fireChannelRegistered();

                if(isActive()) { ................ Omit... }}catch(Throwable t) { ................ Omit... }}Copy the code

When the Reactor thread completes executing the register0 method, the asynchronous task is pulled from the taskQueue for execution.

The structure of the taskQueue in the Reactor thread is as follows:

  • Reactor threadWill first take out the locationtaskQueueTeam leader mission execution, here is directedNioServerSocketChannelthepipelineaddServerBootstrapAcceptorFor asynchronous tasks.

The structure of pipeline in NioServerSocketChannel is as follows:

  • Reactor threadExecute the binding task.

3. Address of the bond port

All operations on a Channel are defined in the ChannelOutboundInvoker interface.

public interface ChannelOutboundInvoker {

    /**
     * Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
     * completes, either because the operation was successful or because of an error.
     *
     */
    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
}
Copy the code

The bind method is implemented by subclass AbstractChannel.

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

   @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        returnpipeline.bind(localAddress, promise); }}Copy the code

A call to pipeline.bind(localAddress, Promise) propagates a BIND event in the pipeline, triggering callbacks to all ChannelHandler bind methods in the pipeline.

Event propagation in pipeline is directional:

  • The inbound eventfromHeadContextIt starts going backwards one by one untilTailContext.
  • Outbound eventsIt’s back propagation fromTailContextIt starts to travel backwards and forwards untilHeadContext.

Inbound events can only be handled by ChannelInboundHandler responses in the pipeline

Outbound events can only be handled by the ChannelOutboundHandler response in the Pipeline

However, the bind event here is defined as an outbound event in Netty, so it is propagated back in Pipeline. You propagate back from TailContext to HeadContext.

However, the core logic of BIND is implemented in HeadContext.

3.1 HeadContext

  final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler.ChannelInboundHandler {

     @Override
        public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
            // Trigger AbstractChannel->bind to perform JDK NIO SelectableChannel to perform the underlying bindingunsafe.bind(localAddress, promise); }}Copy the code

In the HeadContext#bind callback, the unsafe operation class in Channel is called to perform the actual binding.

protected abstract class AbstractUnsafe implements Unsafe {

      @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {... Omit...WasActive = false
            boolean wasActive = isActive();
            try {
                //io.netty.channel.socket.nio.NioServerSocketChannel.doBind
                // Invoke the concrete channel implementation class
                doBind(localAddress);
            } catch(Throwable t) { ................. Omit...return;
            }

            // Channel activation triggers channelActive event propagation after successful binding
            if(! wasActive && isActive()) { invokeLater(new Runnable() {
                    @Override
                    public void run(a) {
                        // The channelActive event is fired in pipelinepipeline.fireChannelActive(); }}); }// Callback ChannelFutureListener registered on the Promise
            safeSetSuccess(promise);
        }

        protected abstract void doBind(SocketAddress localAddress) throws Exception;
}
Copy the code
  • First do subclassesNioServerSocketChannelactualizeddoBindBy means ofJDK NIO native ServerSocketChannelPerform the underlying binding operations.
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        // Call JDK NIO SelectableChannel to bind
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else{ javaChannel().socket().bind(localAddress, config.getBacklog()); }}Copy the code
  • Determine if it is the first binding, if so encapsulate the ChannelActive event in the trigger pipeline as an asynchronous task and put it into the taskQueue in the Reactor.

  • Execute safeSetSuccess(Promise) and call back the ChannelFutureListener registered on the Promise.

Same problem, the current thread of execution is alreadyReactor threadSo why not just trigger itpipelineIn theChannelActiveEvents are encapsulated as asynchronous tasks?

Because if the ChannelActive event is fired directly here, the Reactor thread will execute the ChannelActive event callback from the ChannelHandler in pipeline.

This affects the execution of safeSetSuccess(Promise) and delays the callback of ChannelFutureListener registered on the Promise.

At this point, the Netty server has bound the port address, and the NioServerSocketChannel status is now Active.

One last important thing to do is look at channelActive event handling in pipeline.

3.2 channelActive Event processing

ChannelActive events are defined as inbound events in Netty, so they propagate forward in pipeline, from HeadContext to TailContext.

In the channelActive event callback, the ~~OP_ACCEPT event is triggered to specify the I/O event to listen to to the Selector.

This piece of logic is implemented primarily in HeadContext.

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler.ChannelInboundHandler {

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            // The channelActive event continues propagating backwards in pipeline
            ctx.fireChannelActive();
            // If it is autoRead, read event propagation is triggered automatically
            OP_ACCEPT registration is triggered in the read callback
            readIfIsAutoRead();
        }

        private void readIfIsAutoRead(a) {
            if (channel.config().isAutoRead()) {
                // If it is autoRead, read event propagation is triggeredchannel.read(); }}//AbstractChannel
        public Channel read(a) {
                // Trigger the read event
                pipeline.read();
                return this;
        }

       @Override
        public void read(ChannelHandlerContext ctx) {
            // Register OP_ACCEPT or OP_READ eventsunsafe.beginRead(); }}Copy the code
  • inHeadContextIn thechannelActiveTriggered in a callbackpipelineIn theRead event.
  • whenRead eventSpread to againHeadContextWhen the triggerHeadContext#readMethod callback. inRead the callbackIn the callchannelLow-level operation classunsafethebeginReadMethods toselectorRegister to monitorOP_ACCEPT event.

3.3 beginRead

protected abstract class AbstractUnsafe implements Unsafe {

     @Override
        public final void beginRead(a) {
            assertEventLoop();
            // Channel must be Active
            if(! isActive()) {return;
            }

            try {
                // Triggers a listening event that registers a channel's interest on a selector
                doBeginRead();
            } catch (finalException e) { ............. Omit... }}}public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    // Subclasses are responsible for implementing inheritance
    protected abstract void doBeginRead(a) throws Exception;

}
Copy the code
  • The assertion determines that the thread executing this method must be the Reactor thread.

  • At this point, the NioServerSocketChannel has bound the port address. IsActive () = true

  • The doBeginRead implementation registers the Selector to listen for the event OP_ACCEPT

public abstract class AbstractNioChannel extends AbstractChannel {

    // The Selector key obtained by a channel registered with a Selector
    volatile SelectionKey selectionKey;
    // Channel listens for a collection of events
    protected final int readInterestOp;

    @Override
    protected void doBeginRead(a) throws Exception {
      
        final SelectionKey selectionKey = this.selectionKey;
        if(! selectionKey.isValid()) {return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        /** * 1: ServerSocketChannel initialization readInterestOp set the OP_ACCEPT event ** /
        if ((interestOps & readInterestOp) == 0) {
            // Add OP_ACCEPT events to the interestOps collectionselectionKey.interestOps(interestOps | readInterestOp); }}}Copy the code
  • As mentioned above, a SERVER Socketchannel gets a SelectionKey after registering with the Selector in the Main Reactor. So the first thing we’re going to do is get this SelectionKey.

  • InterestOps, the collection of IO events NioServerSocketChannel is interested in, is obtained from SelectionKey. InterestOps was set to 0 at registration time.

  • Set readInterestOp = OP_ACCEPT, set at NioServerSocketChannel initialization, to the interestOps collection in SelectionKey. So the Selector in the Reactor starts listening for IO events contained in the interestOps collection.

The Main Reactor mainly listens for OP_ACCEPT events.

At this point, the Netty server is actually started, and the next step is to wait for the receiving client to connect. Now you’re looking back and forth at this startup flowchart, isn’t it a lot clearer?

At this time, the Reactor model structure of Netty is as follows:


conclusion

In this article, we provide a complete introduction to the entire Netty server startup process by means of illustrated source code, and introduce the ServerBootstrap related properties and configuration methods involved in the startup process. NioServerSocketChannel creation initialization process and class inheritance structure.

The registration process of NioServerSocketChannel to Reactor and the starting time of Reactor thread and the initialization time of pipeline are introduced emphatically.

Finally, the whole process of NioServerSocketChannel binding port address is introduced.

All of these processes are asynchronous, and the callbacks are looped back and forth. This is what happens when you read asynchronous code. You need to sort out the relationships between callbacks and remind yourself what the current thread of execution is.

Ok, now that the Netty server is up and running, it’s time to receive client connections. See ~~~~ for our next article