Design and implementation of thread pool

This section refers to: Implementation principle of Java thread pool and its practice in Meituan business – Meituan technical team

The overall design

The core thread pool implementation class in Java is ThreadPoolExecutor. Let’s look at the inheritance of ThreadPoolExecutor:

The top-level interface of ThreadPoolExecutor implementation is Executor, and the top-level interface Executor provides the idea of decoupling task submission from task execution. You do not need to worry about how to create a thread or schedule a thread to execute a task. You only need to provide a Runnable object and submit the execution logic of a task to an Executor. The Executor framework takes care of thread allocation and task execution.

The ExecutorService interface adds some new capabilities :(1) extending the ability to execute tasks, and adding ways to generate futures for one or a group of asynchronous tasks; (2) provides methods to manage thread pools, such as stopping them from running.

AbstractExecutorService is a high-level abstract class that strings together the process of performing a task, ensuring that the underlying implementation only needs to focus on a single method to perform the task.

The lowest implementation class, ThreadPoolExecutor, implements the most complex part of the run. ThreadPoolExecutor will maintain its own life cycle while managing threads and tasks in a good combination to execute parallel tasks.

How does ThreadPoolExecutor work, maintaining threads and executing tasks at the same time? Its operating mechanism is shown in the figure below:

Thread pooling actually builds a producer-consumer model internally, decoupling threads and tasks from each other and not directly related to each other, so as to buffer tasks well and reuse threads. The operation of thread pool is mainly divided into two parts: task management and thread management.

The task management part acts as a producer. After the task is submitted, the thread pool will judge the subsequent flow of the task :(1) directly apply for the thread to execute the task; (2) Buffer to queue for thread execution; (3) Reject the task.

The thread management part is the consumer, which is uniformly maintained in the thread pool. According to the task request, threads are allocated. When the thread completes the task, it will continue to acquire new tasks to execute.

Life cycle management

The running state of the thread pool is not explicitly set by the user, but is maintained internally along with the running of the thread pool. A variable is used internally to maintain two values: runState and number of threads (workerCount). In the implementation, the thread pool combines the maintenance of two key parameters, runState and workerCount, as shown in the following code:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
Copy the code

CTL this AtomicInteger is a field that controls the running state of the thread pool and the number of valid threads in the pool. The runState of the thread pool and the number of valid threads in the thread pool (workerCount). The runState is stored in the higher 3 bits and the workerCount is stored in the lower 29 bits. The two variables do not interfere with each other. Using a variable to store two values can avoid inconsistencies when making relevant decisions. It is unnecessary to occupy lock resources to maintain the consistency of the two values. As you can also see from reading the thread pool source code, it is often necessary to determine both the running state of the thread pool and the number of threads. Thread pools also provide several methods for the user to obtain the current running state of the thread pool and the number of threads. All of these are bit operations, which are much faster than basic operations.

The internal encapsulation to get the life cycle state, get the number of threads in the thread pool is calculated as follows:

private static int runStateOf(int c)     { return c & ~CAPACITY; } // Calculate the current running status
private static int workerCountOf(int c)  { return c & CAPACITY; }  // Count the number of current threads
private static int ctlOf(int rs, int wc) { return rs | wc; }   // Generate CTLS from state and thread count
Copy the code

ThreadPoolExecutor can run in five states:

Its lifecycle transformation is shown in the figure below:

Worker

In order to master the state of threads and maintain the life cycle of threads, a Worker thread in the thread pool is designed. Let’s look at some of its code:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;// The thread held by the Worker
    Runnable firstTask;// The initialized task can be null
}
Copy the code

The Worker thread implements the Runnable interface and holds a thread, thread, that initializes the task firstTask. Threads are threads created from ThreadFactory when the constructor is called and can be used to perform tasks. FirstTask uses it to hold the first incoming task, which can be null or null. If this value is non-empty, the thread will execute the task immediately after startup, which is the case when the core thread is created. If the value is null, a thread needs to be created to perform the tasks in the workQueue, that is, the creation of non-core threads.

The task execution model of Worker is shown in the figure below:

Thread pools need to manage the life cycle of threads and need to be recycled when threads are not running for a long time. Thread pools use a Hash table to hold references to threads, which can control the thread’s life cycle by adding and removing references. What matters is how to tell if the thread is running.

Worker inherits AQS and uses AQS to realize the function of exclusive lock. Instead of ReentrantLock, AQS is used to reflect the thread’s current execution state.

  1. lockMethod acquires an exclusive lock, indicating that the current thread is executing a task.
  2. If you are executing a task, you should not interrupt the thread.
  3. If the thread is not currently in an exclusive lock state, that is, idle, it is not working on a task and can be interrupted.
  4. Thread pool executingshutdownMethod ortryTerminateMethod is calledinterruptIdleWorkersMethod to interrupt idle threads,interruptIdleWorkersThe method usestryLockMethod to determine whether a thread in the thread pool is idle; If the thread is idle, it can be safely reclaimed.

This feature is used in the thread collection process, as shown in the following figure:

Submit vs. execute

For this section refer to: Understand Java thread pools thoroughly

Submit returns a Future object whose GET method we can call to get the results of the task execution. The code is as simple as wrapping Runnable as FutureTask. As you can see, the execute method is finally called:

public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
}
Copy the code

FutureTask code will not be posted, but a brief description of the principle (see here for details on FutureTask) :

  • FutureTaskTo achieve theRunnableFutureInterface,RunnableFuture Inherited fromRunnable. Called when executing a taskFutureTask therun Method,run Method to execute the real task code, after executionsetMethod to set the result.
  • If the mission is complete,getMethod returns the result directly, if it doesn’t,getThe method blocks and waits for the result.
  • setMethod to unblock after setting the resultgetMethod returns the result.

The execute() method flow is described in the previous article.

Source code analysis of key methods

JUC thread pool ThreadPoolExecutor source code: Throwable

Execute method source code analysis

This method is used to perform a task. The thread pool can execute tasks asynchronously as ThreadPoolExecutor#execute().

  1. If the total number of current worker threads is less thancorePoolSize, the core thread is created directly to execute the task (the task instance is passed in to be used directly to construct the worker thread instance).
  2. If the number of current worker threads is greater than or equal tocorePoolSizeIf the current number of worker threads is 0, a non-core thread will be created and the task object passed in is NULL.
  3. If placing a task to the task queue fails (the task queue is full), an attempt is made to create a non-core thread to pass in the task instance to execute.
  4. If a non-core thread fails to be created, reject the task and invoke the reject policy to process the task.

Here’s a bit of a puzzle: why do I need to double-check the running status of the thread pool when the current worker thread count is zero and try to create a non-core thread with a null task object passed in? This can be seen in the API comments:

If a task is successfully added to the task queue, we still need to double-check whether we need to add another worker thread (since all alive worker threads may have terminated after the last check) or whether the thread pool was shutdown when the current method was executed. So we need to double check the state of the thread pool, remove the task from the queue if necessary, or create a new worker thread if no worker thread is available.

The source code is as follows:

// Execute the command, where the command object is an instance of Runnable
public void execute(Runnable command) {
    // Determine that the command (task) object is not empty
    if (command == null)
        throw new NullPointerException();
    // Get the value of CTL
    int c = ctl.get();
    If the current number of worker threads is smaller than the number of core threads, a new core thread is created and the incoming task is executed
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            // If the new core thread is successfully created, it returns
            return;
        // Failed to create the core thread and need to update the temporary variable c of the CTL
        c = ctl.get();
    }
    // Failed to create a new core thread, i.e. the number of current worker threads is greater than or equal to corePoolSize
    // Check whether the thread pool is running and try to add a task to the task queue using a non-blocking method (false if the task fails to be added)
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // The thread pool status is rechecked when the task is successfully placed into the task queue
        // If the thread pool is in a non-running state, the current task is removed from the task queue.
        if (! isRunning(recheck) && remove(command))
            // Call reject policy processing task - returns
            reject(command);
        // Go to the following else if branch, which has the following premises:
        The task to be executed has been added to the task queue
        // The thread pool is in the RUNNING state
        // The incoming task may fail to be removed from the task queue (the only way to remove a task is if the task has already been executed)
        // If the current number of worker threads is 0, a non-core thread is created and the task object passed in is null - returned
        // Non-core threads are created that do not run immediately, but wait for tasks that fetch the task queue to execute
        // If the number of previous worker threads is not zero, it should be the last else branch, but we can do nothing because the task has been successfully queued. There will always be a good time to allocate another free thread to execute it
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false);
    }
    // The following preconditions exist:
    // the number of worker threads in the thread pool is greater than or equal to the corePoolSize.
    The thread pool may not be in the RUNNING state
    The thread pool may be in the RUNNING state and the task queue is full
    // If placing a task to the task queue fails, an attempt is made to create a non-core thread to pass in the task to execute
    // Failed to create a non-core thread
    else if(! addWorker(command,false))
        // Call reject policy processing task - returns
        reject(command);
}
Copy the code

Reject method source code analysis

The reject(Runnable Command) method is as simple as calling the corresponding reject policy reject task:

COPYfinal void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}
Copy the code

Call the member held by the thread pool, RejectedExecutionHandler instance, to call back the task instance and the current thread pool instance.

AddWorker method source code analysis

AddWorker creates a Worker object and creates a thread to execute the task.

Boolean addWorker(Runnable firstTask, Boolean core) The first argument to the method can be used to pass in the task instance directly. The second argument identifies whether the worker thread to be created is a core thread. The method source code is as follows:

// Add a worker thread. False indicates that no new worker thread was created, and true indicates that the worker thread was successfully created and started
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:  
    // Note that this is an infinite loop - the outermost loop
    for (int c = ctl.get();;) {
        // This is a very complicated condition. Here we split several and (&&) conditions:
        // 1. The thread pool state is at least SHUTDOWN, i.e. Rs >= SHUTDOWN(0)
        // 2. The thread pool state is at least STOP, i.e. Rs >= STOP(1), or the incoming task instance firstTask is not null, or the task queue is empty
        If the thread pool is in the shutdown state, no new tasks will be accepted. Under this premise, no new threads need to be added if the thread pool is in the STOP state, the incoming task is not empty, or the task queue is empty (there are no backlog tasks)
        if(runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask ! =null
                || workQueue.isEmpty()))
            return false;
        // Note that this is also a dead-layer loop
        for (;;) {
            // Each round of the loop retrieves the worker count wc
            If wc >= corePoolSize, return false. If wc >= corePoolSize, return false
            If wc >= maximumPoolSize, false is returned. If wc >= maximumPoolSize, false is returned
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            // If CAS succeeds in updating the number of worker threads wc, break to the outermost loop
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // The state of the thread pool has changed from RUNNING to SHUTDOWN. The state of the thread pool has changed from RUNNING to SHUTDOWN
            c = ctl.get();  // Re-read ctl
            // If the status of the thread pool has changed from RUNNING to SHUTDOWN, the thread pool will jump back to the outer loop and continue executing
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // If the status of the thread pool is still RUNNING and the CAS fails to update the number of worker threads, the failure may be caused by concurrent updates
            // else CAS failed due to workerCount change; retry inner loop }}// Flags whether the worker thread started successfully
    boolean workerStarted = false;
    // Flag whether the worker thread was created successfully
    boolean workerAdded = false;
    Worker w = null;
    try {
        // Create a Worker instance by passing in task firstTask. The Worker construct creates a new Thread object through the Thread factory, so we can directly operate Thread t = w.hash
        // The Worker instance has been created, but has not been added to the Worker Thread collection or started its Thread instance
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if(t ! =null) {
            // Global locking is required because some index values and non-thread-safe collections will be changed
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                // The thread pool state has changed to SHUTDOWN after the lock is successfully acquired
                // 1. If the state of the thread pool is still RUNNING, it only needs to check whether the thread instance is alive and needs to be added to the Worker thread set and start a new Worker
                // 2. If the thread pool state is smaller than STOP (i.e., RUNNING or SHUTDOWN) and the incoming task instance firstTask is null, it needs to be added to the Worker thread set and start a new Worker
                // for 2, in other words, if the thread pool is SHUTDOWN and the incoming task instance firstTask is not null, no new Worker will be added to the Worker thread set and started
                // It is possible that this step creates a new Worker instance but does not start (temporary objects, without any strong references), and the Worker may succeed in the next round of garbage collection
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // Add the created worker thread instance to the worker thread collection
                    workers.add(w);
                    int s = workers.size();
                    // Try to update the historical peak number of worker threads, i.e., the thread pool peak capacity
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // If the update worker thread started successfully, the flag is true, and then the Thread#start() method is called to start the actual thread instance
                    workerAdded = true; }}finally {
                mainLock.unlock();
            }
            // If the Worker thread is successfully added, the real thread instance is started by calling Thread#start() of the thread instance t inside the Worker
            if (workerAdded) {
                t.start();
                // flag that the thread started successfully
                workerStarted = true; }}}finally {
        // The thread fails to start and the corresponding Worker needs to be removed from the Worker thread set
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

// Failed to add Worker
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Remove it from the worker thread set
        if(w ! =null)
            workers.remove(w);
        // Wc count minus 1
        decrementWorkerCount();
        // Attempt to terminate the thread pool based on state judgment
        tryTerminate();
    } finally{ mainLock.unlock(); }}Copy the code

In the above analysis logic, it should be noted that when the Worker instance is created, a Java Thread instance will be created in its constructor through ThreadFactory. After that, it will check whether the Worker instance needs to be added to the Worker Thread set workers and whether the Thread instance held by the Worker needs to be started. Only when the Thread instance instance is started, the Worker really starts to operate; otherwise, it is just a useless temporary object. The Worker itself also implements the Runnable interface, which can be regarded as a Runnable adapter.

Because the Worker itself implements the Runnable interface, and binds the Worker itself to its internal Thread in the Worker constructor. That is:

Worker(Runnable firstTask) {
    setState(-1); 
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this); // Pass itself as Runnable to the Thread object
}
Copy the code

Therefore, after calling t.start(), the thread in Worker will be started, and the thread will execute Worker’s run method. Worker’s run method is:

public void run(a) {
    runWorker(this);
}
Copy the code

So the logic for executing the task is in the runWorker method.

RunWorker method source code analysis

final void runWorker(Worker w) {
    // Get the current thread, which is actually the same thread instance held by the Worker
    Thread wt = Thread.currentThread();
    // Get the initialized task object held by the Worker, stored in the temporary variable task
    Runnable task = w.firstTask;
    // Set the initialization task object passed in the Worker to null
    w.firstTask = null;
    // Since the state in AQS is set to -1 when Worker is initialized, an unlock is required to update the state to 0 to allow thread interruption
    w.unlock(); // allow interrupts
    // Logs whether the thread terminated due to a user exception. The default is true
    boolean completedAbruptly = true;
    try {
        // Initialize a task object that is not null, or fetch a task from the task queue that is not empty (the task fetched from the task queue is updated to the temporary variable task)
        // getTask() because of the blocking queue, if the second half of the while loop is blocked or timeout blocked, getTask() returns null causing the thread to jump out of the loop and terminate
        while(task ! =null|| (task = getTask()) ! =null) {
            // Worker locks, essentially AQS obtains resources and tries to update CAS state from 0 to 1
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // If the thread pool is stopping (i.e. from RUNNING or SHUTDOWN to STOP), make sure that the current worker thread is interrupted
            // Otherwise, ensure that the current thread is not interrupted
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                // Hook method, before the task is executed
                beforeExecute(wt, task);
                try {
                  	// Execute the task
                    task.run();
                    // Hook method after task execution - normal case
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    // Hook method, after task execution - exception condition
                    afterExecute(task, ex);
                    throwex; }}finally {
                // Empty the task temporary variables. This is important, otherwise the while will execute the same task in an infinite loop
                task = null;
                // Add up the number of tasks completed by the Worker
                w.completedTasks++;
                // set state to 0w.unlock(); }}GetTask () returns null once, and the thread exits normally
        completedAbruptly = false;
    } finally {
        If completedAbruptly is true, the thread exits abnormally due to a user exceptionprocessWorkerExit(w, completedAbruptly); }}Copy the code

The code that determines the interrupt status of the current worker thread is dissected here:

if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();// Let's simplify the logic
If the thread pool is at least STOP, rs >= STOP(1)
boolean atLeastStop = runStateAtLeast(ctl.get(), STOP);
// Check whether the thread pool state is at least STOP, check the interrupted state of the current thread and clear the interrupted state of the current thread
boolean interruptedAndAtLeastStop = Thread.interrupted() && runStateAtLeast(ctl.get(), STOP);
if(atLeastStop || interruptedAndAtLeastStop && ! wt.isInterrupted()){ wt.interrupt(); }Copy the code

Thread.interrupted() gets the interrupted status of the Thread and clears the interrupted status. This method is called because it is possible to invoke shutdownNow() while executing the if logic. ShutdownNow () method also has the logic to interrupt all Worker threads. However, since shutdownNow() method will traverse all workers to do thread interruption, it may not be able to interrupt tasks before they are submitted to the Worker for execution. Therefore, the interrupt logic will be executed inside the Worker, which is the logic of the if code block. It should also be noted that all newly submitted tasks will be rejected in the STOP state, tasks in the task queue will not be executed, and all Worker threads will be interrupted. That is, even if the task Runnable has been pulled out of the first half of the runWorker() logic, it may be interrupted before it reaches the point of calling its Runnable#run(). Assuming that the logic that goes into the if block happens and the shutdownNow() method is called externally, the if logic will determine the thread abort status and reset, InterruptWorkers () called in the shutdownNow() method will not cause the thread to interrupt twice (which would cause an exception) because of a problem with the interrupt status judgment.

To summarize the core flow of the runWorker() method above:

  1. WorkerPerform the unlocking operation first to release the uninterruptible state.
  2. throughwhileCycle callgetTask()Method gets the task from the task queue (of course, the first loop could also be an external incoming firstTask instance).
  3. If the thread pool becomes moreSTOPState, you need to ensure that the worker thread is interrupted and interrupt processing, otherwise you must ensure that the worker thread is not interrupted state.
  4. Execution Task InstanceRunnale#run()Method, the hook method is called before and after the execution of the task instance, including normal completion and exception execution, respectivelybeforeExecute()andafterExecute().
  5. whileBreaking out of the loop meansrunWorker()Method end and worker thread lifecycle end (Worker#run()Lifecycle end), will be calledprocessWorkerExit()Handles the aftermath of worker thread exit.

Next look at the getTask() method for getting the task from the task queue and the processWorkerExit() method for handling the subsequent work of thread exit.

GetTask method source analysis

The getTask() method is a way for a worker thread to get a task object in the task queue in a while loop:

private Runnable getTask(a) {
    // Records whether the last pull from the queue timed out
    boolean timedOut = false; // Did the last poll() time out?
    // Note that this is an infinite loop
    for (;;) {
        int c = ctl.get();

        // Check if queue empty only if necessary.
        // If: If the thread pool state is at least SHUTDOWN, i.e. Rs >= SHUTDOWN(0), then two cases (or logic) need to be determined:
        // 1. The thread pool state is at least STOP(1), that is, the thread pool is being stopped, usually by calling shutdownNow()
        // 2. The task queue is empty
        // If the thread pool is at least SHUTDOWN and one of the above two conditions is met, the number of worker threads is subtracted by 1 and null is returned
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        // The thread pool is still in the RUNNING state
        int wc = workerCountOf(c);

        // Are workers subject to culling?
        The timed variable controls thread timeout and determines whether poll(), a non-blocking method with timeout, is needed to pull a task from the task queue
        AllowCoreThreadTimeOut The default value is false. If set to true, the poll() method allows the core thread to pull tasks from the task queue as well
        // 2. When the number of worker threads is greater than the number of core threads, additional non-core threads have been created in the thread pool. These non-core threads must have pulled tasks from the task queue via the poll() method
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // Second if:
        Wc > maximumPoolSize indicates that the current number of worker threads is greater than maximumPoolSize, indicating that the thread pool capacity is reduced by setMaximumPoolSize()
        Timed && timedOut indicates that the thread hit the timeout control and the previous loop pulls the task null from the task queue via the poll() method
        If the number of threads is greater than 1 or the task queue is empty, CAS subtracts the number of threads by 1 and returns null.
        // CAS fails to subtract 1 from the number of threads and enters the next loop for retry
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // If timed is true, the poll() method is used to pull timeout, and no valid task is waiting within keepAliveTime, null is returned
            // If timed is false, take() will block until the next valid task is available.
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // This is important because it only returns if it is not null, in which case it goes to the next loop
            if(r ! =null)
                return r;
            The workqueue.poll () method has timed out and returns null
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; }}}Copy the code

In this method, there are two very large if logic, for the first if may cause the number of worker threads to be subtracted by 1 directly return null:

  1. The thread pool status isSHUTDOWN, is generally calledshutdown()Method and the task queue is empty.
  2. The thread pool status isSTOP.

For the second if, the logic is a bit complicated, let’s break it down:

// The total number of worker threads is greater than maximumPoolSize, indicating that the thread pool capacity is reduced by setMaximumPoolSize()
boolean b1 = wc > maximumPoolSize;
// Allows the thread to time out while the previous round pulls null from the task queue using the poll() method
boolean b2 = timed && timedOut;
// The total number of worker threads is greater than 1
boolean b3 = wc > 1;
// The task queue is empty
boolean b4 = workQueue.isEmpty();
boolean r = (b1 || b2) && (b3 || b4);
if (r) {
    if (compareAndDecrementWorkerCount(c)){
        return null;
    }else{
        continue; }}Copy the code

This logic is mostly for non-core threads. In the execute() method, when the total thread pool exceeds corePoolSize and is less than maximumPoolSize, non-core threads are added via addWorker(Task,false) when the task queue is already full. The logic here is exactly like the reverse operation of addWorker(Task,false), which is used to reduce the number of non-core threads so that the total number of worker threads is closer to corePoolSize. If the previous loop is null for a non-core thread, the timed && timedOut loop is easily timed to true, and getTask() returns null causing the Worker#runWorker() method to jump out of the loop. The processWorkerExit() method is then executed to handle the subsequent work, and the Worker corresponding to the non-core thread becomes a “floating object” waiting to be reclaimed by the JVM. When allowCoreThreadTimeOut is set to true, the end-of-life logic analyzed here for non-core threads also applies to core threads. Then the meaning of keepAliveTime can be summarized:

  • When the core thread is allowed to time out, i.eallowCoreThreadTimeOutWhen set to true, at this timekeepAliveTimeRepresents the lifetime of idle worker threads.
  • By default, the core thread is not allowed to time outkeepAliveTimeRepresents the lifetime of idle non-core threads.

In some specific scenarios, a properly configured keepAliveTime can make better use of worker thread resources in the thread pool.

ProcessWorkerExit method source code analysis

The processWorkerExit() method does a cleanup and data logging for the terminating Worker(because the processWorkerExit() method is also wrapped in the runWorker() method finally block, The worker thread is not truly terminated until it executes the processWorkerExit() method.

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // Because a user exception was thrown that resulted in thread termination, the number of worker threads can be reduced by 1
    GetTask () returns null to instruct the thread to exit the runWorker() method's while loop normally if no exception is thrown, in which case the thread count has been reduced by 1 in getTask()
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // The number of completed tasks globally plus the number of completed tasks in the terminating Worker
        completedTaskCount += w.completedTasks;
        // Remove the terminating Worker from the Worker thread set
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
     
    // See the analysis in the next section to determine whether thread pool terminate processing is required based on the current thread pool state
    tryTerminate();

    int c = ctl.get();
    // If the thread pool state is less than STOP, i.e. RUNNING or SHUTDOWN:
    If the core thread is allowed to time out, keep at least one worker thread in the thread pool
    // 2. If the thread terminates due to a user exception, or the current number of working threads, then simply add a new non-core thread
    if (runStateLessThan(c, STOP)) {
        if(! completedAbruptly) {// The minimum value is 0 if the core thread is allowed to timeout, otherwise corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // If the minimum value is 0 and the task queue is not empty, the update minimum value is 1
            if (min == 0&&! workQueue.isEmpty()) min =1;
            If the number of worker threads is greater than or equal to the minimum value, the system returns no new non-core threads
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null.false); }}Copy the code

The next part of the code determines the state of the thread pool. If the thread pool is RUNNING or SHUTDOWN, a new non-core thread will be created if the current worker thread is terminated by throwing a user exception. If the current worker thread does not throw a user exception and is terminated (normally terminated), then it is handled like this:

  • allowCoreThreadTimeOutTrue, that is, if the core thread is allowed to timeout and the task queue is empty, at least one worker thread is kept in the thread pool by creating a non-core thread.
  • allowCoreThreadTimeOutFalse if the total number of worker threads is greater thancorePoolSizeOtherwise, creating a non-core thread will tend to keep the number of worker threads in the thread pool close tocorePoolSize.

When processWorkerExit() completes, it means that the life cycle of the worker thread has ended.

TryTerminate method source analysis

The tryTerminate() method is called when each worker thread terminates:

final void tryTerminate(a) {
    for (;;) {
        int c = ctl.get();
        // Determine the state of the thread pool, if any of the following three cases directly return:
        // 1. The thread pool is RUNNING
        // 2. The thread pool is at least in TIDYING state, which means the following steps are completed and the thread pool is TERMINATED
        // 3. The thread pool is at least stopped and the task queue is not empty
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
            return;
        // If the number of worker threads is not zero, the first idle worker thread in the set is interrupted
        if(workerCountOf(c) ! =0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // CAS sets thread pool state to TIDYING. If it succeeds, the hook method terminated() is executed.
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    // Last updated thread pool state to TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    // Wake up all threads blocking the termination condition, and the await() method of this variable is called in awaitTermination()
                    termination.signalAll();
                }
                return; }}finally {
            mainLock.unlock();
        }
        // else retry on failed CAS}}// interrupt idle worker threads. When onlyOne is true, onlyOne of the worker threads is interrupted
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            Thread interrupts when the thread is not interrupted and attempts to acquire the lock are successful
            if(! t.isInterrupted() && w.tryLock()) {try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally{ w.unlock(); }}// This breaks out of the loop, i.e. only the first worker thread in the collection is interrupted
            if (onlyOne)
                break; }}finally{ mainLock.unlock(); }}Copy the code

The puzzle here is the second if code logic of the tryTerminate() method: if the number of worker threads is not zero, the first idle worker thread in the set of worker threads is interrupted. There is a paragraph in the method API comment that reads:

If otherwise eligible to terminate but workerCount is nonzero, interrupts an idle worker to ensure that shutdown signals propagate. When the condition for terminating the thread pool is met but the number of worker threads is not zero, an idle worker thread needs to be interrupted to ensure that the thread pool closure signal is propagated.

The shutdown() method, which will be examined below, interruptIdleWorkers() interrupts all idle threads. It is possible that a non-idle thread is performing a task, and if it happens to be the core thread, It blocks the take() method of the task queue in the next round of the loop, and if no additional intervention is made, it may even permanently block the take() method of the task queue after the thread pool is closed. To avoid this, each worker thread exits by attempting to interrupt one of the idle threads in the worker thread set, ensuring that all idle threads exit normally.

The interruptIdleWorkers() method does tryLock() on each workline and interrupts the thread only if it returns true. We know that in the runWorker() method, every time a non-null task is obtained from the task queue, the worker thread will first lock the Worker#lock() operation. This prevents the thread from being interrupted during the execution of the task, and ensures that the interrupted worker thread must be free.

The resources

  1. Thoroughly understand Java thread pools
  2. Implementation principle of Java Thread Pool and its practice in Meituan business – Meituan Technical team
  3. JUC thread pool ThreadPoolExecutor source code