This series of Netty source code analysis articles is based on the 4.1.56.final version

In this paper, THE author will introduce the operation architecture of Reactor, the core engine of Netty, hoping to give you a comprehensive understanding of how Reactor drives the operation of the entire Netty framework. Also for us to further introduce Netty on the whole network request lifecycle related content to do a pre-knowledge foreshadows, convenient for everyone to follow up understanding.

So before beginning in this paper, the content of the formal, the author first review under the front article with everybody about Netty how to set up the framework of the related content, haven’t seen the author a few articles in front of the reader friends it doesn’t matter, these will not affect the reading of this article, only involves the relevant details, we can under the back to see.

These reviews

In this article, we describe the process of creating the Netty server’s core engine, the Reactor thread group, and the key attributes of the core components. In the process, we also mentioned the various details of Netty’s optimization, such as the JDK NIO native Selector optimization, which shows Netty’s pursuit of extreme performance. We eventually created a Reactor with the following structure.

In the last article “The Netty Reactor Startup Process”, we introduced the entire process of Netty server startup, and the ServerBootstrap properties and configuration methods involved in the startup process. The creation and initialization of the server NioServerSocketChannel used to receive connections and the inheritance structure of its classes. The registration process of NioServerSocketChannel to Reactor and the starting time of Reactor thread and the initialization time of pipeline are introduced emphatically. Finally, the whole process of NioServerSocketChannel binding port address is introduced. In the process, we learned how these core components of Netty are strung together.

When Netty is started, we have the following framework:

The primary Reactor thread group manages the NioServerSocketChannel that receives client connections and initializes them in the ServerBootstrapAcceptor of its own pipeline. The initialized client connection is then registered with the Reactor thread group.

The Reactor thread group is primarily responsible for listening for IO ready events for all client connections registered on it.

A Channel can only be assigned to a fixed Reactor. A Reactor is responsible for processing IO ready events on multiple channels, so that the full amount of client connections hosted by the server can be distributed among multiple reactors and the THREAD safety of IO processing on the Channel can be guaranteed. The corresponding relationship between Reactor and Channel is shown in the figure below:

The above content is a review of the relevant content of the previous several articles of the author. It is better if you can recall them, but it doesn’t matter if you can’t recall them. It doesn’t affect your understanding of the content of this article. If you are interested in the details, you can go back and read it after reading this article.

We’ll begin this article by explaining how these core components work together to drive the Netty Reactor framework.


Once the Netty Reactor framework was up and running, the first and most important thing was how to efficiently receive client connections.

So before we can talk about how Netty servers receive connections, we need to understand how the Reactor thread works, how it listens for and processes IO ready events on a Channel.

This article is a prequel to introduce the Reactor thread to handle ACCEPT, Read and Write events. This article focuses on the entire operating framework of the Reactor thread. Understanding this article will go a long way toward understanding how the Reactor thread handles IO events.

We’ve talked about the Reactor thread countless times during the creation and startup phases of the Netty framework, so it’s the Reactor thread that will shine during the run phase of this article.

From the previous article, we learned that the Reactor thread in Netty does three things:

  • Polls all channels registered with the Reactor for IO ready events of interest.

  • Handles IO ready events on channels.

  • Execute asynchronous tasks in Netty.

It is these three parts that constitute the operating framework of Reactor. Now let’s see how the operating framework works

The entire operating framework for the Reactor thread

Remember I mentioned in my post on The IO model from a Kernel Perspective on Netty stuff that the IO model evolved around the theme of how to manage as many connections as possible with as few threads as possible?

The IO model of Netty is implemented by JDK NIO Selector, while the IO thread model of Netty is mainly a Reactor thread model.

It’s easy to understand that Netty uses a user-mode Reactor thread to continuously test for I/O ready events in a kernel-mode Channel by Selector.

The Reactor thread executes an infinite loop, in which it continuously evaluates the I/O ready event by Selector, returns it from the Selector system call and processes the I/O ready event if it occurs. If no IO ready event occurs, the Selector system call blocks until the Selector wakeup condition is met.

The Reactor thread is awakened from Selector if any of the following three criteria are met:

  • When the Selector polls for an IO active event.

  • When the scheduled task required by the Reactor thread reaches the deadline.

  • When an asynchronous task is submitted to the Reactor, the Reactor thread needs to wake up from the Selector so that it can execute the asynchronous task in time.

There are no IO ready events for the Reactor thread to process. The Reactor thread needs to wake up and move to the asynchronous and scheduled tasks that are submitted. The Reactor thread is a model that runs all the time.

Now that we know the Reactor thread framework, let’s go to the source code to see how the core framework is implemented.

Because this piece of source code is larger and multifarious, so the author first extracted its operating framework, convenient for everyone to understand the whole operation process.

The diagram above shows Reactor’s overall working system, which is divided into the following important working modules:

  1. The Reactor thread blocks an IO ready event at Selector. In this module, we first check to see if there’s an asynchronous task that needs to be executed, and if there’s an asynchronous task that needs to be executed, we don’t block on the Selector whether there’s an IO ready event or not, and then we poll the non-blocking Selector to see if there’s an IO ready event, and if there’s an IO ready event, we can execute it with the asynchronous task. It preferentially processes I/O ready events and executes asynchronous tasks.

  2. If no asynchronous task needs to be executed, the Reactor thread then checks to see if any timed task needs to be executed, and if so blocks on the Selector until the timed task’s deadline expires, or the Reactor thread is awakened if it meets any other wake-up criteria. If there is no scheduled task to execute, the Reactor thread blocks on the Selector until the wakeup condition is met.

  3. When the Reactor thread meets the wake-up criteria and is awakened, it first determines whether it is awakened due to an IO ready event, an asynchronous task, or both. The Reactor thread then handles IO ready events and performs asynchronous tasks.

  4. Finally, the Reactor thread returns to the beginning of the loop and repeats all three steps.

The above is the entire core logic of Reactor thread operation. The following is the author’s overall code design framework extracted according to the above core logic. You can combine the above Reactor flow chart to feel the whole source code implementation framework. It is necessary to correlate the Reactor’s core processing steps with the corresponding processing modules in the code. It is not necessary to understand every line of code, but to understand it by logical processing modules. Later I will carry out these logic processing modules one by one in detail for you.

  @Override
    protected void run(a) {
        // Record the number of polls used to solve the empty poll bug in JDK epoll
        int selectCnt = 0;
        for (;;) {
            try {
                // Polling result
                int strategy;
                try {
                    // Obtain polling results according to polling policies. Here hasTasks() mainly checks whether there are asynchronous tasks waiting to be executed in ordinary queues and tail queues
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // NIO does not support spin (BUSY_WAIT)

                    caseSelectstrategy. SELECT: If the core logic is that there is a task to execute, the Reactor thread executes the asynchronous task immediately. If there is no asynchronous task to execute, the Reactor thread polls for I/O eventsdefault:}}catch(IOException e) { ................ Omit... } The Reactor thread is awakened from selector to process IO ready events and perform asynchronous tasks/** * THE Reactor thread needs to execute asynchronous tasks in a timely manner. Once an asynchronous task is submitted, it needs to exit polling. * If there are IO events, I/O events are processed first, and then asynchronous tasks are processed * */

                selectCnt++;
                // It is mainly used to remove invalid selectkeys from the IO ready SelectedKeys collection
                needsToSelectAgain = false;
                // Adjust the RATIO of THE CPU time used by the Reactor thread to execute I/O events and asynchronous tasks The default value is 50, which indicates that the RATIO of THE CPU time used to execute I/O events and asynchronous tasks is 1:1
                final int ioRatio = this.ioRatio; In this section, I/O ready events are processed first, and asynchronous tasks need to be processed first. The time for executing asynchronous tasks is determined based on the ratio of the CPU usage for processing I/O events to that for asynchronous tasks// Check whether the JDK Epoll BUG triggers empty polling
                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    // The Reactor thread woke up abnormally from Selector and triggered the JDK Epoll training BUG
                    // Rebuild Selector,selectCnt to zero
                    selectCnt = 0; }}catch(CancelledKeyException e) { ................ Omit... }catch(Error e) { ................ Omit... }catch(Throwable t) { ................ Omit... }finally{... Omit... }}}Copy the code

From the above source code implementation framework, we can see that the Reactor thread mainly does the following things:

  1. throughJDK NIO SelectorPoll registration atReactorAll theChannelInterested inIO events. For NioServerSocketChannel, because it receives client connections, it listens forOP_ACCEPT eventFor the client NioSocketChannel, because it handles read and write events on the connection, it listens forOP_READandOP_WRITEEvents.

Note that Netty only automatically registers the OP_READ event, whereas the OP_WRITE event is registered by the user when the Socket write buffer becomes too full to continue writing and sending data.

  1. If asynchronous tasks need to be executed, stop polling immediately and switch to asynchronous tasks. There are two cases:

    • Both IO – ready events occur and asynchronous tasks need to be executed. Then, I/O ready events are processed first, and asynchronous tasks are executed based on the ioRatio ratio. Here, the Reactor thread needs to control the execution time of asynchronous tasks, because the core of the Reactor thread is to process I/O ready events, so it cannot delay the most important things because of the asynchronous task execution.

    • No I/O ready event occurs, but an asynchronous task or scheduled task needs to be executed. Perform only asynchronous tasks, squeezing the Reactor thread as much as possible. You can’t be idle without an IO ready event.

    In the second case, only 64 asynchronous tasks will be executed. The purpose is to prevent over-execution of asynchronous tasks and delay polling for I/O events that are most important.

  2. In the end, Netty will determine whether the Reactor thread woke up because of a JDK epoll empty polling BUG. If it did, it will rebuild the Selector. Bypass JDK BUG, to solve the problem.

Normally, the Reactor thread wakes up from a Selector in two ways:

  • An IO ready event occurred in the polling. Procedure
  • Asynchronous tasks or scheduled tasks need to be executed.

The JDK ePoll empty polling BUG causes the Reactor thread to accidentally wake up from the Selector, causing the CPU to idle when neither of these conditions occurs.

The JDK epoll empty polling BUG:bugs.java.com/bugdatabase…

Now that the Reactor thread’s overall operational framework is understood, let’s dive into each of these core processing modules to break them down

1. The Reactor thread polls for I/O ready events

SelectStrategyFactory is a constructor parameter used in the Reactor thread group NioEventLoopGroup.

   public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

  public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
Copy the code

Reactor thread is one of the most important thing is the polling IO ready event, SelectStrategyFactory is used to specify the polling strategy, the default implementation for DefaultSelectStrategyFactory. The INSTANCE.

At the beginning of the Reactor thread starting polling, the selectStrategy is used to calculate a polling strategy strategy, and then different logical processing will be carried out according to this strategy.

  @Override
    protected void run(a) {
        // Record the number of polls used to solve the empty poll bug in JDK epoll
        int selectCnt = 0;
        for (;;) {
            try {
                // Polling result
                int strategy;
                try {
                    // Obtain polling results according to polling policies. Here hasTasks() mainly checks whether there are asynchronous tasks waiting to be executed in ordinary queues and tail queues
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // NIO does not support spin (BUSY_WAIT)

                    caseSelectstrategy. SELECT: If the core logic is that there is a task to execute, the Reactor thread executes the asynchronous task immediately. If there is no asynchronous task to execute, the Reactor thread polls for I/O eventsdefault:}}catch(IOException e) { ................ Omit... }... Omit... }Copy the code

What is the computational logic of this polling strategy?

1.1 Polling Strategy

public interface SelectStrategy {

    /** * Indicates a blocking select should follow. */
    int SELECT = -1;
    /** * Indicates the IO loop should be retried, no blocking select to follow directly. */
    int CONTINUE = -2;
    /** * Indicates the IO loop to poll for new events without blocking. */
    int BUSY_WAIT = -3;

    int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}
Copy the code

Let’s start by looking at the three polling strategies defined in Netty:

  • Selectstrategy. SELECT: There are no asynchronous tasks to execute, and the Reactor thread can safely block on the Selector waiting for an IO ready event.

  • Selectstrategy. CONTINUE: Restarts an I/O poll.

  • BUSY_WAIT: The Reactor thread polls for spin. NIO does not support spin operations, so we skip to selectStrategy. SELECT.

Let’s take a look at the calculateStrategy of the polling strategy:

final class DefaultSelectStrategy implements SelectStrategy {
    static final SelectStrategy INSTANCE = new DefaultSelectStrategy();

    private DefaultSelectStrategy(a) {}@Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        /** * THE Reactor thread needs to execute asynchronous tasks in a timely manner * 1: if there is an asynchronous task waiting to be executed, perform a non-blocking poll for an I/O ready event * 2: if there is no asynchronous task, skip to switch SELECT ** /
        returnhasTasks ? selectSupplier.get() : SelectStrategy.SELECT; }}Copy the code
  • inReactor threadBefore starting the polling, you need to determine whether the polling is currently availableAsynchronous tasksNeed to be executed. The judgment is to lookReactorAsynchronous task queue intaskQueueAnd tail queues for statistics taskstailTaskIs there aAsynchronous tasks.
    @Override
    protected boolean hasTasks(a) {
        return super.hasTasks() || ! tailTasks.isEmpty(); }protected boolean hasTasks(a) {
        assert inEventLoop(a);
        return! taskQueue.isEmpty(); }Copy the code
  • ifReactorThere areAsynchronous tasksNeed to execute, thenReactor threadIt needs to be executed immediately and cannot be blocked inSelectorOn. It needs to be called again in passing before returningselectNow()Non-blocking to see if it currently existsIO ready eventTo happen. If so, it’s a perfect matchAsynchronous tasksAre processed together, or if not, promptly processedAsynchronous tasks.

First, the Reactor thread needs to ensure the processing of IO ready events first, and then ensure the timely execution of asynchronous tasks. If there is no I/O ready event but there is an asynchronous task that needs to be executed, the Reactor thread needs to execute the asynchronous task in time rather than continue to block on the Selector waiting for the I/O ready event.

   private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get(a) throws Exception {
            returnselectNow(); }};int selectNow(a) throws IOException {
        / / non-blocking
        return selector.selectNow();
    }
Copy the code
  • If the currentReactor threadThere are no asynchronous tasks to execute, thencalculateStrategy Method returns directlySelectStrategy.SELECTThat isSelectStrategy interfaceConstants defined in- 1. whencalculateStrategy Methods byselectNow()returnnonzeroValue: indicates that there isIO readytheChannel, the returned value indicates how manyIO readytheChannel.
  @Override
    protected void run(a) {
        // Record the number of polls used to solve the empty poll bug in JDK epoll
        int selectCnt = 0;
        for (;;) {
            try {
                // Polling result
                int strategy;
                try {
                    // Obtain polling results according to polling policies. Here hasTasks() mainly checks whether there are asynchronous tasks waiting to be executed in ordinary queues and tail queues
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // NIO does not support spin (BUSY_WAIT)

                    caseSelectstrategy. SELECT: If the core logic is that there is a task to execute, the Reactor thread executes the asynchronous task immediately. If there is no asynchronous task to execute, the Reactor thread polls for I/O eventsdefault:}}catch(IOException e) { ................ Omit... }... Handle IO ready events and perform asynchronous tasks............... }Copy the code

As we can see from the default polling strategy selectStrategy. CalculateStrategy returns only three of the following:

  • Return -1: the switch logic branch enters the selectStrategy. SELECT branch, indicating that there is no asynchronous task to execute in the Reactor and that the Reactor thread can safely block on the Selector waiting for an IO ready event to occur.

  • 0: The switch logical branch enters the default branch, indicating that there is no IO ready event in the Reactor but asynchronous task needs to be executed. The process directly enters the logical part of processing asynchronous task through the default branch.

  • If > 0 is returned, the switch logical branch enters the default branch, indicating that both IO ready events and asynchronous tasks need to be executed in the Reactor, and the process directly enters the logical part of processing IO ready events and executing asynchronous tasks through the default branch.

Now that the Reactor’s process logic is clear, let’s focus on the polling logic in the SelectStrategy.select branch. This is the core of Reactor’s monitoring of IO readiness events.

1.2 Polling logic

                    case SelectStrategy.SELECT:
                        // There is no asynchronous task currently running, so the Reactor thread can safely block and wait for IO ready events

                        // Retrieves the deadline of the scheduled task from the scheduled task queue
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            // -1 indicates that no scheduled task exists in the scheduled task queue
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }

                        // The earliest deadline of the scheduled task is used as the deadline of the select, meaning that the execution time of the scheduled task has reached
                        // Regardless of the IO ready event, the selector must be awakened so that the reactor thread performs a timed task
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if(! hasTasks()) {// Check again if there are asynchronous tasks in the normal task queue
                                // If not, start select blocking polling for IO ready eventsstrategy = select(curDeadlineNanos); }}finally {
                            // The Reactor has been awakened from the Selector
                            // Set the Reactor's status to AWAKE
                            // lazySet optimizes unnecessary volatile operations, does not use memory barriers, and does not guarantee visibility of write operations (single thread is not required)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
Copy the code

The process goes here, indicating that the Reactor now has nothing to do, and can safely block on the Selector waiting for the IO ready event to arrive.

thenReactor threadShould be in theSelectorHow long is the upper block?

Before answering this question, the Reactor thread polls the Channel for IO ready events and processes IO ready events. In addition, the Reactor thread polls the Channel for IO ready events. Another task is executing asynchronous tasks in the Netty framework.

Asynchronous tasks in the Netty framework fall into three categories:

  • Ordinary asynchronous tasks stored in the ordinary taskQueue taskQueue.

  • TailTasks stored in tailqueue tailTasks for performing end actions such as statistical tasks.

  • And then there are the timed tasks that we’ll be talking about here. Stored in scheduledTaskQueue of the Reactor.

As you can see from the inheritance structure in the ReactorNioEventLoop class, Reactor has the ability to perform scheduled tasks.

Since the Reactor needs to perform timed tasks, it can’t keep blocking on a Selector indefinitely waiting for an I/O ready event.

So let’s go back to the problem mentioned at the beginning of this section. In order to ensure that the Reactor can execute the scheduled task in time, the Reactor thread needs to be awakened before the first scheduled task deadline arrives.

So before the Reactor thread starts polling for IO ready events, we first need to calculate the blocking timeout time for the Reactor thread on the Selector.

1.2.1 Poll timeout of the Reactor

First we need to fetch the deadline of the scheduled task from the Reactor’s scheduledTaskQueue. Use this deadline as the timeout for the Reactor thread to poll on Selector. This ensures that the Reactor can now wake up from the Selector in time when a scheduled task is about to be executed.

    private static final long AWAKE = -1L;
    private static final long NONE = Long.MAX_VALUE;

    // nextWakeupNanos is:
    // AWAKE when EL is awake
    // NONE when EL is waiting with no wakeup scheduled
    // other value T when EL is waiting with wakeup scheduled at time T
    private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);

      long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
      if (curDeadlineNanos == -1L) {
            // -1 indicates that no scheduled task exists in the scheduled task queue
            curDeadlineNanos = NONE; // nothing on the calendar
      }

      nextWakeupNanos.set(curDeadlineNanos);
Copy the code
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { PriorityQueue<ScheduledFutureTask<? >> scheduledTaskQueue;protected final long nextScheduledTaskDeadlineNanos(a) { ScheduledFutureTask<? > scheduledTask = peekScheduledTask();returnscheduledTask ! =null ? scheduledTask.deadlineNanos() : -1;
    }

    finalScheduledFutureTask<? > peekScheduledTask() { Queue<ScheduledFutureTask<? >> scheduledTaskQueue =this.scheduledTaskQueue;
        returnscheduledTaskQueue ! =null ? scheduledTaskQueue.peek() : null; }}Copy the code

NextScheduledTaskDeadlineNanos method returns the current Reactor timed task queue in a recent task deadline point in time, if there is no timing task timing task queue, it returns 1.

The nextWakeupNanos variable in NioEventLoop is used to store the time at which the Reactor wakes up from the Selector. It is set to the deadline of the last scheduled task that needs to be executed. If there is no scheduled task that needs to be executed, Set it to long. MAX_VALUE and block until an IO ready event arrives or an asynchronous task needs to be executed.

1.2.2 Reactor Starts polling for I/O readiness events

     if(! hasTasks()) {// Check whether there are asynchronous tasks in the normal task queue again, if not, start select blocking polling IO ready events
            strategy = select(curDeadlineNanos);
     }
Copy the code

The Reactor thread also needs to check again to see if any asynchronous tasks need to be executed before it starts blocking polling IO ready events.

If asynchronous tasks happen to be submitted, you need to stop polling for I/O ready events and switch to asynchronous tasks. If there are no asynchronous tasks, polling for IO ready events starts.

    private int select(long deadlineNanos) throws IOException {
        if (deadlineNanos == NONE) {
            // No scheduled task, no common task execution, start polling IO ready events, no block until the wake up condition is established
            return selector.select();
        }

        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;

        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }
Copy the code

If deadlineNanos == NONE, we know from the previous section that NONE indicates that there is no scheduled task in the Reactor currently, so it is safe to block on the Selector and wait for the IO ready event to arrive.

The selector. Select () call is a blocking call, and if there is no IO ready event, the Reactor thread will block there until the IO ready event arrives. The null polling BUG of the JDK NIO Epoll mentioned earlier is not considered here.

At this point, the Reactor thread is blocking on the selector. Select () call waiting for an IO ready event. If there is an asynchronous task submitted to the Reactor that needs to be executed, and there are no IO ready events, The Reactor thread continues to block because there are no IO ready events coming, so how do I perform asynchronous tasks?

Since the asynchronous task is expected to be executed as soon as it is submitted, wake up the Reactor thread when the asynchronous task is submitted.

    AddTaskWakesUp = true means that the Reactor thread is awakened if and only if the addTask method is called
    AddTaskWakesUp = false Indicates that the addTask method is not the only way to wake up the Reactor. There are other ways to wake up the Reactor that default to false
    private final boolean addTaskWakesUp;

    private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if(! inEventLoop) {// Start the Reactor thread if the current thread is not a Reactor thread
            The Reactor thread was started by adding an asynchronous task to the NioEventLoopstartThread(); . Omit... }if(! addTaskWakesUp && immediate) {//io.netty.channel.nio.NioEventLoop.wakeupwakeup(inEventLoop); }}Copy the code

I’m sure you’re familiar with the EXECUTE method. We introduced it in our last article, The Netty Reactor Startup Process, when we introduced the START of the Reactor thread.

Register and Bind operations involved in the startup process should be encapsulated as asynchronous tasks and submitted to Reactor for execution through this method.

Here we focus on the wakeup logic in the second half of the execute method.

Boolean immediate and Boolean addTaskWakesUp are the two parameters associated with wakeUp logic.

  • Immediate: Indicates whether the submitted task needs to be executed immediately. In Netty, any task that you submit is not of the LazyRunnable type needs to be executed immediately. immediate = true

  • AddTaskWakesUp: true means that the Reactor thread is awakened if and only if the addTask method is called. Calling another method does not wake up the Reactor thread.

When the NioEventLoop is initialized, it is set to false to indicate that there are other methods that can be used to wake up the Reactor thread, such as the EXECUTE method.

For this wake up condition in the execute method! AddTaskWakesUp && immediate, netty If the immediate parameter is true, the asynchronous task needs to be executed immediately. The default value of addTaskWakesUp is false, which means that not only addTask can wake up the Reactor, but other methods, such as execute, can also wake up the Reactor. When set to true, however, the semantics become that only addTask can wake up the Reactor, even if the execute immediate = true method does not wake up the Reactor because the execute method is not addTask.

    private static final long AWAKE = -1L;
    private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);

    protected void wakeup(boolean inEventLoop) {
        if(! inEventLoop && nextWakeupNanos.getAndSet(AWAKE) ! = AWAKE) {// Wakes up the Reactor thread from Selectorselector.wakeup(); }}Copy the code

When nextWakeupNanos = AWAKE, it indicates that the current Reactor is AWAKE. Since it is AWAKE, there is no need to perform selector. Wakeup () to wake the Reactor repeatedly, which also saves the system call overhead this time.

In the source code implementation framework introduced at the beginning of Section 1.2 Polling Logic, Reactor’s code is finally{… } statement block, where nextWakeupNanos is set to AWAKE.

                        try {
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // The Reactor has been awakened from the Selector
                            // Set the Reactor's status to AWAKE
                            // lazySet optimizes unnecessary volatile operations, does not use memory barriers, and does not guarantee visibility of write operations (single thread is not required)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
Copy the code

Netty uses an AtomicLong variable, nextWakeupNanos, which can represent both the current Reactor thread state and the block timeout time of the Reactor thread. We can also learn this technique in daily development.


Let’s go back to the Reactor thread polling for IO ready events.

    private int select(long deadlineNanos) throws IOException {
        if (deadlineNanos == NONE) {
            // No scheduled task, no common task execution, start polling IO ready events, no block until the wake up condition is established
            return selector.select();
        }

        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;

        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }
Copy the code

When deadlineNanos is not NONE, it indicates that the Reactor has a scheduled task to execute, and the Reactor thread blocks on the Selector until the last scheduled task deadline arrives.

DeadlineNanos refers to the deadline of the last scheduled task in Reactor, in nanoseconds. Refers to an absolute time.

And what we need to calculate is the timeoutMillis for the Reactor thread to block at Selector, in milliseconds, which is a relative time.

So before the Reactor thread starts blocking on the Selector, we need to convert the absolute time deadlineNanos in nanoseconds to a relative time timeoutMillis in milliseconds.

    private int select(long deadlineNanos) throws IOException {
        if (deadlineNanos == NONE) {
            // No scheduled task, no common task execution, start polling IO ready events, no block until the wake up condition is established
            return selector.select();
        }

        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;

        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }
Copy the code

And you might be wondering here, passDeadlineToDelayNanos methodTo calculatetimeoutMillisWhen, why should givedeadlineNanosOn the plus0.995 milliseconds??

Imagine a scenario where the deadline for the most recent scheduled task is about to arrive in 5 microseconds, and the timeoutMillis calculated by converting nanoseconds to milliseconds is 0.

In Netty, timeoutMillis = 0 indicates that the scheduled task execution time has reached the deadline and needs to be executed.

In reality, there are 5 microseconds left for the scheduled task to reach the deadline, so in this case, one millisecond should be added to the 0.995 milliseconds in deadlineNanos to make it not 0.

So, the Reactor blocks at least 1 millisecond for a timed task.

public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {

    protected static long deadlineToDelayNanos(long deadlineNanos) {
        returnScheduledFutureTask.deadlineToDelayNanos(deadlineNanos); }}Copy the code
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {

    static long deadlineToDelayNanos(long deadlineNanos) {
        return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime());
    }

    // Start time
    private static final long START_TIME = System.nanoTime();

    static long nanoTime(a) {
        return System.nanoTime() - START_TIME;
    }

    static long deadlineNanos(long delay) {
        // Calculate the start time of the scheduled task execution deadline
        long deadlineNanos = nanoTime() + delay;
        // Guard against overflow
        return deadlineNanos < 0? Long.MAX_VALUE : deadlineNanos; }}Copy the code

Note that when a scheduled task is created, the deadlineNanos method is used to calculate the execution deadline of the scheduled task. The deadline calculation logic is the current time point + task delay- system startup time. This deducts the system startup time.

Therefore, when calculating the delay (timeout) by deadline, the system startup time should be added: deadlinenanos-nanotime ()

When timeoutMillis calculated by deadlineToDelayNanos <= 0, it indicates that the Reactor currently has an adjacent timed task to execute, so it needs to return immediately without blocking the Selector to affect the timed task execution. Before returning to execute the scheduled task, you need to poll the Channel with selector. SelectNow () to see if any IO ready events have arrived, so as not to delay the processing of IO events. It’s so frustrating

When timeoutMillis > 0, the Reactor thread can safely block on the Selector and wait for the IO event to arrive until the timeoutMillis timeout is reached.

timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis)
Copy the code

When an IO event arrives in a Channel registered with the Reactor, the Reactor thread wakes up from the selector. Select (timeoutMillis) call to process the IO ready event immediately.

In the extreme case, if the deadline of the last scheduled task is far in the future, which makes timeoutMillis very, very long, wouldn’t the Reactor keep blocking the Selector and making Netty fail?

I think you have the answer in mind by now. As we explained at the beginning of 1.2.2 Reactor Starts polling FOR I/O ready events, when the Reactor is blocking at Selector, if the user thread submits an asynchronous task to the Reactor, The Reactor thread is woken up using the execute method.


This is the core logic of a Reactor: polling IO ready events on a Channel.

When the Reactor polls for an I/O active event or an asynchronous task that needs to be executed, it wakes up from the Selector. It explains how the Reactor handles I/O ready events and how the asynchronous task needs to be executed.

Because Netty is a network framework, it will preferentially handle I/O events on a Channel, so Netty will not tolerate asynchronous tasks being executed without limit to affect I/O throughput.

Netty uses the ioRatio variable to allocate CPU time between processing I/O events and performing asynchronous tasks for the Reactor thread.

Let’s take a look at the execution time ratio allocation logic

2. Time ratio for PROCESSING I/O and asynchronous tasks

Whenever there is an I/O ready event, the Reactor needs to ensure that THE I/O event is processed in a timely and complete manner. IoRatio mainly limits the time required to execute asynchronous tasks, preventing the Reactor thread from processing asynchronous tasks for too long and causing I/O events not to be processed in a timely manner.

                // Adjust the RATIO of THE CPU time used by the Reactor thread to execute I/O events and asynchronous tasks The default value is 50, which indicates that the RATIO of THE CPU time used to execute I/O events and asynchronous tasks is 1:1
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) { // Perform asynchronous tasks in one cluster (no time limit)
                    try {
                        if (strategy > 0) {
                            // Handle IO ready events if there are anyprocessSelectedKeys(); }}finally {
                        // Ensure we always run tasks.
                        // Handle all asynchronous tasksranTasks = runAllTasks(); }}else if (strategy > 0) {IoTime An asynchronous task can be executed only with ioTime * (100 - ioRatio)/ioRatio
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        // Limiting asynchronous tasks to a timeout prevents the Reactor thread from processing asynchronous tasks for too long and blocking I/O events
                        ranTasks = runAllTasks(ioTime * (100- ioRatio) / ioRatio); }}else { If there is no I/O ready event processing, only asynchronous tasks can be executed. A maximum of 64 asynchronous tasks can be executed. This prevents THE Reactor thread from processing asynchronous tasks for too long and causing I/O events to block
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }
Copy the code
  • whenioRatio = 100, indicates that the execution time limit is not requiredIO ready event(strategy > 0)Reactor threadNeed to be prioritizedIO ready eventAfter handling,IO eventsAfter that, execute allAsynchronous tasksIncluding: common task, tail task, scheduled task. No time limit.

The value of strategy represents the number of CHANNELS that are IO ready. It is introduced in front of io.net ty. Channel. Nio. NioEventLoop# select methods return values.

  • whenioRatioThe value is not100, the default value is50. Statistics need to be executed firstIO eventsThe availableioTime , according to theioTime * (100 - ioRatio) / ioRatioCalculate and execute laterAsynchronous tasksLimit time. That is to say,Reactor threadA limited number of asynchronous tasks need to be executed within this limited time to preventReactor threadDue to the processingAsynchronous tasksCaused by too much timeI/O eventsCan’t be dealt with in a timely manner.

By default, the ratio of IO event execution time to asynchronous task execution time is set to one to one.

The higher the ioRatio, the less time the Reactor thread takes to perform asynchronous tasks.

To calculate the time limit required by the Reactor thread to perform an asynchronous task, you must know the IO time required for executing an I/O event and then calculate the time limit for executing an asynchronous task based on the ioRatio.

What if it doesn’tIO ready eventNeed to beReactor threadWe can’t get it in this caseioTimeHow is that enforcedAsynchronous tasksWhat about the limited time?

In this particular case, Netty only allows the Reactor thread to perform a maximum of 64 asynchronous tasks before terminating. Go to continue training I/O ready events. The core goal is to prevent the Reactor thread from processing I/O events in a timely manner because it takes too long to process asynchronous tasks.

By default, Netty only allows the Reactor thread to execute up to 64 asynchronous tasks when the Reactor has asynchronous tasks to process but no IO ready events.


Now that the Reactor thread processes BOTH I/O events and asynchronous tasks, we will introduce the logic used by the Reactor thread to process BOTH I/O events and asynchronous tasks.

3. The Reactor thread processes I/O ready events

    // This field is a reference to the selector object selectedKeys, which is obtained directly from here when the IO event is ready
   private SelectedSelectionKeySet selectedKeys;

   private void processSelectedKeys(a) {
        // Whether to use netty's optimized selectedKey collection type is determined by the DISABLE_KEY_SET_OPTIMIZATION variable, which defaults to false
        if(selectedKeys ! =null) {
            processSelectedKeysOptimized();
        } else{ processSelectedKeysPlain(selector.selectedKeys()); }}Copy the code

Does this code look familiar?

I don’t know if you remember when we introduced the Reactor NioEventLoop class in the process of creating a Selector, Netty replaced the JDK NIO with an array of SelectedSelectionKeySet because it was concerned about the insertion and traversal performance of the selectedKeys set in JDK NIO Selector A HashSet implementation of selectedKeys in Selector.

public abstract class SelectorImpl extends AbstractSelector {

    // The set of keys with data ready for an operation
    // //IO ready SelectionKey (with channel wrapped inside)
    protected Set<SelectionKey> selectedKeys;

    // The set of keys registered with this Selector
    // All selectionkeys registered on the Selector (enclosing the channel)
    protectedHashSet<SelectionKey> keys; . Omit... }Copy the code

In Netty, the DISABLE_KEY_SET_OPTIMIZATION switch controls whether to optimize JDK NIO Selector. The default is to optimize.

With optimization on, Netty stores the created SelectedSelectionKeySet in the NioEventLoop’s private SelectedSelectionKeySet selectedKeys field. The Reactor thread gets the IO ready SelectionKey directly from here.

With optimization off, Netty uses the JDK’s default implementation of NIO Selector. The NioEventLoop selectedKeys field is null.

For those of you who forgot this, you can review the process of creating a Reactor by referring to the Book “Talking about the Netty Things Reactor implemented in Netty”.

After reviewing the previous content, we can see that Reactor’s logic for handling IO ready events is also divided into two parts, one is Netty optimized and the other is JDK native.

Let’s start by looking at how the JDK’s native Selector is handled. Understanding this approach will make it easier to look at Netty’s optimized approach.

3.1 processSelectedKeysPlain

When an IO ready event occurs on a Channel registered with a Selector, The Selector inserts the IO-ready SelectionKey into the Set

selectedKeys Set.

When the Reactor threads from Java. Nio. Channels. Selector# select (long) in the call returns. . Then invokes the Java nio. Channels. Selector# selectedKeys IO ready for SelectionKey collection.

So the Reactor thread needs to call selection.selectedKeys () to get all the IO ready SelectionKeys before calling processSelectedKeysPlain to handle the IO ready event.

processSelectedKeysPlain(selector.selectedKeys())
Copy the code
    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        if (selectedKeys.isEmpty()) {
            return;
        }

        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            // Notice the keyiterator.remove () call at the end of each iteration. The Selector does not remove the SelectionKey instance from the selected key set itself.
            // The channel must be removed when it is finished processing. The next time the channel becomes ready, the Selector puts it into the selected key set again.
            i.remove();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if(! i.hasNext()) {break;
            }

            // The purpose is to enter the for loop again to remove the invalid selectKey(socketChannel may be removed from the selector)
            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else{ i = selectedKeys.iterator(); }}}}Copy the code

3.1.1 Obtaining an IO ready Channel

Set

selectedKeys Set

selectedKeys Set

selectedKeys Set

selectedKeys Because we will introduce the JDK NIO native implementation first.



Start processing iO-ready channels one by one by getting the iterator of the HashSet.

Iterator<SelectionKey> i = selectedKeys.iterator();
final SelectionKey k = i.next();
final Object a = k.attachment();
Copy the code

Do you remember what the attachment property in the SelectionKey is?

In the last article, the Netty Reactor startup process, when we talked about how NioServerSocketChannel registered with the Main Reactor, Registers itself with the Selector as the Attachment property of the SelectionKey through this pointer. This step completes the binding of Netty custom channels with JDK NIO channels.

public abstract class AbstractNioChannel extends AbstractChannel {

    // The Selector key obtained by a channel registered with a Selector
    volatile SelectionKey selectionKey;

    @Override
    protected void doRegister(a) throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0.this);
                return;
            } catch(CancelledKeyException e) { ............... Omit... }}}}Copy the code

And we also mentioned that SelectionKey is a representation of a Channel in a Selector. When there is an IO ready event on a Channel, the Selector returns the SelectionKey of the Channel to the Reactor thread. We can obtain the corresponding Netty custom Channel through the Attachment property in the returned SelectionKey.

For client connection events (OP_ACCEPT) active, the Channel type here is NioServerSocketChannel. When a client Read, Write event is active, the Channel type here is NioSocketChannel.

As we get Netty’s custom Channel through K.ttlekey (), we need to remove the SelectionKey for that Channel from Selector ready Set

selectedKeys. Because the Selector itself doesn’t actively delete the SelectionKey that’s already been processed, the caller has to actively delete it, so that when the Channel is IO ready again, The Selector again puts the Channel’s SelectionKey into the ready Set Set

selectedKeys.

i.remove();
Copy the code

3.1.2 Processing I/O Events on a Channel

     if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
     } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
     }

Copy the code

From this we can see that there are two types of objects that Netty attaches to the Attachment property in SelectionKey:

  • One is the familiar Channel. Both the SERVER SocketChannel and the client socketChannel belong to AbstractNioChannel. IO events on a Channel are handled by the Netty framework, which we will focus on in this section

  • The other is NioTask, which Netty provides users with the ability to customize the handling of iO-ready events on a Channel.


public interface NioTask<C extends SelectableChannel> {
    /**
     * Invoked when the {@link SelectableChannel} has been selected by the {@link Selector}.
     */
    void channelReady(C ch, SelectionKey key) throws Exception;

    /**
     * Invoked when the {@link SelectionKey} of the specified {@link SelectableChannel} has been cancelled and thus
     * this {@link NioTask} will not be notified anymore.
     *
     * @param cause the cause of the unregistration. {@code null} if a user called {@link SelectionKey#cancel()} or
     *              the event loop has been shut down.
     */
    void channelUnregistered(C ch, Throwable cause) throws Exception;
}
Copy the code

NioTask and Channel are essentially the same in that they handle IO ready events on a Channel, but one is user-defined and the other is Netty. Here we focus on Channel’s IO processing logic


    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        // Get Channel's underlying operation class, Unsafe
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if(! k.isValid()) { ...... If SelectionKey has expired, close the corresponding Channel...... }try {
            // Get the I/O ready event
            int readyOps = k.readyOps();
            // Handle the Connect event
            if((readyOps & SelectionKey.OP_CONNECT) ! =0) {
                int ops = k.interestOps();
                // Remove listening for Connect events, otherwise the Selector will always be notified
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                // Trigger the channelActive event to process the Connect event
                unsafe.finishConnect();
            }

            // Handle the Write event
            if((readyOps & SelectionKey.OP_WRITE) ! =0) {
                ch.unsafe().forceFlush();
            }

             // Handle Read or Accept events
            if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) { unsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}Copy the code
  • First we need to getIO ready ChannelThe underlying action classUnsafe, used for specificIO ready eventProcessing.

As you can see, Netty’s handling of IO ready events is all encapsulated in the Unsafe class. For example, the processing logic for the OP_ACCEPT event is encapsulated in the NioServerSocketChannel UnSafe class. Handling of OP_READ or OP_WRITE events is encapsulated in the Unsafe class in NioSocketChannel.

  • fromSelectionkeyGet the details fromIO ready event readyOps.

There are two sets of I/O events in SelectonKey. InterestOps is used to record the I/O events of interest to a Channel. After a Channel registers with a Selector, it is added through the ChannelActive event callback of the HeadContext node in the pipeline. The following code is a ChannelActive event callback where a Channel registers an I/O event it is interested in with a Selector.

    public abstract class AbstractNioChannel extends AbstractChannel {
             @Override
              protected void doBeginRead(a) throws Exception {
                    // Channel.read() or ChannelHandlerContext.read() was called
                    final SelectionKey selectionKey = this.selectionKey;
                    if(! selectionKey.isValid()) {return;
                    }

                    readPending = true;

                    final int interestOps = selectionKey.interestOps();
                    /** * 1: ServerSocketChannel initialization readInterestOp set the OP_ACCEPT event * 2: SocketChannel initialization readInterestOp set the OP_READ event ** /
                    if ((interestOps & readInterestOp) == 0) {
                        // Register to listen for OP_ACCEPT or OP_READ eventsselectionKey.interestOps(interestOps | readInterestOp); }}}Copy the code

The other is readyOps here, which keeps track of which IO events are ready for a Channel that is interested in.

Netty stores a collection of events in an int variable.

  • Use the & operation to determine if an event is in the collection of events :(readyOps & selectionkey.op_connect)! = 0, this is to determine whether the Channel is interested in the Connect event.

  • Operated by | add events to the event in the collection: interestOps | readInterestOp

  • To remove an event from a collection of events, you do this by reversing the deleted event and then doing an & operation on the collection: ops &= ~ selectionKey.op_connect

Netty this space of the ultimate use of the idea, it is worth learning in our daily development ~~


We now know which channels are IO ready and which specific types of IO events are ready.

Now it’s time to handle the different IO ready events on a Channel.

3.1.2.1 Handling Connect Events

The Netty client initiates a connection to the server and registers a Connect event with the Reactor of the client. When the connection is successfully established, the NioSocketChannel of the client will generate a connect-ready event. Through the Reactor framework described above, the final process will come to this point.

      if((readyOps & SelectionKey.OP_CONNECT) ! =0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                // Trigger the channelActive event
                unsafe.finishConnect();
     }
Copy the code

If the IO ready event is a Connect event, the finishConnect method in the Unsafe operation class of the corresponding client NioSocketChannel is called to handle the Connect event. The ChannelActive event is propagated in the pipeline in the Netty client NioSocketChannel.

Finally, you need to remove the OP_CONNECT event from interestOps, the collection of events that the client NioSocketChannel is interested in. Otherwise, Selector will always tell Connect that the event is ready.

3.1.2.2 Handling Write Events

The process of the Reactor thread handling Write events in Netty will be introduced in a follow-up article. In this article we focus on the entire operating framework of the Reactor thread.

      if((readyOps & SelectionKey.OP_WRITE) ! =0) {
            ch.unsafe().forceFlush();
      }
Copy the code

The only thing to remember here is that the OP_WRITE event is registered by the user. The user registers the OP_WRITE event with the Reactor when the Socket send buffer becomes writable. The Reactor receives an OP_WRITE event activity notification, where it calls the forceFlush method in the client NioSocketChannel to send the remaining data.

3.1.2.3 Handle Read or Accept events

      if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) {
            unsafe.read();
     }
Copy the code

In Netty, both Read and Accept events are handled by the Read method in the broadening operation class of the corresponding Channel.

The Read method in the server NioServerSocketChannel handles Accept events, and the Read method in the client NioSocketChannel handles Read events.

You just need to remember the entry functions of each IO event in the corresponding Channel. We will analyze these entry functions in detail in future articles.

3.1.3 Removing an invalid SelectionKey from the Selector

            // It is used to remove invalid selectkeys from selectedKeys in time, such as socketchannels removed from selectedKeys by the user
            private boolean needsToSelectAgain;

             // The purpose is to enter the for loop again to remove the invalid selectKey(socketChannel may be removed by the user from the selector)
            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else{ i = selectedKeys.iterator(); }}Copy the code

When the Reactor framework was introduced earlier, we saw that needsToSelectAgain was set to false every time the Reactor thread finished polling and was ready to handle IO ready events and asynchronous tasks.

Then thisneedsToSelectAgain What exactly is it? And why do we need to go"Select Again"?

First let’s look at what happens when we set needsToSelectAgain to true. Can we get any clues from this process?

We know that a Channel can register itself with a Selector, so of course it can unremove itself from a Selector.

We spent a lot of time on this process in the last article, so let’s take a look at Channel unregistration.

public abstract class AbstractNioChannel extends AbstractChannel {

   // The Selector key obtained by a channel registered with a Selector
    volatile SelectionKey selectionKey;

    @Override
    protected void doDeregister(a) throws Exception {
        eventLoop().cancel(selectionKey());
    }

    protected SelectionKey selectionKey(a) {
        assertselectionKey ! =null;
        returnselectionKey; }}Copy the code

To unregister a Channel, you simply call NioChannel’s doDeregister method. The Channel binding Reactor removes it from the Selector and stops listening for IO events on the Channel.

public final class NioEventLoop extends SingleThreadEventLoop {

    If 256 socketchannels are removed from SelectedKeys, invalid selectkeys are removed from SelectedKeys
    private int cancelledKeys;

    private static final int CLEANUP_INTERVAL = 256;

    /** * Remove socketChannel from selector to cancel listening for IO events ** /
    void cancel(SelectionKey key) {
        key.cancel();
        cancelledKeys ++;
        Set needsToSelectAgain to true when the number of socketchannels removed from selector reaches 256
        / / on the io.net ty. Channel. Nio. NioEventLoop. ProcessSelectedKeysPlain to do a polling, removes invalid selectKey,
        // To ensure the selectKeySet is valid
        if (cancelledKeys >= CLEANUP_INTERVAL) {
            cancelledKeys = 0;
            needsToSelectAgain = true; }}}Copy the code
  • callJDK NIO SelectionKeyThe APICancel methodThat will beChannelfromSelectorCancel.SelectionKey# cancel methodAfter the call is complete, call at this timeSelectionKey#isValidWill returnfalse.SelectionKey# cancel methodAfter the call,SelectorThis is going to be canceledSelectionKeyTo join theSelectorIn theCancelledKeys collectionIn the.
public abstract class AbstractSelector extends Selector {

    private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();

    void cancel(SelectionKey k) {                      
        synchronized(cancelledKeys) { cancelledKeys.add(k); }}}Copy the code
  • When the Channel’s SelectionKey is cancelled, the Channel cancellation counter cancelledKeys will increase by 1. When cancelledKeys = 256, set needsToSelectAgain to true.

  • The SelectionKey in the cancelledKeys set is then removed from all keysets in the Selector ** during the next round of the Selector. The KeySet includes the Selector set of selectedKeys that holds the ready SelectionKey, and the Selector set of keys that holds the selectionkeys of all registered channels.

public abstract class SelectorImpl extends AbstractSelector {

    protected Set<SelectionKey> selectedKeys = new HashSet();
    protected HashSet<SelectionKey> keys = newHashSet(); . Omit... }Copy the code

We see that the determination of needsToSelectAgain in the Reactor thread is made in the body of the loop in which the processSelectedKeysPlain method processes the io-ready SelectionKey.

I mention the location of the needsToSelectAgain judgment specifically to draw attention to the fact that the Reactor is handling IO ready events for this poll.

Now, when you call SelectionKey#cancel, the Selector doesn’t remove the canceled SelectionKey from all the keysets in the Selector until the next round, It also includes the ready set selectedKeys.

If a large number of channels are removed from the Selector during this poll, the ready set of selectedKeys in the Selector will still hold the corresponding selectionkeys of those channels until the next poll. Then, of course, the validity of selectedKeys will be affected.

Therefore, in order to ensure the validity of all keysets in the Selector, selectNow needs to be triggered once when the number of Channel cancellations reaches 256. The purpose is to remove invalid selectionkeys.

    private void selectAgain(a) {
        needsToSelectAgain = false;
        try {
            selector.selectNow();
        } catch (Throwable t) {
            logger.warn("Failed to update SelectionKeys.", t); }}Copy the code

So, we’re done with the JDK’s native Selector method, processSelectedKeysPlain, and the logic is the same for IO ready events, so once we understand processSelectedKeysPlain, ProcessSelectedKeysOptimized method of IO ready event handling, we are very easy to understand.

3.2 processSelectedKeysOptimized

By default, Netty uses an optimized Selector for handling IO ready events. But the processing logic is pretty much the same. Let’s focus on the differences between the two approaches.

    private void processSelectedKeysOptimized(a) {
        // Select selectedKeys and publicSelectKeys from the JDK selector class when using openSelector
        // Replace the original HashSet type with the Netty optimized array implementation's SelectedSelectionKeySet type
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // The remove selector in the corresponding iterator does not remove the selectedKey by itself
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {

                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1; }}}Copy the code
  • The JDK NIO native Selector holds the collection of IO-ready selectionkeys as the HashSet selectedKeys. In order to optimize the traversal efficiency of selectedKeys set, Netty adopted its own type of SelectedSelectionKeySet, so that the traversal of array is replaced by the iterator of HashSet.

  • The Selector inserts the iO-ready Channel’s SelectionKey into the set of selectedKeys every time it polls for an IO-ready event, but the Selector just puts the IO-ready SelectionKey into the set of selectedKeys, After the SelectionKey is processed, the Selector doesn’t automatically remove it from the selectedKeys set. Therefore, Netty needs to delete the iO-ready SelectionKey after iterating through it.

    • inprocessSelectedKeysPlainTo delete it directly from the iterator.
    • inprocessSelectedKeysOptimizedSet its corresponding position in the array toNullTo facilitate garbage recycling.
  • In processSelectedKeysPlain, which uses JDK NIO native Selector, you just need to perform SelectAgain, and the Selector will automatically remove the invalid Key.

But because it is in the processSelectedKeysOptimized Netty themselves realize the optimization of the type, so will need Netty SelectedSelectionKeySet SelectionKey is used in the all clear, Finally, execute SelectAgain.


Now that we have covered the process of how the Reactor thread handles IO ready events, it is time to introduce how the Reactor thread handles asynchronous tasks in the Netty framework.

4. The Reactor thread processes asynchronous tasks

Netty has two methods for handling asynchronous tasks:

  • One is the runAllTasks() method with no timeout limit. When the ioRatio is set to 100, the Reactor thread processes I/O ready events at once and then performs asynchronous tasks at once without time limitation.

  • The other is the runAllTasks(Long timeoutNanos) method with timeout limits. When ioRatio! = 100, the Reactor thread has a time limit for executing asynchronous tasks, and the TIME required for executing I/O tasks is calculated based on the I/O ready events that are processed in a cluster. Calculate the timeout time of asynchronous tasks performed by the Reactor thread according to ioTime * (100 – ioRatio)/ioRatio. Perform a limited number of asynchronous tasks within the timeout limit.

Let’s take a look at the method processing logic for each asynchronous task:

4.1 runAllTasks ()

    protected boolean runAllTasks(a) {
        assert inEventLoop(a);
        boolean fetchedAll;
        boolean ranAtLeastOne = false;

        do {
            // The scheduled tasks that reach their execution time are transferred to the taskQueue, which is fetched and executed by the Reactor thread from the taskQueue
            fetchedAll = fetchFromScheduledTaskQueue();
            if (runAllTasksFrom(taskQueue)) {
                ranAtLeastOne = true; }}while(! fetchedAll);// keep on processing until we fetched all scheduled tasks.

        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        // Perform a tail-queue task
        afterRunningAllTasks();
        return ranAtLeastOne;
    }
Copy the code

The core logic for the Reactor thread to perform asynchronous tasks is:

  • The scheduled tasks are removed from the scheduledTaskQueue scheduledTaskQueue and transferred to the common taskQueue taskQueue.

  • The Reactor thread takes the task from the common taskQueue and executes it.

  • After the Reactor thread completes the scheduled task and the normal task, it starts to execute the tail task stored in the tail task queue tailTasks.

Let’s take a look at the implementation of these core steps:

4.1.1 fetchFromScheduledTaskQueue

    ** ** Dump the scheduled task to the taskQueue, which is fetched by the Reactor thread from the taskQueue to execute ** */
    private boolean fetchFromScheduledTaskQueue(a) {
        if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
            return true;
        }
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        for (;;) {
            // Retrieve the scheduled task deadline <= nanoTime from the scheduled task queue
            Runnable scheduledTask = pollScheduledTask(nanoTime);
            if (scheduledTask == null) {
                return true;
            }
            if(! taskQueue.offer(scheduledTask)) {// if the taskQueue has no space, the scheduled task is added to the scheduled taskQueue for next executionscheduledTaskQueue.add((ScheduledFutureTask<? >) scheduledTask);return false; }}}Copy the code
  1. Gets the current executionAsynchronous tasksThe timing of thenanoTime
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
    private static final long START_TIME = System.nanoTime();

    static long nanoTime(a) {
        returnSystem.nanoTime() - START_TIME; }}Copy the code
  1. Search from the scheduled task queuedeadline <= nanoTimeFor asynchronous tasks. That is, find all scheduled tasks that are due.
    protected final Runnable pollScheduledTask(long nanoTime) {
        assert inEventLoop(a);

        // Retrieve the scheduled task to execute from the timed queue deadline <= nanoTimeScheduledFutureTask<? > scheduledTask = peekScheduledTask();if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
            return null;
        }
        // Select * from *
        scheduledTaskQueue.remove();
        scheduledTask.setConsumed();
        return scheduledTask;
    }
Copy the code
  1. willThe scheduled task expiresInsert to the normal task queuetaskQueue, if thetaskQueueIf there is no room for new tasks, theTiming taskBack into theScheduled task queueWaiting for the next pull.
            if(! taskQueue.offer(scheduledTask)) { scheduledTaskQueue.add((ScheduledFutureTask<? >) scheduledTask);return false;
            }
Copy the code
  1. FetchFromScheduledTaskQueue methodThe return value oftrue“Indicates that scheduled tasks that are due have been pulled out and transferred to the common task queue.

If the return value is false, only part of scheduled tasks are pulled out because the common task queue is full. When the common task is finished, another pull is needed.

When the scheduled task is pulled from the scheduled task queue or the common task queue is full, the system stops pulling and starts to execute asynchronous tasks in the common task queue.

4.1.2 runAllTasksFrom

    protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
        Runnable task = pollTaskFrom(taskQueue);
        if (task == null) {
            return false;
        }
        for (;;) {
            safeExecute(task);
            task = pollTaskFrom(taskQueue);
            if (task == null) {
                return true; }}}Copy the code
  • First, the return value of the runAllTasksFrom method indicates whether at least one asynchronous task has been executed. This will then be assigned to the ranAtLeastOne variable, and the return value will be used later.

  • Pulls asynchronous tasks from the normal task queue.

    protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
        for (;;) {
            Runnable task = taskQueue.poll();
            if(task ! = WAKEUP_TASK) {returntask; }}}Copy the code
  • Reactor threadperformAsynchronous tasks.
    protected static void safeExecute(Runnable task) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception. Task: {}", task, t); }}Copy the code

4.1.3 afterRunningAllTasks

        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        // Perform a tail-queue task
        afterRunningAllTasks();
        return ranAtLeastOne;
Copy the code

If the Reactor thread executed at least one asynchronous task, set lastExecutionTime and return the ranAtLeastOne identifier. The ranAtLeastOne identifier here is the return value of the runAllTasksFrom method.

And then the tail task, the tail task in the tail task queue.

    @Override
    protected void afterRunningAllTasks(a) {
        runAllTasksFrom(tailTasks);
    }
Copy the code

4.2 runAllTasks (long timeoutNanos)

The core logic for handling asynchronous tasks is the same as before, but with more control over timeouts.

    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            // Execute tasks in the bottom queue when there are no tasks in the normal queue
            afterRunningAllTasks();
            return false;
        }

        // Asynchronous task execution timed out deadline
        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);
            runTasks ++;
            // Run 64 asynchronous tasks every time to check whether the execution deadline has been reached
            if ((runTasks & 0x3F) = =0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    // The asynchronous task execution deadline expires and the asynchronous task execution is stopped
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
Copy the code
  • First or through fetchFromScheduledTaskQueue method from the timing task queue Reactor pull timing task, due to common task queue. When the common task queue is full or all scheduled tasks expire, the task is stopped.

  • Will ScheduledFutureTask. NanoTime () + timeoutNanos as Reactor thread execution timeout time point of the asynchronous task deadline.

  • Due to the overhead of the system.nanotime () call, every 64 asynchronous tasks are checked to see if the deadline has been reached. If the deadline is reached, the asynchronous task is stopped. If the deadline is not reached, it continues to fetch the task from the normal task queue and execute the loop.

From this detail can see Netty to the performance of the consideration is quite exquisite


At this point, the Reactor framework and implementation logic of polling IO ready events, processing IO ready events, and performing asynchronous tasks are analyzed.

Netty has fixed the null polling BUG in the JDK NIO Epoll

5. Solve the JDK Epoll BUG

As mentioned earlier, due to the EMPTY polling BUG of the JDK NIO Epoll, this can cause the Reactor thread to wake up unexpectedly without anything to do, causing the CPU to idle.

In fact, Netty did not fundamentally solve the JDK BUG, but chose to cleverly bypass the BUG.

Here’s how Netty did it.

                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    // The Reactor thread woke up abnormally from Selector and triggered the JDK Epoll training BUG
                    // Rebuild Selector,selectCnt to zero
                    selectCnt = 0;
                }
Copy the code

After the Reactor thread processes an I/O ready event and an asynchronous task, it checks to see if the Reactor thread has performed an asynchronous task and if it has an I/O ready Channel.

  • This is where Boolean ranTasks comes in handy. This ranTasks is exactly the return value of the runAllTasks method. Indicates whether at least one asynchronous task has been executed.

  • Int Strategy is the return value of the JDK’s NIO Selector select method, which represents the number of CHANNELS that are IO ready.

If ranTasks = false and strategy = 0, it means that the Reactor thread has neither asynchronous task execution nor IO ready Channel to deal with this time, but is awakened unexpectedly. It’s like idling around doing nothing.

In this case Netty thinks it may have triggered an empty polling BUG in the JDK NIO Epoll

    int SELECTOR_AUTO_REBUILD_THRESHOLD = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold".512);

    private boolean unexpectedSelectorWakeup(int selectCnt) {... Omit.../** * The Reactor thread woke up abnormally from Selector when there was neither an IO ready event nor an asynchronous task. So I rebuild Selector * * */
        if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            // The selector returned prematurely many times in a row.
            // Rebuild the selector to work around the problem.
            logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);
            rebuildSelector();
            return true;
        }
        return false;
    }
Copy the code
  • ifReactorThe number of unplanned awakeningsselectCnt The configured number of times exceeded. ProcedureSELECTOR_AUTO_REBUILD_THRESHOLD Then Netty decides that the situation may have been triggeredJDK NIO Epoll empty polling BUG, the reconstructionSelector(Re-register all previously registered channels with the new Selector and close the old Selector),SelectCnt countBelong to the0.

SELECTOR_AUTO_REBUILD_THRESHOLD default is 512, can through the system variables – D io.net ty. SelectorAutoRebuildThreshold specify custom value.

  • ifselectCntLess thanSELECTOR_AUTO_REBUILD_THRESHOLD , returns no processing,selectCntKeep counting.

Netty did this by counting the number of times the Reactor was accidentally awakened. If the selectCnt count reached 512 times, it cleverly circumvated the JDK NIO Epoll BUG by rebuilding selectCnt.

We can also draw lessons from in the day-to-day development Netty this train of thought, to deal with problems in project development, for example, when we found we can’t guarantee completely solve a problem, or in order to solve this problem in the input-output ratio is not high, we should consider whether should be in a different way of thinking to get around this problem, so as to achieve the same effect. The highest state to solve a problem is not to solve it, clever around the past ~~~~~!!


conclusion

This paper spent a lot of space to introduce the whole operating framework of Reactor, and deeply introduced the concrete implementation logic of Reactor core work module.

Through the introduction of this article, we know how Reactor polls all channels registered on it for interested IO events, how Reactor handles IO ready events, and how to perform asynchronous and scheduled tasks submitted by Netty framework.

Finally, Netty is introduced how to cleverly bypass the JDK NIO Epoll empty poll BUG, to solve the problem.

Refined a new way of thinking to solve the problem: the highest state to solve the problem is not to solve it, clever around the past ~~~~~!!

Well, that’s all for this article. See ~~~~~ for our next article