The last two posts have described what the netTY REACTOR thread did in the first two steps. Here, we use this image to review:

A quick summary of the REACTOR Thread trilogy

  1. Polling an I/O event. Procedure
  2. Processing I/O Events
  3. Processing task queue

Today, we’re going to move on to the last track in the trilogy, Processing the Task Queue, shown in purple above.

After reading this article, you will learn about Netty’s asynchronous task mechanism and timing task processing logic. These details will help you write Netty applications

Common application scenarios of Tasks in Netty

We take three typical task usage scenarios for analysis

1. User-defined common tasks

ctx.channel().eventLoop().execute(new Runnable() {
    @Override
    public void run(a) {
        / /...}});Copy the code

Let’s follow the execute method. Let’s focus

@Override
public void execute(Runnable task) {
    / /...
    addTask(task);
    / /...
}
Copy the code

Execute method calls addTask method

protected void addTask(Runnable task) {
    // ...
    if (!offerTask(task)) {
        reject(task);
    }
}
Copy the code

The offerTask method is then called, and if the offer fails, the Reject method is called, throwing an exception directly through the default RejectedExecutionHandler

final boolean offerTask(Runnable task) {
    // ...
    return taskQueue.offer(task);
}
Copy the code

With the offerTask method, the task basically lands and Netty uses a taskQueue internally to store the task. What is that taskQueue?

We look at where taskQueue is defined and initialized

private final Queue<Runnable> taskQueue;


taskQueue = newTaskQueue(this.maxPendingTasks);

@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    // This event loop never calls takeTask()
    return PlatformDependent.newMpscQueue(maxPendingTasks);
}

Copy the code

We found that taskQueue is MPSC queue by default in NioEventLoop. MPSC queue is a multi-producer/single consumer queue. Netty uses MPSC to easily aggregate tasks from external threads and execute tasks by single thread within reactor thread. We can learn from netty’s task execution mode to deal with applications such as multithreaded data reporting and timed aggregation

In the reactor thread, all code is executed within the reactor thread, so inEventLoop() returns true wherever it is called. Since it is executed within the REACTOR thread, the MPSC queue is not really used. Where the MPSC comes into its own is in the second scenario

Methods that call channel by a non-current REACTOR thread

// non reactor thread
channel.write(...)
Copy the code

The above situation is quite common in the push system. Generally, in the business thread, the corresponding channel reference is found according to the user’s identity, and then the write class method is called to push the message to the user, and this scenario will be entered

The call chain of the channel.write() class method will be dissected in a separate article, but all we need to know is that the final write method is strung through the following methods

AbstractChannelHandlerContext.java

private void write(Object msg, boolean flush, ChannelPromise promise) {
    // ...
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else{ next.invokeWrite(m, promise); }}else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else{ task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); }}Copy the code

When an external thread calls write, executor.ineventloop () returns false and goes directly to the else branch, encapsulating write as a WriteTask. So the flush argument is false), and the safeExecute method is called

private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
    // ...
    executor.execute(runnable);
    // ...
}
Copy the code

The following call chain will enter the first scenario, but there is an obvious difference between the first scenario and the first scenario. The call chain of the first scenario is initiated by the reactor thread, while the call chain of the second scenario is initiated by the user thread. There may be many user threads. Obviously, multiple threads writing to a taskQueue can cause thread synchronization problems, so netty’s MPSC queue is useful in this scenario

3. User-defined scheduled tasks

ctx.channel().eventLoop().schedule(new Runnable() {
    @Override
    public void run(a) {}},60, TimeUnit.SECONDS);

Copy the code

The third scenario is the timed task logic, which is used most often as follows: perform the task after a certain amount of time

We follow up the schedule method

publicScheduledFuture<? > schedule(Runnable command,long delay, TimeUnit unit) {
/ /...
    return schedule(new ScheduledFutureTask<Void>(
            this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
} 
Copy the code

ScheduledFutureTask rewraps a user – defined task as an internal Netty task

<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    // ...
    scheduledTaskQueue().add(task);
    // ...
    return task;
}
Copy the code

Netty uses an MPSC queue to load unscheduled tasks. Is there a similar queue for scheduled tasks? With that question in mind, we moved on

Queue<ScheduledFutureTask<? >> scheduledTaskQueue() {if (scheduledTaskQueue == null) {
        scheduledTaskQueue = newPriorityQueue<ScheduledFutureTask<? > > (); }return scheduledTaskQueue;
}
Copy the code

Sure enough, the scheduledTaskQueue() method returns a priority queue and then calls add to queue the scheduled task, but why use a priority queue instead of multi-threaded concurrency?

Because the reactor thread is the initiator of the call chain in the scenario we are discussing, there are no problems with multi-threading concurrency

But what if some users perform timed tasks outside of REACTOR? This type of scenario is rare, but netty, as an incredibly robust high-performance IO framework, must be considered.

Netty does this by wrapping the logic of adding a scheduled task into a normal task if it is called on an external thread. The task is to add [add scheduled task] tasks instead of adding scheduled tasks, which is the second scenario. Access to PriorityQueue becomes single-threaded, i.e. only reactor threads

The complete Schedule method

<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if (inEventLoop()) {
        scheduledTaskQueue().add(task);
    } else {
        // Go to Scenario 2 to further encapsulate the task
        execute(new Runnable() {
            @Override
            public void run(a) { scheduledTaskQueue().add(task); }}); }return task;
}
Copy the code

In the process of reading the source details, we should ask a few more why? This will be conducive to reading the source code when not sleepy! Here, for example, why are scheduled tasks stored in priority queues? We can look at the source code for a moment and think about the nature of priority columns

A priority queue ranks internal elements in a certain order. Internal elements must be comparable. If each element is a timed task, then timed tasks can be compared.

Each task has a deadline for the next execution, the deadline can be compared, under the same deadline, the order of task addition can also be compared, just like this, in the process of reading the source code, be sure to talk to yourself more, ask a few more why

With a guess, we look at the ScheduledFutureTask and extract the key parts

final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
    private static final AtomicLong nextTaskId = new AtomicLong();
    private static final long START_TIME = System.nanoTime();

    static long nanoTime(a) {
        return System.nanoTime() - START_TIME;
    }

    private final long id = nextTaskId.getAndIncrement();
    /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
    private final long periodNanos;

    @Override
    public int compareTo(Delayed o) {
        / /...
    }

    // Simplified code
    @Override
    public void run(a) {}Copy the code

Here we find the compareTo method at a glance, CMD + U jumps to the implemented interface and discovers that it is the Comparable interface

public int compareTo(Delayed o) {
    if (this == o) {
        return 0; } ScheduledFutureTask<? > that = (ScheduledFutureTask<? >) o;long d = deadlineNanos() - that.deadlineNanos();
    if (d < 0) {
        return -1;
    } else if (d > 0) {
        return 1;
    } else if (id < that.id) {
        return -1;
    } else if (id == that.id) {
        throw new Error();
    } else {
        return 1; }}Copy the code

When entering the body of the method, we find that the comparison of two scheduled tasks is true: the deadline time of the tasks is compared first. If the deadline time is the same, then the id is compared, that is, the order in which the tasks are added. If the IDS are the same again, the Error is thrown

In this way, when executing scheduled tasks, the tasks with the latest deadline can be executed first

Netty ensures the execution of scheduled tasks. There are three types of scheduled tasks in Netty

Execute the command once after a certain period of time. 2. Execute the command once at intervals. 3

Netty uses a periodNanos to distinguish between the three situations, as shown in Netty’s notes

/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
Copy the code

With that in mind, how does Netty handle these three different types of scheduled tasks

public void run(a) {
    if (periodNanos == 0) {
        V result = task.call();
        setSuccessInternal(result);
    } else { 
        task.call();
        long p = periodNanos;
        if (p > 0) {
            deadlineNanos += p;
        } else {
            deadlineNanos = nanoTime() - p;
        }
            scheduledTaskQueue.add(this); }}}Copy the code

If (periodNanos == 0) Corresponds to the type of the periodic task that is executed once in a certain period of time. The task ends when periodNanos is completed.

PeriodNanos is greater than 0, indicating that a task is executed with a fixed frequency, which is irrelevant to the duration of the task. Then, periodperiodos is set to the current deadline plus the interval periodperiodos. Otherwise, The cut-off time is the current time plus the interval time. -P means plus a positive interval time. Finally, the current task object is added to the queue again to realize the scheduled execution of the task

Once we have a good understanding of the process for adding tasks within Netty, how did reactor TRILOGY 1 schedule these tasks

Schedule tasks for the REACTOR thread

First, let’s turn our attention to the outermost facade code

runAllTasks(long timeoutNanos);
Copy the code

As the name suggests, this line of code represents trying to run all the tasks in a given amount of time. TimeoutNanos indicates that the method takes this long at most. Why is Netty doing this? If you think about it, the REACTOR thread that sits around too long will accumulate too many I/O events that can’t be processed (see the first two steps of the REACTOR thread), resulting in a large number of client requests blocking, so by default netty controls the execution time of the internal queue

All right, we’ll follow up

protected boolean runAllTasks(long timeoutNanos) {
    fetchFromScheduledTaskQueue();
    Runnable task = pollTask();
    / /...

    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        safeExecute(task);
        runTasks ++;
        if ((runTasks & 0x3F) = =0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }

        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }

    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}
Copy the code

This code is all the logic that REACTOR uses to execute a task, which can be broken down into the following steps

  1. Transferring scheduled tasks from scheduledTaskQueue to taskQueue(MPSC queue)
  2. Calculate the cutoff time for this task cycle
  3. Perform a task
  4. finishing

Let’s go through this step by step

Transferring scheduled tasks from scheduledTaskQueue to taskQueue(MPSC queue)

First call fetchFromScheduledTaskQueue () method, transfer the timing task due to MPSC queue

private boolean fetchFromScheduledTaskQueue(a) {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    while(scheduledTask ! =null) {
        if(! taskQueue.offer(scheduledTask)) {// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.scheduledTaskQueue().add((ScheduledFutureTask<? >) scheduledTask);return false;
        }
        scheduledTask  = pollScheduledTask(nanoTime);
    }
    return true;
}
Copy the code

As you can see, Netty is very careful when transferring tasks from scheduledTaskQueue to taskQueue. When the taskQueue fails to offer, the tasks removed from scheduledTaskQueue need to be added back

The logic for pulling a scheduled task from the scheduledTaskQueue is as follows: the passed parameter nanoTime is the current time (actually the current nanosecond minus the number of nanoseconds the ScheduledFutureTask class loaded)

protected final Runnable pollScheduledTask(long nanoTime) {
    assert inEventLoop(a); Queue<ScheduledFutureTask<? >> scheduledTaskQueue =this.scheduledTaskQueue; ScheduledFutureTask<? > scheduledTask = scheduledTaskQueue ==null ? null : scheduledTaskQueue.peek();
    if (scheduledTask == null) {
        return null;
    }

    if (scheduledTask.deadlineNanos() <= nanoTime) {
        scheduledTaskQueue.remove();
        return scheduledTask;
    }
    return null;
}
Copy the code

As you can see, pollScheduledTask will be fetched only after the current task has expired

Calculate the cutoff time for this task cycle

     Runnable task = pollTask();
     / /...
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
Copy the code

This step takes the first task, calculates the deadline of the current task loop using the timeoutNanos passed in by the REACTOR thread, and uses runTasks and lastExecutionTime to keep track of the status of the task

Loop the task

for (;;) {
    safeExecute(task);
    runTasks ++;
    if ((runTasks & 0x3F) = =0) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
        if (lastExecutionTime >= deadline) {
            break;
        }
    }

    task = pollTask();
    if (task == null) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
        break; }}Copy the code

This step is the core code to perform all tasks in Netty. SafeExecute is first called to ensure that the task executes safely, ignoring any exceptions

protected static void safeExecute(Runnable task) {
    try {
        task.run();
    } catch (Throwable t) {
        logger.warn("A task raised an exception. Task: {}", task, t); }}Copy the code

Then add one to the running task runTasks. Every 0x3F (64 tasks), determine whether the current time has exceeded the deadline of the reactor task cycle. If so, break it; if not, continue. It can be seen that Netty’s performance optimization is quite thoughtful. If there are a large number of small tasks in the Netty task queue, if you have to judge whether the deadline is reached every time you finish the task, then the efficiency is relatively low

finishing

afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
Copy the code

To wrap things up, call the afterRunningAllTasks method

@Override
protected void afterRunningAllTasks(a) {
        runAllTasksFrom(tailTasks);
}
Copy the code

NioEventLoop can tailTasks via the superclass SingleTheadEventLoop executeAfterEventLoopIteration method to add finishing task, for instance, You can call this method if you want to count how long it takes to execute a task loop each time

public final void executeAfterEventLoopIteration(Runnable task) {
        // ...
        if(! tailTasks.offer(task)) { reject(task); }/ /...
}
Copy the code

this.lastExecutionTime = lastExecutionTime; Simply record the execution time of the task, search the reference to the field, and find that the field is not used, but constantly assign, assign, assign… , another day to netty official to raise an issue…

The reactor Thread 3 is basically finished for you, if you feel comfortable reading this, then congratulations to you, you are very familiar with netty’s task mechanism, also congratulations to me, this mechanism will be clear to you. Let’s wrap it up one last time, tips style

  • The current REACTOR thread calls the current eventLoop to execute the task directly, otherwise, it is added to the task queue for later execution
  • Netty tasks are classified into common tasks and scheduled tasks, which land in MpscQueue and PriorityQueue respectively
  • Netty transfers scheduled tasks that have expired from PriorityQueue to MpscQueue before each task cycle is executed
  • Netty checks every 64 tasks to see if it is time to exit the task loop

If you want to systematically learn Netty, my little book “Netty introduction and Actual Practice: Imitate writing wechat IM instant messaging system” can help you, if you want to systematically learn Netty principle, then you must not miss my Netty source code analysis series video: Coding.imooc.com/class/230.h…