(Mobile phone landscape view source more convenient)

Note: The Java source analysis section is based on the Java 8 version unless otherwise noted.

Note: this article is based on ScheduledThreadPoolExecutor thread pool class regularly.

Introduction to the

In the past, we learned the execution process of common tasks and future tasks. Today, we will learn a new task — scheduled task.

A scheduled task is a task that we often use. It refers to a task that will be executed at some point in the future, or that will be repeated in the future according to certain rules.

The problem

(1) How to ensure that the task is executed at some point in the future?

(2) How to ensure the repeated execution of tasks according to certain rules?

To a chestnut

Create a timed thread pool to run four different timed tasks.

public class ThreadPoolTest03 { public static void main(String[] args) throws ExecutionException, InterruptedException {/ / create a timer thread pool ScheduledThreadPoolExecutor ScheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5); System.out.println("start: " + System.currentTimeMillis()); / / a no return value mission, performed after 5 seconds, execute only once scheduledThreadPoolExecutor. The schedule (() - > {System. Out. Println (" spring: " + System.currentTimeMillis()); }, 5, TimeUnit.SECONDS); // Execute a task with a return value 5 seconds later, Only perform a ScheduledFuture < String > future. = scheduledThreadPoolExecutor schedule (() - > {System. Out. Println (" inner summer: " + System.currentTimeMillis()); return "outer summer: "; }, 5, TimeUnit.SECONDS); System.out.println(future.get() + System.currentTimemillis ())); // Execute a task at a fixed frequency, every 2 seconds, After 1 second / / mission scheduledThreadPoolExecutor. After 2 seconds at the beginning of the scheduleAtFixedRate (() - > {System. Out. Println (" autumn: " + System.currentTimeMillis()); LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); }, 1, 2, TimeUnit.SECONDS); // Execute a task with a fixed delay of 2 seconds for each delay and 1 second for each delay // 2 seconds after the task ends, In this paper, by the public from, "red elder brother read source" original scheduledThreadPoolExecutor. ScheduleWithFixedDelay (() - > {System. Out. Println (" winter: " + System.currentTimeMillis()); LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); }, 1, 2, TimeUnit.SECONDS); }}Copy the code

There are four types of scheduled tasks:

(1) There is no return value for the task to be executed once in the future;

(2) There is a return value for a task to be executed in the future;

(3) tasks to be repeated at a fixed frequency in the future;

(4) tasks to be repeated with fixed delay in the future;

In this paper, the third kind of source code analysis as an example.

ScheduleAtFixedRate () method

Submit a task to be performed at a fixed frequency.

public ScheduledFuture<? > scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {/ / parameter to judge if the command = = null | | unit = = null) throw new NullPointerException (); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> SFT = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); T == SFT RunnableScheduledFuture<Void> t= decorateTask(command, SFT) sft.outerTask = t; // Execute delayedExecute(t); return t; }Copy the code

As you can see, the processing here is similar to that of the future task, which is decorated as another task and then executed, except that the delayedExecute() method is used to execute it. What does this method do?

DelayedExecute () method

Delayed execution.

private void delayedExecute(RunnableScheduledFuture<? > task) {// If the thread pool is closed, execute the reject policy if (isShutdown()) reject(task); Super.getqueue ().add(task); If (isShutdown() &&! canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); Else make sure there are enough threads to execute the task ensurePrestart(); } } void ensurePrestart() { int wc = workerCountOf(ctl.get()); // Create a worker thread // Notice that the firstTask parameter is not passed in, because the task is put on the queue first. // Also, the maxPoolSize parameter is not used. If (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }Copy the code

And that’s the end of it? !

In fact, this controls whether or not the task can be executed, and the actual execution of the task is in the task’s run() method.

Remember that the task above was decorated as an instance of the ScheduledFutureTask class? So, look no further than the ScheduledFutureTask’s run() method.

The ScheduledFutureTask class run() method

The place where scheduled tasks are executed.

Public void run() {// Whether to repeat Boolean periodic = isPeriodic(); If (! canRunInCurrentRunState(periodic)) cancel(false); // This is a one-off task that calls the run() method of the parent class, which is actually FutureTask. periodic) ScheduledFutureTask.super.run(); // Call the runAndReset() method of the parent class, This parent also FutureTask / / in this paper, we analysis the following part else if (ScheduledFutureTask. Super. RunAndReset ()) {/ / set the next execution time setNextRunTime (); ReExecutePeriodic (outerTask); reExecutePeriodic(outerTask); }}Copy the code

As you can see, for repetitive tasks, the runAndReset() method of FutureTask is called, the next execution time is set, and finally the reExecutePeriodic() method is called.

FutureTask’s runAndReset() method is similar to run(), except that it does not change the state to NORMAL after the task is completed.

Look at the reExecutePeriodic() method.

void reExecutePeriodic(RunnableScheduledFuture<? > task) {// Thread pool status check if (canRunInCurrentRunState(true)) {// Throw the task to the task queue super.getQueue().add(task); // Check thread pool status again if (! canRunInCurrentRunState(true) && remove(task)) task.cancel(false); Else make sure the worker thread is sufficient ensurePrestart(); }}Copy the code

The timed thread pool performs repeated tasks and then throws them back to the queue.

The problem of repetition is solved, so how does it control the task to be executed at any given moment?

OK, that’s where our delayed queue comes in.

DelayedWorkQueue inner class

We know that a thread pool needs to take a task out of the task queue to perform a task, whereas a normal task queue, if there is a task in it, it just takes it out, but a delay queue, if there is a task in it, it can’t take the task out until the time is up. This is why the previous analysis threw the task into the queue and created the Worker without passing in firstTask.

Having said all that, how does it actually work?

In fact, we have a detailed analysis of the DelayQueue in front, want to see the full source analysis can see the previous “dead drop Java collection DelayQueue source analysis”.

Delay queue internal is the use of the “heap” data structure to achieve, interested students can see the previous “Please, interview don’t ask me heap (sort)!” .

Let’s just take a take() method here.

public RunnableScheduledFuture<? > take() throws InterruptedException { final ReentrantLock lock = this.lock; / / lock lock. LockInterruptibly (); try { for (;;) {// RunnableScheduledFuture<? > first = queue[0]; // If the queue is empty, wait for if (first == null) available.await(); Else {// Long delay = first.getdelay (NANOSECONDS); Return finishPoll(first); if (delay <= 0) return finishPoll(first); First = null; If (leader! = null) available.await(); ThisThread = thread.currentThread (); thisThread = thread.currentThread (); leader = thisThread; Try {// Wait for the delay calculated above, and then automatically wake up available.awaitNanos(delay); If (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] ! = null) // Wake up the next waiting task available.signal(); // Unlock, lock. Unlock (); }}Copy the code

The general principle is to take advantage of the characteristics of the heap to get the fastest to the time of the task, namely the heap top task:

(1) If the task at the top of the heap runs out of time, it is queued from the queue;

(2) If the task on the top of the heap is not up to time, see how long it will be up to time, use the conditional lock to wait for this period of time, when the time is up to go again (1) judgment;

This solves the problem of being able to execute tasks after the specified time.

other

Actually, ScheduledThreadPoolExecutor also can use the execute () or submit () submit tasks, but they will be as zero delay of tasks to perform at a time.

public void execute(Runnable command) {
    schedule(command, 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
    return schedule(task, 0, NANOSECONDS);
}Copy the code

conclusion

To implement a scheduled task, two problems need to be solved: to specify a future time to execute the task and to repeat the task.

(1) Specify a time to execute the task, is solved by the characteristics of the delay queue;

(2) Repeated execution is solved by adding the task to the queue again after the task is executed.

eggs

This is the end of the common thread pool source code parsing. This thread pool is a classic implementation. Overall, it is not very efficient, because all worker threads share the same queue, and every time they fetch a task from the queue, they must lock and unlock the task.

So, can we have a task queue for each worker thread, when the task is submitted, the task is assigned to the specified worker thread, so that there is no need to lock and unlock the task frequently.

The answer is yes, and in the next chapter we will take a look at ForkJoinPool, a thread pool based on the work-stealing theory.

Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.