1. Introduction

1.1 What is a thread pool?

Thread pool is a thread management technology based on the idea of pooling technology. The main purpose is to reuse threads, conveniently manage threads and tasks, and decouple the creation of threads from the execution of tasks. Thread pools can be created to reuse already created threads to reduce the resource consumption associated with frequent thread creation and destruction. JAVA uses the ThreadPoolExecutor class to create thread pools, and the Executors Factory class is provided (not recommended) to create thread pools.

Advantages of thread pooling: Reduced resource consumption, reuse of created threads to reduce the cost of thread creation and destruction. Improved response speed, tasks can be executed immediately when they arrive without waiting for the creation of a thread. Improved thread manageability, with uniform allocation, tuning, and monitoring using thread pools.

1.2 Why use thread pools?

Let’s start by recalling the model for creating threads to perform tasks before there was a thread pool as shown below

You can see some of the disadvantages of creating threads shown earlier:

1) Uncontrolled risk. There is no unified management for each thread created. After each thread is created, we do not know where the thread is going. 2) Each task needs to be executed by creating a new thread, which is very expensive for the system

2. Overview of JAVA thread pools

The core Java thread pool implementation class is ThreadPoolExecutor, which can be used to construct a thread pool. Let’s look at the entire inheritance of ThreadPoolExecutor

The Executor interface, which provides the ability to execute tasks and create threads, as well as the use of an abstract decoupled ExecutorService interface, inherits from the Executor interface and adds some methods for managing the thread pool itself. Such as viewing the status of tasks, stop/ Terminal thread pools, getting the status of thread pools, and so on.

2.1 Structure of ThreadPoolExecutor

MaximumPoolSize, the maximum number of threads allowed to be created in the thread pool keepAliveTime, the length of time a thread can live while idle unit, ThreadFactory; threadFactory; handler for creating threads to execute tasks; reject policies that are used to reject task submissions when the thread pool is saturated

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 
Copy the code

2.2 Thread pool lifecycle

2.2.1 Five states of the thread pool
Thread pool state Description in this state
RUNNING A thread pool in the RUNNING state can receive new tasks and process tasks in a blocking queue
SHUTDOWN The thread pool in SHUTDOWN state can no longer receive incoming tasks, but can still process tasks in the blocking queue
STOP A thread pool in the STOP state can no longer receive incoming tasks, but can still process tasks in the blocking queue
TIDYING All tasks have terminated, the number of valid threads is 0, and the thread transitioning to TIDYING state runs the terminated hook method
TERMINATED The terminated method enters the terminated state after execution
2.2.2 Thread pool life cycle flow

2.2.3 ThreadPoolExecutor represents the thread pool state design

ThreadPoolExecutor uses an AtomicInteger CTL field to describe the running state and number of threads in the pool. The three bits of the CTL represent the five states of the pool, and the lower 29 bits represent the number of threads in the pool. Use the fewest variables to reduce lock contention and improve concurrency efficiency.

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// The number of threads in the thread pool
private static final int COUNT_BITS = Integer.SIZE - 3;
// Maximum thread capacity in the thread pool
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;


// Get the thread pool running state
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// Get the number of valid worker threads
private static int workerCountOf(int c)  { return c & CAPACITY; }
// Number of assembly threads and thread pool status
private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code

2.3 Thread pool execution process

1) If workerCount < corePoolSize ==> creates the thread to execute the submitted task

2) If workerCount >= corePoolSize && the blocking queue is not full ==> add to the blocking queue and wait for subsequent threads to execute the submitted task

3) If workerCount >= corePoolSize && workerCount < maxinumPoolSize && The blocking queue is full ==> Create a non-core thread to execute the submitted task

4) If workerCount >= maxinumPoolSize && The blocking queue is full ==> Execute the deny policy

2.3.1 Execute Submit a task
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
	// Number of worker threads < corePoolSize => Create a thread directly to execute the task
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
	// Number of worker threads >= corePoolSize && If the thread pool is running => Add the task to the blocking queue
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
		/** * why double check thread pool status? * When adding task places to the blocking queue, the blocking queue may be full and wait for other tasks to be removed from the queue. During this process, the state of the thread pool may change, so doublecheck is required. * If the state of the thread pool changes when adding task places to the blocking queue, Remove */ is required
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false);
    }
    else if(! addWorker(command,false))
        reject(command);
}
Copy the code
2.3.2 addWorker Adds a thread to the thread pool
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
		// Failed to add worker because the thread pool is not in the RUNNING state
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null&&! workQueue.isEmpty()))return false;
		// Determine whether the number of threads in the thread pool is within the maximum number allowed by the thread pool. If the number of threads is allowed to create, cas updates the number of threads in the thread pool, exits the loop, and executes the following thread creation logic
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if(runStateOf(c) ! = rs)continue retry;
            // else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
		// Create a thread
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if(t ! =null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
				If the thread pool is RUNNING and the thread has been started, the thread exception is raised ahead of time
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
					// Add the thread to the created local thread collection and update the largestPoolSize field used to track the number of threads in the thread pool
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true; }}finally {
                mainLock.unlock();
            }
			// Start the thread to execute the task
            if (workerAdded) {
				// The starting thread calls the local runWorker() in the Worker to perform the task
                t.start();
                workerStarted = true; }}}finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Copy the code
2.3.3 runWorker Executing a task
final void runWorker(Worker w) {
	// Get the task thread
    Thread wt = Thread.currentThread();
    // Get the execution task
    Runnable task = w.firstTask;
	// Empty the tasks in worker
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while(task ! =null|| (task = getTask()) ! =null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
			// Double check whether the thread pool is stopping, and interrupt the thread if the pool stops and the current thread can interrupt
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
				// Prefixes the task hook function
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
					// Execute the current task
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
					// Post only sing task hook functionafterExecute(task, thrown); }}finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
		// Reclaim the threadprocessWorkerExit(w, completedAbruptly); }}Copy the code

2.4 Thread Pool Rejection Policy

When the threads in the thread pool and the tasks in the blocking queue are saturated, the thread pool needs to execute a given rejection policy to reject the tasks being submitted. ThreadPoolExecutor provides four rejection policies to reject the tasks. Default reject policy for ThreadPoolExecutor.

AbortPolicy throws an exception so that the user can make specific decisions based on the specific task

Submit AbortPolicy throw RejectedExecutionException abnormal refused to taskpublic static class AbortPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from "+ e.toString()); }}Copy the code

DiscardPolicy, do nothing, just drop the task

public static class DiscardPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}Copy the code

DiscardOldestPolicy: Poll tasks in the blocking queue and execute the current task

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if(! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}Copy the code

CallerRunsPolicy, which lets the thread submitting the task execute the task

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if(! e.isShutdown()) { r.run(); }}}Copy the code
  1. Set thread pool parameters properly
  • Cpu-intensive tasks (N+1)

This type of task consumes CPU resources. You can set the number of threads to N (number of CPU cores) +1. One more thread than the number of CPU cores is used to prevent occasional page miss interrupts, or the impact of task suspension caused by other reasons. Once the task is paused, the CPU is idle, and in this case the extra thread can take full advantage of the idle CPU time.

  • I/O intensive Tasks (2N)

In this case, the system will spend most of its time processing I/O interactions, and the thread will not occupy CPU for the time it is processing I/O, so the CPU can be handed over to another thread. Therefore, in the application of I/O intensive tasks, we can configure more threads, the specific calculation method is 2N. How to determine whether a CPU – intensive task or an IO – intensive task?

CPU intensive is simply a task that uses the computing power of the CPU such as sorting large amounts of data in memory. When it comes to network reading alone, file reading is IO intensive. The characteristic of this type of task is that the CPU computation time is very small compared to the time spent waiting for THE I/O operation to complete. Most of the time is spent waiting for the I/O operation to complete.

4. Some suggestions for using thread pools

【 Mandatory 】 Thread pools cannot be created by Executors. Use ThreadPoolExecutor to clear the running rules of the thread pool and avoid resource depletion

1) Excutors. FixedThreadPool (fixedPoolSize) and SingleThreadPool: The allowed queue length is integer. MAX_VALUE, which may accumulate a large number of requests and result in OOM. . 2) Excutors cachedThreadPool () to allow the creation of the number of threads for Integer. MAX_VALUE, could create a large number of threads, which can lead to OOM.

[Suggestion] When creating a thread pool, try to prefix the thread with a specific service name to facilitate fault locating

// 1. Use Guava's ThreadFactoryBuilder
ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();


// 2. Implement the ThreadFactory interface yourself

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/** * thread factory, which sets the thread name and helps us locate problems. * /
public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String name;

    /** * Create a thread pool production factory with a name */
    public NamingThreadFactory(ThreadFactory delegate, String name) {
        this.delegate = delegate;
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = delegate.newThread(r);
        t.setName(name + threadNum.incrementAndGet());
        returnt; }}Copy the code

[Suggestion] Try to use different thread pools for different types of business tasks

5. Other questions?

  • How does a thread pool reuse threads that have already been created?

A Work object in a thread pool can be thought of as a thread. If the number of threads in the thread pool has reached its maximum, the thread in Woker can be reused, that is, iterating over and over again to fetch tasks from the queue and then to execute them. If the tasks retrieved from the blocking queue are not null,

So that you can reuse threads to perform tasks,

  • Will the core worker thread be reclaimed?

The allowCoreThreadTimeOut(true) method is used to set the thread pool to recycle core threads. The allowCoreThreadTimeOut(true) method is used to set the thread pool to recycle core threads