First, background and use

As we implement various server-side systems on a daily basis, we are bound to have some need for scheduled tasks. For example, meetings are automatically reminded half an hour in advance, and asynchronous tasks are executed periodically. So how to achieve such a timing task system? The Timer class provided by the Java JDK is a great tool for implementing timed tasks with simple API calls.

Let’s take a look at how java.util.timer implements this timing feature.

First, let’s look at a working demo

Timer timer = new Timer(); 
TimerTask task = new TimerTask() { 
    public void run(a) { 
        System.out.println("executing now!"); }};// Delay printing once for 1s
timer.schedule(task, 1000) 
// delay 1s The value is printed every 1s
timer.schedule(task, 1000.1000); 
// Delay 1s The value is printed at a fixed rate every 1s
timer.scheduleAtFixRate(task, 1000.1000)
Copy the code

Basic usage:

  1. Create a Timer object
  2. Create a TimerTask object where you implement the run method
  3. The TimerTask object is passed as a parameter into the Scheule method of the Timer object for scheduling execution.

The API added to the task is as follows:

  • An API for one-time tasks
   // Run with delay specified
   // The default fixed-delay mode is fixed. The period is calculated according to the end time of the last execution
   public void schedule(TimerTask task, long delay) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        sched(task, System.currentTimeMillis()+delay, 0);
    }
    
    // Run at a specified point in time
    public void schedule(TimerTask task, Date time) {
        sched(task, time.getTime(), 0);
    }
Copy the code
  • APi for periodic tasks:
    // Run with the specified delay, and then run with the specified period
    public void schedule(TimerTask task, long delay, long period) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        if (period <= 0)
            throw new IllegalArgumentException("Non-positive period.");
        sched(task, System.currentTimeMillis()+delay, -period);
    }

    // Run at the specified point in time, and then at the specified period
    public void schedule(TimerTask task, Date firstTime, long period) {
        if (period <= 0)
            throw new IllegalArgumentException("Non-positive period.");
        sched(task, firstTime.getTime(), -period);
    }

    // Run with the specified delay, and then run with the specified period
    // The default fixedRate mode is fixedRate. The period is calculated based on the start time of the task
    public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        if (period <= 0)
            throw new IllegalArgumentException("Non-positive period.");
        sched(task, System.currentTimeMillis()+delay, period);
    }

    public void scheduleAtFixedRate(TimerTask task, Date firstTime,
                                    long period) {
        if (period <= 0)
            throw new IllegalArgumentException("Non-positive period.");
        sched(task, firstTime.getTime(), period);
    }
Copy the code

It can be seen that API methods call Sched methods internally, where the time parameter of the next task execution time is calculated. Period 0 indicates a one-time task.

Two, the internal principle

So let’s look at how scheduling is implemented inside the Timer.

The internal structure

Take a look at the components of a Timer:

public class Timer {

    // Task queue
    private final TaskQueue queue = new TaskQueue();

    // The worker thread loops to fetch the task
    private final TimerThread thread = new TimerThread(queue);

    private final Object threadReaper = new Object() {
        protected void finalize(a) throws Throwable {
            synchronized(queue) {
                thread.newTasksMayBeScheduled = false;
                queue.notify(); // In case queue is empty.}}};// The serial number of the Timer, naming the worker thread (static variable, which can be used to distinguish the corresponding worker thread in the case of multiple timers)
    private final static AtomicInteger nextSerialNumber = new AtomicInteger(0);  
}

Copy the code

Timer has three important modules, namely TimerTask, TaskQueue, and TimerThread

  • A TimerTask is a task to be executed
  • TaskQueue, TaskQueue, and timertasks are automatically sorted according to their execution time
  • TimerThread, worker thread, the thread that actually loops through the TimerTask

So, how does the whole Timer work after the task is added? You can see the diagram below:

In the simplified logic shown in the figure, multiple tasks added to a TaskQueue are sorted automatically, and the first task in the queue must be the one with the earliest execution time. The TimerThread has an ongoing loop that retrieves the first task from the TaskQueue and determines whether the current time has reached the task execution point. If so, the task is executed.

The worker thread

  1. Create a task and call the Scheule method
public void schedule(TimerTask task, Date firstTime, long period) { 
    if (period <= 0) 
        throw new IllegalArgumentException("Non-positive period."); 
    sched(task, firstTime.getTime(), -period); 
}
Copy the code
  1. Internally call the Sched method
// The sched method takes the task, the execution time, and the execution cycle
private void sched(TimerTask task, long time, long period) {
    if (time < 0)
        throw new IllegalArgumentException("Illegal execution time.");

    // Prevent overflow
    if (Math.abs(period) > (Long.MAX_VALUE >> 1))
        period >>= 1;

    // Lock queue to prevent concurrent queueing
    synchronized(queue) {
        if(! thread.newTasksMayBeScheduled)throw new IllegalStateException("Timer already cancelled.");
        
        // Lock the task to avoid concurrent modification
        synchronized(task.lock) {
            if(task.state ! = TimerTask.VIRGIN)throw new IllegalStateException(
                    "Task already scheduled or cancelled");
            task.nextExecutionTime = time;
            task.period = period;
            task.state = TimerTask.SCHEDULED;
        }
        // Join the team
        queue.add(task);
        /* If the task is the first task in the queue, the worker thread will wake up because the worker thread will sleep until the execution time of the next task after processing the previous task. If a task with an earlier nextExecutionTime cuts to the front of the queue, the worker thread needs to be immediately woken up to avoid task execution delay */
        if(queue.getMin() == task) queue.notify(); }}Copy the code

Some locks are added to the process to avoid concurrency issues when timerTasks are added at the same time. As you can see, the logic of Sched method is relatively simple. After task assignment, queue will be automatically sorted by nextExecutionTime (ascending order, the implementation principle of sorting will be mentioned later).

  1. MainLoop of worker thread
public void run(a) {
    try {
        mainLoop();
    } finally {
        synchronized(queue) {
            newTasksMayBeScheduled = false; queue.clear(); }}}/** ** worker thread main logic, loop through */
private void mainLoop(a) {
    while (true) {
        try {
            TimerTask task;
            boolean taskFired; // flag whether the task should be executed
            synchronized(queue) {
                // If the queue is empty and newTasksMayBeScheduled uled is true, we would wait for the task to join
                while (queue.isEmpty() && newTasksMayBeScheduled)
                    queue.wait();
                
                // If the queue is empty and newTasksMayBeScheduled is false, the thread should exit
                if (queue.isEmpty())
                    break;

                // Queue is not empty, try to fetch task from queue (target task with earliest execution time)
                long currentTime, executionTime;
                task = queue.getMin();
                synchronized(task.lock) {
                    // Check the task status
                    if (task.state == TimerTask.CANCELLED) {
                        queue.removeMin();
                        continue;
                    }
                    currentTime = System.currentTimeMillis();
                    executionTime = task.nextExecutionTime;
                    
                    // Current time >= Target execution time, which means the task can be executed, set taskFired = true
                    if (taskFired = (executionTime<=currentTime)) {
                        if (task.period == 0) { // period == 0 Indicates that the task is non-periodic and is removed from the queue first
                            queue.removeMin();
                            task.state = TimerTask.EXECUTED;
                        } else { // The execution time of periodic tasks is reset according to period and the task is added to the queue
                            queue.rescheduleMin(
                              task.period<0? currentTime - task.period : executionTime + task.period); }}}if(! taskFired)// If the task does not need to be executed, wait
                    queue.wait(executionTime - currentTime);
            }
            if (taskFired)  // If the task needs to be executed, the task's run method is called
                task.run();
        } catch(InterruptedException e) {
        }
    }
}
Copy the code

As you can see from mainLoop’s source code, the basic flow is as follows

Check the queue status graph TD - > | queue is empty | calibration newTasksMayBeScheduled newTasksMayBeScheduled - > | | false exit check newTasksMayBeScheduled thread -- - > | true | wait check the queue status - > | | queue is not empty check team first task status checking team first task status - > | CANCELLED | remove task, the next round of cycle check team first task status - > | | check other task time is executable Check whether the task time executable - > | false | thread wait to stay execution time check whether the task time executable - > | true | whether whether cycle task cycle task - > | | false if removed from the queue task cycle task - > | true | Reset next task Time Remove a task from the queue --> Execute the current task Reset next task time --> Execute the current task

When a periodic task is found, the time of the next task execution is calculated. In this case, there are two calculation methods, namely, in the previous API

  • Schedule: Period Is a negative value, indicating the next execution time
  • ScheduleAtFixedRate: Period is positive
queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period);
Copy the code

Priority queue

When a task is removed from the queue or its execution time is changed, the queue is automatically sorted. Always keep the task with the earliest execution time at the head of the team. So how does this work?

Take a look at the source code for TaskQueue

class TaskQueue {

    private TimerTask[] queue = new TimerTask[128];

    private int size = 0;

    int size(a) {
        return size;
    }
    
    void add(TimerTask task) {
        if (size + 1 == queue.length)
            queue = Arrays.copyOf(queue, 2*queue.length);

        queue[++size] = task;
        fixUp(size);
    }

    TimerTask getMin(a) {
        return queue[1];
    }

    TimerTask get(int i) {
        return queue[i];
    }

    void removeMin(a) {
        queue[1] = queue[size];
        queue[size--] = null;  // Drop extra reference to prevent memory leak
        fixDown(1);
    }

    void quickRemove(int i) {
        assert i <= size;

        queue[i] = queue[size];
        queue[size--] = null;  // Drop extra ref to prevent memory leak
    }

    void rescheduleMin(long newTime) {
        queue[1].nextExecutionTime = newTime;
        fixDown(1);
    }

    boolean isEmpty(a) {
        return size==0;
    }

    void clear(a) {
        for (int i=1; i<=size; i++)
            queue[i] = null;

        size = 0;
    }

    private void fixUp(int k) {
        while (k > 1) {
            int j = k >> 1;
            if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
                break; TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; }}private void fixDown(int k) {
        int j;
        while ((j = k << 1) <= size && j > 0) {
            if (j < size &&
                queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)
                j++; // j indexes smallest kid
            if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
                break; TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; }}void heapify(a) {
        for (int i = size/2; i >= 1; i--) fixDown(i); }}Copy the code

TaskQueue has a balanced binary heap (balanced binary heap), where elements are sorted by nextExecutionTime, and the earliest tasks are always placed at the top of the heap. In this way, each task checked by the worker thread is the earliest task that needs to be executed. The heap has an initial size of 128 and has a simple multiplication mechanism.

Other methods

The TimerTask has four states:

  • VIRGIN: The task has just been created and there is no schedule yet
  • SCHEDULED: The task has been SCHEDULED and is queued
  • EXECUTED: A task is EXECUTED or being EXECUTED
  • CANCELLED: Mission CANCELLED

Timer also provides cancel and Purge methods

  • Cancel clears all tasks in the queue, and the worker thread exits.
  • Purge: purge all the tasks in the queue whose state has been set to cancel.

Common application

Java’s Timer is widely used to implement asynchronous task systems and is common in some open source projects, such as asynchronous logic in delayed message/consumption retries for message queue RocketMQ.

public void start(a) {
    if (started.compareAndSet(false.true)) {
        super.load();
        // Create a new timer
        this.timer = new Timer("ScheduleMessageTimerThread".true);
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            // ...
        }
        
        // The scheduleAtFixedRate method of the Timer is called
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run(a) {
                try {
                    if (started.get()) {
                        ScheduleMessageService.this.persist(); }}catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e); }}},10000.this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }}Copy the code

The above code is the core logic of RocketMQ’s delayed message delivery task ScheduleMessageService, which is an asynchronous timed task implemented using a Timer.

Third, summary

Whether implementing simple asynchronous logic or building complex task systems, The Java Timer is a convenient and stable utility class. From the implementation principle of Timer, we can also see a basic implementation of timing system: thread loop + priority queue. This will also have some inspiration for us to design related systems.

Finally, you are welcome to comment, any form of suggestions and corrections are welcome👏

Original is not easy, welcome to forward to share this content, your support is my motivation!