Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

Time wheel principle analysis

Technology sometimes comes from life, for example, queuing to buy a ticket can be thought of as a queue, and the organizational relationship of a company can be understood as a tree, etc., while the design idea of time wheel algorithm comes from clocks and watches. As shown in the figure below, a time wheel can be understood as a circular structure, divided into multiple slots like a clock. Each slot represents a time range. Multiple tasks can be stored in each slot. A linked list is used to store all tasks that expire within this time range. The time wheel rotates by an hour hand in slot by slot and executes all due tasks in the slot.

How are tasks added to the time wheel? You can mold tasks based on their expiration time and then distribute them into different slots. As shown in the figure above, the time wheel is divided into eight slots, with each slot representing 1s and the current hour pointing to 2. If a task needs to be scheduled after 3s, slot 2+3=5 should be added. If a task needs to be scheduled 12 seconds later, slot 6 (2+12)%8 needs to be inserted after the hour hand completes a full circle round 0.

So when the pin reaches slot 6, how do you tell if each task needs to be executed immediately, or if it needs to wait for the next round, or even longer? So we need to save the round information in the task. For example, the linked list of slot 6 contains three tasks. The first task is round=0 and needs to be executed immediately. The second task, round=1, needs to wait 18=8s before execution; The third task, round=2, will be executed after 28=8s. Therefore, when the needle moves to the slot, only the round of the slot is 0. The rounds of other tasks in the slot should be reduced by 1 and wait for the next round to execute.

The basic theory of the time wheel algorithm is introduced above. It can be seen that the time wheel is similar to HashMap. If multiple tasks correspond to the same slot, the zipper method is used to deal with conflicts. In a scenario with a large number of tasks, increasing the number of slots in the time wheel can reduce the number of tasks traversed during clockwise rotation.

The biggest advantage of time wheel timers is that tasks can be added and cancelled in O(1) time, and only one thread can drive the time wheel to work. HashedWheelTimer is an implementation class of the Time wheel algorithm in Netty. Below, I will analyze the implementation principle of the time wheel algorithm in detail based on the source code of HashedWheelTimer.

Netty HashedWheelTimer source code analysis

Before starting to learn the source code of HashedWheelTimer, it is necessary to understand the HashedWheelTimer interface definition and related components in order to better use HashedWheelTimer.

The interface definition HashedWheelTimer implements the interface IO.net ty.util.Timer, and the Timer interface is a good entry for us to study HashedWheelTimer. Let’s look at the definition of the Timer interface:

public interface Timer {
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
    Set<Timeout> stop();
}
Copy the code

The Timer interface provides two methods, namely, creating a task newTimeout() and stopping all unexecuted tasks stop(). As can be seen from the definition of the method, Timer can be considered as the upper time scheduler. The newTimeout() method can submit a task, TimerTask, and return a Timeout. TimerTask and Timeout are two interface classes. What do they do? Let’s look at the interface definitions for TimerTask and Timeout respectively:

public interface TimerTask {
    void run(Timeout timeout) throws Exception;
}
public interface Timeout {
    Timer timer();
    TimerTask task();
    boolean isExpired();
    boolean isCancelled();
    boolean cancel();
}
Copy the code

Timeout holds references to Timer and TimerTask, and cancels tasks through the Timeout interface. The relationship between Timer, Timeout and TimerTask is shown in the following figure:

With the definition of HashedWheelTimer’s interface and the concepts of its related components clear, it’s time to start using it.

Quick learning

Here’s a simple example of how HashedWheelTimer is used.

public class HashedWheelTimerTest {
    public static void main(String[] args) {
        Timer timer = new HashedWheelTimer();
        Timeout timeout1 = timer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) {
                System.out.println("timeout1: " + new Date());
            }
        }, 10, TimeUnit.SECONDS);
        if (!timeout1.isExpired()) {
            timeout1.cancel();
        }
        timer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws InterruptedException {
                System.out.println("timeout2: " + new Date());
                Thread.sleep(5000);
            }
        }, 1, TimeUnit.SECONDS);
        timer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) {
                System.out.println("timeout3: " + new Date());
            }
        }, 3, TimeUnit.SECONDS);
    }
}
Copy the code

The code results are as follows:

timeout2: Mon Nov 09 19:57:04 CST 2020
timeout3: Mon Nov 09 19:57:09 CST 2020
Copy the code

A few lines of code that basically shows most of the uses of HashedWheelTimer. In the example, we started three TimerTasks with newTimeout(), and timeout1 was canceled so it didn’t execute. Timeout2 and timeout3 should be executed after 1s and 3s respectively. Timeout2 and Timeout3 print 5s differently because timeout2 blocks 5s. It can be seen that the task execution in the time wheel is serial. If the execution time of a task is too long, the scheduling and execution of subsequent tasks will be affected and task accumulation is likely to occur.

So far, the basic use method of HashedWheelTimer has been preliminarily understood, and the implementation principle of HashedWheelTimer will be further studied below.

The internal structure

Starting from the constructor of HashedWheelTimer, we combed out the internal implementation structure of HashedWheelTimer together with the time wheel algorithm introduced above.

public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, Boolean leakDetection, long maxPendingTimeouts) {wheel = createWheel(ticksPerWheel); // create a time wheel array structure mask = wheel.length - 1; Long duration = ununit.tonanos (tickDuration); WorkerThread = threadfactory.newthread (worker); / / create a worker thread leak = leakDetection | |! workerThread.isDaemon() ? leakDetector.track(this) : null; // Enable memory leak detection this.maxPendingTimeouts = maxPendingTimeouts; If the number of HashedWheelTimer instances exceeds 64, If (instance_counter. incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); }}Copy the code

The HashedWheelTimer constructor clearly lists several core attributes:

  • ThreadFactory, thread pool, but only one thread is created;

  • TickDuration: Indicates the tick time of the clockwise. It is the interval between the clockwise and the next slot.

  • Unit: indicates the unit of tickDuration.

  • TicksPerWheel: Indicates the number of slots in the time wheel. The default number is 512. The more slots allocated, the more memory space occupied.

  • LeakDetection, whether to enable memory leakDetection;

  • MaxPendingTimeouts: maximum number of tasks that can wait.

Here’s how HashedWheelTimer was created, and we’ll follow up directly with the createWheel() method:

Private static HashedWheelBucket[] createWheel(int ticksPerWheel) {// skip other code ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i ++) { wheel[i] = new HashedWheelBucket(); } return wheel; } private static int normalizeTicksPerWheel(int ticksPerWheel) { int normalizedTicksPerWheel = 1; while (normalizedTicksPerWheel < ticksPerWheel) { normalizedTicksPerWheel <<= 1; } return normalizedTicksPerWheel; } private static final class HashedWheelBucket { private HashedWheelTimeout head; private HashedWheelTimeout tail; // omit other code}Copy the code

The time wheel is created to create an array of HashedWheelBuckets, each of which represents a slot in the time wheel. From the structural definition of HashedWheelBucket, it can be seen that inside HashedWheelBucket is a bidirectional linked list structure, and each node of the bidirectional linked list holds a HashedWheelTimeout object. HashedWheelTimeout stands for a timed task. Each HashedWheelBucket contains two HashedWheelTimeout nodes, head and tail, so that the list traversal can be realized in different directions. The specific functions of HashedWheelBucket and HashedWheelTimeout will be further introduced below.

The length of the array needs to be a power of 2 because the time wheel uses ampersand. NormalizeTicksPerWheel () is used to find the minimum power of ticksPerWheel. This method is not well implemented. You can refer to the IMPLEMENTATION of JDK HashMap to expand tableSizeFor for performance optimization, as shown below. Of course normalizeTicksPerWheel() is only used during initialization, so it doesn’t matter.

static final int MAXIMUM_CAPACITY = 1 << 30;
private static int normalizeTicksPerWheel(int ticksPerWheel) {
    int n = ticksPerWheel - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
Copy the code

The main work of HashedWheelTimer initialization has been described, and its internal structure is similar to the time wheel algorithm introduced above.

Next, we will analyze how HashedWheelTimer can add, execute and cancel tasks by focusing on the three basic operations of timers.

Add tasks

After HashedWheelTimer initialization is complete, how do I add tasks to HashedWheelTimer? We naturally think of the newTimeout() method provided by HashedWheelTimer.

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {/ / omit other code long pendingTimeoutsCount = pendingTimeouts. IncrementAndGet (); if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal  to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } start(); Long deadline = system.nanotime () + unit.tonanos (delay) -starttime; Deadline if (delay > 0 && deadline < 0) {deadline = long. MAX_VALUE; } HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); // 2. Create a scheduled task timeouts.add(timeout); // 3. Add a task to Mpsc Queue return timeout; }Copy the code

private final Queue timeouts = PlatformDependent.newMpscQueue(); The newTimeout() method does three things: start the worker thread, create a scheduled task, and add the task to the Mpsc Queue. HashedWheelTimer’s worker threads take a lazy start approach that does not require the user to display calls. This has the advantage of avoiding the performance penalty of idling worker threads when there are no tasks in the time wheel. Start () ¶

public void start() {
    switch (WORKER_STATE_UPDATER.get(this)) {
        case WORKER_STATE_INIT:
            if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    }
    while (startTime == 0) {
        try {
            startTimeInitialized.await();
        } catch (InterruptedException ignore) {
        }
    }
}
Copy the code

Before starting the worker thread, the CAS operation is performed to obtain the status of the worker thread. If the status of the worker thread is started, the CAS operation is skipped. If it is not started, change the worker thread state again through the CAS operation, and then start the worker thread. The process of getting started is a direct call to the Thread#start() method, but we’ll leave out what the worker thread actually does for the moment.

Back to the main flow of newTimeout(), the logic is pretty simple. According to the task delay time passed in by the user, the deadline of the task can be calculated, and then the HashedWheelTimeout object of the timed task can be created. Finally, HashedWheelTimeout can be added to the Mpsc Queue. If you look at this, why not add HashedWheelTimeout directly to the time wheel? Instead of adding to the Mpsc Queue first, okay? The Mpsc Queue can be understood as a thread-safe Queue with multiple producers and single consumers. We will analyze the Mpsc Queue in detail in the next class, but we will not expand it here. As you can guess, HashedWheelTimer wants to use the Mpsc Queue to ensure thread-safety of multiple threads adding tasks to the time wheel.

When will the task be added to the time wheel and executed? There is not much information at this point, so we can only look for the answer to the question in the Worker thread.

Worker Worker is the core engine of the time wheel. As the hour hand rotates, the Worker completes the processing of due tasks. Let’s locate the Worker’s run() method to find out.

private final class Worker implements Runnable { private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); // Unprocessed task list private long Tick; @Override public void run() { startTime = System.nanoTime(); if (startTime == 0) { startTime = 1; } startTimeInitialized.countDown(); do { final long deadline = waitForNextTick(); If (deadline > 0) {if (deadline > 0) {if (deadline > 0) { Deadline <= 0 int idx = (int) (tick & mask); // 2. Get the subscript processCancelledTasks() of the current tick in HashedWheelBucket array; HashedWheelBucket bucket = wheel[idx]; transferTimeoutsToBuckets(); // 4. Add the task from the Mpsc Queue to the corresponding slot bucket.expireTimeouts(deadline); // 5. Execute the expired task tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // After the wheel exits, unexecuted and uncancelled tasks are removed from the slot and added to the list of unprocessed tasks so that the stop() method returns for (HashedWheelBucket bucket: wheel) { bucket.clearTimeouts(unprocessedTimeouts); } // Remove the task that has not yet been added to the slot, or add it to the list of unprocessed tasks if the task is not cancelled, so that the stop() method returns for (;;). { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (! timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } processCancelledTasks(); }}Copy the code

The core execution process of Worker thread Worker is the do-while loop in the code. As long as the Worker is in the STARTED state, the do-while loop will be executed. We split the process into the following steps and analyze them one by one.

Calculate the interval from the hour hand to the next tick by waitForNextTick(), and then sleep to the next tick.

The corresponding subscript of the current tick in HashedWheelBucket array is obtained through bit operation

Example Remove a canceled task.

Remove the task from the Mpsc Queue and add it to the corresponding HashedWheelBucket.

Execute expired tasks in the current HashedWheelBucket.

The waitForNextTick() method calculates the wait time.

private long waitForNextTick() { long deadline = tickDuration * (tick + 1); for (;;) { final long currentTime = System.nanoTime() - startTime; long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } } if (PlatformDependent.isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; } try { Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {  return Long.MIN_VALUE; }}}}Copy the code

According to tickDuration, the deadline of the next tick can be calculated, and the waiting time to sleep can be obtained by subtracting the current time from the deadline. Therefore, the smaller the value of tickDuration is, the higher the accuracy of the time will be, and the busier the Worker will be. If tickDuration is set too small, in order to prevent the system from frequently sleeping and waking up, the Worker’s sleep time is at least 1ms.

After the Worker wakes up from the sleep state, it will perform the second step of the process to calculate the corresponding subscript of the current tick in the HashedWheelBucket array by the operation of bit and. Bitwise and is much faster than ordinary module taking operation. The premise is that the length of the array in the time wheel is the power of 2, and the mask is the power of 2 minus 1, so as to achieve the same effect as module taking.

The Worker then calls the processCancelledTasks() method to process the cancelled tasks. All cancelled tasks are added to the cancelledTimeouts queue, and the Worker fetches the tasks from the queue. Then it is deleted from the corresponding HashedWheelBucket, and the deletion operation is a basic linked list operation. The source code for processCancelledTasks() is relatively simple, so we won’t expand it here.

We left a question earlier: when did tasks in the Mpsc Queue join the time wheel? The answer lies in transferTimeoutsToBuckets () method.

Private void transferTimeoutsToBuckets () {/ / every hour hand tick up to only 100000 tasks, to prevent blocking Worker threads for (int I = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { continue; } long calculated = timeout.deadline / tickDuration; // Calculate the number of tick timeout.remainingRounds = (calculated - tick)/wheel.length; RemainingRounds Final long ticks = math.max (calculated, tick); Int stopIndex = (int) (ticks & mask); // If the task has passed its execution time in the Timeouts queue, it will be added to the current HashedWheelBucket; HashedWheelBucket bucket = wheel[stopIndex]; bucket.addTimeout(timeout); }}Copy the code

TransferTimeoutsToBuckets main job is () from the Mpsc task Queue, and then added to the corresponding HashedWheelBucket time round. A maximum of 100000 tasks can be processed each time. On the one hand, the operation of fetching tasks takes too long, and on the other hand, the execution of too many tasks can prevent Worker threads from blocking.

According to the task deadline set by the user, we can calculate how many times the task needs to tick before execution and how many turns need to rotate in the time wheel remainingRounds. RemainingRounds will be recorded in HashedWheelTimeout. – remainingRounds will be used during missions Because tasks in the time wheel cannot be guaranteed to be executed in a timely manner, if a task takes a very long time to execute, it doesn’t matter if the task has passed its execution time in the Timeouts queue. The Worker will directly add these tasks to the current HashedWheelBucket. So expired tasks are not missed.

After the task has been added to the time wheel, it is back to the main flow of work #run(), which is followed by the execution of the expired task in the current HashedWheelBucket, and HashedWheelBucket#expireTimeouts() method.

public void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; while (timeout ! = null) { HashedWheelTimeout next = timeout.next; if (timeout.remainingRounds <= 0) { next = remove(timeout); if (timeout.deadline <= deadline) { timeout.expire(); Else {throw new IllegalStateException(string. format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { next = remove(timeout); } else { timeout.remainingRounds --; // remainingRounds minus 1} time = next; }}Copy the code

The task is performed simply by iterating through the two-way linked list in HashedWheelBucket from scratch. If remainingRounds <=0 then the expire() method is called to execute the task, timeout.expire() inside is the run() method of TimerTask called. If the task has already been cancelled, it is removed from the linked list. RemainingRounds + 1, wait for the next circle.

At this point, we have covered the core logic of the Worker thread Worker do-while loop. When the time wheel exits, the Worker also performs some post-closure work. The Worker retrieves unexecuted and uncancelled tasks from each HashedWheelBucket, as well as tasks that have yet to be added to HashedWheelBucket, and then adds them to the list of unprocessed tasks for unified processing by the stop() method.

Stop time wheel

Back to the two methods of the Timer interface, newTimeout() has been analyzed above, so let’s start with the stop() method to see what the time wheel stop does.

@override public Set<Timeout> stop() {if (thread.currentThread () == workerThread) {throw new IllegalStateException( HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName()); } // Try to update the state of the worker thread to SHUTDOWN via CAS if (! WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) ! = WORKER_STATE_SHUTDOWN) { INSTANCE_COUNTER.decrementAndGet(); if (leak ! = null) { boolean closed = leak.close(this); assert closed; } return Collections.emptySet(); } try { boolean interrupted = false; while (workerThread.isAlive()) { workerThread.interrupt(); Join (100); // Interrupt Worker thread try {workerthread.join (100); } catch (InterruptedException ignored) { interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } } finally { INSTANCE_COUNTER.decrementAndGet(); if (leak ! = null) { boolean closed = leak.close(this); assert closed; } } return worker.unprocessedTimeouts(); // Return a list of unprocessed tasks}Copy the code

If the current thread is a Worker thread, it cannot initiate the operation of stopping the time wheel, in order to prevent the malicious operation of stopping the time wheel initiated by a scheduled task. The stop time round mainly does three things. First, it tries to update the state of the Worker thread to SHUTDOWN through CAS operation, then interrupts the Worker thread Worker, and finally returns the list of unprocessed tasks to the upper layer.

So far, we have analyzed the implementation principle of HashedWheelTimer. Take a look back at some of the core members of HashedWheelTimer.

  • HashedWheelTimeout, the encapsulation class of the task, including the deadline of the task, the number of laps to go through remainingRounds and other attributes.

  • HashedWheelBucket, which is equivalent to each slot of a time wheel, internally stores a list of HashedWheelTimeout that needs to be executed using a bidirectional linked list.

  • Worker, the core work engine of HashedWheelTimer, is responsible for handling timed tasks.