Interpret NioEventLoop

After the NioEventLoop thread is created, how does the thread work?

From the above, we know that the thread creation time is the first time the task is submitted to the NioEventLoop, i.e

    private void doStartThread(a) {
        assert thread == null;
        ThreadPerTaskExecutor = ThreadPerTaskExecutor = ThreadPerTaskExecutor;
        // It contains a thread factory (type DefaultThreadFactory) for thread creation.
        executor.execute(new Runnable() {
            // This works well by submitting tasks to executor, which creates a thread to execute
            @Override
            public void run(a) {
                // Attach the currently created thread to the thread, depending on the thread in the EventLoop
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    // Key method, switch to nioEventloop.run () - illustrates how threads work in EventLoop
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break; }}// Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        if (logger.isErrorEnabled()) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                    "be called before run() implementation terminates."); }}try {
                        // Run all remaining tasks and shutdown hooks. At this point the event loop
                        // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
                        // graceful shutdown with quietPeriod.
                        for (;;) {
                            if (confirmShutdown()) {
                                break; }}// Now we want to make sure no more tasks can be added from this point. This is
                        // achieved by switching the state. Any new tasks beyond this point will be rejected.
                        for (;;) {
                            int oldState = state;
                            if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
                                break; }}// We have the final set of tasks in the queue now, no more can be added, run all remaining.
                        // No need to loop here, this is the final pass.
                        confirmShutdown();
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                            // the future. The user may block on the future and once it unblocks the JVM may terminate
                            // and start unloading classes.
                            // See https://github.com/netty/netty/issues/6596.
                            FastThreadLocal.removeAll();

                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.countDown();
                            int numUserTasks = drainTasks();
                            if (numUserTasks > 0 && logger.isWarnEnabled()) {
                                logger.warn("An event executor terminated with " +
                                        "non-empty task queue (" + numUserTasks + ') ');
                            }
                            terminationFuture.setSuccess(null); }}}}}); }Copy the code

As you can see, SingleThreadEventExecutor. This. The run (), this step then explained the working principle of the EventLoop thread

run()

    // NioEventLoop's working logic
    @Override
    protected void run(a) {
        // Epoll bug feature count variable
        int selectCnt = 0;
        / / death cycle
        for (;;) {
            try {
                // 1. >= 0: represents the return value of the selector - the number of ready channels registered with the selector
                < 0: constant states: CONTINUE, BUSY_WAIT, SELECT
                int strategy;
                try {
                    // selectStrategy -> DefaultSelectStrategy
                    // Depending on whether NioEventLoop has a local task, decide what to do
                    Call selectNow() on the multiplexer to return the number of channels ready on the multiplexer
                    // 2. There is no local task, return -1, we will do some more processing according to the constant
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());

                    // So we only need to consider >=0, -1

                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO//
                    case SelectStrategy.SELECT: // NioEventLoop has no task, returns -1
                        // Gets the execution time of the schedulable task
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        // true - Indicates that there are no tasks in eventLoop that need to be executed periodically
                        if (curDeadlineNanos == -1L) {
                            // Set it to the maximum value of long
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            // true - No local normal task is executed
                            if(! hasTasks()) {// curDeadlineNanos: the maximum value of long, with no periodic tasks to execute
                                // curDeadlineNanos: the deadline by which periodic tasks need to be executed
                                // The final return is the number of ready Channel events on the selector
                                // Note: Due to an epoll bug in Linux, a sudden Socket break will cause the selector to wake up, with no Channel event ready, and the selector will remain unblocked
                                // Select strategy == 0;
                                strategy = select(curDeadlineNanos); // This step determines which select method to call}}finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:}}catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }

                // Here, strategy is the number of ready channel events on selector


                selectCnt++; // This is a key variable that represents the number of iterations of the current loop - corresponding to the number of invalid selectors
                cancelledKeys = 0;
                needsToSelectAgain = false;

                // The actual percentage of I/O events processed by the thread is 50% by default
                final int ioRatio = this.ioRatio;
                // Indicates whether the thread of this round has handled any local tasks
                boolean ranTasks;

                // true-io takes precedence over IO tasks and then local tasks
                if (ioRatio == 100) {
                    try {
                        // Determine whether there are IO events to be processed
                        if (strategy > 0) {
                            // Execute the I/O event entryprocessSelectedKeys(); }}finally {
                        // Ensure we always run tasks.
                        // Execute tasks in the local task queueranTasks = runAllTasks(); }}// The selector in the current eventLoop has a ready Channel event
                else if (strategy > 0) {
                    // Indicates the start time of I/O event processing
                    final long ioStartTime = System.nanoTime();
                    try {
                        // Start processing I/O events
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        // Calculate the I/O event processing duration
                        final long ioTime = System.nanoTime() - ioStartTime;
                        // ioTime * (100 - ioRatio)/ioRatio -> Calculates the maximum processing duration of a local task based on the I/O ratio and I/O event processing duration
                        // This step shows that the execution of local tasks is limited
                        // There is no limit to how long an IO event can be executed
                        ranTasks = runAllTasks(ioTime * (100- ioRatio) / ioRatio); }}// There is no ready Channel event on selector in eventLoop, so only local task can be processed
                else {
                    // Perform minimal local tasks
                    // timeoutNanos == 0 - a maximum of 64 tasks will be performed
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }

                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    // The normal NioEventLoop thread wakes up from selector because of an I/O event
                    // This will set selectCnt to 0
                    selectCnt = 0;
                }
                // unexpectedSelectorWakeup - actually a solution to the Selector bug - Netty carries us unintended
                RanTask == false && strategy == 0
                // selectCnt - represents the number of iterations of the current loop - corresponding to the number of invalid selectors
                else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    // Set selectCnt to 0, and the selector has been fixed
                    selectCnt = 0; }}catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); }}catch (Error e) {
                throw (Error) e;
            } catch (Throwable t) {
                handleLoopException(t);
            } finally {
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return; }}}catch (Error e) {
                    throw (Error) e;
                } catch(Throwable t) { handleLoopException(t); }}}}Copy the code

As can be seen from the above, NioEventLoop mainly does two things, one is IO task and the other is the task in the task queue. As we know from the inheritance system of NioEventLoop, It inherits ScheduledExecutorService from JDK, so there are two types of task queues: normal tasks and tasks that need to be scheduled (scheduled tasks, periodic tasks).

NioEventLoop is actually handled by a task that implements its work. It cares most about the task corresponding to the IO event. When it has no local task to execute, it calls Selector. Of course, since there may be tasks that need to be scheduled, it will obtain the cut-off time corresponding to the task with the highest priority by obtaining the task queue that needs to be scheduled to calculate a maximum selectable time and carry out select() that supports the timeout mechanism.

If there was a local task to begin with, it would call selectNow() nonblocking to get the number of ready Channel events on Selector, with a value >= 0

This step can be seen here:

strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
Copy the code
    /**
     *     private final IntSupplier selectNowSupplier = new IntSupplier() {
     *         @Override* public int get() throws Exception { * return selectNow(); // Call the multiplexer's selectNow() method, which does not block and returns *} *}; *@param selectSupplier The supplier with the result of a select result.
     * @param hasTasks true if tasks are waiting to be processed.
     * @return
     * @throws Exception
     */
    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        // There are local tasks in NioEventLoop? Call Selector. SelectNow () : -1
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
Copy the code

Further down, how does NioEventLoop work when strategy == 0 is computed in this step, i.e. there are no IO events to process?

That is to come back to this branch:

ranTasks = runAllTasks(0); // This will run the minimum number of tasks
Copy the code

Corresponding: SingleThreadEventExecutor

    // timeoutNanos Specifies the maximum available time for executing a task
    protected boolean runAllTasks(long timeoutNanos) {
        // Move the tasks that need to be scheduled to the normal queue
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask(); // Get the normal queue head task
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        // Calculate the deadline for performing the task. This is an exact point in time
        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
        // Tasks that have been executed
        long runTasks = 0;
        // The execution time stamp of the last task
        long lastExecutionTime;
        for (;;) {
            // Execute the current task
            safeExecute(task);

            runTasks ++; // Cumulative tasks executed

            // Check timeout every 64 tasks because nanoTime() is relatively expensive. Check the deadline every 64 tasks
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            // runTasks & 111111
            // 64&0111111 == 0
            if ((runTasks & 0x3F) = =0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                // Check whether the deadline for the last task in this batch has passed
                if (lastExecutionTime >= deadline) {
                    break; }}// Get the next task
            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break; // No more tasks, exit spin
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
Copy the code

As you can see, the deadline == 0 calculated in This step determines that only 64 tasks will be executed, as shown in the comment: This will run the minimum number of tasks

By default, it takes 64 tasks as a unit of work. After executing 64 tasks, it will check whether there is time left to perform other tasks. If there is time, it will perform 64 tasks again

Epoll bug?

We know that there is actually a bug in traditional NIO that causes CPU spikes due to epoll polling, and Netty has provided us with a solution. Before we get to the specific solution, let’s look at the causes and consequences of this bug

[Bug ID: JDK-6670302 (SE) NIO selector wakes up with 0 selected keys as an exampleLNX 2.4] (java.com)

Description of bug in documentation: The bug is mainly caused by the problem of epoll polling. In Kernal of Linux 2.4, a sudden interruption of socket in poll and epoll will cause Selector to be awakened unexpectedly, and the number of Channel events returned is 0. And the NIO would still wake up from Selector. Select () when it should have blocked, which would cause the select method call to stop blocking and have no events to process and keep spinning, which would cause the CPU to spike

Unfortunately, the JDK NIO doesn’t provide a solution for this, so it’s often necessary to consider this when implementing the NIO framework, but Netty takes care of the epoll bug!

Netty solve

As can be seen from the run() method, the value of selectCnt is incrementing each time the loop is iterated. We know that when an epoll bug occurs, the corresponding bug characteristic is that nothing is done in the loop. The corresponding strategies are strategy == 0 and ranTask == true

With this in mind, come back to this branch:

                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    // The normal NioEventLoop thread wakes up from selector because of an I/O event
                    // This will set selectCnt to 0
                    selectCnt = 0;
                }
                // unexpectedSelectorWakeup - actually a solution to the Selector bug - Netty carries us unintended
                RanTask == false && strategy == 0
                // selectCnt - represents the number of iterations of the current loop - corresponding to the number of invalid selectors
                else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    // Set selectCnt to 0, and the selector has been fixed
                    selectCnt = 0;
                }
Copy the code

You can see that if this round is executing tasks or strategy is greater than or equal to zero, that means that the Selector is still working

Because we consider method unexpectedSelectorWakeup()

    // returns true if selectCnt should be reset
    private boolean unexpectedSelectorWakeup(int selectCnt) {
        if (Thread.interrupted()) {
            // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
            // As this is most likely a bug in the handler of the user or it's client library we will
            // also log it.
            //
            // See https://github.com/netty/netty/issues/2426
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely because " +
                        "Thread.currentThread().interrupt() was called. Use " +
                        "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
            }
            return true;
        }
        // true - Indicates that the number of invalid selectors has reached 512, in which case the selector is fixed
        if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            // The selector returned prematurely many times in a row.
            // Rebuild the selector to work around the problem.
            logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);
            // This is the fix
            rebuildSelector();
            return true;
        }
        return false;
    }
Copy the code

And in the method it’s going to say is Selector accidentally awakened, is the number of iterations 512, selectCnt >= 512?

When this condition is satisfied, it fixes the Selector

rebuildSelector()

    public void rebuildSelector(a) {
        if(! inEventLoop()) { execute(new Runnable() {
                @Override
                public void run(a) { rebuildSelector0(); }});return;
        }
        rebuildSelector0();
    }

    @Override
    public int registeredChannels(a) {
        return selector.keys().size() - cancelledKeys;
    }

    private void rebuildSelector0(a) {
        final Selector oldSelector = selector;
        final SelectorTuple newSelectorTuple;

        if (oldSelector == null) {
            return;
        }

        try {
            // As you can see, the solution to fix Seletcor - recreate seletcor and initialize it
            newSelectorTuple = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }

        // Register all channels to the new Selector.
        int nChannels = 0;
        for (SelectionKey key: oldSelector.keys()) {
            Object a = key.attachment();
            try {
                if(! key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) ! =null) {
                    continue;
                }

                int interestOps = key.interestOps();
                key.cancel();
                SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
                if (a instanceof AbstractNioChannel) {
                    // Update SelectionKey
                    ((AbstractNioChannel) a).selectionKey = newKey;
                }
                nChannels ++;
            } catch (Exception e) {
                logger.warn("Failed to re-register a Channel to the new Selector.", e);
                if (a instanceof AbstractNioChannel) {
                    AbstractNioChannel ch = (AbstractNioChannel) a;
                    ch.unsafe().close(ch.unsafe().voidPromise());
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    invokeChannelUnregistered(task, key, e);
                }
            }
        }

        selector = newSelectorTuple.selector;
        unwrappedSelector = newSelectorTuple.unwrappedSelector;

        try {
            // time to close the old selector as everything else is registered to the new one
            oldSelector.close();
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to close the old Selector.", t); }}if (logger.isInfoEnabled()) {
            logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); }}Copy the code

As you can see, repairing a Selector is essentially recreating a Selector, initializing a Selector, restoring its state, registering a Channel, and finally closing the OldSelector with the bug

After that, it is time to set selectCtn to 0 and resume normal work

Of course, if the bug condition is not met, then the corresponding is to continue spinning, incrementing the selectCtn until 512 times

Doesn’t that solve the pain point in NIO where CPU spikes are caused by epoll bugs? That is, Netty underwrites the use of epoll

The selector returned prematurely many times in a row. Rebuild The selector to work around The problem

Because the selector returns prematurely many times in a row; Rebuild the selector to fix the problem!

That’s it for NioEventLoop!