If you are not familiar with the NETty reactor thread, I recommend that you read the previous article About the SOURCE code for the Netty Reactor thread (1), and then post an illustration of the three steps in the REACTOR thread

We already know that the first step of the Netty Reactor thread is to poll for the SELECTED keys registered with the selector. The next step is to process the selected keys. In this article we will explore netty’s handling of IO events in detail

We go to the REACTOR thread’s run method and find the code for handling IO events as follows

ProcessSelectedKeys ();Copy the code

To go in

private void processSelectedKeys(a) {
    if(selectedKeys ! =null) {
        processSelectedKeysOptimized(selectedKeys.flip());
    } else{ processSelectedKeysPlain(selector.selectedKeys()); }}Copy the code

Netty has two options for handling IO events: optimized selectedKeys and normal handling

Let’s expand a little bit on the optimized selectedKeys and see how Netty optimized it. Let’s look at where selectedKeys are referenced

private SelectedSelectionKeySet selectedKeys;

private Selector NioEventLoop.openSelector() {
    / /...
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    // selectorImplClass -> sun.nio.ch.SelectorImpl
    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    selectedKeysField.setAccessible(true);
    publicSelectedKeysField.setAccessible(true);
    selectedKeysField.set(selector, selectedKeySet);
    publicSelectedKeysField.set(selector, selectedKeySet);
    / /...
    selectedKeys = selectedKeySet;
}
Copy the code

First, selectedKeys is a SelectedSelectionKeySet class object created in the openSelector method of NioEventLoop, SelectedKeys are then bound to the two fields in Sun.nio.ch.selectorImpl by reflection

In sun.nio.ch.selectorImpl we can see that these two fields are actually two hashsets

// Public views of the key sets
private Set<SelectionKey> publicKeys;             // Immutable
private Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition
protected SelectorImpl(SelectorProvider sp) {
    super(sp);
    keys = new HashSet<SelectionKey>();
    selectedKeys = new HashSet<SelectionKey>();
    if (Util.atBugLevel("1.4")) {
        publicKeys = keys;
        publicSelectedKeys = selectedKeys;
    } else{ publicKeys = Collections.unmodifiableSet(keys); publicSelectedKeys = Util.ungrowableSet(selectedKeys); }}Copy the code

When a selector calls a Select () family of methods, if an IO event occurs, it adds the corresponding selectionKey to the two fields, which is equivalent to adding elements to a hashSet. Since Netty replaced the two fields in the JDK with reflection, we should be aware that netty’s custom SelectedSelectionKeySet is optimizing the add method.

With that in mind, let’s dive into the SelectedSelectionKeySet class

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    private SelectionKey[] keysA;
    private int keysASize;
    private SelectionKey[] keysB;
    private int keysBSize;
    private boolean isA = true;

    SelectedSelectionKeySet() {
        keysA = new SelectionKey[1024];
        keysB = keysA.clone();
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        if (isA) {
            int size = keysASize;
            keysA[size ++] = o;
            keysASize = size;
            if(size == keysA.length) { doubleCapacityA(); }}else {
            int size = keysBSize;
            keysB[size ++] = o;
            keysBSize = size;
            if(size == keysB.length) { doubleCapacityB(); }}return true;
    }

    private void doubleCapacityA(a) {
        SelectionKey[] newKeysA = new SelectionKey[keysA.length << 1];
        System.arraycopy(keysA, 0, newKeysA, 0, keysASize);
        keysA = newKeysA;
    }

    private void doubleCapacityB(a) {
        SelectionKey[] newKeysB = new SelectionKey[keysB.length << 1];
        System.arraycopy(keysB, 0, newKeysB, 0, keysBSize);
        keysB = newKeysB;
    }

    SelectionKey[] flip() {
        if (isA) {
            isA = false;
            keysA[keysASize] = null;
            keysBSize = 0;
            return keysA;
        } else {
            isA = true;
            keysB[keysBSize] = null;
            keysASize = 0;
            returnkeysB; }}@Override
    public int size(a) {
        if (isA) {
            return keysASize;
        } else {
            returnkeysBSize; }}@Override
    public boolean remove(Object o) {
        return false;
    }

    @Override
    public boolean contains(Object o) {
        return false;
    }

    @Override
    public Iterator<SelectionKey> iterator(a) {
        throw newUnsupportedOperationException(); }}Copy the code

AbstractSet class can be used as a set, but the bottom layer uses two arrays alternately. In the add method, determine which array is currently used, find the corresponding array, and then go through the following three steps: 1. Insert the SelectionKey at the end of the array. 2. Update the array’s logical length +1 3. If the logical length of the array is equal to its physical length, the array is expanded

As you can see, when the program runs for a while and the array length is long enough, netty only needs O(1) time to insert the SelectionKey into the set every time it polls for a NIO event, whereas the underlying hashSet in the JDK requires O(LGN) time

I thought about it for a long time. I looked up all the places where SelectedSelectionKeySet is used, and I thought I could optimize it by using just one array, and I didn’t have to decide which array to use every time. So for this problem, I submitted an issue to the netty official, who also replied that they would follow up the issue link: github.com/netty/netty… Version, netty has SelectedSelectionKeySet. Java underlying using an array, link

About netty the optimization of SelectionKeySet we temporarily like so many, let’s continue with netty’s handling of IO events, processSelectedKeysOptimized

 private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
     for (int i = 0;; i ++) {
         // 1. Fetch the I/O event and its corresponding channel
         final SelectionKey k = selectedKeys[i];
         if (k == null) {
             break;
         }
         selectedKeys[i] = null;
         final Object a = k.attachment();
         // 2. Process the channel
         if (a instanceof AbstractNioChannel) {
             processSelectedKey(k, (AbstractNioChannel) a);
         } else {
             NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
             processSelectedKey(k, task);
         }
         // 3. Determine whether polling should be performed again
         if (needsToSelectAgain) {
             for (;;) {
                 i++;
                 if (selectedKeys[i] == null) {
                     break;
                 }
                 selectedKeys[i] = null;
             }
             selectAgain();
             selectedKeys = this.selectedKeys.flip();
             i = -1; }}}Copy the code

We can divide this process into three steps

1. Obtain the I/O events and the corresponding Netty Channel class

The optimized SelectedSelectionKeySet iterates over arrays, making it more efficient than the JDK’s native HashSet

Set selectedKeys[I] to null after getting the current SelectionKey. Imagine a scenario where a NioEventLoop polls on average for N IO events and a peak poll for 3N events. Then the physical length of selectedKeys must be greater than or equal to 3N. If selectedKeys[I] is left empty each time these keys are processed, then once the peak has passed, The selectedKeys[I] that are stored at the end of the array will never be collected, and the SelectionKey might not be a big object, but it has an attachment, and we’ll talk about that in a second, The attachment can be very large, so the attachment is reachable by the GC root, which can easily lead to GC failures and memory leaks

This bug was fixed in 4.0.19.final and it is recommended that projects using Netty be upgraded to the latest version ^^

2. Process the channel

After getting the corresponding attachment, Netty made the following judgment

if (a instanceof AbstractNioChannel) {
    processSelectedKey(k, (AbstractNioChannel) a);
} 
Copy the code

So we need to think about why the attachment might be an AbstractNioChannel object.

So the idea is to find the underlying selector, and then when the selector calls the register method, see what the hell is going on with the object registered to the selector, and we use IntelliJ’s global search for references, Finally, AbstractNioChannel searches for the following methods

protected void doRegister(a) throws Exception {
    // ...
    selectionKey = javaChannel().register(eventLoop().selector, 0.this);
    // ...
}
Copy the code

JavaChannel () returns a JDK underlying channel object corresponding to netty class AbstractChannel

protected SelectableChannel javaChannel(a) {
    return ch;
}
Copy the code

We look at the SelectableChannel method, combined with Netty’s doRegister() method, we can easily deduce that, AbstractNioChannel’s JDK SelectableChannel object is registered with the JDK Selctor object. AbstractNioChannel is attached as an attachment to a SelectableChannel object, so that when the JDK polls a SelectableChannel for IO events, You can pull out AbstractNioChannel directly for further operations

Here are the register methods in the JDK

     / / *
     //* @param sel
     //* The selector with which this channel is to be registered
     / / *
     //* @param ops
     //* The interest set for the resulting key
     / / *
     //* @param att
     //* The attachment for the resulting key; may be null
public abstract SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException;
Copy the code

ProcessSelectedKey (SelectionKey K, AbstractNioChannel CH) processSelectedKey(SelectionKey K) For boss NioEventLoop, polling is basically a connection event, and the next thing to do is throw the connection to a worker NioEventLoop through his pipeline 2. For worker NioEventLoop, polling is basically IO read and write events, and the next thing to do is pass the read bytes through his pipeline to each channelHandler for processing

When we do the attachment up there, we have an else branch, so let’s look at the else part of the code as follows

NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
Copy the code

There’s another type of attachment registered to selctor, which is NioTask, and NioTask is basically used to perform tasks when a SelectableChannel registers a selector

The definition of NioTask

public interface NioTask<C extends SelectableChannel> {
    void channelReady(C ch, SelectionKey key) throws Exception;
    void channelUnregistered(C ch, Throwable cause) throws Exception;
}
Copy the code

Since NioTask has no place to use within Netty, it will not be expanded much here

3. Determine whether to perform polling again

if (needsToSelectAgain) {
    for (;;) {
        i++;
        if (selectedKeys[i] == null) {
            break;
        }
        selectedKeys[i] = null;
    }
    selectAgain();
    selectedKeys = this.selectedKeys.flip();
    i = -1;
}
Copy the code

Remember that the Netty REACTOR thread went through the first two steps of capturing the GENERATED I/O event and processing the I/O event. Each time after capturing the I/O event, the needsToSelectAgain was reset to false. When will needsToSelectAgain be set to True?

In the same vein, we use Intellij to help us see where needsToSelectAgain is used. In the NioEventLoop class, the only place where needsToSelectAgain is set to true is below

NioEventLoop.java

void cancel(SelectionKey key) {
    key.cancel();
    cancelledKeys ++;
    if (cancelledKeys >= CLEANUP_INTERVAL) {
        cancelledKeys = 0;
        needsToSelectAgain = true; }}Copy the code

Continue to see where the cancel function is called

AbstractChannel.java

@Override
protected void doDeregister(a) throws Exception {
    eventLoop().cancel(selectionKey());
}
Copy the code

As you can see, when the channel is removed from the selector, call the cancel function to cancel the key, and when the removed key reaches CLEANUP_INTERVAL, Set needsToSelectAgain to true,CLEANUP_INTERVAL defaults to 256

private static final int CLEANUP_INTERVAL = 256;
Copy the code

That is, for each NioEventLoop, mark needsToSelectAgain to true every 256 channels removed from the selector, so we’ll jump back to the code above

if (needsToSelectAgain) {
    for (;;) {
        i++;
        if (selectedKeys[i] == null) {
            break;
        }
        selectedKeys[i] = null;
    }
    selectAgain();
    selectedKeys = this.selectedKeys.flip();
    i = -1;
}
Copy the code

SelectedKeys = selectionKey = selectionKey = selectionKey = selectionKey = selectionKey = selectionKey = selectionKey = selectionKey = selectionKey = selectionKey = selectionKey = selectionKey = selectionKey = selectionKey

private void selectAgain(a) {
    needsToSelectAgain = false;
    try {
        selector.selectNow();
    } catch (Throwable t) {
        logger.warn("Failed to update SelectionKeys.", t); }}Copy the code

The purpose of netty’s selectionKey is to clean up the existing selectionKey every 256 times a channel is disconnected

This is enough of reactor’s second step from our first reading of the source code. To sum up: The second thing that Netty’s REACTOR thread does is to process IO events. Netty uses arrays to replace the JDK’s native HashSet to ensure efficient PROCESSING of IO events. AbstractChannel, a netty class, is bound to each SelectionKey as an attachment. AbstractChannel can be found when processing each SelectionKey. The processing is then pipelined to the ChannelHandler, which calls back to the user method

In the next article, we’ll take a look at the last step in the Reactor thread, runTasks, and you’ll learn more about how asynchronous tasks work in Netty

If you want to systematically learn Netty, my little book “Netty introduction and Actual Practice: Imitate writing wechat IM instant messaging system” can help you, if you want to systematically learn Netty principle, then you must not miss my Netty source code analysis series video: Coding.imooc.com/class/230.h…