[TOC]

The introduction

In many business scenarios, we encounter the need for delayed tasks, timed tasks. In particular, in network connection scenarios, there is often some kind of timeout control. Because of the large number of connections on the server, the number of these timeout tasks is often large. Implementing time-out management for a large number of tasks is not an easy task.

In this chapter, we introduce several data structures used to implement timeout tasks, and finally examine the structure and code adopted by Netty for timeout tasks.

Welcome to join the technical exchange group 186233599 discussion exchange, also welcome to pay attention to the author’s public account: Fenghuo said.

JDK native support for timeout tasks

java.util.Timer

The JDK introduced the Timer data structure in 1.3 to implement timed tasks. The implementation idea of Timer is relatively simple, and it has two main internal attributes:

  • TaskQueue: Abstract class of scheduled taskTimeTaskIn the list.
  • TimerThread: Thread used to execute scheduled tasks.

The Timer structure also defines an abstract class, TimerTask, that inherits the Runnable interface. The business system implements the run method of this abstract class to provide concrete delayed task logic.

TaskQueue uses a big top heap internally to sort tasks by their trigger time. TimerThread, on the other hand, retrieves the queue header from the TaskQueue in an infinite loop, waits until the task’s timeout expires and then triggers the task and removes the task from the queue.

The Timer’s data structure and algorithm are easy to understand. All timeout tasks enter the delay queue first. Background timeout threads continually fetch tasks from the delay queue and wait until the timeout period expires to execute the task. Delay queue adopts big top heap sort. In the scenario of delayed tasks, there are three operations: adding tasks, extracting queue header tasks, and viewing queue header tasks.

The event complexity of the view queue header task is O(1). The time complexity of adding tasks and extracting queue headers is O(Log2n). When the number of tasks is large, the overhead of adding and deleting is also high. In addition, because there is only one processing thread inside the Timer, if the processing of a delayed task consumes a lot of time, the processing of subsequent tasks will be delayed accordingly.

ScheduledThreadPoolExecutor

Since the Timer has only one thread to handle delayed tasks, it is obviously not enough when there are many tasks. With the introduction of the thread pool interface in JDK1.5, a ScheduledExecutorService subclass interface is provided to handle delayed tasks. Internally, the interface also uses a deferred queue to hold tasks that are sorted using the small top heap. Threads in the thread pool will wait on this queue until there are tasks to extract.

The implementation of ScheduledExecutorService is special in that only one thread can pick up the delay queue header and wait based on the timeout of the task. During this wait, no other thread can acquire the task. The purpose of this implementation is to prevent multiple threads from obtaining tasks at the same time, causing tasks to be triggered before the timeout period is reached or new tasks to be added while waiting for the timeout period.

Because ScheduledExecutorService can use multiple threads, it also reduces the possibility of subsequent tasks being blocked because individual tasks take too long to execute. However, the delay queue also adopts the small top heap sorting mode, so the time complexity of adding and deleting tasks is O(Log2n). In the case of a large number of tasks, the performance is poor.

More efficient data structures

Although the Timer and ScheduledThreadPoolExecutor provides support for the delayed work ability, but because of the new task and extraction task time complexity is O (Log2n), a high number of tasks, such as tens of thousands, tens of, performance overhead is very large.

Are there data structures that are less complex than O(Log2n) for new and extract tasks? There is an answer. In the paper Hashed and Hierarchical Timing Wheels, a data structure named Timing Wheels is designed. When dealing with delayed tasks, the time complexity of new and deleted tasks is reduced to O(1).

The basic principle of

The data structure of a time wheel is very similar to the data Pointers on our clocks, hence the name time wheel. Its data structure is illustrated as follows

Each time “grid” is called a slot, and a slot houses a queue of delayed tasks. The slot itself represents a unit of time, such as one second. The number of slots owned by the time wheel is the task with the maximum delay span that the time wheel can handle, and the time unit of the slots represents the accuracy of the time wheel. This means that times less than units of time are indistinguishable in that time wheel.

The tasks in the delay task queue on the slot have the same delay time. With each unit of time, the pointer moves to the next slot. When the pointer points to a slot, all the tasks in the deferred task queue of the slot are triggered.

When a delay task needs to be inserted into the time wheel, the residual value of the delay time and unit time is calculated first, and the number of residual value is moved from the current slot pointed by the pointer, which is the slot that the delay task needs to be inserted into.

For example, the time wheel has eight slots, numbered from 0 to 7. The pointer currently points to slot 2. Add a delay task with a delay time of 4 seconds, 4% 8 = 4, so the task will be inserted into the delay task queue in slot 6 (4 + 2 = 6).

The slot implementation of the time wheel can be achieved by looping through an array, with the pointer returning to the starting subscript after crossing the boundary of the array. In summary, the algorithm of time wheel can be described as

Queue is used to store delayed tasks. The tasks in the same queue have the same delay time. Store elements in a loop array, with each element pointing to a deferred task queue.

There is a current pointer to a slot in the array that moves to the next slot every unit of time. Delay queue of the slot to which the pointer points, in which all delay tasks are triggered.

Add a delay task in the time wheel, divide the delay time by the residual value per unit time, and move the slot corresponding to the residual value from the current pointer, which is the slot in which the delay task is placed.

Based on this data structure, the time complexity of inserting a delayed task is reduced to O(1). When the pointer points to a slot, all the deferred tasks in the queue connected to the slot are triggered.

The firing and execution of a delayed task should not affect the timing accuracy of the backward movement of the pointer. Therefore, in general, the thread used to move the pointer is only responsible for triggering the task, leaving the execution of the task to other threads. For example, a queue of deferred tasks in a slot can be put into an additional thread pool for execution, and then a new queue of deferred tasks can be created in the slot for subsequent tasks to be added.

Support more out-of-range latency times

In the fundamentals we analyze the infrastructure of the time wheel. However, we assumed that the delay time of the tasks to be inserted would not exceed the length of the time wheel, which means that the delay time of the tasks in the delay task queue in each slot is the same.

In this case, to support delayed tasks with larger time spans, either increase the number of slots in the time wheel or decrease the precision of the time wheel, which is the unit time each slot represents. The accuracy of the time wheel is obviously a business requirement, so the number of slots can only be increased. Assume that the accuracy is 1 second and the time wheel slot number is 86400 (60 x 60 x 24) to support the delay task with a delay time of 1 day. This consumes more memory. Obviously, simply increasing the number of slots is not a good solution.

In this paper, we provide two extension schemes for long – span delayed task support.

Scheme 1: Delay tasks of different turns coexist in the same delay queue

In this scheme, the algorithm introduces the concept of “rounds”, and the quotient of the delay time of the delayed task divided by the length of the time round is rounds. The remainder of the delay time of the delay task divided by the length of the time wheel is the offset of the slot to be inserted.

When inserting a delay task, calculate the number of cycles and the slot offset first, and determine the slot for inserting a delay task by the slot offset. When the pointer points to a slot, the queue for the deferred tasks pointed to by the slot is traversed. All the deferred tasks with 0 turns are triggered, and the remaining tasks wait for the next period.

By introducing rounds, it is possible to support an infinite time range of deferred tasks in a limited number of slots. However, although the time complexity of the inserted task is still O(1), when the delayed task is triggered, it needs to traverse the delayed task queue to confirm whether its turn is 0. The task trigger time is complex but increases to O(n).

A variation of the detail that can be used in this case is to sort the deferred task queue by round, for example using the small top heap. In this way, when the pointer points to a slot and triggers a delayed task, you only need to take the task out of the queue head for the check of the job cycles. Once the job cycles are not equal to 0, you can stop the task. The time complexity of task triggering is reduced to O(1). Correspondingly, since the queue is sorted, the task insertion needs to locate not only the insertion slot but also the insertion position in the queue. The time complexity of insertion is O(1) and O(Log2n), where n is the length of the delayed task queue in the slot.

Scheme 2: multi-level time wheel

Look at the design of the watch, the second hand, the minute hand, the hour hand. The second and minute hands, for example, both have 60 cells, but each cell represents a different length of time. With this idea in mind, we can declare multiple time wheels of different levels, and the time span of the slots of each time wheel is the whole time range of the next time wheel.

When the pointer of the lower level time wheel completes a full circle, its corresponding higher level time wheel moves one slot. In addition, the tasks in the slots pointed to by the time wheel pointer at the upper level are put into different slots of the time wheel at the lower level according to the delay time. In this way, the tasks in the delay task queue of each slot in each time wheel have the same delay time accuracy.

Taking a time wheel with a precision of 1 second and a time range of 1 day as an example, a three-level time wheel can be designed: a second-level time wheel has 60 slots, and the time of each slot is 1 second; The minute-level time wheel has 60 slots, and the time of each slot is 60 seconds. The hour-level time wheel has 24 slots, and each slot has a time of 60 minutes. When the second-level time wheel runs for 60 seconds, the pointer of the second-level time wheel points to the slot with subscript 0 again, and the pointer of the minute-level time wheel moves backward by one slot. After all the delayed tasks in the slot are removed and recalculated, the pointer is put into the second-level time wheel.

A total of 60 + 60 + 24 = 144 slots are required for support. This saves a lot of memory compared to the 86,400 slots required for the single-stage time wheel mentioned above.

There are two common approaches to hierarchical time rounds:

  • Fixed time range: The number of time wheels, as well as the number of slots for different levels of time wheels, is specified by constructor input, meaning that the time range that the whole time wheel can support is determined at constructor time.
  • Unfixed time range: Define the number of slots for a time wheel and the minimum slot time for a time wheel. When the time of the inserted delay task exceeds the time wheel range, a higher-level time wheel is dynamically generated. Because the time wheel is generated during the running time and calculated according to the delay time of the task, when the existing time wheel does not meet the requirements of its delay time range, the high-level time wheel is dynamically generated. Therefore, there is no upper limit for the overall time range that can be supported.

Netty time round implementation

The core idea of time wheel algorithm is to reduce the time complexity of new delayed tasks to O(1) by means of cyclic array and pointer movement. However, different implementations have some changes in details, including how to deal with delayed tasks with a larger time span. Here we take Netty time wheel implementation as an example to conduct code analysis.

The interface definition

The Netty implementation defines a timeout interface io.netty.util.Timer as follows

public interface Timer
{
    // Add a new delay task. The input parameter is the scheduled task TimerTask and the corresponding delay time
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
    // Stops the time wheel and returns all delayed tasks that have not been triggered
    Set < Timeout > stop();
}
public interface Timeout
{
    Timer timer(a);
    TimerTask task(a);
    boolean isExpired(a);
    boolean isCancelled(a);
    boolean cancel(a);
}
Copy the code

The Timeout interface encapsulates a deferred task, and its interface methods state that the implementation internally needs to maintain the state of the deferred task. This will be easier to see later when we analyze the internal code of its implementation.

The Timer interface has a unique implementation of HashedWheelTimer. First, let’s look at its construction method, as follows

Building a loop array

public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts)
{
    // omit code, omit parameter non-empty check content.
    wheel = createWheel(ticksPerWheel);
    mask = wheel.length - 1;
    // Omit code, omit slot time range checking, avoid overflow and less than 1 ms.
    workerThread = threadFactory.newThread(worker);
    // Omit code, omit resource leak tracking Settings and time cycle instance count check
}
Copy the code

The first is the method createWheel, which creates the core data structure of the time wheel, the loop array. Let’s look at the method content

private static HashedWheelBucket[] createWheel(int ticksPerWheel)
{
    // Omit the code to make sure ticksPerWheel is in the correct range
    // Normalize ticksPerWheel to a power of 2.
    ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
    HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
    for(int i = 0; i < wheel.length; i++)
    {
        wheel[i] = new HashedWheelBucket();
    }
    return wheel;
}
Copy the code

The length of the array is 2 to the power for easy quotient and mod calculation.

Inside HashedWheelBucket, a bidirectional linked list composed of HashedWheelTimeout nodes is stored, as well as the head node and tail node of the linked list, which is convenient for task extraction and insertion.

Adding a delayed Task

The HashedWheelTimer#newTimeout method is used to add a delayed task. Here’s the code

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit)
{
    // omit code for parameter checking
    start();
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
    if(delay > 0 && deadline < 0)
    {
        deadline = Long.MAX_VALUE;
    }
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);
    return timeout;
}
Copy the code

As you can see, when new tasks are added, they are not directly added to the loop array. Instead, they are first placed into a queue, namely the timeouts property. This queue is an MPSC queue. This is because only the thread workerThread does the task extraction on this queue.

This thread is created in the constructor by calling workerThread = threadFactory.newThread(worker). However, instead of executing the thread’s start method immediately after creation, it starts when the time round first adds a delayed task, which is the content of the start method in this method. Here’s the code

public void start(a)
{
    switch(WORKER_STATE_UPDATER.get(this))
    {
        case WORKER_STATE_INIT:
            if(WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED))
            {
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    }
    while(startTime == 0)
    {
        try
        {
            startTimeInitialized.await();
        }
        catch(InterruptedException ignore)
        {
            // Ignore - it will be ready very soon.}}}Copy the code

The first part is the Switch method block. The CAS operation on the state variable ensures that only one thread can execute the workerthread.start () method to start the workerThread and avoid concurrent exceptions. The second part is the blocking wait, which is performed through the CountDownLatch type variable startTimeInitialized to wait for the workerThread workerThread to actually enter the working state.

From the perspective of newTimeout method, the delayed task is first inserted into the queue. When analyzing the data structure before, it has been said that the task is triggered when the pointer points to a slot in the time wheel, so there must be a job that needs to put the delayed task in the queue into the array of the time wheel. This action is obviously done by the workerThread. Let’s look at the specific code content of this thread.

The workerThread

The Worker thread relies on the hashedWheelTimer. Worker class, which implements the Runnable interface, to work. Then, the implementation code of the run method is shown below

public void run(a)
{{// Code block ①
        startTime = System.nanoTime();
        if(startTime == 0)
        {
            // startTime==0 is used to indicate that the thread is in working mode, so this is reassigned to 1
            startTime = 1;
        }
        // Notify the external initializer thread that the worker thread has been started
        startTimeInitialized.countDown();
    }
    {// Code block ②
        do {
            final long deadline = waitForNextTick();
            if(deadline > 0)
            {
                int idx = (int)(tick & mask); processCancelledTasks(); HashedWheelBucket bucket = wheel[idx]; transferTimeoutsToBuckets(); bucket.expireTimeouts(deadline); tick++; }}while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
    }
    {// Code block ③
        for(HashedWheelBucket bucket: wheel)
        {
            bucket.clearTimeouts(unprocessedTimeouts);
        }
        for(;;)
        {
            HashedWheelTimeout timeout = timeouts.poll();
            if(timeout == null)
            {
                break;
            }
            if(!timeout.isCancelled())
            {
                unprocessedTimeouts.add(timeout);
            }
        }
        processCancelledTasks();
    }
}
Copy the code

Thread startup and preparation

For ease of reading, the run method is divided into three code blocks. Let’s start with the code block ①. The System call system.nanotime sets the initial value for startTime, which represents the baseline time of the time wheel for subsequent relative time calculations. After the assignment, external waiting threads are notified via the startTimeInitialized variable.

Driver pointer and task trigger

Next, look at the code block ②. This is the main working part, and the whole thing is in a while loop that ensures that the worker thread only works when the time wheel is not terminated. First, let’s look at the waitForNextTick method. In the time wheel, the pointer moves once, which is called a tick. This method is obviously internally used to wait for the pointer to move to the next tick

private long waitForNextTick(a)
{
    long deadline = tickDuration * (tick + 1);
    for(;;)
    {
        final long currentTime = System.nanoTime() - startTime;
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
        if(sleepTimeMs <= 0)
        {
            if(currentTime == Long.MIN_VALUE)
            {
                return -Long.MAX_VALUE;
            }
            else
            {
                returncurrentTime; }}if(PlatformDependent.isWindows())
        {
            sleepTimeMs = sleepTimeMs / 10 * 10;
        }
        try
        {
            Thread.sleep(sleepTimeMs);
        }
        catch(InterruptedException ignored)
        {
            if(WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN)
            {
                returnLong.MIN_VALUE; }}}}Copy the code

The whole idea is very simple. As mentioned earlier, each time the pointer of the time wheel moves, it means one tick. The tick can be viewed as the number of times the pointer moves. Since the slot time range is fixed, you can easily calculate the time that should pass when the pointer moves to the next slot, that is, long Deadline = tickDuration * (tick + 1). Long currentTime = System.nanoTime() -startTime The difference is how much sleep the thread needs.

If the difference is less than 0, it means that the actual time has passed beyond the theoretical time, and the method needs to return immediately. Because it is possible to have a time wheel stopped during the execution of this method, a special value is used to express the event, long.min_value, which is why currentTime avoids this value.

It is also important to note that the implementation of thread. sleep relies on interrupt checking provided by the operating system, which checks for any threads to wake up and provide CPU resources at each interrupt. The default Linux interrupt interval is 1 millisecond, while Windows interrupt interval is 10 or 15 milliseconds, depending on hardware recognition.

TimeBeginPeriod () and timeEndPeriod() are called to change the interrupt period to 1 millisecond if thread. sleep is not an integer multiple of 10. And set the default value again after the hibernation ends. The goal is to keep the dormancy time accurate. However, on Windows platform, frequent calls to modify the interrupt cycle will lead to abnormal Windows clock, most of the time the performance is to speed up the clock. This can result in, for example, sleeping for 9 seconds when you try to sleep for 10 seconds. So here, sleepTimeMs = sleepTimeMs / 10 * 10 ensures that sleepTimeMs is an integer multiple of 10, thus avoiding the Windows BUG.

When waitForNextTick returns and the value returned is positive, it means that the current tick sleep wait has been completed and the delay task can be triggered. Determine the subscript of the next slot in the loop array to be triggered by a call to int idx = (int)(tick & mask). Before processing the triggering task, delete the cancelled delayed task from the delay task queue pointed to by the slot. A Timeout object is returned each time HashedWheelTimer#newTimeout is called to create a new delayed task, which can be cancelled using the cancle method. When the cancellation action will not be deleted from the queue delay directly, but will be cancelled this object into the queue, namely HashedWheelTimer. CancelledTimeouts attributes. Before you are ready to traverse the queue of delayed tasks on the slot, the method processCancelledTasks traverses the de-queue to remove the delayed tasks from the queue of delayed tasks on their respective slots. The advantage of using this method is that the deletion of delayed tasks can be carried out by only one thread, which avoids the concurrent interference brought by multiple threads and reduces the difficulty of development.

After in dealing with the cancel the delay of tasks, call methods transferTimeoutsToBuckets to new task queue delay HashedWheelTimer. Timeouts delayed tasks were added to the right of the delay time slot. The code for the method is simple: the loop keeps pulling tasks out of Timeouts and computes the quotient and remainder of the delay time and the time cycle range, resulting in the cycle and slot subscripts, respectively. Add the task to the delay task queue corresponding to the slot based on the subscript of the slot.

Here you can see the Netty authors’ concurrent design of the time wheel structure, where the new task is to add elements to the MPSC queue implementation. The slot delay task queue can be added or deleted only by the thread of the time wheel itself, which is designed in SPSC mode. The former is to improve the performance of lock-free concurrency, while the latter is to reduce the design difficulty through constraints.

TransferTimeoutsToBuckets method at most 100000 delay will only transfer tasks into an appropriate slot, this is to avoid external circulation add tasks to starve to death. After the execution of the HashedWheelBucket#expireTimeouts method, it is time to delay the triggering of the task in the slot, which is the function of HashedWheelBucket#expireTimeouts. The logic in the method is also very simple. Traverses the queue. If the number of rounds of the delayed task is not 0, it is reduced by 1. Otherwise the task execution method, known as HashedWheelOut #expire, is triggered. In this method, the status is still updated through CAS to avoid the competition conflict between method trigger and cancellation. As you can see from the implementation of this method, Netty uses a round approach to support delay times beyond the time round range. The implementation of multi-level time wheel is more complex than the implementation of the concept of rounds. Considering that in network IO applications, there are few scenes beyond the scope of time wheel, and it is relatively easy to implement the scheme of rounds to support more time.

When all the delayed tasks to be triggered are triggered, tick 1 is added to indicate that the pointer moves to the next slot.

Time wheel stop

An external thread stops the time wheel by calling the HashedWheelTimer#stop method, simply by modifying the state properties of the time wheel with a CAS call. In code block 2, the status bit is checked each time through a loop. The content of code block ③ is very simple. It iterates through all slots, and iterates through the queue of delayed tasks of slots, putting all tasks that have not reached the delay time and have not been canceled into a set, and finally returning the set. This collection stores all the delayed tasks that failed to execute.

Thinking summary

Time wheel is a very efficient algorithm and data structure for processing a lot of delayed tasks. Netty on the implementation of the time round, in the task, expired task, delete task and other links on some details of the adjustment. In fact, there are several implementations of time wheels in different middleware, and each has its own differences, but the core is centered around the concept of cyclic arrays and slot expiration. Different detail variations have their own appropriate scenarios and considerations.