Introduction to the REACTOR model

The Reactor multi-threaded model has the following characteristics:

  • A special thread, the Acceptor thread, listens for TCP connection requests from clients.
  • Client connection IO operations are handled by a specific NIO thread pool. Each client connection is bound to a specific NIO thread, so all IO operations in this client connection are done in the same thread.
  • There are many client connections, but the number of NIO threads is small, so a NIO thread can be bound to multiple client connections at the same time.

The introduction here is excerpted from the yongshun biggieNo secrets under the source code – do the best Netty source analysis tutorialIf you haven’t read it, I highly recommend reading it several times. It’s very well written.

What are acceptor threads and NIO thread pools in netty source code? How is the Accept event passed to the NIO thread pool?

NioEventLoopGroup corresponds to the Reactor thread model

Let’s take a look at the server creation example:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 ...
Copy the code

thenbossGroupandworkerGroupWhat are they? Take a look through the Debug breakpointbossGroupandworkerGroup:

You can seebossGroupContains aNioEventLoop.workerGroupContains 8 CPU cores x 2NioEventLoopAnd debug it againbossGrouptheNioEventLoop:One can be clearly seen in the picture aboveNioEventLoopCorresponds to one Thread.

The creation time and process of Thread are ignored here. Here we can briefly summarize: bossGroup is a collection of one Thread, and workerGroup is a collection of eight threads.

Let’s seeNioEventLoopWhat does the corresponding Thread do? Find the Runnable for Thread and see what the Runnable does:By the ideaJump To Type SourceWe can jump to the implementation of Runable for thread, which we find is actuallyNioEventLoop->run method, I have simplified the run method, roughly the process code is as follows:

   @Override
    protected void run() {
        for (;;) {
            try {
                long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                strategy = select(curDeadlineNanos);
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } catch (Exception e) {
            }
        }
    }
Copy the code

I found a picture to make it more intuitive:

Pictures fromNetty source code analysis unveils reactor thread

To summarize, the NioEventLoop thread does three main things:

  • Polling an I/O event. Procedure
  • Processing I/O Events
  • Processing task queue

Continue to see processSelectedKeys () call again processSelectedKeysOptimized (), the code is as follows:

private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
}
 private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();
            processSelectedKey(k, (AbstractNioChannel) a);
        }
 }
Copy the code

Focus on processSelectedKeysOptimized () this method, selectedKeys is the attribute of the Selector, at the time of initialization NioEventLoop, obtained through reflection, call the Selector. The select (TIMEOUT), SelectedKeys contain ready IO events.

The debug command displays the following information:

  • The current selectionKey. readyOps is 16, or ready IO events areSelectionKey.OP_ACCEPTThe accept events
  • InterestOps is also 16, indicating that the registration is currentSelectorThe events ofSelectionKey.OP_ACCEPTThe accept events
  • The attachment object A from K isNioServerSocketChannel

From this we can confirm that the current NioEventLoop is the Acceptor thread we are looking for!

Let’s take a look at the presentNioEventLoopIt is to belong tobossGrouporworkerGroup:

The current NioEventLoop belongs to a one-thread bossGroup, which is the reactor model for acceptors.

The Channel with the ChannelPipeline

Before analyzing event passing, let’s take a look at channels and Channel pipelines:

In Netty, each Channel has only one Channel pipeline corresponding to it, and their composition relationship is as follows:

As you can see from the figure above, a Channel contains a ChannelPipeline, which maintains a two-way linked list of ChannelHandlerContext. The head of the list is a HeadContext, the tail is a TailContext, and each ChannelHandlerContext is associated with a ChannelHandler. The above diagram gives us a visual understanding of ChannelPipeline.

Accept event passing

Let’s examine how Acceptor threads pass events to the NIO thread pool.

Then the above processSelectedKeys () method, will call NioServerSocketChannel. DoReadMessages () :

@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = serverSocketChannel.accept(); try { if (ch ! = null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { } return 0; }Copy the code

Here we see the Java client SocketChannel wrapped up as netty’sNioSocketChannel. And then this oneNioSocketChannelWill be inNioServerSocketChannelTo the ChannelPipeline.

ServerBootstrapAcceptor: ServerBootstrapAcceptor: ServerBootstrapAcceptor: ServerBootstrapAcceptor: ServerBootstrapAcceptor: ServerBootstrapAcceptor: ServerBootstrapAcceptor: ServerBootstrapAcceptor: ServerBootstrapAcceptor: ServerBootstrapAcceptor: ServerBootstrapAcceptor: ServerBootstrapAcceptor

public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (! future.isSuccess()) { forceClose(child, future.cause()); }}}); } catch (Throwable t) { forceClose(child, t); }}Copy the code

This MSG is the NioSocketChannel created above. What is childHandler? Here is the ChannelInitializer anonymous class

ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(serverHandler); }});Copy the code

See adding childHandler to NioSocketChannel’s pipeline, configuring the channel, and registering the channel with childGroup.

public final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { 1. register0(promise); } else { 2. try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); }}); } catch (Throwable t) { } } }Copy the code

The registration process here is to take a NioEventLoop from childGroup, also known as workerGroup (NioEventLoop thread will not start immediately in the new version), and submit the task to NioEventLoop by going 2, Submitting the task checks if the current NioEventLoop thread is started, and if not, it is started, along with the task execution, the register0 function. Let’s look at the register0 function again:

private void register0(ChannelPromise promise) { try { boolean firstRegistration = neverRegistered; doRegister(); // javaChannel().register(Selector, 0, this) neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { } }Copy the code

You can see that the channel is actually bound to the selector, so the IO event for the SocketChannel on the client is selected by a thread in the workerGroup. The channel is bound to a thread. The process is repeated with new client connections.

To summarize, there is a thread loop in bossGroup that receives accept eventsNioServerSocketChannelPipeline, inServerBootstrapAcceptor.channelReadThe middle will be initializedNioSocketChannelAs well asNioSocketChannelThen submit the task to the queue of the thread selected by the workerGroup. The workerGroup thread detects a task in the queue and executes itNioSocketChannelRegister to currentNioEventLoopOn selector in,NioSocketChannelSubsequent IO events are also the responsibility of the current thread.