From the school to A factory all the way sunshine vicissitudes of life

Please go to www.codercc.com


1. ScheduledThreadPoolExecutor profile

ScheduledThreadPoolExecutor can be used to perform asynchronous task after the given delay or perform periodic task, relative to the Timer task scheduling, the more powerful, the Timer can only use a background thread, Whereas the ScheduledThreadPoolExecutor could through the constructor to specify the number of background threads. ScheduledThreadPoolExecutor class UML diagram is as follows:

ScheduledThreadPoolExecutor class of UML diagrams. PNG
  1. Can be seen from the UML diagrams, ScheduledThreadPoolExecutor inheritedThreadPoolExecutor, that is to say, ScheduledThreadPoolExecutor owns the execute () and submit () submit function, the basis of the asynchronous task about ThreadPoolExecutorYou can read this article. However, ScheduledThreadPoolExecutor class implementsScheduledExecutorService, the interface defines ScheduledThreadPoolExecutor can delay functions to perform tasks and cycle;
  2. ScheduledThreadPoolExecutor are two important inner classes: DelayedWorkQueue and ScheduledFutureTask. DelayedWorkQueue implements the BlockingQueue interface, which is a BlockingQueue, while ScheduledFutureTask inherits the FutureTask class, indicating that it is used to return the results of asynchronous tasks. These two key classes will be examined in detail below.

1.1 Construction method

ScheduledThreadPoolExecutor has the following several constructors:

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
};
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
};
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
};
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}
Copy the code

It can be seen with inherited ThreadPoolExecutor ScheduledThreadPoolExecutor, it is actually the constructor calls the ThreadPoolExecutor, An introduction to ThreadPoolExecutor can be found in this article, which is easy to understand once you understand the meaning of a few parameters to the ThreadPoolExecutor constructor. As you can see, ScheduledThreadPoolExecutor core thread pool thread for the specified number corePoolSize, when core thread pool thread number after reaching corePoolSize, The task is submitted to the bounded blocking queue DelayedWorkQueue, which is described below. The maximum number of threads allowed in the thread pool is integer. MAX_VALUE, which means it is theoretically an unbounded thread pool.

1.2 Unique methods

ScheduledThreadPoolExecutor implements ScheduledExecutorService interface, the interface defines can delay the asynchronous tasks and asynchronous tasks can be performed cycle characteristic function, the corresponding methods are:

// The task is executed after the specified delay time is reached. Here the incoming is implements the Runnable interface task, / / so through ScheduledFuture. Get () to obtain the result is null public ScheduledFuture <? > schedule(Runnable command, long delay, TimeUnit unit); // The task is executed after the specified delay time is reached. This is the task of implementing the Callable interface, // therefore, Public <V> ScheduledFuture<V> schedule(Callable<V> Callable, long delay, TimeUnit unit); If the previous task is completed, the current task will be executed immediately. If the previous task is not completed, the current task will be executed immediately. If the previous task is not completed, Public ScheduledFuture<? > scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); // When initialDelay is reached, the task starts to execute. The interval between the execution of the previous task and the execution of the next // task is delay. In this way, tasks are performed periodically. public ScheduledFuture<? > scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);Copy the code

2. ScheduledFutureTask that can be executed periodically

ScheduledThreadPoolExecutor biggest feature is the ability to perform asynchronous periodic task, when calling the schedule, and scheduleAtFixedRate scheduleWithFixedDelay method, The ScheduledFutureTask class is actually converted to the submitted task, as can be seen from the source code. Take the Schedule method as an example:

public ScheduledFuture<? > schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<? > t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; }Copy the code

As you can see, the incoming Runnable is converted to the ScheduledFutureTask class through decorateTask. The maximum effect of thread pool is to decouple tasks from threads, which are mainly the executor of tasks, and tasks are now called ScheduledFutureTask. Next, you’ll think of any thread executing a task that always calls the run() method. To ensure the ScheduledThreadPoolExecutor delay to perform a task as well as the ability to perform periodic task, ScheduledFutureTask rewrite the run method:

public void run() { boolean periodic = isPeriodic(); if (! canRunInCurrentRunState(periodic)) cancel(false); else if (! Periodic) / / if not perform periodic task, directly call the run method ScheduledFutureTask. Super. The run (); / / if it is a periodic task, need to reset the time of the next mission else if (ScheduledFutureTask. Super. RunAndReset ()) {setNextRunTime (); reExecutePeriodic(outerTask); }}Copy the code

It is obvious from the source code that in the rewritten run method the first if (! Periodic) checks whether the current task is periodic and calls run() directly if not; Otherwise, the setNextRunTime() method is used to reset the time of the next task execution and reExecutePeriodic(outerTask) method is used to place the next task to the DelayedWorkQueue.

Therefore, it can be concluded that the main function of ScheduledFutureTask is to further encapsulate asynchronous tasks based on whether the current task is periodic or not. If the task is not periodic (calling the schedule method), it is executed directly through run(). If the task is periodic, it needs to reset the time of the next execution after each execution, and then continue the next task to the blocking queue.

3. DelayedWorkQueue

There is another important in ScheduledThreadPoolExecutor class is DelayedWorkQueue. In order to achieve its ScheduledThreadPoolExecutor can delay the asynchronous tasks and tasks can be executed cycles, DelayedWorkQueue accordingly encapsulation. DelayedWorkQueue is a heap-based data structure similar to DelayQueue and PriorityQueue. When executing scheduled tasks, the execution time of each task is different, so the work of DelayedWorkQueue is sorted in ascending order of execution time. The tasks whose execution time is closer to the current time are placed at the front of the queue.

Why use DelayedWorkQueue?

When a scheduled task is executed, the latest task to be executed must be displayed. Therefore, the task whose execution time is earlier in the queue must be displayed each time. Therefore, the priority queue is used naturally.

DelayedWorkQueue is a priority queue that guarantees that each queue is the first in the queue. Because it is a heap-based queue, the worst time complexity of the heap structure is O(logN) when performing insert and delete operations.

Data structure of DelayedWorkQueue

Private static final int INITIAL_CAPACITY = 16; // The DelayedWorkQueue consists of an array of 16, The array element is a class that implements the RunnableScheduleFuture interface. // It is actually ScheduledFutureTask. >[] queue = new RunnableScheduledFuture<? >[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock(); private int size = 0;Copy the code

DelayedWorkQueue is made up of arrays. DelayedWorkQueue is made up of arrays.

The DelayedWorkQueue is a heap-based data structure that sorts each task in chronological order and places the most recent task at the head of the queue so that it can be executed first.

4. ScheduledThreadPoolExecutor execution process

Now we two inner classes of ScheduledThreadPoolExecutor ScheduledFutueTask and DelayedWorkQueue was carried out to understand, in fact, this is also the thread pool two of the most important key factors in the working process, tasks and blocking queue. Now we’ll look at ScheduledThreadPoolExecutor submit a task, the overall implementation process. ScheduledThreadPoolExecutor schedule method, for example, the specific source:

public ScheduledFuture<? > schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); // Convert the submitted task to ScheduledFutureTask RunnableScheduledFuture<? > t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); ScheduledFutureTask delayedExecute(t); return t; }Copy the code

Method is easy to understand, in order to satisfy the ScheduledThreadPoolExecutor can delay can perform tasks and periodic task features, will first will implement the Runnable interface classes into ScheduledFutureTask. It then calls the delayedExecute method to perform the task. This method is also the key method.

private void delayedExecute(RunnableScheduledFuture<? > task) {if (isShutdown()) // Reject the task if the current thread pool is closed; Else {// Put the task in the blocking queue super.getQueue().add(task); if (isShutdown() && ! canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); Else ensure that at least one thread is started, even if corePoolSize=0 ensurePrestart(); }}Copy the code

The main logic of the delayedExecute method is in the ensurePrestart() method.

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}
Copy the code

It can be seen that the logic of this method is very simple. The key lies in the addWorker method it calls. The main function of this method is to create a new Worker class. The getTask method is called in the runWorker method to retrieve tasks from the blocking queue until null is fetched from the blocking queue and the thread terminates. The addWorker method is a method in the ThreadPoolExecutor class. The source code for ThreadPoolExecutor can be found in this article in detail.

5. To summarize

  1. ScheduledThreadPoolExecutor inherited ThreadPoolExecutor class, therefore, the overall function is consistent, the thread pool is mainly responsible for create a thread (the Worker class), get new asynchronous task thread from blocking the queue, Until there are no more asynchronous tasks in the blocking queue. But compared with ThreadPoolExecutor, ScheduledThreadPoolExecutor time-delay characteristics of missions and tasks can be performed periodically, Class ScheduleFutureTask ScheduledThreadPoolExecutor redesigned task, ScheduleFutureTask rewrite the run method make its have to delay implementation and characteristics of the tasks can be performed periodically. In addition, the blocking queue DelayedWorkQueue is a queue that can be sorted according to the priority. It adopts the underlying data structure of the heap, so that the task whose execution time is closer than the current time is placed in the queue head, so that the thread can obtain the task for execution.

  2. Thread pool in both ThreadPoolExecutor ScheduledThreadPoolExecutor, when the design of the three key elements are: task, executives and task results. The idea is to completely decouple these three key elements.

    practitioners

    The execution mechanism of tasks is completely entrusted to Worker class, which further encapsulates Thread. Submit a task to the thread pool, no matter the execute method and submit for ThreadPoolExecutor, or ScheduledThreadPoolExecutor schedule method, is to first mission to the blocking queue, Then, the addWork method is used to create a new Work class, and the thread is started by runWorker method, and the asynchronous task execution is continuously obtained from the blocking pair column and handed to the Worker until the task cannot be obtained from the blocking queue.

    task

    In the ThreadPoolExecutor and ScheduledThreadPoolExecutor task is to point to implement the Runnable interface and Callable interface implementation class. ThreadPoolExecutor task will be converted into FutureTask classes, and in order to implement can carry out tasks and time delay in ScheduledThreadPoolExecutor periodic characteristics of the mission, The task is converted to the ScheduledFutureTask class, which inherits FutureTask and overwrites the RUN method.

    The task results

    After submitting a task in ThreadPoolExecutor, the result of the task can be obtained through the Future interface class, which in ThreadPoolExecutor is actually the FutureTask class, And in ScheduledThreadPoolExecutor is ScheduledFutureTask class