Netty source code analysis series

  • Netty source code parsing series – server start process parsing
  • Netty source code parsing series – client connection access and read I/O parsing
  • 5 minutes to understand pipeline model -Netty source code parsing

preface

Netty source code parsing series – Server startup process parsing we complete the server startup, so after the server startup is complete, the client access and read I/O events how and where to start? And how does the Netty boss thread register the link to the worker thread pool after receiving the client TCP connection request? With these questions in mind, we started client connection access and read/write I/O parsing.

1. NioEventLoop run start ()

    processSelectedKeys();
Copy the code
private void processSelectedKeys() {
    if(selectedKeys ! = null) { processSelectedKeysOptimized(selectedKeys.flip()); }else{ processSelectedKeysPlain(selector.selectedKeys()); }}Copy the code

According to the selectedKeys for null, judge whether to adopt the optimized selectedKeys, into the processSelectedKeysOptimized.

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    for (int i = 0;; i ++) {
        final SelectionKey k = selectedKeys[i];
        if (k == null) {
            break;
        }
        selectedKeys[i] = null;

        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
               processSelectedKey(k, (AbstractNioChannel) a);
        } else {
               @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); }... }}Copy the code

K.attegrine () So where do we attach? The last “Netty source code parsing – server startup process parsing” registration attach to the object, in fact, is NioServerSocketChannel itself.

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for(;;) {... selectionKey = javaChannel().register(eventLoop().selector, 0, this); . }}Copy the code

So let’s go back to k. ttlechannel (), for example, whether the type is AbstractNioChannel after taking out adjunct object, so we can see that not adjunct AbstractNioChannel, so it is adjunct NioTask, Let’s just look at AbstractNioChannel here, and go to the processSelectedKey() method.

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); . int readyOps = k.readyOps();if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! = 0 || readyOps == 0) { unsafe.read();if(! ch.isOpen()) {return;
    }
    if((readyOps & SelectionKey.OP_WRITE) ! = 0) { ch.unsafe().forceFlush(); }if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
          int ops = k.interestOps();
          ops &= ~SelectionKey.OP_CONNECT;
          k.interestOps(ops);
          unsafe.finishConnect();
    }
    ...
}
Copy the code

When the operation type is read or join, go to unsafe.read(). There are two classes that implement this method. One is abstractNiobyteunsafe, an inner class of AbstractNioByteChannel. One is the inner class NioMessageUnsafe AbstractNioMessageChannel, these two classes are NioUnsafe implementation class AbstractNioChannel subclass, that what a subclass? NioServerSocketChannel is created with NioByteUnsafe or NioMessageUnsafe.

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {
        public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }}Copy the code
public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
Copy the code
public abstract class AbstractNioMessageChannel extends AbstractNioChannel { protected AbstractNioMessageChannel(Channel  parent, SelectableChannel ch, intreadInterestOp) {
          super(parent, ch, readInterestOp); }}Copy the code
public abstract class AbstractNioChannel extends AbstractChannel {
	protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); }}Copy the code
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
        protected AbstractChannel(Channel parent) {
                this.parent = parent;
                unsafe = newUnsafe();
                pipeline = new DefaultChannelPipeline(this);
        }
}
Copy the code

NioServerSocketChannel is a subclass of AbstractNioMessageChannel, AbstractNioMessageChannel is a subclass of AbstractNioChannel, NewUnsafe () is AbstractChannel abstract method, then we can know from here, AbstractNioMessageChannel realized AbstractChannel newUnsafe () abstract methods, the judgment, We choose AbstractNioMessageChannel inner class NioMessageUnsafe the read ().

private final class NioMessageUnsafe extends AbstractNioUnsafe {
    private final List<Object> readBuf = new ArrayList<Object>();
    @Override
    public void read() {...for (;;) {
           int localRead = doReadMessages(readBuf); . }setReadPending(false);
    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
            pipeline.fireChannelRead(readBuf.get(i));
    }
    readBuf.clear(); pipeline.fireChannelReadComplete(); . }Copy the code

This is divided into two parts, one is to handle the message, one is to handle the event. 1. Process messages

@Override
protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); . buf.add(new NioSocketChannel(this, ch));return1; . }Copy the code

Taking a client SocketChannel, encapsulating it to a NioSocketChannel and adding it to the list collection, let’s look at new NioSocketChannel().

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); }}Copy the code
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
	protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    		super(parent, ch, SelectionKey.OP_READ);
	}

    @Override
    protected AbstractNioUnsafe newUnsafe() {
    	return new NioByteUnsafe();
    }

    protected class NioByteUnsafe extends AbstractNioUnsafe {
	    @Override
	    public final void read() {... }}}Copy the code

AbstractNioByteChannel also inherits AbstractNioChannel and implements the newUnsafe() method, from which we can infer that when a client first connects, Walking is a subclass of AbstractNioMessageChannel NioMessageUnsafe the read (), when the client sends data, AbstractNioByteChannel uses AbstractNioUnsafe’s inner class AbstractNioByteChannel’s read() method. 2. Handle events

   for (int i = 0; i < size; i ++) {
    	   pipeline.fireChannelRead(readBuf.get(i));
     }

Copy the code
@Override
public ChannelPipeline fireChannelRead(Object msg) {
    head.fireChannelRead(msg);
    return this;
}
Copy the code
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(msg);
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() { next.invokeChannelRead(msg); }}); }return this;
}
Copy the code

    

next
debug
handler
ServerBootstrapAcceptor
ChannelRead()
Netty source code parsing – server startup process parsing
init()
pipeline.addLast(new ServerBootstrapAcceptor())
p.addLast(new ChannelInitializer())?
ChannelInitializer.channelRegistered()
initChannel

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    initChannel((C) ctx.channel());
    ctx.pipeline().remove(this);
    ctx.fireChannelRegistered();
}
Copy the code

Let’s move on to the ServerBootstrapAcceptor’s ChannelRead() method.

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    child.pipeline().addLast(childHandler);
    for(Entry<ChannelOption<? >, Object> e: childOptions) { try {if(! child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e);
          }
        } catch (Throwable t) {
              logger.warn("Failed to set a channel option: "+ child, t); }}for(Entry<AttributeKey<? >, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(newChannelFutureListener() {
           @Override
           public void operationComplete(ChannelFuture future) throws Exception {
               if(! future.isSuccess()) { forceClose(child, future.cause()); }}}); } catch (Throwable t) { forceClose(child, t); }}Copy the code

There are three steps (1) to add childHandler to the handler. Where does this come from? Is set from the beginning serverBootstrap. ChildHandler (new IOChannelInitialize ()). (2) Set some parameters. (3) Channel of the Work thread pool register client.

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

Copy the code
@Override
public EventLoop next() {
    return (EventLoop) super.next();
}
Copy the code
@Override
public EventExecutor next() {
    return chooser.next();
}
Copy the code
private final class GenericEventExecutorChooser implements EventExecutorChooser {
    @Override
    public EventExecutor next() {
        returnchildren[Math.abs(childIndex.getAndIncrement() % children.length)]; }}Copy the code

Select a thread from the Work thread pool to execute register.

@Override
public ChannelFuture register(Channel channel) {
    return register(channel, new DefaultChannelPromise(channel, this));
}
Copy the code
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
	 ...
        channel.unsafe().register(this, promise);
        return promise;
}
Copy the code
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
	 ...
     AbstractChannel.this.eventLoop = eventLoop;
     if (eventLoop.inEventLoop()) {
     register0(promise);
     } else {
          try {
              eventLoop.execute(new OneTimeTask() {
              @Override
              public void run() { register0(promise); }}); } catch (Throwable t) { ... }}}Copy the code
@Override
protected void doRegister() throws Exception { ... selectionKey = javaChannel().register(eventLoop().selector, 0, this); . }Copy the code

The following process and the previous article “Netty source code analysis – server startup process analysis” is the same registration process, the difference is that the service startup registration is executed in the boss thread pool task queue, the client new access registration is executed in the work thread pool task queue register0() method. By registering the selector from the work thread pool with the Java NIO, we can answer the first few questions: How do clients access? How does the Boss thread of Netty register the link to the worker thread pool after receiving the TCP connection request from the client? That leaves us with one question: How and where did the read-write I/O event start? Let’s go back to the beginning of the article

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    for (int i = 0;; i ++) {
        final SelectionKey k = selectedKeys[i];
        if (k == null) {
            break;
        }
        selectedKeys[i] = null;

        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
               processSelectedKey(k, (AbstractNioChannel) a);
        } else {
               @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); }... }}Copy the code

The boss thread pool has completed the client connection, registered the link to the worker thread pool task queue, and added the listen for the read event, so now the work thread loops whether there are any pending events in selectedKeys. The processSelectedKey() method is executed.

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
	...
	int readyOps = k.readyOps();
	if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! = 0 || readyOps == 0) { unsafe.read(); . }... }Copy the code

Here unsafe.read() selects AbstractNioByteChannel’s read().

@Override
public final void read() {
    final ChannelConfig config = config();
    if(! config.isAutoRead() && ! isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime
        removeReadOp();
        return;
    }
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final int maxMessagesPerRead = config.getMaxMessagesPerRead();
    RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
    if (allocHandle == null) {
       this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
    }
    ByteBuf byteBuf = null;
    int messages = 0;
    boolean close = false;
    try {
       int totalReadAmount = 0;
       boolean readPendingReset = false;
       do {
          byteBuf = allocHandle.allocate(allocator);
          int writable = byteBuf.writableBytes();
          int localReadAmount = doReadBytes(byteBuf);
          if (localReadAmount <= 0) {
           // not was read release the buffer
              byteBuf.release();
              byteBuf = null;
              close = localReadAmount < 0;
              break;
           }
          if (!readPendingReset) {
               readPendingReset = true;
               setReadPending(false);
          }
          pipeline.fireChannelRead(byteBuf);
          byteBuf = null;

          if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
               totalReadAmount = Integer.MAX_VALUE;
               break;
          }
          totalReadAmount += localReadAmount;

          if(! config.isAutoRead()) {break;
          }

          if (localReadAmount < writable) {
              break; }}while (++ messages < maxMessagesPerRead);
         pipeline.fireChannelReadComplete();
         allocHandle.record(totalReadAmount);

        if (close) {
            closeOnRead(pipeline);
            close = false;
        }
     } catch (Throwable t) {
          handleReadException(pipeline, byteBuf, t, close);
     } finally {
         if(! config.isAutoRead() && ! isReadPending()) { removeReadOp(); }}}}Copy the code

Break up this large chunk of code into parts 1. MaxMessagesPerRead = 16; maxMessagesPerRead = 16; 2. Access to and operation cache handler, config. GetRecvByteBufAllocator () newHandle (). AllocHandle. Allocate (allocator) to allocate cache space. 4. Read data from the socket to byteBuf. 5. Pass the read event to the next handler handler. We will only look at the read event, which will be explained in more detail later in this article.

@Override
public ChannelPipeline fireChannelRead(Object msg) {
    head.fireChannelRead(msg);
    return this;
}
Copy the code
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    if (msg == null) {
        throw new NullPointerException("msg");
    }

    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(msg);
 } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() { next.invokeChannelRead(msg); }}); }return this;
}
Copy the code

Handler
HeadContextHandler –> IdleStateHandler –>IOHandler –> TailContext

private void invokeChannelRead(Object msg) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); }}Copy the code

Into the IdleStateHandler

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
        reading = true;
        firstReaderIdleEvent = firstAllIdleEvent = true;
    }
    ctx.fireChannelRead(msg);
}
Copy the code

Set the read event to true in preparation for later state checks, and continue passing the read event, this time to the IOHandler.

public class IOHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); System.out.println(msg.toString()); }... }Copy the code

This section explains how the read I/O event starts and how the read I/O event is handled by a user-defined handler.

Summary: 1. The boss thread handles the Accept event of the NioServerSocketChannel and adds the client to the work task queue, which performs redister0() and registers the read event with the Selector of the Work thread. 2. The Work thread polls the SelectKeys and sends cached data to the user handler when an event comes up.