This series of Netty source code analysis articles is based on the 4.1.56.final version

These reviews

In the previous series of articles, we started from how the kernel sends and receives network data and took a C10K problem as the main line to elaborate the evolution of network IO model from the perspective of the kernel in detail. Finally, based on this, Netty’s network IO model is drawn as shown in the figure below:

See the evolution of IO models from a Kernel Perspective for more details.

Then we focus on the IO thread model of Netty’s master-slave Reactor network. In the article realization of Reactor Model in Netty, we elaborate the establishment of Netty’s master-slave Reactor model and introduce the key components of the Reactor model. The core skeleton of Netty is built as shown in the figure below:

After the core framework was built, we then described the whole process of Reactor startup in the article detailed illustrated Reactor startup process. A very important core component NioServerSocketChannel made its debut here. It undertakes the most important task of a network framework – receiving network connections efficiently. We introduced the whole process of creating a SERVER SocketChannel, initializing it, registering it with the Main Reactor, and listening for OP_ACCEPT events. With that in mind, Netty was ready for a flood of client connections.

Then we introduced in detail the whole process of Netty receiving client network connections efficiently in the article how Netty receives network connections efficiently. Here, the core important component of Netty, NioServerSocketChannel, came on the stage. In NioServerSocketChannel, we create a client connection to NioSocketChannel, and introduce the initialization process of NioSocketChannel in detail. Then by firing the ChannelRead event in the NioServerSocketChannel pipeline, Finally, the Client connection NioSocketChannel is registered in the Sub Reactor ServerBootstrapAcceptor to listen for OP_READ events on the client connection and prepare to receive network data sent by the client, which is the subject of this article.

Since then, all the core components of Netty have been ready and started, and started to take off ~~~

The Main characters in the previous article are the Main Reactor in the Main Reactor group of Netty and the NioServerSocketChannel registered in the Main Reactor group, so from the beginning of this article, The protagonists of our article switch to a SubReactor and the NioSocketChannel registered to a SubReactor.

Let’s dive into today’s topic and see how Netty handles OP_READ events and receives network data efficiently.

1. The Sub Reactor processes OP_READ events

After the client initiates the system IO call and sends data to the server, when the network data reaches the server’s nic and is processed by the kernel protocol stack, and finally reaches the Socket’s receiving buffer, the Sub Reactor polling OP_READ event on NioSocketChannel is ready. The Sub Reactor thread then returns from the blocking polling apisElector.select (timeoutMillis) call on JDK Selector. Instead, it handles the OP_READ event on NioSocketChannel.

Note that the Reactor is the Sub Reactor that handles client connections. The connection is of type NioSocketChannel and handles the OP_READ event.

As I have emphasized many times in previous articles, Reactor handles the I/o event entry function NioEventLoop#processSelectedKey for a Channel.

public final class NioEventLoop extends SingleThreadEventLoop {

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        finalAbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); . Omit...try {
            int readyOps = k.readyOps();

            if((readyOps & SelectionKey.OP_CONNECT) ! =0) {... Process the OP_CONNECT event................. }if((readyOps & SelectionKey.OP_WRITE) ! =0) {... Process the OP_WRITE event................. }if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) {
                // This article focuses on the OP_ACCEPT eventunsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}}Copy the code

It is important to note that the current thread of execution is now a Sub Reactor, which registers the netty client NioSocketChannel to handle read and write events on the connection.

So AbstractNioChannel ch, the entry function argument, is the IO ready client connecting to NioSocketChannel.

The NioUnsafe action class, initially obtained via ch. Unsafe (), is the underlying NioSocketChannel action class for the underlying JDK NIO SocketChannel. The implementation type NioByteUnsafe is defined in the AbstractNioByteChannel parent class in the following inheritance structure.

NioByteUnsafe#read (NioByteUnsafe#read);

2. Netty receives network data

We directly in accordance with the old rules, first from the whole of the entire OP_READ event logical processing framework extracted, let us first look at the overall picture of the process, and then for each core point to break.

The relevant steps in the process are the logic of Netty handling connection closure, which has nothing to do with the main idea of this article. We will ignore it for the time being. When the author introduces connection closure, we will open a separate article to introduce it in detail.

From the above general flow chart of Netty receiving network data, it can be seen that NioServerSocketChannel receives network data and NioServerSocketChannel receives client connections in the general box introduced in the previous article “How to receive network connections efficiently” It’s the same on the rack.

NioSocketChannel receives network data through a do{…. }while(…) Loop read loop the continuous loop that reads data connected to a NioSocketChannel.

The NioSocketChannel read loop, which reads connection data, is also limited by the maximum number of reads. By default, the NioSocketChannel cannot be read for more than 16 reads, regardless of whether the NioSocketChannel still has data to read.

The maximum number of reads for the read loop can be set in the bootstrap configuration class ServerBootstrap using the channeloption. MAX_MESSAGES_PER_READ option. The default is 16.

ServerBootstrap b = newServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.MAX_MESSAGES_PER_READ, Custom times)Copy the code

**Netty does not limit the number of times a read loop can be read. ** Why not read the data all at once in the read loop?

This is the time to test our overall view. In the introduction of the previous article, we mentioned that Netty’s IO model is the primary and secondary Reactor thread Group model. In the Sub Reactor Group, there are several Sub reactors specially used to monitor and process THE IO events on the client connection.

In order to efficiently and orderly process the read and write events on the full client connections, Netty allocates the full client connections borne by the server to multiple Sub reactors for processing, and also ensures the thread safety of IO processing on the Channel.

A Channel can only be assigned to a fixed Reactor. A Reactor is responsible for processing IO ready events on multiple channels. The corresponding relationship between Reactor and Channel is shown in the figure below:

While a Sub Reactor registers multiple nio socketchannels, Netty cannot process data on one NioSocketChannel without limit, so it is necessary to evenly distribute data reading opportunities to other nio socketchannels. So you need to limit the maximum number of reads on each NioSocketChannel.

In addition to listening for all IO ready events registered with niO SocketChannels, the Sub Reactor also needs to set aside events to handle asynchronous tasks submitted by user threads. At this point, Netty won’t be stuck with NioSocketChannel’s IO handling. Therefore, it is necessary to limit the maximum number of reads a read loop can have.

For the general framework of the Reactor, those who are interested in the details can refer back to my article on the Framework of the Netty Core Reactor.

So for this reason, we need to read loop loop, when read data from NioSocketChannel by doReadBytes approaches (methods return value will be greater than zero, and record in allocHandle. LastBytesRead), All need through allocHandle. IncMessagesRead (1) the method of statistics has read the number of times. At the end of the read loop, the NioSocketChannel needs to exit the read loop when it reaches 16, regardless of whether there is any data left to read. Go to the asynchronous task in the Sub Reactor. And IO ready events on other NIo SocketChannels. Even distribution, even rain!!

public abstract class MaxMessageHandle implements ExtendedHandle {

        // Read loop Total number of reads
        private int totalMessages;

       @Override
        public final void incMessagesRead(int amt) { totalMessages += amt; }}Copy the code

The read loop reads to the size of the data will be recorded in allocHandle. LastBytesRead

public abstract class MaxMessageHandle implements ExtendedHandle {

         // The number of bytes read in this read loop
        private int lastBytesRead;
        // The total number of bytes read in the entire read loop
        private int totalBytesRead;

        @Override
        public void lastBytesRead(int bytes) {
            lastBytesRead = bytes;
            if (bytes > 0) { totalBytesRead += bytes; }}}Copy the code
  • LastBytesRead < 0: indicates that the client initiated the connection closure process and Netty started the connection closure process. This has nothing to do with the main idea of this article. An article will be devoted to explaining the closure process.

  • LastBytesRead = 0: indicates that all data has been read from the current NioSocketChannel. This OP_READ event is handled successfully, and you can exit the read loop happily.

  • When lastBytesRead > 0: indicates that the NioSocketChannel is read in this read loop, the NioSocketChannel pipeline will fire the ChannelRead event. Then respond in ChannelHandelr which is responsible for IO processing in pipeline to process network request.

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {... Handle network requests, such as decoding, deserialization, etc....... }}Copy the code

Will end up in the read loop cycle. At the end of the call allocHandle continueReading () to determine whether to end the read loop cycle. The judgment of the end of the loop condition here will be much more complicated than that of NioServerSocketChannel receiving connection. The author will put the detailed analysis of this judgment condition in the later part of the article for you to interpret. Here, you only need to grasp the overall core process without paying attention to too many details.

In general, the end conditions of the read loop for reading network data in NioSocketChannel should meet the following requirements:

  • The data in the current NioSocketChannel has been read, and the loop exits.

  • This round of read loop exits if no data is read.

  • If the read loop is read 16 times, the loop exits.

When the read loop condition is met, the Sub Reactor thread exits the loop, The allochandle.readComplete () method is then called to determine whether to expand or shrink the ByteBuffer used to receive the next OP_READ event based on the total number of bytes read in the current read loop.

Finally, the ChannelReadComplete event is fired in the NioSocketChannel pipeline, notifying the ChannelHandler that the OP_READ event has been handled.


public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {... Handle network requests, such as decoding, deserialization, etc....... }@Override
    public void channelReadComplete(ChannelHandlerContext ctx) {... The OP_READ event is processed successfully....... . Decide whether to respond to the client with processing results...... }}Copy the code

2.1 Differences between ChannelRead and ChannelReadComplete events

Some friends may not understand the timing of some propagation events in Netty or the difference between events clearly, and the concept is easily confused. In the following article, the author will also start from the source point of view to make clear all the asynchronous events defined in Netty, as well as the differences and connections between these events and trigger timing, transmission mechanism.

Here we focus on the two events covered in this article’s topic: the ChannelRead event and the ChannelReadComplete event.

The ChannelRead event is not the same as the ChannelReadComplete event, as you can see from the overview of how Netty receives network data. However, the name of the ChannelRead event is similar to that of the ChannelReadComplete event.

Let’s look at the differences between the two events:

The Netty server processes an OP_READ event by reading network data from the client NioSocketChannel several times in a do{}while() read loop. Read the size of our allocated ByteBuffer each time. The initial size is 2048.

  • ChanneRead event: A ChannelRead event is fired for each read. The maximum size of the DirectByteBuffer allocated at the start of the Read loop is read. This capacity will be adjusted dynamically, and the author will introduce it in detail later in the article.

  • ChannelReadComplete event: The ChannelReadComplete event is emitted when the read loop exits without reading data or any of the conditions of continueReading are not met. Indicates that the OP_READ event is processed successfully.

It is important to note that firing the ChannelReadComplete event does not mean that the NioSocketChannel has finished reading data, only that the OP_READ event has finished processing. It is possible that the client sent too much data, and Netty did not finish reading it until the next OP_READ event.


This is all the core logic that Netty uses to send network data from receiving clients. So far we have not involved this part of the main core source code, THE author would like to give you the core logic explained clearly, so understand the core source code will be more clear and thorough.

After the introduction of the core logic of network data reception, the author put out this flow chart, we can combine this figure to recall the main core logic.

The following author will be combined with this flow chart, to show you the core source code framework of this part, we can introduce the core logic and trunk source code to do a one to one correspondence, or the old saying, we want to grasp the overall processing process from the backbone framework level, do not need to read every line of code, In the follow-up, the author will break down the core points involved in this process for everyone.

3. Source code core framework overview

        @Override
        public final void read(a) {
            finalChannelConfig config = config(); . Handle half-closed related code omission...............// get NioSocketChannel pipeline
            final ChannelPipeline pipeline = pipeline();
            //PooledByteBufAllocator Specifies the allocator used to actually allocate ByteBuf
            final ByteBufAllocator allocator = config.getAllocator();
            / / adaptive ByteBuf distributor AdaptiveRecvByteBufAllocator for dynamic adjusting ByteBuf capacity
            // Need to work with a specific ByteBuf allocator such as PooledByteBufAllocator here
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            //allocHandler is used to count the size of the data read each time, so that the appropriate size of ByteBuf can be allocated next time
            // Reset Clears the last statistic
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    // Allocate the appropriate size of byteBuf using PooledByteBufAllocator. The initial size is 2048
                    byteBuf = allocHandle.allocate(allocator);
                    // Record the number of bytes read this time
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    // If no bytes are read this time, exit the loop for the next round of event polling
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if(close) { ...... Indicates that the client initiates a connection and closes..... }break;
                    }

                    //read loop The number of times the data is read +1
                    allocHandle.incMessagesRead(1);
                    // the ChannelRead event is fired in the pipeline of the client NioSocketChannel
                    pipeline.fireChannelRead(byteBuf);
                    // Remove the ByteBuffer reference for the next read loop allocation
                    byteBuf = null;
                } while (allocHandle.continueReading());// Determine if read loop should continue

                // Determine whether to expand or shrink the capacity next time based on the total number of bytes read in this read loop
                allocHandle.readComplete();
                // the ChannelReadComplete event is fired in the NioSocketChannel pipeline, indicating that a read event has been processed
                // This does not mean that all the data sent by the client has been read, because if there is too much data, it will only be read 16 times, and the rest will be processed after the next read eventpipeline.fireChannelReadComplete(); . Omit connection closure process......... }catch(Throwable t) { ............... Omit... }finally{... Omit... }}}Copy the code

Once again, the current executing thread is the Sub Reactor thread, and the connection reading logic is handled in the NioSocketChannel.

The Channel configuration class NioSocketChannelConfig for client NioSocketChannel is first obtained through config().

Get the pipeline for NioSocketChannel through pipeline(). In the Netty server template example we discussed in the Netty Reactor startup process, NioSocketChannelde pipeline had only one EchoChannelHandler.

3.1 Allocate DirectByteBuffer to receive network data

When a Sub Reactor receives I/o data from NioSocketChannel, it allocates a ByteBuffer to store the received I/o data.

Here you might find it odd that there are two ByteBuffer allocator in NioSocketChannel for receiving data. One is ByteBufAllocator and the other is RecvByteBufAllocator.

    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
Copy the code

What are the differences and connections between these two bytebuffers?

RecvByteBufAllocator RecvByteBufAllocator RecvByteBufAllocator RecvByteBufAllocator

Mentioned in the previous article NioServerSocketChannelConfig, here for ServerChannelRecvByteBufAllocator RecvByteBufAllocator type.

Remember this in 4.1.69 ServerChannelRecvByteBufAllocator type. The final version is introduced in order to solve the author mentioned in the previous article that Bug? In 4.1.69. Before the final version, type of RecvByteBufAllocator NioServerSocketChannelConfig AdaptiveRecvByteBufAllocator.

In this article the NioSocketChannelConfig for AdaptiveRecvByteBufAllocator RecvByteBufAllocator type.

So here recvBufAllocHandle () to obtain the RecvByteBufAllocator AdaptiveRecvByteBufAllocator. As the name implies, this type of RecvByteBufAllocator dynamically adjusts the size of the ByteBuffer based on the size of each incoming IO data on the NioSocketChannel.

For the client NioSocketChannel, the length of the IO data contained in it is the network data sent by the client. Therefore, such a ByteBuffer that can adjust the capacity dynamically according to the size of each IO data is needed to receive it.

If we regard the ByteBuffer used to receive data as a bucket, then it is definitely not appropriate to hold small data in large buckets or large data in small buckets, so we need to dynamically adjust the size of the bucket according to the size of the data received. And AdaptiveRecvByteBufAllocator function is used to dynamically according to each receiving data size to adjust the capacity of the ByteBuffer.

Now that recvByte Fallocator has been explained, let’s continue to look at byte fallocator.

Everyone is important to note here AdaptiveRecvByteBufAllocator ByteBuffer will not really to distribution, it is responsible for the size of the distribution of dynamic adjustment ByteBuffer.

In this case, the ByteBufAllocator type is PooledByteBufAllocator. It will be according to the size of the dynamic adjustment AdaptiveRecvByteBufAllocator out go to the real application memory allocation ByteBuffer.

PooledByteBufAllocator is a memory pool in Netty that manages the out-of-heap DirectByteBuffer.

AdaptiveRecvByteBufAllocator allocHandle of in the previous article we introduced, its actual type for MaxMessageHandle.

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    @Override
    public Handle newHandle(a) {
        return new HandleImpl(minIndex, maxIndex, initial);
    }
    
    private final class HandleImpl extends MaxMessageHandle {... Omit... }}Copy the code

The MaxMessageHandle contains statistics for dynamically adjusting the size of ByteBuffer.

   public abstract class MaxMessageHandle implements ExtendedHandle {
        private ChannelConfig config;

        // This is used to control the maximum number of times a read loop can be read. The default is 16
        // This can be set in the bootstrap configuration class ServerBootstrap with the channeloption. MAX_MESSAGES_PER_READ option.
        private int maxMessagePerRead;

        // This is used to count the total number of connections received in the read loop. NioSocketChannel is used to count the number of reads
        / / read each time the loop cycle will be called after allocHandle. IncMessagesRead increase records receives the number of connections
        private int totalMessages;

        // Count the total amount of data received on the client connection in the read loop
        private int totalBytesRead;

        // Indicates the number of bytes this read loop attempts to read, and the number of bytes left in the writable byteBuffer
        private int attemptedBytesRead;

        // The number of bytes read in this read loop
        private int lastBytesRead;
        
        // The expected size of the next buffer allocation, initial: 2048
        private intnextReceiveBufferSize; . Omit... }Copy the code

Before each read loop starts, allochandle.reset (config) is called to reset and clear the statistics of the previous read loop.

        @Override
        public void reset(ChannelConfig config) {
            this.config = config;
            // By default, a maximum of 16 entries are read each time
            maxMessagePerRead = maxMessagesPerRead();
            totalMessages = totalBytesRead = 0;
        }
Copy the code

Before each start reading from NioSocketChannel, memory needs to be allocated for ByteBuffer in the memory pool using PooledByteBufAllocator. The default initial size is 2048, which is determined by the guess() method.

        byteBuf = allocHandle.allocate(allocator);
Copy the code
        @Override
        public ByteBuf allocate(ByteBufAllocator alloc) {
            return alloc.ioBuffer(guess());
        }

        @Override
        public int guess(a) {
            // The expected size of the next buffer allocation, starting with 2048
            return nextReceiveBufferSize;
        }
Copy the code

In every time after the data read from the NioSocketChannel by doReadBytes, will be called allocHandle. LastBytesRead (doReadBytes (byteBuf) records the read how many bytes of data, And count the total number of bytes read by the current round of the read loop.

        @Override
        public void lastBytesRead(int bytes) {
            lastBytesRead = bytes;
            if (bytes > 0) { totalBytesRead += bytes; }}Copy the code

After each loop reads the data from NioSocketChannel will call allocHandle. IncMessagesRead (1). Count the number of current reads. If you exceed the maximum read limit of 16 times, you need to exit the read loop. To handle IO events on other NIo SocketChannels.

        @Override
        public final void incMessagesRead(int amt) {
            totalMessages += amt;
        }
Copy the code

In every time I read the end of the loop cycle need by calling allocHandle. ContinueReading () to determine whether to continue the read data in a read NioSocketChannel loop cycle.

        @Override
        public boolean continueReading(a) {
            return continueReading(defaultMaybeMoreSupplier);
        }

        private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
            @Override
            public boolean get(a) {
                // Check whether the read byteBuffer is full
                returnattemptedBytesRead == lastBytesRead; }};@Override
        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            returnconfig.isAutoRead() && (! respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead >0;
        }
Copy the code
  • attemptedBytesRead :Represents the number of bytes that the current ByteBuffer is expected to attempt to write.
  • lastBytesRead :Indicates how many bytes were actually read in this read loop.

DefaultMaybeMoreSupplier checks whether ByteBuffer is full after the read loop reads data. AttemptedBytesRead == lastBytesRead, indicating there may still be data in the NioSocketChannel. If the NioSocketChannel is not full, there is no data in it.

Whether to proceed with the read loop must satisfy the following conditions:

  • TotalMessages < maxMessagePerRead Whether the number of current reads has exceeded 16, if so, exit do(…) While () loop. Poll for the next OP_READ event. Because each Sub Reactor manages multiple nio socketchannels, you cannot spend too much time on one NioSocketChannel, and the opportunity should be evenly distributed among all nio socketchannels managed by the Sub Reactor.

  • TotalBytesRead > 0 Whether the OP_READ event has read data, if there is no data to read, then exit the read loop directly.

  • ! RespectMaybeMoreData | | maybeMoreDataSupplier. The get () the condition is more complex, All it does is respectMaybeMoreData to control what to do with respect for situations where there might still be data in NioSocketChannel that is readable.

    • MaybeMoreDataSupplier. The get () : true indicates the read to read data from the NioSocketChannel, ByteBuffer fill their carts. The NioSocketChannel may still have unread data. Fasle indicates that ByteBuffer is not yet full, indicating that there is no more data to read in NioSocketChannel.

    • RespectMaybeMoreData = true to there could be more data for processing this kind of situation to new one seriously, if the loop reads the data has been filled with ByteBuffer, says there may be behind the data, then read. If ByteBuffer is not full, there is no more data to read and the loop exits.

    • RespectMaybeMoreData = false respectMaybeMoreData = false respectMaybeMoreData = false respectMaybeMoreData = false respectMaybeMoreData No matter whether the ByteBuffer returns with full load in the current reading cycle, reading will continue until no data can be read and exits the cycle, which belongs to mindless reading.

If all three conditions are met, the read loop continues. Continue reading data from NioSocketChannel until one of the three conditions is not met or cannot be read.

3.2 reading data from NioSocketChannel

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

    @Override
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());    
        returnbyteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }}Copy the code

This calls the SocketChannel#read method of the underlying JDK NIO directly to read the data into DirectByteBuffer. The read data size is the allocated DirectByteBuffer capacity, which is 2048 initially.

4. ByteBuffer dynamic adaptive capacity expansion and shrinkage mechanism

Since we do not know how much network data the client will send at the beginning, PooledByteBufAllocator is used to allocate a DirectByteBuffer with an initial capacity of 2048 to receive data.

  byteBuf = allocHandle.allocate(allocator);
Copy the code

It’s like we need to take a bucket to line up with water, but for the first time to pack, we don’t know the administrator will give us how much water allocation, bucket of Canadian is not appropriate to take small also not appropriate, so we will first estimate about the size of a bucket, if the distribution of many, we took a bigger barrel next time, if the distribution of less, Next time we’ll get a smaller bucket.

In this scenario, we need ByteBuffer to automatically adjust its capacity dynamically according to the size of each network data.

The ByteBuffer dynamic adaptive enlarge shrinks capacity mechanism depends on the AdaptiveRecvByteBufAllocator class implementation. Let’s return to AdaptiveRecvByteBufAllocator classes to create the beginning start ~ ~

4.1 AdaptiveRecvByteBufAllocator creation

As mentioned in the previous article how Netty Receives Network connections efficiently, when the Main Reactor listens to the OP_ACCPET event being active, it accepts the three handshake client connection in the NioServerSocketChannel. NioSocketChannel is created, and the NioSocketChannel configuration class NioSocketChannelConfig is also created.

    public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }
Copy the code

Will end up in the constructor of the superclass DefaultChannelConfig NioSocketChannelConfig create AdaptiveRecvByteBufAllocator. The directory is saved in the RecvByteBufAllocator rcvBufAllocator field.

public class DefaultChannelConfig implements ChannelConfig {

    / / to Channel receive data in the buffer distributor AdaptiveRecvByteBufAllocator
    private volatile RecvByteBufAllocator rcvBufAllocator;

    public DefaultChannelConfig(Channel channel) {
            this(channel, newAdaptiveRecvByteBufAllocator()); }}Copy the code

In the new AdaptiveRecvByteBufAllocator () will be triggered when creating AdaptiveRecvByteBufAllocator class instance AdaptiveRecvByteBufAllocator class initialization.

Let’s look at AdaptiveRecvByteBufAllocator class initialization all did some what things:

4.2 AdaptiveRecvByteBufAllocator class initialization

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    // Expand the step
    private static final int INDEX_INCREMENT = 4;
    // Shrink step size
    private static final int INDEX_DECREMENT = 1;

    //RecvBuf Capacity allocation table (Capacity expansion index table) Expands and shrinks the capacity based on the capacity recorded in the table
    private static final int[] SIZE_TABLE;

   static {
        // Initialize the RecvBuf capacity allocation table
        List<Integer> sizeTable = new ArrayList<Integer>();
        // If the allocated capacity is less than 512, the capacity expansion unit is 16 increments
        for (int i = 16; i < 512; i += 16) {
            sizeTable.add(i);
        }

        // If the allocated capacity is greater than 512, the capacity expansion unit is doubled
        for (int i = 512; i > 0; i <<= 1) {
            sizeTable.add(i);
        }

        // Initialize the RecbBuf index table
        SIZE_TABLE = new int[sizeTable.size()];
        for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); }}}Copy the code

AdaptiveRecvByteBufAllocator main role is to receive data ByteBuffer expansion capacity, so every time what capacity? By how much? How to shrink? How much is the reduction?

These four questions will be the author to answer for you in this section ~~~

Netty defines an int array SIZE_TABLE to store the capacity of each unit. Set up the capacity index table for expansion and contraction. Each capacity expansion and reduction is recorded in the capacity index table.

When AdaptiveRecvByteBufAllocatorl class initialization in the static {} static block of code to enlarge shrinks SIZE_TABLE capacity index table is initialized.

Initialization of SIZE_TABLE is divided into two parts:

  • When the index capacity is less than512When,SIZE_TABLE The capacity index defined in16 beganAccording to the16The increment.

  • When the index capacity is greater than512When,SIZE_TABLE The capacity index defined in the index increases by twice the capacity of the previous index.

4.3 Capacity Expansion logic

Now that SIZE_TABLE has been initialized, how do we determine how much to expand or shrink ByteBuffer according to SIZE_TABLE?

It’s used in AdaptiveRecvByteBufAllocator class defined in the expansion step INDEX_INCREMENT = 4, shrinkage capacity step INDEX_DECREMENT = 1.

Let’s take the above two sets of capacity indexes from SIZE_TABLE as an example to illustrate the capacity expansion logic. Assume that the current capacity index of ByteBuffer is 33 and the corresponding capacity is 2048.

This will increase

When the capacity of 2048 ByteBuffer is expanded, the capacity index is calculated as 37 according to the current capacity index = 33 and the expansion step INDEX_INCREMENT = 4. SIZE_TABLE subscript 37 corresponds to the capacity of the expanded ByteBuffer. SIZE_TABLE[37] = 32768

4.3.1 shrinkage capacity

In the same way, when reducing the size of a ByteBuffer with a capacity of 2048, we need to subtract the reduction step INDEX_DECREMENT = 1 from the current capacity index = 33 to calculate the reduced capacity index 32. SIZE_TABLE subscript 32 corresponds to the capacity of the ByteBuffer after SIZE_TABLE[32] = 1024

4.4 Capacity Expansion and contraction Timing

public abstract class AbstractNioByteChannel extends AbstractNioChannel {
        @Override
        public final void read(a) {... Omit...try {
                do{... Omit... }while (allocHandle.continueReading());

                // Determine whether to expand or shrink the capacity next time based on the total number of bytes read in this read loopallocHandle.readComplete(); . Omit... }catch(Throwable t) { ............... Omit... }finally{... Omit... }}}Copy the code

At the end of each read loop, we call allochandle.readComplete () to calculate the total size of bytes read in the loop based on allocHandle, To determine whether to expand or shrink DirectByteBuffer in the next read loop.

public abstract class MaxMessageHandle implements ExtendedHandle {

       @Override
       public void readComplete(a) {
                // Whether to expand or shrink recVBUF
                record(totalBytesRead());
       }

       private void record(int actualReadBytes) {
            if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
                if (decreaseNow) {
                    index = max(index - INDEX_DECREMENT, minIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                } else {
                    decreaseNow = true; }}else if (actualReadBytes >= nextReceiveBufferSize) {
                index = min(index + INDEX_INCREMENT, maxIndex);
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false; }}}Copy the code

We take the current ByteBuffer capacity of 2048 and capacity index of index = 33 as an example to illustrate the allocHandle capacity expansion and reduction rules.

INDEX_INCREMENT = 4 and INDEX_DECREMENT = 1.

4.4.1 shrinkage capacity

  • If theOP_READ eventThe total number of bytes actually readactualReadBytesBetween SIZE_TABLE[INDEX_DECREMENT] and SIZE_TABLE[index], that is, if the number of bytes read since the end of this read loop is less than[1024204]In between. Indicates what is assigned at this timeByteBufferThe capacity is right, and there is no need to shrink or expand the capacity.

For example, actualReadBytes = 2000, which is between 1024 and 2048. Note The capacity of 2048 is just right.

  • ifactualReadBytesLess than or equal to SIZE_TABLE[INDEX_DECREMENT], which means that the number of bytes read since the end of this read loop is less than or equal to1024. Indicates that the number of bytes read this time is smaller than the next level of the current ByteBuffer capacity, indicating that the current ByteBuffer capacity is allocated too large. Set the capacity reduction flagdecreaseNow = true. The next timeOP_READ eventContinue to meet the capacity reduction conditions, the real start to reduce capacity. The SIZE_TABLE[INDEX_DECREMENT] capacity cannot be smaller than SIZE_TABLE[minIndex].

Caution The capacity can be shrunk only when the condition is met twice and the step is 1

4.4.2 capacity

If the total number of bytes read by the OP_READ event actualReadBytes is greater than or equal to the current ByteBuffer capacity (nextReceiveBufferSize), the allocated ByteBuffer capacity is too small and needs to be expanded. The capacity after expansion is SIZE_TABLE[index + INDEX_INCREMENT] but cannot exceed SIZE_TABLE[maxIndex].

If the capacity expansion condition is met, the capacity expansion step is 4

4.5 AdaptiveRecvByteBufAllocator instantiation of a class

AdaptiveRecvByteBufAllocator class instantiation is mainly determine the initial capacity of ByteBuffer, and with the minimum capacity and maximum capacity expansion of the capacity index table SIZE_TABLE subscript: minIndex and maxIndex.

AdaptiveRecvByteBufAllocator defines three fields about ByteBuffer capacity:

  • DEFAULT_MINIMUM: Specifies the minimum size of the ByteBuffer. The default value is 64.

  • DEFAULT_INITIAL: indicates the initial capacity of ByteBuffer. The default is 2048.

  • DEFAULT_MAXIMUM: indicates the maximum capacity of the ByteBuffer. The default value is 65536.

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    static final int DEFAULT_MINIMUM = 64;
    static final int DEFAULT_INITIAL = 2048;
    static final int DEFAULT_MAXIMUM = 65536;

    public AdaptiveRecvByteBufAllocator(a) {
        this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
    }

    public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {... Omit exception checking logic.............// minIndex maxIndex
        // Find the minimum size index in SIZE_TABLE: 3
        int minIndex = getSizeTableIndex(minimum);
        if (SIZE_TABLE[minIndex] < minimum) {
            this.minIndex = minIndex + 1;
        } else {
            this.minIndex = minIndex;
        }

        // Binary lookup in SIZE_TABLE <= maximum size index: 38
        int maxIndex = getSizeTableIndex(maximum);
        if (SIZE_TABLE[maxIndex] > maximum) {
            this.maxIndex = maxIndex - 1;
        } else {
            this.maxIndex = maxIndex;
        }

        this.initial = initial; }}Copy the code

The next thing to do is to determine the subscript minIndex of DEFAULT_MINIMUM in SIZE_TABLE and maxIndex of DEFAULT_MAXIMUM in SIZE_TABLE.

From AdaptiveRecvByteBufAllocator class initialization process, we can see the data stored in the SIZE_TABLE is characterized by an ordered set.

We can use binary lookup to find the first capacity index minIndex in SIZE_TABLE whose capacity is greater than or equal to DEFAULT_MINIMUM.

Similarly, binary lookup is used to find the last maxIndex in SIZE_TABLE whose capacity is less than or equal to DEFAULT_MAXIMUM.

Based on the screenshot from the previous section on the size distribution of the capacity data in SIZE_TABLE, we can see that minIndex = 3 and maxIndex = 38

4.5.1 Binary Search for capacity Index subscripts

    private static int getSizeTableIndex(final int size) {
        for (int low = 0, high = SIZE_TABLE.length - 1;;) {
            if (high < low) {
                return low;
            }
            if (high == low) {
                return high;
            }

            int mid = low + high >>> 1;// Unsigned right shift, the high level always complement 0
            int a = SIZE_TABLE[mid];
            int b = SIZE_TABLE[mid + 1];
            if (size > b) {
                low = mid + 1;
            } else if (size < a) {
                high = mid - 1;
            } else if (size == a) {
                return mid;
            } else {
                return mid + 1; }}}Copy the code

Those of you who are familiar with LeetCode will immediately recognize this as a binary search template.

SIZE_TABLE = SIZE_TABLE = SIZE_TABLE = SIZE_TABLE = SIZE_TABLE = SIZE_TABLE

4.6 RecvByteBufAllocator. Handle

The front we mentioned finally dynamic adjustment ByteBuffer capacity is in the charge of the Handler AdaptiveRecvByteBufAllocator, we’ll look at the allocHandle creation process.

protected abstract class AbstractUnsafe implements Unsafe {

        private RecvByteBufAllocator.Handle recvHandle;

        @Override
        public RecvByteBufAllocator.Handle recvBufAllocHandle(a) {
            if (recvHandle == null) {
                recvHandle = config().getRecvByteBufAllocator().newHandle();
            }
            returnrecvHandle; }}Copy the code

From allocHandle acquisition process we see most allocHandle created by AdaptiveRecvByteBufAllocator# newHandle method of execution.

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    @Override
    public Handle newHandle(a) {
        return new HandleImpl(minIndex, maxIndex, initial);
    }

    private final class HandleImpl extends MaxMessageHandle {
        // Index of the minimum capacity in the scaling index table
        private final int minIndex;
        // The maximum capacity is index in the expansion index table
        private final int maxIndex;
        // The current capacity is in the index table. Index Initial 33 corresponds to capacity 2048
        private int index;
        // The expected size of the next buffer allocation, initial: 2048
        private int nextReceiveBufferSize;
        // Whether to shrink the capacity
        private boolean decreaseNow;

        HandleImpl(int minIndex, int maxIndex, int initial) {
            this.minIndex = minIndex;
            this.maxIndex = maxIndex;

            // Find the capacity greater than or equal to initial in the scaling index table
            index = getSizeTableIndex(initial);
            / / 2048nextReceiveBufferSize = SIZE_TABLE[index]; }... Omit... }}Copy the code

Here we see that the allocHandle used in Netty to dynamically adjust the size of the ByteBuffer is actually of type MaxMessageHandle.

Let’s look at the core fields in HandleImpl, which are related to the size of the ByteBuffer:

  • MinIndex: indicates the index of the minimum capacity in SIZE_TABE. The default is 3.

  • MaxIndex: indicates the index of the maximum capacity in SIZE_TABE. The default is 38.

  • Index: indicates the index of the current capacity in SIZE_TABE. It starts at 33.

  • NextReceiveBufferSize: Expected buffer capacity to be allocated next time. The initial value is 2048. NextReceiveBufferSize specifies the capacity each time ByteBuffer is allocated.

  • decreaseNow: Specifies whether to shrink the capacity.

5. Allocate memory for ByteBuffer using out-of-heap memory

ByteBuffer AdaptiveRecvByteBufAllocator class is only responsible for dynamic adjustment capacity, and specific for ByteBuffer PooledByteBufAllocator memory space is responsible for the application.

5.1 Origin of the class Name prefix Pooled

In Java in daily during the development process, we use the allocated memory space for the object of time we will choose to allocate memory for the object in the JVM heap, Java developers particularly friendly to us, we just use without too much concerned with the application of memory how to recycle, because the JVM heap is completely controlled by the Java virtual machine management, The Java virtual machine helps us reclaim memory that we no longer use.

But the JVM’s stop the world during garbage collection can have an impact on our application’s performance.

When the data reaches the nic, the NIC copies the data to the kernel via DMA. This is the first copy. When a user thread makes a system IO call in user space, the CPU copies the data from the kernel space to user space again. This is the second copy.

The difference is that when we make an IO call in the JVM, for example when we use the JVM heap memory to read data from the Socket receive buffer, there will be an additional memory copy. When the CPU copies data from the kernel space to user space in the second copy, the user space in this case is out-of-heap memory from the PERSPECTIVE of the JVM. So you also need to copy data from off-heap memory to in-heap memory. This is the third memory copy.

Similarly, when we make an IO call in the JVM and write data to the Socket buffer, the JVM copies the IO data to out-of-heap memory before making the system IO call.

So why doesn’t the operating system just use the JVMWithin the heap memoryforIO operations?

Because the MEMORY layout of the JVM is not the same as the memory allocated by the operating system, the operating system cannot read and write data according to the JVM specification, so a third copy is made to copy the data from off-heap memory to the JVM heap.


So based on the above, there are two performance implications when using JVM heap memory:

  1. Stop the world occurs when the JVM garbage collects memory in the heap, causing the application to stall.

  2. When I/O operations are performed, one more copy is made from out-of-heap memory to in-heap memory.

Based on the impact of using in-heap memory of JVM on performance, Netty, which has a strong pursuit of performance, uses out-of-heap memory, namely DirectBuffer, to allocate memory space for ByteBuffer.

The benefits of using out-of-heap memory to allocate memory for ByteBuffer are:

  • Out-of-heap memory is managed directly by the operating system and not by the JVM, so JVM garbage collection has no impact on application performance.

  • When the network data arrives, it is received directly in out-of-heap memory, and when the process reads the network data, it is read directly in out-of-heap memory, so a third memory copy is avoided.

Therefore, Netty uses out-of-heap memory for ALL I/O operations to avoid copying data from JVM heap memory to out-of-heap memory. However, since out-of-heap memory is not managed by the JVM, it needs to pay extra attention to the use and release of memory, which can cause memory leaks. Therefore, Netty introduced the memory pool to manage out-of-heap memory uniformly.

This prefix of the PooledByteBufAllocator classPooledisMemory poolThis class will use Netty’s memory pool to allocate BytebuffersOut of memory.

5.2 Creating a PooledByteBufAllocator

Create time

On the server NioServerSocketChannel configuration class NioServerSocketChannelConfig and client NioSocketChannel configuration class will trigger the Pooled NioSocketChannelConfig instantiation Creation of ByteBufAllocator.

public class DefaultChannelConfig implements ChannelConfig {
    //PooledByteBufAllocator
    private volatileByteBufAllocator allocator = ByteBufAllocator.DEFAULT; . Omit... }Copy the code

The PooledByteBufAllocator instance is saved in the ByteBufAllocator allocator field of the DefaultChannelConfig class.

The creation process

public interface ByteBufAllocator { ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR; . Omit... }Copy the code
public final class ByteBufUtil {

    static final ByteBufAllocator DEFAULT_ALLOCATOR;

    static {
        String allocType = SystemPropertyUtil.get(
                "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
        allocType = allocType.toLowerCase(Locale.US).trim();

        ByteBufAllocator alloc;
        if ("unpooled".equals(allocType)) {
            alloc = UnpooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else if ("pooled".equals(allocType)) {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType); } DEFAULT_ALLOCATOR = alloc; . Omit... }}Copy the code

As you can see from the initialization of the ByteBufUtil class, it is configurable in Netty to use the memory pool when allocating memory for ByteBuffer.

  • You can use the system variable -d io.Netty.allocator. Type to configure whether to use the memory pool to allocate memory for ByteBuffer. Memory pools are required by default. But android doesn’t use memory pools by default.

  • Through PooledByteBufAllocator. The DEFAULT access memory pool ByteBuffer distributor.

   public static final PooledByteBufAllocator DEFAULT =
            new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
Copy the code

Since the main line of this article is to introduce the complete process of OP_READ event processing by the Sub Reactor, this article only introduces the main line related content. It only briefly introduces why PooledByteBufAllocator is used to allocate memory for ByteBuffer when receiving data. The architecture of the memory pool is more complex, so I will write a separate article about Netty memory management.


conclusion

This paper introduces the whole process of OP_READ event processing by Sub Reactor thread. And further analyzes the ByteBuffer capacity AdaptiveRecvByteBufAllocator class dynamic adjustment principle.

It also explains why Netty uses off-heap memory to allocate memory for ByteBuffer, which leads to Netty’s PooledByteBufAllocator.

In introducing AdaptiveRecvByteBufAllocator classes and PooledByteBufAllocator combined together to realize dynamically allocates ByteBuffer capacity, I can’t help but think of article 16 composition over Inheritance in Effective Java, which I read many years ago.

Netty is following the same catch-22 here, as the first two classes are designed for a single function.

  • AdaptiveRecvByteBufAllocator class only responsible for dynamic adjustment ByteBuffer capacity, and regardless of the specific memory allocation.

  • The PooledByteBufAllocator class is responsible for specific memory allocation, in the form of memory pools.

In this way, the design is flexible. Memory allocation is assigned to a specific ByteBufAllocator. The memory allocation mode PooledByteBufAllocator or UnpooledByteBufAllocator can be used. The specific memory can be either in-heap MEMORY (HeapBuffer) or out-of-heap memory (DirectBuffer).

And AdaptiveRecvByteBufAllocator only need to pay attention to adjust their work capacity is ok, and do not need to pay attention to their specific way of memory allocation.

Finally through io.net ty. Channel. RecvByteBufAllocator. Handle# the allocate method and combined different ways of memory allocation. This is also an application of the decorative pattern.

byteBuf = allocHandle.allocate(allocator);
Copy the code

Well, that’s all for today. See ~~~~ for our next article