Interpret NioEventLoop
After the NioEventLoop thread is created, how does the thread work?
From the above, we know that the thread creation time is the first time the task is submitted to the NioEventLoop, i.e
private void doStartThread(a) {
assert thread == null;
ThreadPerTaskExecutor = ThreadPerTaskExecutor = ThreadPerTaskExecutor;
// It contains a thread factory (type DefaultThreadFactory) for thread creation.
executor.execute(new Runnable() {
// This works well by submitting tasks to executor, which creates a thread to execute
@Override
public void run(a) {
// Attach the currently created thread to the thread, depending on the thread in the EventLoop
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// Key method, switch to nioEventloop.run () - illustrates how threads work in EventLoop
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break; }}// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates."); }}try {
// Run all remaining tasks and shutdown hooks. At this point the event loop
// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
// graceful shutdown with quietPeriod.
for (;;) {
if (confirmShutdown()) {
break; }}// Now we want to make sure no more tasks can be added from this point. This is
// achieved by switching the state. Any new tasks beyond this point will be rejected.
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break; }}// We have the final set of tasks in the queue now, no more can be added, run all remaining.
// No need to loop here, this is the final pass.
confirmShutdown();
} finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ') ');
}
terminationFuture.setSuccess(null); }}}}}); }Copy the code
As you can see, SingleThreadEventExecutor. This. The run (), this step then explained the working principle of the EventLoop thread
run()
// NioEventLoop's working logic
@Override
protected void run(a) {
// Epoll bug feature count variable
int selectCnt = 0;
/ / death cycle
for (;;) {
try {
// 1. >= 0: represents the return value of the selector - the number of ready channels registered with the selector
< 0: constant states: CONTINUE, BUSY_WAIT, SELECT
int strategy;
try {
// selectStrategy -> DefaultSelectStrategy
// Depending on whether NioEventLoop has a local task, decide what to do
Call selectNow() on the multiplexer to return the number of channels ready on the multiplexer
// 2. There is no local task, return -1, we will do some more processing according to the constant
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
// So we only need to consider >=0, -1
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO//
case SelectStrategy.SELECT: // NioEventLoop has no task, returns -1
// Gets the execution time of the schedulable task
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
// true - Indicates that there are no tasks in eventLoop that need to be executed periodically
if (curDeadlineNanos == -1L) {
// Set it to the maximum value of long
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
// true - No local normal task is executed
if(! hasTasks()) {// curDeadlineNanos: the maximum value of long, with no periodic tasks to execute
// curDeadlineNanos: the deadline by which periodic tasks need to be executed
// The final return is the number of ready Channel events on the selector
// Note: Due to an epoll bug in Linux, a sudden Socket break will cause the selector to wake up, with no Channel event ready, and the selector will remain unblocked
// Select strategy == 0;
strategy = select(curDeadlineNanos); // This step determines which select method to call}}finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:}}catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
// Here, strategy is the number of ready channel events on selector
selectCnt++; // This is a key variable that represents the number of iterations of the current loop - corresponding to the number of invalid selectors
cancelledKeys = 0;
needsToSelectAgain = false;
// The actual percentage of I/O events processed by the thread is 50% by default
final int ioRatio = this.ioRatio;
// Indicates whether the thread of this round has handled any local tasks
boolean ranTasks;
// true-io takes precedence over IO tasks and then local tasks
if (ioRatio == 100) {
try {
// Determine whether there are IO events to be processed
if (strategy > 0) {
// Execute the I/O event entryprocessSelectedKeys(); }}finally {
// Ensure we always run tasks.
// Execute tasks in the local task queueranTasks = runAllTasks(); }}// The selector in the current eventLoop has a ready Channel event
else if (strategy > 0) {
// Indicates the start time of I/O event processing
final long ioStartTime = System.nanoTime();
try {
// Start processing I/O events
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// Calculate the I/O event processing duration
final long ioTime = System.nanoTime() - ioStartTime;
// ioTime * (100 - ioRatio)/ioRatio -> Calculates the maximum processing duration of a local task based on the I/O ratio and I/O event processing duration
// This step shows that the execution of local tasks is limited
// There is no limit to how long an IO event can be executed
ranTasks = runAllTasks(ioTime * (100- ioRatio) / ioRatio); }}// There is no ready Channel event on selector in eventLoop, so only local task can be processed
else {
// Perform minimal local tasks
// timeoutNanos == 0 - a maximum of 64 tasks will be performed
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
// The normal NioEventLoop thread wakes up from selector because of an I/O event
// This will set selectCnt to 0
selectCnt = 0;
}
// unexpectedSelectorWakeup - actually a solution to the Selector bug - Netty carries us unintended
RanTask == false && strategy == 0
// selectCnt - represents the number of iterations of the current loop - corresponding to the number of invalid selectors
else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
// Set selectCnt to 0, and the selector has been fixed
selectCnt = 0; }}catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); }}catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return; }}}catch (Error e) {
throw (Error) e;
} catch(Throwable t) { handleLoopException(t); }}}}Copy the code
As can be seen from the above, NioEventLoop mainly does two things, one is IO task and the other is the task in the task queue. As we know from the inheritance system of NioEventLoop, It inherits ScheduledExecutorService from JDK, so there are two types of task queues: normal tasks and tasks that need to be scheduled (scheduled tasks, periodic tasks).
NioEventLoop is actually handled by a task that implements its work. It cares most about the task corresponding to the IO event. When it has no local task to execute, it calls Selector. Of course, since there may be tasks that need to be scheduled, it will obtain the cut-off time corresponding to the task with the highest priority by obtaining the task queue that needs to be scheduled to calculate a maximum selectable time and carry out select() that supports the timeout mechanism.
If there was a local task to begin with, it would call selectNow() nonblocking to get the number of ready Channel events on Selector, with a value >= 0
This step can be seen here:
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
Copy the code
/**
* private final IntSupplier selectNowSupplier = new IntSupplier() {
* @Override* public int get() throws Exception { * return selectNow(); // Call the multiplexer's selectNow() method, which does not block and returns *} *}; *@param selectSupplier The supplier with the result of a select result.
* @param hasTasks true if tasks are waiting to be processed.
* @return
* @throws Exception
*/
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
// There are local tasks in NioEventLoop? Call Selector. SelectNow () : -1
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
Copy the code
Further down, how does NioEventLoop work when strategy == 0 is computed in this step, i.e. there are no IO events to process?
That is to come back to this branch:
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
Copy the code
Corresponding: SingleThreadEventExecutor
// timeoutNanos Specifies the maximum available time for executing a task
protected boolean runAllTasks(long timeoutNanos) {
// Move the tasks that need to be scheduled to the normal queue
fetchFromScheduledTaskQueue();
Runnable task = pollTask(); // Get the normal queue head task
if (task == null) {
afterRunningAllTasks();
return false;
}
// Calculate the deadline for performing the task. This is an exact point in time
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
// Tasks that have been executed
long runTasks = 0;
// The execution time stamp of the last task
long lastExecutionTime;
for (;;) {
// Execute the current task
safeExecute(task);
runTasks ++; // Cumulative tasks executed
// Check timeout every 64 tasks because nanoTime() is relatively expensive. Check the deadline every 64 tasks
// XXX: Hard-coded value - will make it configurable if it is really a problem.
// runTasks & 111111
// 64&0111111 == 0
if ((runTasks & 0x3F) = =0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
// Check whether the deadline for the last task in this batch has passed
if (lastExecutionTime >= deadline) {
break; }}// Get the next task
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break; // No more tasks, exit spin
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
Copy the code
As you can see, the deadline == 0 calculated in This step determines that only 64 tasks will be executed, as shown in the comment: This will run the minimum number of tasks
By default, it takes 64 tasks as a unit of work. After executing 64 tasks, it will check whether there is time left to perform other tasks. If there is time, it will perform 64 tasks again
Epoll bug?
We know that there is actually a bug in traditional NIO that causes CPU spikes due to epoll polling, and Netty has provided us with a solution. Before we get to the specific solution, let’s look at the causes and consequences of this bug
[Bug ID: JDK-6670302 (SE) NIO selector wakes up with 0 selected keys as an exampleLNX 2.4] (java.com)
Description of bug in documentation: The bug is mainly caused by the problem of epoll polling. In Kernal of Linux 2.4, a sudden interruption of socket in poll and epoll will cause Selector to be awakened unexpectedly, and the number of Channel events returned is 0. And the NIO would still wake up from Selector. Select () when it should have blocked, which would cause the select method call to stop blocking and have no events to process and keep spinning, which would cause the CPU to spike
Unfortunately, the JDK NIO doesn’t provide a solution for this, so it’s often necessary to consider this when implementing the NIO framework, but Netty takes care of the epoll bug!
Netty solve
As can be seen from the run() method, the value of selectCnt is incrementing each time the loop is iterated. We know that when an epoll bug occurs, the corresponding bug characteristic is that nothing is done in the loop. The corresponding strategies are strategy == 0 and ranTask == true
With this in mind, come back to this branch:
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
// The normal NioEventLoop thread wakes up from selector because of an I/O event
// This will set selectCnt to 0
selectCnt = 0;
}
// unexpectedSelectorWakeup - actually a solution to the Selector bug - Netty carries us unintended
RanTask == false && strategy == 0
// selectCnt - represents the number of iterations of the current loop - corresponding to the number of invalid selectors
else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
// Set selectCnt to 0, and the selector has been fixed
selectCnt = 0;
}
Copy the code
You can see that if this round is executing tasks or strategy is greater than or equal to zero, that means that the Selector is still working
Because we consider method unexpectedSelectorWakeup()
// returns true if selectCnt should be reset
private boolean unexpectedSelectorWakeup(int selectCnt) {
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
return true;
}
// true - Indicates that the number of invalid selectors has reached 512, in which case the selector is fixed
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
// This is the fix
rebuildSelector();
return true;
}
return false;
}
Copy the code
And in the method it’s going to say is Selector accidentally awakened, is the number of iterations 512, selectCnt >= 512?
When this condition is satisfied, it fixes the Selector
rebuildSelector()
public void rebuildSelector(a) {
if(! inEventLoop()) { execute(new Runnable() {
@Override
public void run(a) { rebuildSelector0(); }});return;
}
rebuildSelector0();
}
@Override
public int registeredChannels(a) {
return selector.keys().size() - cancelledKeys;
}
private void rebuildSelector0(a) {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
// As you can see, the solution to fix Seletcor - recreate seletcor and initialize it
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if(! key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) ! =null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t); }}if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); }}Copy the code
As you can see, repairing a Selector is essentially recreating a Selector, initializing a Selector, restoring its state, registering a Channel, and finally closing the OldSelector with the bug
After that, it is time to set selectCtn to 0 and resume normal work
Of course, if the bug condition is not met, then the corresponding is to continue spinning, incrementing the selectCtn until 512 times
Doesn’t that solve the pain point in NIO where CPU spikes are caused by epoll bugs? That is, Netty underwrites the use of epoll
The selector returned prematurely many times in a row. Rebuild The selector to work around The problem
Because the selector returns prematurely many times in a row; Rebuild the selector to fix the problem!
That’s it for NioEventLoop!