Abstract: The source code of ThreadPoolExecutor class is used to analyze the overall process of executing tasks in a thread pool.

This article is from The huawei cloud community. The core process of executing tasks through ThreadPoolExecutor class source code is analyzed by The Author of The article.

The ThreadPoolExecutor class has a collection of workers threads. Users can add tasks to the pool that need to be executed. Workers can execute tasks directly or from a task queue. The ThreadPoolExecutor class provides methods for the entire process of creating, executing, and dying thread pools. In this article, we take a deep look at the overall flow of thread pool tasks with the source code of the ThreadPoolExecutor class.

In the ThreadPoolExecutor class, the thread pool logic is mainly reflected in the execute(Runnable) method, addWorker(Runnable, Boolean) method, addWorkerFailed(Worker) method, and rejection policy. Next, Let’s dive into these core approaches.

Execute (Runnable) method

Execute (Runnable) is used to submit tasks of type Runnable to a thread pool. Let’s look at the source code for the execute(Runnable) method, as shown below.

Public void execute(Runnable command) {// If (command == null) throw new NullPointerException(); Int c = ctl.get(); // Get the state of the pool and the number of threads in the pool. If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return; c = ctl.get(); If (isRunning(c) && workqueue.offer (command)) {// Get the status of the thread pool and the number of threads in the thread pool again, Int recheck = ctl.get(); // If the thread pool is not RUNNING, remove the task from the queue if (! IsRunning (recheck) && remove(command)) // Execute reject(command); // Add a thread to the pool if the pool is empty else if (workerCountOf(recheck) == 0) addWorker(null, false); } // if the queue is full, add a new worker thread, if the new thread fails, execute the reject policy else if (! addWorker(command, false)) reject(command); }Copy the code

The execution process of the whole task can be simplified as shown in the following figure.

Next, we disassemble the execute(Runnable) method and analyze the execution logic of the execute(Runnable) method.

(1) Whether the number of threads in the thread pool is smaller than the number of corePoolSize core threads. If it is smaller than the number of corePoolSize core threads, a core thread is added to the workers thread set to perform tasks. The code is shown below.

If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return; c = ctl.get(); }Copy the code

(2) If the number of threads in the thread pool is greater than the number of corePoolSize core threads, the current thread pool is judged to be in the RUNNING state. If it is in the RUNNING state, the task is added to the task queue to be executed. Note: When adding tasks to the task queue, it is necessary to check whether the thread pool is in the RUNNING state. Only when the thread pool is in the RUNNING state, new tasks can be added to the task queue. Otherwise, the rejection policy is executed. The code is shown below.

if (isRunning(c) && workQueue.offer(command)) 
Copy the code

(3) The task is successfully added to the task queue. Since other threads may modify the state of the thread pool, a second check is needed on the thread pool. If the current state of the thread pool is no longer in the RUNNING state, the added task needs to be removed from the task queue and the subsequent rejection policy is implemented. If the current thread pool is still RUNNING, the thread pool is determined to be empty. If there are no threads in the thread pool, a new thread is added to the thread pool, as shown below.

Int recheck = ctl.get(); int recheck = ctl.get(); // If the thread pool is not RUNNING, remove the task from the queue if (! IsRunning (recheck) && remove(command)) // Execute reject(command); // Add a thread to the pool if the pool is empty else if (workerCountOf(recheck) == 0) addWorker(null, false);Copy the code

(4) If adding a task to the task queue fails in Step 3, start a new thread to execute the task. At this point, if the number of threads in the thread pool is greater than the maximum number of threads in the thread pool maximumPoolSize, no new threads can be started. In this case, it indicates that the task queue in the thread pool is full, and the thread in the thread pool is full, and the reject policy needs to be executed, as shown in the following code.

// If the queue is full, add a new worker thread. If the new thread fails, reject the policy else if (! addWorker(command, false)) reject(command);Copy the code

Here, we disassemble the Execute (Runnable) method and use flowcharts to understand the execution flow of tasks in a thread pool. Execute (Runnable) is basically the execution logic of a common thread pool.

Note: about ScheduledThreadPoolExecutor classes and class ForkJoinPool execution, the logic of the thread pool, after a series of high concurrency project 】 【 this paper will elaborate on, understand the execution logic of these classes, is a comprehensive grasp the basic thread pool implementation process.

Execute (Runnable) calls addWorker(Runnable, Boolean) in multiple places. Let’s examine the logic of the addWorker(Runnable, Boolean) method.

AddWorker (Runnable, Boolean) method

In general, the addWorker(Runnable, Boolean) method can be divided into three parts. The first part is to safely addWorker threads to the thread pool using CAS. The second part is creating a new worker thread; The third part is to add the task to workers in a safe and concurrent way and start the worker thread to execute the task.

Next, take a look at the source code for the addWorker(Runnable, Boolean) method, as shown below.

Private Boolean addWorker(Runnable firstTask, Boolean core) {Retry: for (;); { int c = ctl.get(); int rs = runStateOf(c); If (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // This loop increases the number of threads through CAS for (;;) Int wc = workerCountOf(c); / / if the number of threads in thread pool beyond the limit, direct return false if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; / / to the new thread thread pool by CAS number if (compareAndIncrementWorkerCount (c)) / / CAS way that only one thread execution success, jump out of the outer loop break retry. C = ctl.get(); If (runStateOf(c)! = rs) continue retry; Boolean workerStarted = false; boolean workerAdded = false; Worker w = null; W = new worker (firstTask); w = new worker (firstTask); final Thread t = w.thread; if (t ! Final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Int rs = runStateOf(ctl.get()); int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); // Add worker. add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // Set whether a new task was added to true workerAdded = true; }} finally {// Unlock the exclusive lock mainlock. unlock(); If (workerAdded) {t.start(); // Set whether the task has started to true workerStarted = true; }}} finally {// If the task is not started or fails to start, call the addWorkerFailed(Worker) method if (! workerStarted) addWorkerFailed(w); } // Return workerStarted; }Copy the code

At first glance, the addWorker(Runnable, Boolean) method looks pretty long. Here, we’ll unpack the addWorker(Runnable, Boolean) method.

(1) Check whether the task queue is empty under some specific conditions, as shown below.

If (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;Copy the code

(2) After passing the verification of step (1), it enters the inner for loop. In the inner FOR loop, CAS is used to increase the number of threads in the thread pool. If the CAS operation succeeds, it directly exits the double for loop. If the CAS operation fails, the state of the current thread pool is checked to see if the state of the thread pool has changed. If the state of the thread pool has changed, the continue keyword is used to re-validate the task queue through the outer for loop to verify that the CAS operation of the inner for loop has been executed again. If the state of the thread pool has not changed and the last CAS operation failed, continue the CAS operation. The code is shown below.

for (;;) Int wc = workerCountOf(c); / / if the number of threads in thread pool beyond the limit, direct return false if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; / / to the new thread thread pool by CAS number if (compareAndIncrementWorkerCount (c)) / / CAS way that only one thread execution success, jump out of the outer loop break retry. C = ctl.get(); If (runStateOf(c)! = rs) continue retry; }Copy the code

(3) After the CAS operation is successful, it indicates that the worker thread is successfully added to the thread pool. At this time, there is no thread to execute the task. The global exclusive lock mainLock is used to add the newly added Worker thread Worker object to workers safely.

The overall logic is: create a new Worker object and acquire the thread of execution in the Worker object. If the thread is not empty, the exclusive lock will be obtained. After the lock is successfully obtained, the state of the thread will be checked again, which prevents other threads from modifying the state of the thread pool or closing the thread pool before obtaining the exclusive lock. If the thread pool is closed, the lock needs to be released. Otherwise, the newly added thread is added to the working set, the lock is released, and the thread is started to execute the task. Set the thread start flag to true. Finally, determine whether the thread is started, and if not, call the addWorkerFailed(Worker) method. Finally returns an indication of whether the thread is sending up.

Boolean workerStarted = false; Boolean workerStarted = false; boolean workerAdded = false; Worker w = null; W = new worker (firstTask); w = new worker (firstTask); final Thread t = w.thread; if (t ! Final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Int rs = runStateOf(ctl.get()); int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); // Add worker. add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // Set whether a new task was added to true workerAdded = true; }} finally {// Unlock the exclusive lock mainlock. unlock(); If (workerAdded) {t.start(); // Set whether the task has started to true workerStarted = true; }}} finally {// If the task is not started or fails to start, call the addWorkerFailed(Worker) method if (! workerStarted) addWorkerFailed(w); } // Return workerStarted;Copy the code

AddWorkerFailed (Worker) method

In the addWorker(Runnable, Boolean) method, the addWorkerFailed(Worker) method is called if the Worker thread fails to be added or started, Let’s look at the implementation of the addWorkerFailed(Worker) method, as shown below.

Private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// If Worker task is not empty if (w! Worker. remove(w); DecrementWorkerCount (); // decrementWorkerCount() via CAS; tryTerminate(); } finally {// unlock mainlock. unlock(); }}Copy the code

The logic of addWorkerFailed(Worker) method is relatively simple: obtain the exclusive lock, remove the task from workers, and reduce the number of tasks by 1 through CAS, and finally release the lock.

Rejection policies

When analyzing the execute(Runnable) method, the thread pool calls reject(Runnable) when appropriate to execute the corresponding rejection policy. Let’s look at the implementation of the Reject (Runnable) method, as shown below.

final void reject(Runnable command) {
	handler.rejectedExecution(command, this);
}
Copy the code

The rejectedExecution method is called by handler, as shown below.

private volatile RejectedExecutionHandler handler;
Copy the code

Take a look at the RejectedExecutionHandler type, as shown below.

package java.util.concurrent;

public interface RejectedExecutionHandler {

    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Copy the code

Can find RejectedExecutionHandler a interface, defines a rejectedExecution (Runnable, ThreadPoolExecutor) method. Since RejectedExecutionHandler is an interface, let’s look at the classes that implement the RejectedExecutionHandler interface.

We see here that the implementation class of the RejectedExecutionHandler interface is the implementation class for the four rejection policies provided by the thread pool by default.

Which class’s reject policy is executed in the Reject (Runnable) method depends on the arguments passed when the thread pool is created. If no rejection policy is passed, the rejection policy of the AbortPolicy class is executed by default. Otherwise, a rejection policy for the passed class is enforced.

When creating a thread pool, you can pass custom reject policies in addition to those provided by the JDK by default. If you want to use a custom refused to strategy, then only need to implement RejectedExecutionHandler interface, and rewrite rejectedExecution (Runnable, ThreadPoolExecutor) method. For example, the following code.

public class CustomPolicy implements RejectedExecutionHandler { public CustomPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (! E.isshutdown ()) {system.out.println (" use the caller's thread to execute the task ") r.run(); }}}Copy the code

Create a thread pool as follows.

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                       60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>(),
                       Executors.defaultThreadFactory(),
		       new CustomPolicy());
Copy the code

At this point, the overall core logic analysis of thread pool tasks is complete.

Click to follow, the first time to learn about Huawei cloud fresh technology ~