preface

SOFAJRaft source code analysis – How does JRaft’s timed task scheduler work? This article has explained how the time wheel algorithm is applied in JRaft, but I feel THAT I did not explain it clearly, so it is still the same after reading this article, so I am going to explain the time wheel algorithm completely again.

The time wheel is not unique to JRaft. There are many other applications for the time wheel, including Netty, Akka, Quartz, ZooKeeper, Kafka, and more.

The implementation of the time wheel will be explained in JRaft as an example, because the code for JRaft is a reference to Netty, so you can also go to Netty to find the source code implementation.

What problem does the time wheel solve?

If there are a large number of scheduling tasks in a system, and a large number of scheduling tasks if each uses its own scheduler to manage the task life cycle, it wastes CPU resources and is inefficient.

Time round is a scheduling model that efficiently uses thread resources for batch scheduling. Bind a large number of scheduling tasks to the same scheduler, and use this scheduler to manage, trigger and runnable all tasks. Able to efficiently manage various delayed tasks, periodic tasks, notification tasks and so on.

However, the time accuracy of the time scheduler may not be very high, and it may not be suitable for scheduling tasks with high precision requirements. Because the accuracy of the time wheel algorithm depends on the minimum granularity of the time “pointer” unit, for example, the grid of the time wheel jumps once a second, then tasks with scheduling accuracy less than one second cannot be scheduled by the time wheel.

Time wheel structure

As shown in the figure, the HashedWheelTimer in JRaft is a circular queue that stores scheduled tasks. The bottom layer is realized by array. Each element in the array can store a HashedWheelBucket. HashedWheelBucket is a circular bidirectional linked list, and each item in the linked list represents a HashedWheelTimeout, which encapsulates a real timed task.

A time wheel consists of multiple time cells, each of which represents the basic time span (tickDuration) of the current time wheel. The number of time cells in the time wheel is fixed and can be represented by wheel.length.

The time wheel also has a tick, which indicates the number of times the current tick of the time wheel ticks. TickDuration * (tick + 1) can be used to indicate the next expired task that needs to be processed in all HashedWheelBucket tasks corresponding to this time frame.

Time wheel running logic

When the time wheel starts, it records the current startTime and assigns a value to startTime. When adding a task, the time wheel will first calculate the deadline. For example, if the delay time of a task is 24ms, the currentTime (currentTime) +24ms- startTime of the time wheel will be added. The task is then encapsulated as a HashedWheelTimeout and added to the Timeouts queue as a cache.

When running, the time wheel will iterate over 100,000 HashedWheelTimeout tasks cached in Timeouts. Then you need to calculate several parameter values:

  1. HashedWheelTimeout Total delay times: Calculate the total number of tick ticks required by deadline /tickDuration of each task;
  2. Calculate the number of time rounds based on the number of times to be calculated (total times – current tick number)/number of time bars (wheel.length). For example, if tickDuration is 1ms and the number of time cells is 20, then it takes 20ms for a round, then add a data with a delay of 24ms, if the current tick is 0, then the calculated number of rounds is 1, and if the pointer does not run one round, the round will be removed and reduced by one. So you have to go to the second round to get round zero and then it will run
  3. Calculate the slot that the task needs to be placed in the time wheel, and then add it to the end of the slot list

After placing the data in timeouts into the time wheel, calculate the position of the slot to which the current hour hand moves, take out the linked list data in the slot, compare the deadline with the current time, and run the expired data.

Source code analysis

The constructor

public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
                        long maxPendingTimeouts) {

    if (threadFactory == null) {
        throw new NullPointerException("threadFactory");
    }
    //unit = MILLISECONDS
    if (unit == null) {
        throw new NullPointerException("unit");
    }
    if (tickDuration <= 0) {
        throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
    }
    if (ticksPerWheel <= 0) {
        throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
    }

    // Normalize ticksPerWheel to power of two and initialize the wheel.
    // Create a HashedWheelBucket array
    // Create the basic data structure of the time wheel, an array. The length is at least 2 to the n of ticksPerWheel
    wheel = createWheel(ticksPerWheel);
    // This is an identifier that is used to quickly calculate where tasks should stay.
    Deadline %wheel.length =deadline%wheel.length But the % operation is a relatively time-consuming operation, so an alternative bit operation is used instead:
    Deadline&mast == deadline%wheel.length; deadline&mast == deadline%wheel.length
    // The hash algorithm for index is the same as that for Java HashMap
    mask = wheel.length - 1;

    // Convert tickDuration to nanos.
    If tickDuration is passed in as 1, this will be converted to 1000000
    this.tickDuration = unit.toNanos(tickDuration);

    // Prevent overflow.
    // Check for overflow. That is, the interval between pointer rotation cannot be too Long to cause tickDuration*wheel.length> long.max_value
    if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
        throw new IllegalArgumentException(String.format(
            "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE
                                                                                        / wheel.length));
    }
    // Wrap worker as thread
    workerThread = threadFactory.newThread(worker);
    //maxPendingTimeouts = -1
    this.maxPendingTimeouts = maxPendingTimeouts;

    // If there are too many HashedWheelTimer instances, an error log is printed
    if (instanceCounter.incrementAndGet() > INSTANCE_COUNT_LIMIT
        && warnedTooManyInstances.compareAndSet(false.true)) { reportTooManyInstances(); }}Copy the code

There are a few details to note in this constructor:

  1. The array of wheels created by calling createWheel must be a power of 2. For example, if ticksPerWheel is passed in 6, the initial wheel length must be 8. This is done to allow mask & Tick to calculate slots
  2. TickDuration uses nanoseconds
  3. The time wheel is not started inside the construct, but is not started until the first task is added to the time wheel. The worker thread is encapsulated as a workerThread in the constructor

Put the task into the time wheel

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (unit == null) {
        throw new NullPointerException("unit");
    }

    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
                                             + ") is greater than or equal to maximum allowed pending "
                                             + "timeouts (" + maxPendingTimeouts + ")");
    }
    // If the time wheel is not started, it is started
    start();

    // Add the timeout to the timeout queue which will be processed on the next tick.
    // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    // Guard against overflow.
    // When delay is positive, deadline cannot be negative
    // If it is negative, the maximum value of long is exceeded
    if (delay > 0 && deadline < 0) {
        deadline = Long.MAX_VALUE;
    }
    // Instead of adding the scheduled task to the corresponding grid, it is added to a queue and then wait for the next tick.
    // Up to 100000 tasks are fetched from the queue and added to the specified grid
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    // The Worker processes the data in the timeouts queue
    timeouts.add(timeout);
    return timeout;
}
Copy the code
  1. If the time wheel is not started, the start method is called to start the time wheel and startTime is set to the current time
  2. Calculate the delay deadline
  3. Encapsulate the Task into a HashedWheelTimeout and then add it to the Timeouts queue for caching

start

private final CountDownLatch                                     startTimeInitialized   = new CountDownLatch(1);

public void start(a) {
    //workerState starts with 0 (WORKER_STATE_INIT) before it is set to 1 (WORKER_STATE_STARTED)
    switch (workerStateUpdater.get(this)) {
        case WORKER_STATE_INIT:
            // Use cas to get the right to start the schedule. Only competing threads are allowed to start the instance
            if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                // If the workerState is set successfully, the workerThread thread is called
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    }

    // Waiting for the worker thread to initialize the time wheel to start
    // Wait until the startTime is initialized by the worker.
    while (startTime == 0) {
        try {
            // countDownLauch is used to ensure that the scheduled thread has been started
            startTimeInitialized.await();
        } catch (InterruptedException ignore) {
            // Ignore - it will be ready very soon.}}}Copy the code

The start method starts the time wheel based on the current workerState state. StartTimeInitialized is used to control the start of the thread. If the workerThread is not started, the newTimeout method will block in the start method. If not blocked, the newTimeout method will not get startTime.

Start time wheel

The startup of the time wheel is in the internal class Worker of HashedWheelTimer. Calling the workerThread#start method calls the Worker’s run method to start the time wheel.

Let’s look at what the time wheel does. The analysis below does not take into account the task being canceled.

Worker#run

public void run(a) {
    // Initialize the startTime.
    startTime = System.nanoTime();
    if (startTime == 0) {
        // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
        startTime = 1;
    }

    //HashedWheelTimer's start method will continue running
    // Notify the other threads waiting for the initialization at start().
    startTimeInitialized.countDown();

    do {
        // Returns the current nanotime-startTime
        // Return the time interval of each tick
        final long deadline = waitForNextTick();
        if (deadline > 0) {
            // Calculate the slot of the time wheel
            int idx = (int) (tick & mask);
            // Remove the bucket in cancelledTimeouts
            // Remove timeout from bucket
            processCancelledTasks();
            HashedWheelBucket bucket = wheel[idx];
            // Adds tasks from the newTimeout() method to the queue of scheduled tasks to the specified cell
            transferTimeoutsToBuckets();
            bucket.expireTimeouts(deadline);
            tick++;
        }
    // check if workerState is started, then loop continuously
    } while (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

    // Fill the unprocessedTimeouts so we can return them from stop() method.
    for (HashedWheelBucket bucket : wheel) {
        bucket.clearTimeouts(unprocessedTimeouts);
    }
    for (;;) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            break;
        }
        // If there is no timeout to handle, add it to the unprocessedTimeouts column
        if (!timeout.isCancelled()) {
            unprocessedTimeouts.add(timeout);
        }
    }
    // Process cancelled tasks
    processCancelledTasks();
}
Copy the code
  1. When the time wheel is running, startTime will be recorded first, and then startTimeInitialized will be called to release the outer waiting thread.
  2. Enter the dowhile loop and call waitForNextTick to wait for the next tick tick tick tick and return the current time minus startTime as the deadline
  3. Since mask= wheel. length-1, wheel is the power of 2, so we can directly use tick & mask to calculate the slot in wheel
  4. Call processCancelledTasks to pull the task from the cancelledTimeouts queue and remove the current task from the time wheel
  5. Call transferTimeoutsToBuckets method remove cached data timeouts the queue into the time round
  6. Runs the bucket linked list data in the slot to which the current pointer points

The time wheel pointer jumps

waitForNextTick

//sleep until the next tick arrives, and then return the time between that tick and the startup time
private long waitForNextTick(a) {
    //tickDuration is 100000
    //tick Indicates the total number of ticks
    long deadline = tickDuration * (tick + 1);

    for (;;) {
        final long currentTime = System.nanoTime() - startTime;
        // Add 999999 and divide 10000000 by 1, so subtract 1.
        // The thread performs the next ticket task at a certain time of sleep.
        // So if the ticket interval is set too small, the system will sleep frequently and start up,
        // The step size can be slightly larger in order to make better use of system resources
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
        //sleepTimeMs less than zero indicates that the next time wheel position is reached
        if (sleepTimeMs <= 0) {
            if (currentTime == Long.MIN_VALUE) {
                return -Long.MAX_VALUE;
            } else {
                returncurrentTime; }}// Check if we run on windows, as if thats the case we will need
        // to round the sleepTime as workaround for a bug that only affect
        // the JVM if it runs on windows.
        //
        // See https://github.com/netty/netty/issues/356
        if (Platform.isWindows()) {
            sleepTimeMs = sleepTimeMs / 10 * 10;
        }

        try {
            Thread.sleep(sleepTimeMs);
        } catch (InterruptedException ignored) {
            if (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                returnLong.MIN_VALUE; }}}}Copy the code

Imagine that the time between seconds on the clock is waiting, so the waitForNextTick method calculates the interval from the current time to the next time, performs the sleep operation, and returns the time between the current time and the time wheel starting time.

Transfer the task to the time wheel

When the method of calling the time wheel joins the task, it is not directly added to the time wheel, but cached in the Timeouts queue. Therefore, the tasks in the Timeouts queue need to be transferred to the linked list of the time wheel data when running

transferTimeoutsToBuckets

private void transferTimeoutsToBuckets(a) {
    // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
    // adds new timeouts in a loop.
    // Each tick processes 10W tasks to avoid blocking the worker thread
    for (int i = 0; i < 100000; i++) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            // all processed
            break;
        }
        // Has been cancelled;
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
            // Was cancelled in the meantime.
            continue;
        }
        // Calculated = tick count
        long calculated = timeout.deadline / tickDuration;
        // Count the remaining rounds. The task will expire only when the timer has completed enough rounds and reached the slot where the task is located
        timeout.remainingRounds = (calculated - tick) / wheel.length;
        // If the task has been in the timeouts queue for so long that it has passed its execution time, the current tick is used, that is, placed in the current bucket. This method will be executed when it is called
        final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
        //// Calculate the slot of the wheel to be inserted. SlotIndex = tick count & mask, mask = wheel.length-1
        int stopIndex = (int) (ticks & mask);

        HashedWheelBucket bucket = wheel[stopIndex];
        // Add timeout to the bucket listbucket.addTimeout(timeout); }}Copy the code

In this transfer method, a loop is written to death and only 100,000 tasks are moved at a time.

Then calculate how many times the time wheel needs to run to run the current task according to the deadline delay time of HashedWheelTimeout. If the current task delay time is longer than the time wheel needs to run one lap, then calculate how many laps are needed to run to run the task.

Finally, the slot of the task in the time wheel is calculated and added to the linked list of the time wheel.

Run tasks in the time wheel

When the pointer jumps to the time in the slot of the time wheel, the HashedWheelBucket in the slot is pulled out and the linked list is traversed, running the tasks that are due in it.

expireTimeouts

// Expires and executes the expired task in the grid. The worker thread calls this method when it ticks to the grid
// Determine whether the task is overdue based on deadline and remainingRounds
public void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;

    // process all timeouts
    // Iterate over all scheduled tasks in the grid
    while(timeout ! =null) {
        // Save next first, because next will be set to null after removal
        HashedWheelTimeout next = timeout.next;
        if (timeout.remainingRounds <= 0) {
            // Remove the current timeout from the bucket list and return the next timeout in the list
            next = remove(timeout);
            // If the timeout time is less than the current time, call EXPIRE to execute the task
            if (timeout.deadline <= deadline) {
                timeout.expire();
            } else {
                // Round is already 0 and deadline is greater than the deadline of the current slot
                // The timeout was placed into a wrong slot. This should never happen.
                throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); }}else if (timeout.isCancelled()) {
            next = remove(timeout);
        } else {
            // Because the current slot has already passed, it means that it has already walked one round, reduce the number of rounds by one
            timeout.remainingRounds--;
        }
        // Place the pointer to the next timeouttimeout = next; }}Copy the code

HashedWheelBucket is a linked list, so we need to traverse from the head node down. If the list doesn’t go all the way to the end of the list then it goes down.

If the remainingRounds number remainingRounds is greater than 0, then it indicates that it will run in the next round, so the remainingRounds number is reduced by one.

If the current number of remaining rounds is less than or equal to zero, the current node is removed from the bucket list and the current time is determined to be greater than the timeout delay. If so, timeout’s expire is invoked to execute the task.