Recently spent more time to see netty, was going to do a book to see, give up. “Netty actual combat” this book is actually written very good, mainly or the translation level may only be a little better than me, but the source code, to understand some Netty services to start, establish connections and read data and other parts, then look at this book is still good. The authoritative guide there is highly recommended. In addition netty official website recommended flash blog write really good, this blog has many places are used for reference.

public class NettyServer {

    public static void main(String[] args) {

        NioEventLoopGroup parentGroup = new NioEventLoopGroup();
        NioEventLoopGroup childGroup = new NioEventLoopGroup(3);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128) 
                    .childOption(ChannelOption.SO_KEEPALIVE, true) 
                    .handler(new SimpleServerHandler())
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(newServerHandler()); }}); ChannelFuture channelFuture = bootstrap.bind(8081).sync();
            channelFuture.channel().closeFuture().sync(); 
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{ parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); }}}private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive");
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered");
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerAdded");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channelRead");
        // if channelRead is not invoked further, the NioSocketChannel service will be registered, and eventually all child handlers on the server will fail.ctx.fireChannelRead(msg); }}Copy the code

ServerBootstrap is a bootstrap helper class on the server that sets a set of parameters to bind ports to start the service.

We need two types of people to work, one is the boss and the other is the worker. The boss is responsible for taking work from outside, and the work that he receives is distributed to the worker, and he puts it here, and the role of the bossGroup is to constantly accept new connections. The new connection is thrown to the workerGroup for processing

Channel (NioServerSocketChannel. Class) said the server startup is nio related channel, the channel is a core concept in netty, can be understood as a channel is a connection or a server bind action, More on that later

.handler(new SimpleServerHandler() refers to the process of starting a server. The SimpleServerHandler() interface is ChannelHander, which is a core concept of Netty. Represents the processor through which the data stream passes, and can be understood as each checkpoint on the pipeline

childHandler(new ChannelInitializer)… What should the boss do when a new connection comes in

ChannelFuture f = b.bind(8888).sync(); Here is the actual startup process, binding port 8888, waiting for the server to start up, will enter the downlink code

f.channel().closeFuture().sync(); Wait for the server to close the socket

Source code analysis

We enter from the entry method bind, click all the way in, and stop at doBind

AbstractBootstrap

public ChannelFuture bind(int inetPort) {
	return bind(new InetSocketAddress(inetPort));
}

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    
     // Ignore some code
    doBind0(regFuture, channel, localAddress, promise);
}
Copy the code

I’m going to go into initAndRegister, and I know from the name that this method is initialized and registered, so how do you do that

AbstractBootstrap

final ChannelFuture initAndRegister(a) {
    // Create a channel through reflection
    Channel channel = channelFactory.newChannel();

	// (2) Initialize channel
    init(channel);

    // (3) register
    ChannelFuture regFuture = config().group().register(channel);

    return regFuture;
}
Copy the code

Create a channelFactory with reflection. Create a channel with reflection

public B channel(Class<? extends C> channelClass) {
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
   // Ignore validation

    this.channelFactory = channelFactory;
    return (B) this;
}
Copy the code

Obviously create channelFactory from our user code. The channel (NioServerSocketChannel. Class), ChannelFactory =new ReflectiveChannelFactory(channelClass

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        this.clazz = clazz;
    }

    @Override
    public T newChannel(a) {
        returnclazz.newInstance(); }}Copy the code

So we know that we get a NioServerSocketChannel object from newChannel. It’s important to note that the object created by reflection must have no parameter constructor. Let’s dig into the NioServerSocketChannel class to see what we did when we created it

NioServerSocketChannel

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

// Except for this constructor, the rest of the API is provided by NIO
public NioServerSocketChannel(a) {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

/ / create a ServerSocketChannel
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    return provider.openServerSocketChannel();
}
Copy the code

Keep going down

public NioServerSocketChannel(ServerSocketChannel channel) {
    // Construct a parent class that specifies the event of interest as connection event selectionKey.op_accept
    super(null, channel, SelectionKey.OP_ACCEPT);
    // Create a netty configuration parameter class
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}


Copy the code

AbstractNioMessageChannel

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}
Copy the code

AbstractNioChannel

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    ch.configureBlocking(false);
}
Copy the code

AbstractNioChannel bound to an internal channel is the ServerSocketChannel of the NIO created in NioServerSocketChannel, parent null, This.readinterestop = selectionkey. OP_ACCEPT, set ServerSocketChannel to non-blocking. Next, look at the service constructor

AbstractChannel

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
Copy the code

The broadening and pipelining issues are pretty important, basically throughout service startup and client connection, so take a quick look

AbstractNioMessageChannel

@Override
protected AbstractNioUnsafe newUnsafe(a) {
    return new NioMessageUnsafe();
}

private final class NioMessageUnsafe extends AbstractNioUnsafe {

    private final List<Object> readBuf = new ArrayList<Object>();
    @Override
    public void read(a) {
        assert eventLoop(a).inEventLoop(a);
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);

        boolean closed = false;
        Throwable exception = null;
        do {
            int localRead = doReadMessages(readBuf);
            if (localRead == 0) {
                break;
            }
            if (localRead < 0) {
                closed = true;
                break;
            }

            allocHandle.incMessagesRead(localRead);
        } while (allocHandle.continueReading());

        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); }}Copy the code
protected DefaultChannelPipeline newChannelPipeline(a) {
    return new DefaultChannelPipeline(this);
}
Copy the code

DefaultChannelPipeline

 protected DefaultChannelPipeline(Channel channel) {
     this.channel = ObjectUtil.checkNotNull(channel, "channel");
     // ..

     tail = new TailContext(this);
     head = new HeadContext(this);

     head.next = tail;
     tail.prev = head;
 }
Copy the code

As you can see from the constructor, a channel is bound when a pipeline is created, and a two-way list is built. Subsequent pipeline.addlast (Hander) handlers are added to the end of the list (the node before the tail).

A quick summary of what is done to create a NioServerSocketChannel via factory reflection:

  1. Inheritance: NioServerSocketChannel – > AbstractNioMessageChannel – > AbstractNioChannel – > AbstractChannel – > Channel
  2. NioServerSocketChannel internally holds NIO ch=ServerSocketChannel(javvaSocket() returns) set to non-blocking, ReadInterestOp = selectionkey.op_accept; Pipeline, unsafe, config = NioServerSocketChannelConfig

Note that, in the above said to the configuration of netty NioServerSocketChannelConfig, its inheritance from DefaultServerSocketChannelConfig, can think of is almost the same, can hold a lot of netty configuration within the relevant parameters.

DefaultServerSocketChannelConfig

public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
                                              implements ServerSocketChannelConfig {

    protected final ServerSocket javaSocket;
    private volatile int backlog = NetUtil.SOMAXCONN;
    
    /** * Creates a new instance. */
    public DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) {
        
        super(channel);
        if (javaSocket == null) {
            throw new NullPointerException("javaSocket");
        }
        this.javaSocket = javaSocket;
    }
    
    @Override
    public <T> boolean setOption(ChannelOption<T> option, T value) {
        validate(option, value);

        if (option == SO_RCVBUF) {
            setReceiveBufferSize((Integer) value);
        } else if (option == SO_REUSEADDR) {
            setReuseAddress((Boolean) value);
        } else if (option == SO_BACKLOG) {
            setBacklog((Integer) value);
        } else {
            // The parent class has similar Settings, but has more properties, such as setting the maximum timeout
            return super.setOption(option, value);
        }

        return true;
    }
    
    @Override
    public ServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
        try {
            javaSocket.setReceiveBufferSize(receiveBufferSize);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }
    
    @Override
    public ServerSocketChannelConfig setBacklog(int backlog) {
        if (backlog < 0) {
            throw new IllegalArgumentException("backlog: " + backlog);
        }
        this.backlog = backlog;
        return this; }}Copy the code

(2) Initialize a channel

ServerBootstrap

@Override
void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    
    // Set the channel's option and attr
	finalMap<ChannelOption<? >, Object> options = options();synchronized (options) {
        // Iterate over options and set it to config
        setChannelOptions(channel, options, logger);
    }

    // Set the IO channel option
    // Ignore the parts

    // This section is quite important and will be called during registration in the following sections
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            Handler(new SimpleServerHandler())
            ChannelHandler handler = config.handler();
            if(handler ! =null) {
                // Add to pipeline
                pipeline.addLast(handler);
            }
			
            // Add tasks to the task queue
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run(a) {
                    // ServerBootstrapAcceptor is used to bind IOchannel, as described later
                    pipeline.addLast(newServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }static void setChannelOptions( Channel channel, Map
       
        , Object> options, InternalLogger logger)
       > {
    for (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {
        setChannelOption(channel, e.getKey(), e.getValue(), logger);
    }
}

private static void setChannelOption( Channel channel, ChannelOption
        option, Object value, InternalLogger logger) {
    try {
        if(! channel.config().setOption((ChannelOption<Object>) option, value)) { logger.warn("Unknown channel option '{}' for channel '{}'", option, channel); }}catch (Throwable t) {
        logger.warn(
                "Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t); }}Copy the code

Init: Get child-related parameters such as childGroup, childHandler, options, attrs, set them to the ServerBootstrapAcceptor handler and add them to the pipeline. Of course, the boss initialization handler needs to be added to the pipeline before this. The initChannel method is then called during registration to add the two processors to the pipeline.

A simpler way to do this is to set the initialization handler for a channel pipeline. The initChannel method is executed at registration time to add two handlers to the pipeline, one of which handles data reads.

Finally, (3) register

AbstractBootstrap

ChannelFuture regFuture = group().register(channel);
Copy the code

The group method returns the NioEventLoopGroup bossGroup, which was created when the NioEventLoop children were created. Each child has the same thread pool, which will be discussed later

MultithreadEventLoopGroup

public ChannelFuture register(Channel channel) {
    
	return next().register(channel);
}

/ / inherited from MultithreadEventExecutorGroup
public EventExecutor next(a) {
    return chooser.next();
}

/ / MultithreadEventExecutorGroup inner classes, next () returns the children of the array a NioEventLoop, is one of the polling for n
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    @Override
    public EventExecutor next(a) {
        return children[childIndex.getAndIncrement() & children.length - 1]; }}Copy the code

So the registration is NioEventLoop, because the one returned from next is NioEventLoop, which is inherited from SingleThreadEventLoop

SingleThreadEventLoop

@Override
public ChannelFuture register(Channel channel) {
    return register(channel, new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    channel.unsafe().register(this, promise);
    return promise;
}
Copy the code

Get channel’s unsafe and register. This type is actually NioMessageUnsafe

Unsafe: The Unsafe class is the internal interface of a Channel, which is normally not intended for outer layers. Note that the Unsafe class is for outer layers.

AbstractChannel.AbstractUnsafe

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
         // Add a task to the task queue, or bind it to a thread for the first time
         eventLoop.execute(new Runnable() {
                @Override
                public void run(a) {
                    // Actually register, so register only when the task in the task queue is executedregister0(promise); }}); }}Copy the code

Eventloop.ineventloop () determines whether the current thread (in this case main) is equal to the thread held in the NioEventLoop, which must start with false

Next, let’s look at eventloop.execute

SingleThreadEventExecutor

public void execute(Runnable task) {
  	// Check whether the current thread is a thread held by EventLoop
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        // (1) Start thread
        startThread();
        // (2) Add tasks to the task queue
        addTask(task);
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}
Copy the code

Let’s start with (1) what the starting thread does

private void startThread(a) {
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { doStartThread(); }}}private void doStartThread(a) {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run(a) {
            thread = Thread.currentThread();

            boolean success = false;
            SingleThreadEventExecutor.this.run();
            success = true; }}); }Copy the code

Let’s look at executor.execute first, okay

@Override
public void execute(Runnable command) {
	threadFactory.newThread(command).start();
}
Copy the code

This is a bit different from version 4.0, where the doStartThread method is thread.start. Threads are held internally and are built when an Eventloop is built using thread factories. The threadFactory.newThread(command) is the final thread pool used in version 4.1. It’s the same idea. Version 4.0 builds the same runnable logic inside threads.

NewThread gets a FastThreadLocalThread, which I’ll leave out for now, and then executes the thread’s start() method, That executes the command. The run () – > performed SingleThreadEventExecutor. Enclosing the run (), due to the current object is NioEventLoop, so is the object of execution of the run method, we go in to see

NioEventLoop

protected void run(a) {
    for (;;) {
        // There is a for loop inside the event polling to determine whether there is a task or a scheduled task or an event is being listened on
        select(wakenUp.getAndSet(false));

        / / SelectedKey processing
        processSelectedKeys();

        // Execute tasks in the task queuerunAllTasks(); }}Copy the code

When the service started, we added tasks to the task queue, such as the previous registration task register0(Promise); But it does not listen for connection events, so select will break out of the loop when it learns about the task queue, enter processSelectedKeys(), and then exit runAllTasks() because there is no key. Register0 (promise). So let’s take a look at how to register.

Ps: The processSelectedKeys is very important and will be covered later when the client connects, which will enter the method.

AbstractChannel

private void register0(ChannelPromise promise) {

    doRegister();
    
    / / execution SimpleServerHandler. HandlerAdded (CTX)
    pipeline.invokeHandlerAddedIfNeeded();

    / / execution SimpleServerHandler. ChannelRegistered (CTX)
    pipeline.fireChannelRegistered();

}
Copy the code

DoRegister is actually registering

AbstractNioChannel

protected void doRegister(a) throws Exception {
    for (;;) {
        selectionKey = javaChannel().register(eventLoop().selector, 0.this);
        return; }}Copy the code

JavaChannel () gets a NIO ServerSocketChannel, and a NIO channel registers with a selector. One was added to the initial init and one was added to the final doBind0().

Let’s go back to how do we execute all the missions

SingleThreadEventExecutor

// Continuously fetch tasks from the task queue and execute them
protected boolean runAllTasks(long timeoutNanos) {
    // Get the first task
    Runnable task = pollTask();

    long lastExecutionTime;
    for (;;) {
        // Execute the task
        safeExecute(task);

        // Loop to get tasks
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break; }}this.lastExecutionTime = lastExecutionTime;
    return true;
}
Copy the code

AbstractEventExecutor

// The task initially executed is register0(promise)
protected static void safeExecute(Runnable task) {
    task.run();
}
Copy the code

Here’s a longer summary:

  1. The first is to use channel.unsafe for registration, which requires passing in an eventLoop, which is set to a channel member variable.
  2. Then execute the eventLoop’s execute method, which inherits from the Executor framework, passing in the registered task Runnable(register0(Promise).
  3. In eventloop. execute, the first thing you need to do is check whether the current thread, the main thread, is equal to the thread created by the thread factory in the eventLoop, which it certainly wasn’t the first time it came in. So you need to start the thread, which is the startThread() method
  4. The startThread() method actually starts the thread, and the tasks inside it are polling, processing the polling keys, and processing the task queue. The registered tasks are added to the task queue before this step, so the task is registered after the thread is started.
  5. So what does register0(Promise) basically do
    • The first is the registration of NIOjavaChannel().register(eventLoop().selector, 0, this);This is a channel with attachement, and JavaChannel is a ServerSocketChannel
    • And then set registered = true, the execution pipeline invokeHandlerAddedIfNeede and fireChannelRegistered () these two methods is very important, in the most began to initialize a portion of the code is executed here.

The handler invokeHandlerAddedIfNeede () analysis:

Call the handlerAdded method of each processor in the pipeline. Note that the handlerAdded method of ChannelInitializer is already implemented as a template method, which needs to implement initChannel. Before and in the init method is to add the ChannelInitializer such a processor pipeline, so the invokeHandlerAddedIfNeede () method will be executed. The init part of the code is posted again

p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) throws Exception {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = handler();
        if(handler ! =null) {
            pipeline.addLast(handler);
        }
        
        // Add to the task queue
        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run(a) {
                pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }});Copy the code

All the way down

DefaultChannelPipeline

final void invokeHandlerAddedIfNeeded(a) {
    assert channel.eventLoop().inEventLoop();
    if (firstRegistration) {
        firstRegistration = false; callHandlerAddedForAllHandlers(); }}Copy the code

As you can see from the name and comments, once a channel registers an EventLoop, it calls the Added method of all processors

private void callHandlerAddedForAllHandlers(a) {
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert! registered;// Both pipeline and channel are registered separately

        registered = true;

        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
        // Null out so it can be GC'ed.
        this.pendingHandlerCallbackHead = null;
    }

    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while(task ! =null) { task.execute(); task = task.next; }}Copy the code

Could you ask this. PendingHandlerCallbackHead; What it is, when it was assigned; This is all done in pipelines.addLast (handler), so let’s look at addLast first

public final ChannelPipeline addLast(String name, ChannelHandler handler) {
    return addLast(null, name, handler);
}

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {

        The processor handler DefaultChannelHandlerContext / / create a package
        newCtx = newContext(group, filterName(name, handler), handler);

        // Add CTX to the end of the pipeline
        addLast0(newCtx);
		
        // If the registered is false it means that the channel was not registered on an eventloop yet.
       // In this case we add the context to the pipeline and add a task that will call
       // ChannelHandler.handlerAdded(...) once the channel is registered.
        // It is called only if it is not registered
        if(! registered) {// There is a one-way linked list
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        // Ignore some code
    }
    // Call the handler that implements the handlerAdded method
    callHandlerAdded0(newCtx);
    return this;
}

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

// CTX is encapsulated as PendingHandlerCallback and concatenated as a one-way linked list
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
 
    AddTask or removeTask
    PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
    PendingHandlerCallback pending = pendingHandlerCallbackHead;
    if (pending == null) { // Set the header if it is the first time to addLast
        pendingHandlerCallbackHead = task;
    } else {
        // Find the tail of the linked-list.
        while(pending.next ! =null) { pending = pending.next; } pending.next = task; }}Copy the code

To summarize, addLast: encapsulates the handler into a context and adds it to the end of the pipeline’s list; If it is not registered, build PendingHandlerAddedTask to encapsulate CTX and concatenate it into a one-way linked list. Notice when it is being unregistered joining together the singly linked list, so in front of the init is obviously will ChannelInitialier stitching came in, so behind callHandlerAddedForAllHandlers method will be executed.

To put it simply, the processor, mainly ChannelInitialier, was constructed as CTX and added to the pipeline when it was not registered, that is, initialized. At the same time, it was encapsulated as PendingHandlerCallback to form a one-way linked list. After subsequent registration, each node of the linked list would be called.

Let’s see DefaultChannelHandlerContext

DefaultChannelHandlerContext

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler(a) {
        return handler;
    }

    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceofChannelOutboundHandler; }}Copy the code

It inherits from AbstractChannelHandlerContext, hold internal processor, provides two methods is used to determine methods of entrance or a team approach

AbstractChannelHandlerContext

private final boolean inbound;
private final boolean outbound;
private final DefaultChannelPipeline pipeline;
private final String name;

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
}

@Override
public Channel channel(a) {
    return pipeline.channel();
}

@Override
public ChannelPipeline pipeline(a) {
    return pipeline;
}

@Override
public EventExecutor executor(a) {
    if (executor == null) {
        return channel().eventLoop();
    } else {
        returnexecutor; }}Copy the code

It is important to have a pipeline in each context, and also to get a channel. Since you can get a channel, which is bound to an EventLoop at registration time, you can get an EventLoop based on the context

We go back to callHandlerAddedForAllHandlers method

PendingHandlerCallback task = pendingHandlerCallbackHead;
while(task ! =null) {
    task.execute();
    task = task.next;
}
Copy the code

The PendingHandlerCallback is a node from the list that performs the execute, so there is only one node in the list, which is added in init.

Let’s look at the PendingHandlerCallback implementation class PendingHandlerAddedTask

DefaultChannelPipeline

private abstract static class PendingHandlerCallback implements Runnable {
    final AbstractChannelHandlerContext ctx;
    PendingHandlerCallback next;

    PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
        this.ctx = ctx;
    }

    abstract void execute(a);
}

private final class PendingHandlerAddedTask extends PendingHandlerCallback {

    PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
        super(ctx);
    }

    @Override
    public void run(a) {
        callHandlerAdded0(ctx);
    }

    @Override
    void execute(a) {
        EventExecutor executor = ctx.executor();
        if (executor.inEventLoop()) {
            callHandlerAdded0(ctx);
        } else {
            // ...}}}Copy the code

Internally holds a CTX and a pointer to the next node. Note that ctx.executor() returns a NioEventLoop, because there is a pipeline in each CTX. The channel is retrieved from the pipeline, and the previously bound Eventloop is retrieved from the channel.

You can see it as you go down

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    ctx.handler().handlerAdded(ctx);
}
Copy the code

The resulting handler is ChannelInitializer, which is added to the pipeline from the init method

ChannelInitializer

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if(ctx.channel().isRegistered()) { initChannel(ctx); }}@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            
        } finally {
            remove(ctx); // The ChannelInitializer will be removed from the pipeline's two-way list.
        }
        return true;
    }
    return false;
}

protected abstract void initChannel(C ch) throws Exception;
Copy the code

InitChannel is implemented by adding the ChannelInitializer handler to the pipeline in our init method.

p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = handler(); if (handler ! = null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }});Copy the code

Once inside the initChannel, the handlers set to ServerBootStrap are added to the pipeline. The build task is then added to the task queue.

Note here pipeline.addlast (handler); This results in a call to the handlerAdded method of the handler’s SimpleServerHandler.

The execute method then adds the processor to the queue, and polling in the SELECT method continues execution

Let’s go back to the original AbstractBootstrap doBind0(regFuture, Channel, localAddress, Promise);

AbstractBootstrap

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered. 
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run(a) {
           if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else{ promise.setFailure(regFuture.cause()); }}}); }Copy the code

AbstractChannel

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}
Copy the code

DefaultPipelineChannel

@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}
Copy the code

In the end in

AbstractChannelHandlerContext

public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise){
	
    // DefaultChannelPipeline$HeadContext
    final AbstractChannelHandlerContext next = findContextOutbound();
    // NioEventLoop
    EventExecutor executor = next.executor();
    next.invokeBind(localAddress, promise);
    return promise;
}

At this point there are two intermediate nodes: the custom SimpleChannelHandler Context and the ServerBootstrapAcceptor in the init method
private AbstractChannelHandlerContext findContextOutbound(a) {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while(! ctx.outbound);return ctx;
}
Copy the code

I’m going to go in invokeBind, and I’m going to call HeadContext

HeadContext

@Override
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
    throws Exception {
    // unsafe = pipeline.channel().unsafe(); , built while building head
    unsafe.bind(localAddress, promise);
}
Copy the code

Go further down to NioServerSocketChannel

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog()); // Bind and set the maximum connection number}}Copy the code

javaChannel().bind(localAddress, config.getBacklog()); Address binding in NIO

To summarize: we said that the thread will start polling, judging that there are tasks in the queue, execute the tasks in the queue. Note that eventloop. execute adds the task to the queue and starts the thread first if it is not started the first time.

The task queue starts with a registered task, which does three things: Use the selector to register, the execution, singly linked list of pendingHandlerCallbackHead execut methods, eventually will call that contains CTX. Handler handlerAdded method, and at this time The only one CTX singly linked list, This is the handler ChannelInitializer that is added to the init method by calling pipeline.addLast. Note that only a call to pipeline.addLast will build the one-way list if it is not already registered.

The handlerAdded method of the processor ChannelInitializer calls the initChannel method, which is abstract enough to be implemented in init. Calling this method adds the SimpleChannelHandler that builds ServerBootstrap to the pipeline. Execute eventloop. execute to add the task to the task queue, which is to add ServerBootstrapAcceptor to the pipeline.

Later in doBind, the build task is added to the task queue, so there are now two tasks in the task queue. One is to add ServerBootstrapAcceptor to the pipeline. Remember that there are two processors in the pipeline. One is to perform address binding.