I wrote @threadPoolExecutor

JAVA concurrency version 1.8


  • I wrote @threadPoolExecutor
    • 1. Start with the topic
      • 1.1 How to Create more Threads
      • 1.2 Overview of thread pools
    • 2. Thread pool composition
      • 2.1 the class definition
      • 2.2 the constructor
      • 2.3 Important Variables
    • 3. Thread pool usage
      • 3.1 Creating a thread pool
      • 3.2 Submitting and Executing a Task
      • 3.3 Disabling a thread pool
      • 3.4 Configuring a Thread Pool properly
    • 4. Implementation principle of thread pool
      • 4.1 the flow chart
      • 4.2 Execution Status
      • 4.3 Handling Timeout
      • 4.4 Thread Pool Status
        • 4.4.1 Status Controller
        • 4.4.2 Thread Pool Status
      • 4.5 the Worker
        • 4.5.1 composition
        • 4.5.2 Executing a Task
        • 4.5.3 lock method
      • 4.6 Dynamic Control
    • 5. Task submission and execution
      • 5.1 Execute () – Submits tasks
      • 5.2 addWorker() – Adds a worker thread
      • 5.3 runWorker() – Perform tasks
      • 5.4 getTask() – Get the task
    • 6. Close the thread pool
      • 6.1 Shutdown () – Orderly shutdown
      • 6.2 shutdownNow() – Shut down immediately
      • 6.3 awaitTermination() – Waiting for the thread pool to terminate
    • 7. Interrupt and terminate processing
      • 7.1 interruptIdleWorkers() – Interrupts idle threads
      • 7.2 interruptWorkers() – Interrupts all threads
      • 7.3 tryTerminate() – Attempts to terminate the thread pool
    • 8. Thread failure and recycle processing
      • 8.1 addWorkerFailed() – Failed to add a thread
      • 8.2 processWorkerExit() – Thread reclamation process
    • 9. Task queues and queuing policies
      • 9.1 Bounded Queues
        • 9.1.1 ArrayBlockingQueue
        • 9.1.2 LinkedBlockingQueue
        • 9.1.3 LinkedBlockingDeque
      • 9.2 Unbounded Queue
        • 9.2.1 SynchronousQueue will
        • 9.2.2 PriorityBlockingQueue
        • 9.2.3 DelayQueue
        • 9.2.4 LinkedTransferQueue
    • 10. Thread pool monitoring
      • 10.1 Native Monitoring
      • 10.2 Expanding monitoring
    • 11. Saturation rejection strategy
      • 11.1 CallerRunsPolicy
      • 11.2 AbortPolicy
      • 11.3 DiscardPolicy
      • 11.4 DiscardOldestPolicy
    • Thread pool exception handling
      • 12.1 Submit () Exception handling
      • 12.2 Execute () Exception Handling
  • Note: Since there are too many English annotations in the JDK documentation, I choose to extract some of the most important ones, and the rest will be presented in Chinese annotations
  • Recommendation: I suggest readers combine with @synchronized article (version 1.8) to understand
  • Recommendation: the author suggests readers combined with concurrent @ AbstractQueuedSynchronizer is through understanding
  • Thank you for your support: kiraSally’s personal blog for the Nuggets

Note: I will write a further article on @Future that describes the AbstractExecutorService, a thread pool operation with a return value. At the same time, in the @Future article, I will also further introduce the use of diversified thread pool configuration and Tomcat thread pool configuration, stay tuned

1. Start with the topic

1.1 How to Create more Threads

In Java, you can adjust the size of each thread stack by adjusting the -xSS parameter (default 1024KB on 64-bit systems). Decreasing this value means that more threads can be created, but the problem is that JVM resources are limited and threads cannot be created indefinitely!

You can control the number of threads through thread pools, which are similar to connection pools that effectively reduce the overhead of frequently creating and destroying threads by reusing a finite number of threads; At the same time, thread pools can perfectly handle the producer-consumer pattern, where submitting tasks equals production and executing tasks equals consumption

1.2 Overview of thread pools

From the development experience, the thread pool is supposed to be in the process of contract awarding and use frequency and use of setting up concurrent framework, almost all the demand of the concurrent tasks/asynchronous execution need to use a thread pool, thread reuse, available in the form of internal thread pool management task execution, thread scheduling, thread pool management and so on services. There are three benefits to using thread pools properly:

1. Reduce resource consumption: Reduce thread creation and destruction costs by reusing created threads

2. Improved response time: Tasks can be executed immediately upon arrival without waiting for a thread to be created

3. Improved thread manageability: Thread pools can be managed, allocated, tuned, and monitored uniformly

2. Thread pool composition

2.1 the class definition

public class ThreadPoolExecutor extends AbstractExecutorServiceCopy the code

2.2 the constructor

/** * Public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } /** * Public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } /** * Public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }Copy the code

2.3 Important Variables

// Thread Pool controller private Final AtomicInteger CTL = new AtomicInteger(ctlOf(RUNNING, 0)); Private final BlockingQueue<Runnable> workQueue; Private final ReentrantLock mainLock = new ReentrantLock(); Private final HashSet<Worker> workers = new HashSet<Worker>(); Private final Condition termination = mainlock. newCondition(); private final Condition termination = mainlock. newCondition(); Private int largestPoolSize; private int largestPoolSize; Private long completedTaskCount; Private volatile ThreadFactory ThreadFactory; Private volatile RejectedExecutionHandler Handler; Private Volatile Long keepAliveTime; private volatile long keepAliveTime; /** * Allows the core worker thread to respond to timeout collection * false: The core worker thread survives even with an idle timeout * true: Private volatile Boolean allowCoreThreadTimeOut; private volatile Boolean allowCoreThreadTimeOut; Private volatile int corePoolSize; private volatile int corePoolSize; Private volatile int maximumPoolSize; AbortPolicy -> Throw an exception private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();Copy the code

3. Thread pool usage

3.1 Creating a thread pool

Creating a thread pool is actually instantiating a thread pool object, and here we use the most complete constructor to describe the most complete creation process:

CorePoolSize (number of core worker threads) : the minimum number of idle thread pools allowed (maintained) by a thread pool without a task; When a task is submitted to the thread pool, a new worker thread is created to execute the task (even if there are idle core workers) until (actual worker threads >= number of core workers). Calling the prestartAllCoreThreads() method creates and starts all core worker threads ahead of time

2. WorkQueue: A blocking queue used to store tasks waiting to be executed. If (actual number of worker threads >= number of core worker threads) && (number of tasks < task queue length), the task will offer() to join the queue waiting; For details on task queues, see Task Queues and Queuing Policies below

3. MaximumPoolSize (maximum number of worker threads) : the maximum number of worker threads allowed to be created in the thread pool; When (the queue is full && the actual number of worker threads < the maximum number of worker threads), the thread pool creates new worker threads (even if there are still free worker threads) to execute the task until the maximum number of worker threads; This parameter is invalid when setting an unbounded queue

KeepAliveTime (maximum idle time of a worker thread) : the unit is nanosecond. The idle worker thread that meets the timeout condition is reclaimed. Non-core worker threads that time out are reclaimed, and core worker threads are not reclaimed. When allowCoreThreadTimeOut=true, the core worker thread that timed out is also reclaimed; If this value is not set, the thread will live forever; You are advised to adjust the time to improve thread utilization in a scenario with short and long tasks

5. Unit: indicates the unit of the thread hold time. Options include NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, and DAYS

ThreadFactory: As the name suggests, it is a factory for creating threads, allowing custom creation of factories that can be initialized by threads, such as names, daemon threads, exception handling, and so on

7. Handler (saturation policy executor) : When the thread pool and queue are full, it indicates that the thread is unable to receive any more tasks. The default is Abort(Reject exception). This policy also includes Discard(LIFO rule discarding), DiscardOldest(LRU rule discarding), and CallerRuns(caller thread execution). This policy allows you to define your own executable

Note: The reason there are more than “=” cases when comparing the number of threads is that thread pools allow dynamic control, as described below

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory ThreadFactory, RejectedExecutionHandler handler) { Otherwise when initialized directly thrown in IAE if (corePoolSize < 0 | | maximumPoolSize < = 0 | | maximumPoolSize < corePoolSize | | keepAliveTime < 0) throw new IllegalArgumentException(); // The task queue, thread factory, and saturation policy executor are not allowed to be empty. Otherwise the initialization is directly out of NPE if (workQueue = = null | | threadFactory = = null | | handler = = null) throw new NullPointerException (); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }Copy the code

3.2 Submitting and Executing a Task

You can choose one or the other depending on whether you want a return value:
1. Execute (): applies to tasks that do not return a value

– This method is unable to determine whether the task was successfully executed by the thread pool

2. Submit (): Applies to tasks that require a return value

– The returned Future object tells you whether the task has been successfully executed

The -get() method blocks the current thread until the task is complete, but beware of infinite blocking!!

– Use the get(long timeout,TimeUnit unit) method to block the current thread until the task is complete or times out.

3.3 Disabling a thread pool

You can choose between the following two options to turn off the thread pool, and I recommend using the safer first:

1. Shutdown (): Shuts down the thread pool in an orderly manner. Submitted tasks are executed (both in progress and in the task queue), but new tasks are rejected

2. ShutdownNow (): Immediately (try) to stop all tasks (both executing and in the task queue) and return to the list of tasks to be executed

Note: All of the above methods can terminate the thread pool by calling awaitTermination() and waiting for the task to complete

3.4 Configuring a Thread Pool properly

Recommended reading
Configure thread pools properlyI will share my practical experience in the future if I have the opportunity

It is recommended that the size of the thread pool be determined based on specific business pressure measurements or estimated according to Little’s rule

Little’s Rule Little’s law (Little’s result, Lemma or Formula, THEOREM), in a stable system, the average number of customers observed over a long period of time, L, is the product of the effective arrival rate observed over a long period of time, λ, and the time spent by the average customer in the system, That is L = λW. (From Baidu)

Of course, the reality is always cruel, such as network exceptions, I/O timeout, etc., may cause time-consuming instability; At the same time in the process of using the thread pool may appear a variety of strange problems, so we also need to consider the task timeout processing, failure processing, isolation processing and so on, especially a large number of errors occur, such as abnormal flood, high concurrency flood and so on

Recommended reading
ExecutorService-10 tips and Tricks

Note: I will write a further article on @Future that describes the AbstractExecutorService, a thread pool operation with a return value. At the same time, in the @Future article, I will also further introduce the use of diversified thread pool configuration and Tomcat thread pool configuration, stay tuned

4. Implementation principle of thread pool

4.1 the flow chart

4.2 Execution Status

As the number of tasks increases, the execution of the thread pool mainly includes the following four situations, corresponding to each judgment in the processing flow:

1. Create a new worker thread to execute the new task execute(Runable) if workers< corePoolSize

2. If the actual number of workers>= number of core workers corePoolSize(all core workers are executing tasks) and the workQueue is not full, the task is added to the workQueue

3. If the workQueue is full, create a new thread to execute execute().

4. If the actual number of worker threads (workers>= Maximum number of worker threads maximumPoolSize(all threads are executing tasks) and the number of tasks is saturated, perform the saturation rejection operation according to the rejectedExecutionHandler policy

The overall design of the thread pool is to avoid acquiring global locks as much as possible for performance reasons:

Step 1 and Step 3 must be locked because a global lock is required when creating a new thread

2. To avoid obtaining the global lock multiple times (performance scaling bottleneck), if the actual number of worker threads is greater than or equal to the number of core worker threads, step 2 will be performed.

Note: Don’t be fooled by reject. It simply means that the thread pool has no extra worker threads to execute and no extra queue space to store the task. It doesn’t mean that the task is not actually processed

4.3 Handling Timeout

If you need to handle timed core worker threads, choose the second option; If not, choose the first option:

CorePoolSize (workers> corePoolSize); reclaim idle non-core threads whose idle time exceeds keepAliveTime (reduce the number of workers until <= number of core workers);

2. If allowCoreThreadTimeOut is true, idle core worker threads that exceed keepAliveTime are also reclaimed

4.4 Thread Pool Status

4.4.1 Status Controller

// The thread pool state controller is used to ensure the thread pool state and the number of working threads. Private Final AtomicInteger CTL = new AtomicInteger(ctlOf(RUNNING, 0)); SIZE = 32 -> COUNT_BITS = 29 Private static final int COUNT_BITS = integer.size - 3; Private static final int CAPACITY = (1 << COUNT_BITS) -1; private static final int CAPACITY = (1 << COUNT_BITS) -1; Private static int runStateOf(int c) {return c & ~CAPACITY; } private static int workerCountOf(int c) {return c & CAPACITY; } private static int ctlOf(int rs, @param wc) {private static int ctlOf(int rs, @param WC int wc) { return rs | wc; }Copy the code
Here’s a bit of binary operator basics for those who have forgotten:

The & : and operator is 1 only if both bits are 1, otherwise 0

| : or operator, with a 1 to 1, otherwise 0

~ : non-operator, 0 and 1 are interchangeable, i.e. if 0 becomes 1,1 becomes 0

^ : xor operator, 0 if same bit, 1 if different bit

4.4.2 Thread Pool Status

The thread state flow follows the following order, from smallest to largest:

RUNNING -> SHUTDOWN -> STOP -> TIDYING -> TERMINATED

Add: The numerical shift feels like we’re getting closer to God as we get older

// runState is stored in the high-order bits. If the lower 29 bits are 0, the thread pool receives new submitted tasks and executes queue tasks. Private static final int RUNNING = -1 << COUNT_BITS; Private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; Private static final int STOP = 1 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; Terminated () private static final int TIDYING = 2 << COUNT_BITS; // In this state, all tasks in the thread pool are terminated and the number of working threads is 0. // The thread pool terminated() method is terminated. Private static final int terminated = 3 << COUNT_BITS;Copy the code

4.5 the Worker

Worker is an internal class of the thread pool that encapsulates Worker threads and tasks and manages the interrupt status of Worker threads, among other functions

4.5.1 composition

The Worker class encapsulates the three parts (lock + thread + task), thus becoming a generalist:

1. Inherit the AQS class: to achieve a simple non-reentrant mutex lock, to provide convenient lock operations, the purpose of handling interrupt situations

2. Implementation of Runnable interface: “opportunistic” design, mainly borrows the Runnable interface uniform writing, the advantage is that you do not have to rewrite the same function interface

3. Worker thread: The Worker will bind a Worker thread (one to one) that actually performs the task through the thread variable, which will be allocated by the thread factory at initialization, and it will repeatedly acquire and execute the task

4. Task: The Worker will assign a new task to the firstTask variable each time, and the Worker thread will process the newly acquired task through this variable each time (this value is allowed to be null during initialization, which has special effects, as detailed below).

Supplementary: Since worker thread and worker instance are one-to-one, it is easy to understand that worker thread is equivalent to worker, especially when it comes to quantity. For example, creating a worker thread is actually creating a worker, which requires your understanding. For convenience of description, the author will hide this relationship in the paper

Private final class Worker extends AbstractQueuedSynchronizer implements Runnable {/ * *, in fact, the real work thread - behind the scenes, Null */ final Thread Thread; /** The task to be executed may be null */ Runnable firstTask; /** The number of tasks completed by the worker thread -- on the importance of the KPI */ Volatile long completedTasks; Worker(Runnable firstTask) {// Set the lock state to -1 to prevent the runWorker() from being interrupted setState(-1); * 1. The first task passed in when the addWorker() method is called to create a new thread * 2. GetTask () is called internally when the runWorker() method is called -- this is the thread reuse of the present */ this.firstTask = firstTask; /** * create a newThread -> this is the real Worker thread * note that Worker is a Runnable object * therefore this in newThread(this) is also a Runnable object */ this.thread = getThreadFactory().newThread(this); }}Copy the code

4.5.2 Executing a Task

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}Copy the code
Public void run() {runWorker(this); public void run() {runWorker(this); }Copy the code
(Emphasis) Here is a brief introduction to the workflow of threads executing tasks in a thread pool:

1. Before the worker thread starts execution, it needs to lock the worker and unlock the task after completion

2. Run the beforeExecute() and afterExecute() methods before and after the task

3. An exception is thrown during execution. Whether the thread dies depends on how you handle the exception

4. After each task is executed, the number of tasks completed by the current worker thread increases, and getTask() will be called repeatedly to obtain and execute tasks from the task queue. When no task can be executed, the thread will block on this method

5. When the worker thread exits for various reasons, processWorkerExit() will be executed to reclaim the thread (the core is to remove the worker from the workers set, notice that the worker has quit the task cycle before, so it is no longer working, and it is convenient to gc after being removed from the set).

4.5.3 lock method

// Lock methods // The value 0 represents The unlocked state. 0 represents The unlocked state protected boolean isHeldExclusively() { return getState() ! = 0; } protected Boolean tryAcquire(int unused) {// If the lock state is not 0, it is 1. If (compareAndSetState(0, 1)) {setExclusiveOwnerThread(thread.currentThread ()); return true; } return false; } protected Boolean tryRelease(int unused) {setExclusiveOwnerThread(null); // State returns to unlocked state setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) ! = null && ! t.isInterrupted()){ try { t.interrupt(); } catch (SecurityException ignore) { } } }Copy the code

Q: Why not execute the submitted command directly instead of having to use Worker encapsulation? Friendship tip: This is related to the function of worker

Answer: Mainly to control the interruption


Q: How do I control interrupts? Friendly tip: Worker inherits AQS and thus is an AQS lock

Small answer: Worker has the following four criteria for interrupt processing:

1. Do not interrupt a worker thread before it actually starts executing

2. When a worker thread is executing a task, it cannot be interrupted

3. The worker thread is interrupted only when it is waiting to get the task getTask() from the task queue

4. The worker lock must be obtained before calling interruptIdleWorkers() to interrupt idle threads


Q: Why aren’t workers designed to be reentrant locks? Friendly tip: Do not acquire locks again during dynamic control

Answer: Because threads can be interrupted in dynamic control methods, such as calling interruptIdleWorkers(), the method calls worker.trylock () before interrupt(), allowing reentrant causes threads to be interrupted unexpectedly, This is contrary to the principle of not being interrupted while the worker thread is executing a task


4.6 Dynamic Control

Thread pools provide several common methods for dynamically controlling thread pool configuration information:

/** * Sets the number of core worker threads * 1. If the new value < the current value, interruptIdleWorkers() will be called to process more threads * 2. Public void setCorePoolSize(int corePoolSize) /** * Sets whether to respond to the core worker thread timeout processing * 1. When set to false, the core worker thread is not terminated due to insufficient (idle) tasks * 2. When set to true, core worker threads and non-core worker threads are treated the same as if they had timed out * Note: To prevent persistent thread substitutions, the timeout must be >0 when set to true * Note: This method should normally be called */ public void allowCoreThreadTimeOut(Boolean value) /** * to set the maximum number of worker threads * 1 before the thread pool is used. If the new value < the current value, interruptIdleWorkers() is called to process more threads * Note: */ public void setMaximumPoolSize(int maximumPoolSize) / */ public void setKeepAliveTime(long time, TimeUnit unit)Copy the code

5. Task submission and execution

5.1 Execute () – Submits tasks

If the task cannot be submitted for execution, either because the Executor has been shutdown or because its capacity has reached its limit, */ Public void execute(Runnable Command) {// The new task cannot be empty, If (command == null) throw new NullPointerException(); /** * 1. If the actual number of worker threads is less than the number of core worker threads, an attempt will be made to create a worker thread to execute the * task, i.e., the command will be the firstTask of the thread, i.e., the firstTask ** 2. If the task is enqueued successfully, double check is still required for two reasons: * - The first is to check whether a new worker thread needs to be created, because there may be * worker threads that have died since the last check * - the second is that the thread pool may be closed after entering the method, such as executing shutdown() * and therefore need to check the state again, Both cases are handled separately: * - if there are no more worker threads in the thread pool, a new worker thread needs to be created * - If the thread pool has been closed, it needs to be rolled back to the queue (if necessary) * * 3. If the task fails to join the queue (for example, the queue is full), a new worker thread needs to be created. */ int c = ctl.get(); /** * If the number of worker threads is less than the number of core worker threads, a new worker thread will be created to execute the task. If (workerCountOf(c) < corePoolSize) {/** * New worker thread, If (addWorker(command, true)) return; if (addWorker(command, true)) return; if (addWorker(command, true) return; /** * Thread fails to be added * There are two possible reasons for thread failure: * -1. The thread pool has been closed. Thread pools that are not in the RUNNING state are not allowed to receive new tasks. WorkerCountOf (c) >= corePoolSize (c); workerCountOf(c) < corePoolSize (C); */ c = ctl.get(); */ c = ctl.get(); } /** * If the actual number of working threads >= the number of core threads, the new submitted task needs to be added to the queue * Execution scheme: */ if (isRunning(c) &&workqueue.offer (command)) {// double check int recheck = ctl.get(); /** * The purpose of recheck is to prevent the thread pool state from mutating - that is, being closed * once the thread pool is not in the RUNNING state, in addition to removing the task from the queue (rollback), it also needs to execute the task rejection policy to process the newly submitted task */ if (! IsRunning (recheck) && remove(command)) // Execute the task reject policy (command); /** * If the thread pool is still RUNNING or the queue fails to be removed (possibly by a worker thread) * make sure that at least one worker thread is still working * Else if (workerCountOf(recheck) == 0) /** * If all the worker threads are dead, A new worker thread needs to be created * the cause of death could be a thread timeout, an exception, etc. * * The first parameter null means that an empty task is passed in, AddWorker (null, false); addWorker(null, false); } /** * Case 3: Once the thread pool is closed or a new task fails to join the queue (queue is full) * Execution scenario: an attempt will be made to create a new worker thread and allow expansion to the maximum number of workers * Note: If the creation fails, such as when the maximum number of worker threads is exceeded, the task rejection policy needs to be implemented */ else if (! AddWorker (command, false)) // Execute task reject policy (command); }Copy the code

5.2 addWorker() – Adds a worker thread

/** * new worker threads must comply with the thread pool control state and boundary restrictions ** @param core Core can be expanded to the number of core worker threads if true, otherwise, the maximum number of worker threads * @return New worker thread returns true, False */ private Boolean addWorker(Runnable firstTask, Boolean core) {// Label retry: /*** * External spin -> the purpose is to verify that new worker threads can be added * New threads are allowed under two conditions: * 1. If the thread pool status condition is met -> Condition 1 * 2. If the actual worker thread does not meet the condition -> Condition 2 * false is returned, indicating that the new worker thread failed to be added */ for (;) Int c = ctl.get(); Int rs = runStateOf(c); /** * Check whether the thread pool status conditions are met * 1. Only two new threads can be added: * 1.1 Thread pool status ==RUNNING * 1.2 Thread pool status ==SHUTDOWN and firstTask is null and the queue is not empty * * 2. If the thread pool status is >=SHUTDOWN, no new tasks can be received. TERMINATED thread pool state ==SHUTDOWN and firstTask is not empty * TERMINATED thread pool state == firstTask is empty */ if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; /*** * internal spin -> Condition two. Check whether the actual number of working threads meets the number boundary condition * - If the number boundary condition is met, the CAS increment is implemented for the attempted workerCount. Otherwise, the increment fails. * - If the CAS fails, the system checks whether the number of working threads meets the number boundary condition. If the thread pool state suddenly changes (is closed) during this period, rejudge the thread pool state condition and quantity boundary condition * 2. If the thread pool state is consistent during this time, then only the quantitative boundary condition */ for (;;) needs to be reevaluated. Int wc = workerCountOf(c); /** * The new worker thread will fail in two cases: * 1. Number of actual working threads >= Maximum capacity * 2. Number of actual working threads > Number of comparison boundaries (current maximum capacity) * - if core = true, comparison boundaries = number of core working threads * - if core = false, Compare border = maximum number of worker threads to * / if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; /** * The actual number of worker threads increases. * 1. If the retry cycle is successfully exited, all the new conditions are * 2. There are three reasons for CAS update failure due to concurrent competition: * 2.1 A new worker thread has just been added to the thread pool * -> Count increased, just rejudge the number of boundary conditions * 2.2 just other worker thread runtime error or due to timeout * -> count decreased, Just rejudge the quantity boundary condition * 2.3 just as the thread pool is closed * -> Count is reduced and worker threads are reclaimed, * need to judge the thread pool state condition and boundary condition number * / if (compareAndIncrementWorkerCount (c)) break retry. // Re-read the atomic control quantity -> The reason is that the thread pool may be closed during this time c = ctl.get(); /** * Quickly detect whether thread pool state mutation occurs * 1. If state mutation occurs, rejudge thread pool state condition and quantity boundary condition * 2. */ if (runStateOf(c)! = rs) continue retry; }} /** * Here is a split line of the addWorker method * the first line determines whether the thread pool accepts or rejects the new Worker thread * the second line actually starts the new Worker thread and encapsulates it as the Worker and then performs the following operation * */ / Log whether the new worker thread is started. Boolean workerStarted = false; Boolean workerAdded = false; Boolean workerAdded = false; Worker w = null; W = new Worker(firstTask); w = new Worker(firstTask); Final Thread t = w.htread; /** * Checks if there is a thread available to execute the task, that is, if a new worker thread * 1 was successfully created. 2. If no, run the addWorkerFailed() method */ if (t! LargestPoolSize */ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {/** * once the global lock is acquired, the current thread pool state needs to be checked again * to prevent two illegal cases: * 1. */ int rs = runStateOf(ctl.get()); */ int rs = runStateOf(ctl.get()); /** * Only two cases are allowed to add work to the Works collection * and only after entering the Workers collection is the real worker thread and starts executing the task * 1. The thread pool state is RUNNING(rs<SHUTDOWN) * 2. The thread pool state is SHUTDOWN and an empty task is passed * (reason: quick check thread pool state? * / if (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {/ * * * if the thread is active, the thread is started, Workers (workers) is not allowed to be added to workers (workers) because the thread was started later. Workers (workers) are not allowed to be added to workers. Custom thread factory newThread will likely start thread * / if (t.i sAlive ()) throw new IllegalThreadStateException (); Worker. add(w); worker. add(w); int s = workers.size(); If (s > largestPoolSize) largestPoolSize = s; WorkerAdded = true; workerAdded = true; }} finally {// Don't forget to unlock mainlock. unlock(); } /** * Once the new worker thread is added to the worker thread collection, it is ready to work. * If you are aware of this, the lock has been released before the thread starts. * The reason is that once the workerAdded is true, the purpose of the lock has been achieved. If (workerAdded) {/** * If (workerAdded) {/** * If (workerAdded) {/** * If (workerAdded) {/** * If (workerAdded) {/** * If (workerAdded) { */ t.start() calls the run() method as soon as the thread is initialized. WorkerStarted = true; }} finally {// If the new worker thread fails to execute successfully, the new worker thread needs to be added. workerStarted) addWorkerFailed(w); } // The result indicates whether the new worker thread has been started. }Copy the code

Q: What is the significance of cases 1.2, 2.1, 2.3 when quickly detecting thread status? Tips for friendship: Readers can ask themselves -> When is it meaningful to add workers? What is the purpose of passing in an empty task?

Answer: Before we clarify this question, let’s clarify two points:

1. The purpose of adding workers is to process tasks, which can be divided into initial tasks and queue tasks (i.e., remaining tasks to be processed).

The thread pool is not allowed to receive new tasks when the thread pool is not in the RUNNING state.

For 2.1 – > thread pool state ==SHUTDOWN, but firstTask! = null, new Worker is not allowed
  • When the thread pool is SHUTDOWN, new tasks are not allowed to be received, so once firstTask! = null Indicates a direct rejection
For 2.2 – > thread pool state ==SHUTDOWN and firstTask == NULL, but the queue is empty, no new Worker is allowed
  • When firstTask is null, addWorker() is not called to process the new task
  • Then its purpose should be to deal with the remaining tasks, that is, tasks in the queue. Once the queue is empty, there is no need to add workers
For 1.2 – > if the thread pool status ==SHUTDOWN, the firstTask must be null and the queue is not empty before adding workers
  • When the thread pool state is SHUTDOWN (call SHUTDOWN ()), no new tasks are allowed to be received, so firstTask must be null
  • But the remaining tasks need to be processed, so the queue must be non-empty, otherwise the new worker thread will have no work to do, and that will be meaningless

Conclusion: The purpose of passing in an empty task is to create a new worker thread to process the remaining tasks in the task queue


Q: How does a thread actually start working, i.e. when does it start executing runWorker()? Friendly tip: Consider combining Thread and Worker constructors

A: The author uses some “opportunistic” writing in the thread execution section. Let’s analyze the Worker class

Private final class Worker extends AbstractQueuedSynchronizer / / step 1: Implements Runnable{Worker(Runnable firstTask) {setState(-1); this.firstTask = firstTask; // Step 2: the Worker is Runnable this.thread = getThreadFactory().newthread (this); } /** * Step 3: The run() call finally executes runWorker() * - The worker.thread.start() is used to start the thread in addWorker() * - The run() method is called immediately after the thread is started, which means that the start call goes through something like this: * worker = new worker (Runnable) -> thread = newThread(worker) -> thread.start() -> worker.run() -> threadPoolExecuter.runWorker(worker) */ public void run() { runWorker(this); }}Copy the code
The start call to a conclusion goes through the following process:

(1) worker = new Worker(Runnable) –> (2) thread = newThread(worker) –> (3) thread.start() –> (4) Thread. The run () [JVM automatically call] — > (5) the worker. The run () – > (6) threadPoolExecuter. RunWorker (worker)


5.3 runWorker() – Perform tasks

Final void runWorker(Worker w) {// Execute () Thread wt = thread.currentThread (); Runnable task = w.firsttask; W.firsttask = null; w.firstTask = null; /** * Note that Worker is itself a non-reentrant mutex! * Since state=-1 when Worker is initialized, the purpose of unlocking here is: * Change state-1 to 0, because interruption is allowed only when state>=0; */ w.unlock(); */ w.unlock(); Boolean completedAbruptly = true; Try {/** * get the task and execute the task, get the task in two ways: * 1. Initial task: the firstTask assigned to the Worker when it is initialized (firstTask) * 2. Queue tasks: when firstTask task execution, the thread will not be recycled, but after automatic spin task from the task queue (getTask) * at this point that embodies the reuse of thread * / while (task! = null || (task = getTask()) ! = null) {/** ** The purpose of the Worker lock is not to immediately terminate the running Worker when shutdown(), * because the lock needs to be held first. * Terminates the worker immediately when shutdownNow() because it can terminate without holding a lock */ w.lock() more on closing thread pools below */ w.lock(); /** * When the thread pool is closed and the main thread is not interrupted, it needs to be interrupted again * Since the calling thread is usually the main thread, So here is the main Thread for the calling Thread * / if ((runStateAtLeast (CTL) get (), STOP) | | (Thread. Interrupted () && runStateAtLeast (CTL) get (), STOP))) && ! wt.isInterrupted()) wt.interrupt(); Try {/** * the thread dies by calling the "pre-method" before each task is executed. An exception may be thrown at the "pre-method", which results in an exit loop and completedAbruptly=true. Task not executed (and discarded) */ beforeExecute(wt, task); Throwable thrown = null; Try {// Execute task task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {/** * when a task is executed, a "afterExecute" method is called * this method may throw an exception and cause the thread to die * but it is worth note that the task has already been executed */ afterExecute(task, thrown); }} finally {// Empty task help GC task = null; W.com pletedTasks++; W.nlock (); } // clear the abnormal state completedAbruptly = false; } finally {/** * a worker thread exits the loop for two reasons: * 1. GetTask () returns null -> There are four reasons, more on this later */ processWorkerExit(w, completedAbruptly); }}Copy the code

Q: Why aren’t new tasks put directly into the task queue instead of being executed by a new thread? Tip: The main purpose is to reduce unnecessary overhead and thus provide performance

Answer: New tasks are not directly added to the task queue to reduce the queuing and dequeuing operations on the task queue. Because the queuing is a blocking queue, the queuing and dequeuing operations involve lock operations and concurrent processing


5.4 getTask() – Get the task

The getTask() method returns null for five reasons:

1. The status of the thread pool is closed (STOP | | TIDYING | | TERMINATED)

2. The thread pool is SHUTDOWN and the task queue is empty

3. The actual number of working threads exceeds the upper limit

4. After the timeout condition is met, the worker thread meets either of the following conditions:

4.1 At least one other worker thread is available in the thread pool

4.2 No other worker thread is available in the thread pool but the task queue is empty

Private Runnable getTask() {// Record whether poll() of the task queue times out, default does not timeout Boolean timedOut = false; // For (;;) {/** * the thread pool determines five cases in turn and returns null: * 1 for any of them. State of the thread pool is closed (STOP | | TIDYING | | TERMINATED) * 2. The thread pool is closed with state SHUTDOWN and the task queue is empty * 3. The actual number of working threads exceeds the maximum number of working threads x 4. If the working thread meets the timeout condition, it meets either of the following conditions: * 4.1 At least one other available worker thread exists in the thread pool * 4.2 No other available worker thread exists in the thread pool but the task queue is empty */ int c = ctl.get(); int rs = runStateOf(c); /** * returns null * 1 in two cases. State of the thread pool is greater than the SHUTDOWN (STOP | | TIDYING | | TERMINATED), that is not allowed to perform a task * - for > = STOP above the state is not allowed to receive new task will interrupt in performing tasks at the same time, also does not perform the task of task queue * * 2. If the thread pool status is SHUTDOWN and the task queue is empty, no tasks are available * - Because the remaining tasks in the task queue need to be executed during SHUTDOWN. Only when there is no task to exit the * / if (rs > = SHUTDOWN && (rs > = STOP | | workQueue. IsEmpty ())) {/ reduce a worker thread number * * * * Note that worker thread recycling is placed in processWorkerExit(). DecrementWorkerCount () decrementWorkerCount() is an internal loop that executes CAS, ensuring eventual success. DecrementWorkerCount () : count decrement due to thread pool closing may occur concurrently with addWorker() 's * count CAS increment */ decrementWorkerCount() return null; } int wc = workerCountOf(c); /** * Determine whether timeout needs to be handled: */ Boolean timed = allowCoreThreadTimeOut = true the core worker thread with idle timeout needs to be recycled * 2. Wc > corePoolSize The non-core worker thread with idle timeout needs to be recycled */ Boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /** * There are three cases where the actual worker thread count is -1 and null ** 1 is returned. The actual number of worker threads exceeds the maximum number of threads * 2. The worker thread meets the idle timeout condition and needs to be reclaimed: * 2.1 When at least one other available worker thread exists in the thread pool * 2.2 There are no other available worker threads in the thread pool but the task queue is empty * * Combining 2.1 and 2.2, we can deduce: * * 1. When the task queue is not empty, the thread pool needs to maintain at least one available worker thread, * so that even if the worker thread times out, it will not be reclaimed but will continue to acquire the task * * 2. When the actual number of worker threads exceeds the limit or a task timeout is obtained, the thread pool will gradually reduce the number of threads until the number of core threads because * has no new tasks to execute. * If allowCoreThreadTimeOut is set to true, it is reduced to 1; * * Hint: If wc > maximumPoolSize, wc > 1 Therefore do not need to compare * (wc > maximumPoolSize && workQueue. IsEmpty ()) * / if this kind of circumstance ((wc > maximumPoolSize | | (timed && timedOut)) && (wc > 1 | | workQueue. IsEmpty ())) {/ * * * CAS the cause of the failure or a concurrent competition, refer to the above * when CAS after failing to explain actual working threads has been changed, * must work to determine the actual number of threads and timed * so you need to countinue * / if (compareAndDecrementWorkerCount (c)) return null; /** */ continue; } // Different methods will be called depending on whether timeout is required to obtain tasks from the task queue: Timed =fasle timed=fasle timed=fasle timed=fasle timed=true timed=true */ Runnable r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // If (r! = null) return r; // poll timeout timedOut = true; /** * Take () and poll(long timeout, TimeUnit Unit) throws InterruptedException * Cause locksupport. park(this) does not throw an exception but responds to an interrupt; The await() of ConditionObject will be interrupted by reportInterruptAfterWait(). Catch (InterruptedException Retry) {/** * The timeout flag needs to be cleared once the worker thread is interrupted * this indicates that when the worker thread is interrupted while fetching a queue task, * If you do not handle the interrupt exception, the thread pool defaults * you want the thread to continue executing, This resets the previous timeout flag */ timedOut = false; }}}Copy the code

Q: Why does poll time out when the task is empty? Friendly tip: Think of the blocking queue operation interface

Answer: To solve this problem, we only need to look at the following figure, because take is a blocking operation

Supplementary: I remember so “AR throw “, “OP cloth ultra “, “PT resistance “… You casually


6. Close the thread pool

There are two main ways to turn off a thread pool. The differences are:

Shutdown () : terminates the remaining tasks in the queue

ShutdownNow () : abandons the remaining tasks in the queue, but returns them

The commonality of the two is:

1. A task in progress will continue to be executed and will not be terminated or abandoned

2. Newly submitted tasks will be rejected

6.1 Shutdown () – Orderly shutdown

Using shutdown() to shutdown the thread pool performs five operations:

1. Obtain the global lock

2.CAS spins to change the thread pool status to SHUTDOWN

3. Interrupt all idle worker threads (set interrupt flag) -> Note that it is idle

4. Release the global lock

5. Try to terminate the thread pool

/** * Orderly closing of the thread pool * During closing, previously submitted tasks will be executed (both ongoing and queued), * but new submitted tasks will be rejected * If the thread pool is already closed, */ public void shutdown() {//1. Final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //2. The CAS spin changes the thread pool state to SHUTDOWN advanceRunState(SHUTDOWN); //3. InterruptIdleWorkers (); / / special offers ScheduledThreadPoolExecutor hook method onShutdown (); } finally {//4. Unlock mainlock. unlock(); } /** * 5. Try to terminate the thread pool with two conditions: * 1. 2. All idle worker threads have been terminated */ tryTerminate(); }Copy the code

6.2 shutdownNow() – Shut down immediately

Closing the thread pool using shutdownNow() performs six operations:

1. Obtain the global lock

2.CAS spins to change the thread pool status to SHUTDOWN

3. Interrupt all worker threads (set interrupt flag)

4. Add the remaining tasks to a list and clear the task queue

5. Release the global lock

6. Try to terminate the thread pool

/** * Attempts to interrupt all worker threads and returns a set of pending task lists (removed from the task queue) ** 1. To wait for the executing thread to complete the task, use awaitTermination() * 2. Since the task cancellation operation is implemented with Thread#interrupt, * tasks that fail to respond to interrupts may never be terminated (use caution!!). */ public List<Runnable> shutdownNow() {List<Runnable> tasks; Final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //2. The CAS spin update thread pool is in the STOP advanceRunState(STOP) state; //3. InterruptWorkers (); //4. Add extra tasks to a list and drain the drainQueue(). } finally {//5. Unlock mainlock. unlock(); } /** * 6. Try to terminate the thread pool with two conditions: * 1. The thread pool status is STOP * 2. The task queue is empty * note: not all worker threads are interrupted at this time. For details, see * 7.3 tryTerminate */ tryTerminate(); //5. Return tasks; }Copy the code

6.3 awaitTermination() – Waiting for the thread pool to terminate

When closing the thread pool, awaitTermination() blocks until either of the following conditions occurs:

1. All tasks are completed: TERMINATED. SignalAll () is called termination only after the call to tryTerminated() tries to terminate the pool and succeeds in changing the state to TERMINATED. The blocking thread is TERMINATED again and exits as soon as TERMINATED () is TERMINATED

When the blocking timeout time reaches termination. AwaitNanos () will return the remaining time (0 at this time) after reaching the timeout time, then it will return false because nano==0 is judged again, that is, wait failure

3. The current thread is interrupted: If the current thread (the main thread) is interrupted, the thread throws InterruptException and unblocks if it does not

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; //1. Obtain the global lock mainlock. lock(); try { for (;;) {//2. If (runStateAtLeast(ctl.get(), TERMINATED) return true; If (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); }} finally {//4. Unlock mainlock. unlock(); }}Copy the code

You can tell if the thread pool is actually closed by:

/ / close the thread pool threadPoolExecutor. Shutdown (); Try {// loop while(! threadPoolExecutor.awaitTermination(300, TimeUnit.MILLISECONDS)) { logger.info("task executing..." ); } // Stop logger.info("shutdown completed!") )} catch (InterruptedException e) {Copy the code

7. Interrupt and terminate processing

7.1 interruptIdleWorkers() – Interrupts idle threads

Worker has the following four criteria for interrupt handling (let’s review the previous knowledge again) :

1. Do not interrupt a worker thread before it actually starts executing

2. When a worker thread is executing a task, it cannot be interrupted

3. The worker thread is interrupted only when it is waiting to get the task getTask() from the task queue

4. The worker lock must be obtained before calling interruptIdleWorkers() to interrupt idle threads

/** * Interrupts all idle threads */ private void interruptIdleWorkers() {interruptIdleWorkers(false); } /** * interrupts are unlocked and idle threads waiting for a task ** Interrupts are used to facilitate the handling of terminating the thread pool or dynamic control ** @param onlyOne is broken when it is true, If false, all */ private void interruptIdleWorkers(Boolean onlyOne) {// Add global lock final ReentrantLock mainLock = this.mainLock; mainLock.lock(); */ for (Worker w: workers) {Thread t = w.t_read; /** ** Only the worker thread that has successfully acquired the worker lock is allowed to be interrupted ** 1. The Worker thread that has been interrupted does not need to mark the interrupt again * * 2. W.trylock () reflects the core role of Worker as a lock: * that is, to control thread interruption -> when the thread is still running, it is not allowed to be interrupted * * 3. See the runWorker() method, which calls lock() * * 4 before running. Since this method is only called in shutdown(), it indirectly means that * shutdown() will only interrupt the idle thread that has acquired the worker lock * in this method (while the thread is acquiring the new task getTask() and not yet locked) */ if (! T.isinterrupted () &&w.traclock ()) {try {// Interrupt the work thread T.isterrupt (); } catch (SecurityException ignore) {} finally {// The worker lock is released, corresponding to tryLock() w.nlock (); }} // if onlyOne is true, onlyOne idle thread (Set is unordered) is broken. }} finally {// Unlock global lock mainlock. unlock(); }}Copy the code

7.2 interruptWorkers() – Interrupts all threads

/** * interrupt all threads, This method is provided only for shutdownNow() using */ private void interruptWorkers() {final ReentrantLock mainLock = this.mainlock; mainLock.lock(); Try {// Loop for (Worker w: workers) w.interruptifStarted (); } finally { mainLock.unlock(); }} /** * Worker implements interruptIfStarted() {Thread t; /** * A thread pool must be in a non-running state to allow interrupts. ** A thread pool must be in a non-running state. * Because this method is provided only for interruptWorkers() using * and interruptWorkers() using * for shutdownNow(), the thread status should be STOP */ if (getState() >= 0 && (t =)  thread) ! = null && ! T.isinterrupted ()) {try {// Set the interrupt token t.isterrupt (); } catch (SecurityException ignore) { } } }Copy the code

7.3 tryTerminate() – Attempts to terminate the thread pool

Before resolving tryTerminate(), let’s resolve a few important issues


Q: Why can’t a worker thread that is executing a task be interrupted? Friendship tip: The worker thread needs to add the worker lock before executing the task, and the lock is not reentrant

Answer: InterruptIdleWorkers () is allowed to interrupt the worker thread until (1) tryLock() has successfully acquired the worker lock, and (2) the worker lock is not reentrant until the worker thread has acquired the task. This means that the worker thread executing the task cannot be interrupted


Q: What is an idle worker thread? Friendship tip: There are two times to acquire the worker lock, one is shutdown() and the other is before the actual execution of the task

The worker thread must hold the lock when executing the task, and it does not need to acquire the worker lock only when getting the task getTask() from the task queue. Therefore, the worker thread can be divided into two states:

1. The worker thread executing the task: the worker thread executing task.run() after acquiring the worker lock

2. Idle worker thread: the worker thread that is fetching a task from the task queue (including the worker thread that just got the task and blocked when there is no task)


Q: How do thread interrupts affect thread recycling? Friendly tip: The core is to exit runWorker() and execute processWorkerExit() when getTask() returns null

GeTask () returns null.

The getTask() method returns null for five reasons:

1. The status of the thread pool is closed (STOP | | TIDYING | | TERMINATED)

2. The thread pool is SHUTDOWN and the task queue is empty

3. The actual number of working threads exceeds the upper limit

4. After the timeout condition is met, the worker thread meets either of the following conditions:

4.1 At least one other worker thread is available in the thread pool

4.2 No other worker thread is available in the thread pool but the task queue is empty

GetTask () can return null when a thread pool is closed, a thread times out, or a thread is dynamically controlled (pool size, timeout, etc.). How does getTask() affect recycling?

Let’s just use closing the thread pool as an example (the rest of the case is just a conditional distinction) to describe the logic that happens after an interrupt:

1. If the worker thread blocking on getTask() is interrupted, InterruptedException is thrown and the task is unblocked

When the thread pool is closed, for example, when shutdown() is called, the thread pool state changes to shutdown, and because the task queue is empty, getTask() directly returns null. If shutdownNow() is called and the thread pool state changes to STOP, null is returned

3. In the runWorker() method, when getTask() returns null, it exits the loop and calls the processWorkerExit() method to reclaim the thread

It is worth mentioning that: JAVA’s interrupt mechanism only sets the interrupt flag, so executing thread.currentThread ().interrupt() on your own in a task does not prevent the Thread from continuing the task and reclaiming it. You also cannot obtain InterruptedException in the task. The reason is that it’s already caught in getTask()


Q: Why do you call tryTerminate() when the thread pool state changes and interrupted threads are reclaimed? Friendly tip: Calling shutdown() interruptIdleWorkers() only interrupts idle worker threads. What happens to the worker thread that is executing the task?

Short answer: After shutdown() is called, the worker threads that are executing the task will not be interrupted. When they finish the task, assuming that the queue is not empty, the worker threads will continue to execute the remaining tasks until blocking. As the number of tasks decreases, the actual number of worker threads will continue to decrease until the minimum maintenance number. When the queue is empty, the minimum-maintenance worker threads are permanently blocked on workerqueue.take (), never terminating, and the thread pool is closed to receive new submitted tasks

for
A problem where a worker thread executing a task cannot be terminated at the time of an interruptTeacher Doug Lea’s solution is:

– Call tryTerminate() wherever it is likely to terminate the thread pool. This method determines whether the pool has entered the termination process and re-terminates an idle worker thread if any threads are still present

Termination process: the thread pool state is SHUTDOWN and the task queue is empty, or the thread pool state is STOP


TERMINATED thread pool TERMINATED -> The state of the thread TERMINATED is TERMINATED only when two conditions are met: * 1. The thread pool state is SHUTDOWN and the task queue is empty or STOP * 2. */ Final void tryTerminate() {// spin for (;;)) {// get thread pool controller int c = ctl.get(); /** * there are four cases where changes TERMINATED operation ** 1 is not allowed. If the thread pool is still RUNNING, the thread pool is still RUNNING. * No attempt to interrupt the thread pool is allowed. At this point, at least SHUTDOWN or STOP is required. The thread pool state is already TIDYING or TERMINATED, which state indicates the change is being executed and TERMINATED. Termination is not required to be repeated in either case * * 3. If the thread pool state is SHUTDOWN and the task queue is not empty, * indicates that the thread pool has been asked to close, But there are tasks haven't processed * need to wait for remaining tasks are completed in task queue * / if (set (c) | | runStateAtLeast (c, TIDYING) | | (runStateOf (c) = = SHUTDOWN &&! workQueue.isEmpty())) return; /** * the thread pool is SHUTDOWN and the queue is empty, or the thread pool is stopped * 4. If the number of worker threads is not zero, another worker thread may be executing or waiting for a task. * The reason for this is' Why tryTerminate() '* at this point an idle worker thread is selected to terminate to ensure SHUTDOWN signals propagate */ if (workerCountOf(c)! {// Eligible to terminate /** * If you are Eligible to terminate /** *, you can terminate one idle worker thread at a time to avoid all threads waiting ** What if you call interruptIdleWorkers(false)? * the answer: Note that processWorkerExit() will be called for each thread's collection and that tryTerminate() will be called. Once * is set to true(for all), since the worker.trylock () must be locked before the termination, Therefore, there may be a large number of unnecessary waits due to lock contention, so it is better to execute one by one: why can shutdown() be true? * That's because idle threads don't have worker locks! * No unnecessary overhead associated with lock contention */ interruptIdleWorkers(ONLY_ONE); return; } /** * When the termination process is entered and there are no viable worker threads * then terminate the thread pool */ /1. Final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {/** * 2. Try to become TIDYING * 1. The CAS test is terminated because the thread pool terminated is terminated. The CAS test is terminated because the thread pool terminated is terminated. */ if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {//3 Terminated (); } finally {//4. Set ctl.set(ctlOf(TERMINATED, 0)); /** * if the thread pool is terminated, the awaitTermination() method is called, and the main thread is terminated by ** / termination. SignalAll (); } return; }} finally {//6. Unlock mainlock. unlock(); } // else retry on failed CAS } }Copy the code

8. Thread failure and recycle processing

8.1 addWorkerFailed() – Failed to add a thread

The following operations are performed when the new worker thread fails to be processed:

1. Obtain the global lock

2. Remove the worker from the Workers set

3.CAS spin reduces the actual worker count

4. Try to terminate the thread pool

5. Release the global lock

*/ private void addWorkerFailed(Worker w) {//1. Workers final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {//2. Remove the worker if (w! = null) workers.remove(w); /** * 3.CAS spin reduces the actual count of worker threads -> eventually succeeds * the answer: * 1. Workers set is a non-thread-safe HashSet that cannot use CAS and can only be locked (i.e., external control mode) * 2. CTL is an AtomicInteger atomic type. So you can use CAS maintenance directly (i.e. internal control) * Note that holding a lock means holding a global lock mainLock, although the underlying implementation of ReentrantLock also decrementWorkerCount(); /** * 4. Try to terminate the thread pool ** Q: So why try to terminate the thread pool now? The thread pool is closed and terminates * see addWorker() for details */ tryTerminate(); } finally {//5. Unlock mainlock. unlock(); }}Copy the code

8.2 processWorkerExit() – Thread reclamation process

Thread recycling is divided into two parts:

1. Reclaim the worker thread

2. Add a worker thread as required

I. There are 6 steps to reclaim the worker thread:

1. The count of the actual working thread that is suddenly interrupted due to an error exception is -1

2. Obtain the global lock

3. Count the total number of completed tasks in the thread pool

4. Safely remove the worker from the workers set

5. Release the global lock

6. Try to terminate the thread pool

If the thread pool is in the RUNNING or SHUTDOWN state, you need to add a worker thread in two cases:

1. The thread dies unexpectedly due to an error exception

2. Ensure the survival of at least a minimum number of available worker threads in the event of non-accidental death

private void processWorkerExit(Worker w, boolean completedAbruptly) { //1. The number of abnormal threads that died unexpectedly due to an abnormal error, the number of actual working threads -1 if (completedAbruptly) // if abnormal, then workerCount wasn't adjusted decrementWorkerCount(); Final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {//3. Count the total number of tasks completed by the thread pool completedTaskCount += w.completedTasks; //4. Remove worker. remove(w); } finally {//5. Unlock mainlock. unlock(); } /** * 6. Try to terminate the thread pool * 小 q: Why try to terminate the thread pool here? * Small answer: Since the processWorkerExit() method will only be called from the runWorker(), it can be called at one of two times: * 1. The worker thread is interrupted due to an error exception and exits * 2. GetTask () returns null * The former does not actually terminate the thread pool according to the termination condition of tryTerminate(), The problem is that * it is possible for the latter getTask() to return null */ tryTerminate() as a result of entering the termination process; int c = ctl.get(); /** * When the thread pool is in the RUNNING or SHUTDOWN state, there are two situations in which a worker thread * 1 needs to be added. The thread died unexpectedly due to an error exception * -> the purpose is to fill the thread gap (pit filling) caused by this unexpectedly dead worker thread * 2. * -> The purpose is to ensure that the thread pool is running properly or capable of completing the remaining tasks in the queue when SHUTDOWN occurs */ if (runStateLessThan(c, STOP)) {if (! completedAbruptly) {/** * The minimum thread lifetime is determined by a combination of allowCoreThreadTimeOut and queue length * 1. When allowCoreThreadTimeOut is true, if the queue is not empty, * at least one available thread is guaranteed to survive * 2. If allowCoreThreadTimeOut is false, the number of worker threads * exceeds the number of core worker threads */ int min = allowCoreThreadTimeOut? 0 : corePoolSize; If (min == 0 &&! Workqueue.isempty ()) // Ensure that at least one thread is available for min = 1; //2. Once the number of actual worker threads exceeds the number of core worker threads, If (workerCountOf(c) >= min) // replacement not needed return; } /** * There are two reasons for adding worker threads: * 1. 2. Non-accidental death exits normally and the queue is not empty: * -> Process the remaining tasks in the task queue * Although the purpose is different, the effect is the same: * -> all queue tasks (because firstTask is null) */ addWorker(null, false); }}Copy the code

9. Task queues and queuing policies

Thank you
Talk about concurrency (7) – Blocking queues in Java

A task queue is a blocking queue (specifically implemented here) that stores tasks waiting to be executed
BlockingQueueInterface blocking queue implementation class, whose purpose is to implement data caching and sharing; In addition, 7 kinds of blocking queues are provided by packet origination, which can be divided into two parts according to the bounding:

– Bounded queue: a bounded queue has a limited capacity and cannot be expanded indefinitely. Its maximum capacity can be set to integer. MAX_VALUE

  • Bounded queues: Initial capacity must be given, including ArrayBlockingQueue

  • Optionally optionally bounded queues: the default maximum capacity when initial capacity is not set is INTEger.max_value, including LinkedBlockingQueue and LinkedBlockingDeque

– Unbounded queue: Unbounded queue refers to an unbounded queue, which can be 0 or unbounded

  • Borderless (0) : has zero capacity, stores no elements, and does not block, such as SynchronousQueue

  • Unbounded: Capacity is allowed to expand indefinitely until OutOfMemoryError is thrown. Queue entry is not blocked, but queue exit is blocked, including DelayQueue, LinkedTransferQueue, and PriorityBlockingQueue

Note: Blocking queues follow the FIFO first-in, first-out rule unless otherwise specified

9.1 Bounded Queues

A bounded queue is a blocked queue that has a limited capacity and does not allow infinite expansion. Compared to an unbounded queue, maximumPoolSizes is limited, which prevents resource running out, but makes control more difficult -> bounded queues require a compromise between queue size and maximum number of threads:

– Large queues + small pools: effectively reduces thread overhead but may reduce throughput. If tasks are frequently blocked, such as frequent I/O, using large queues and small pools can minimize CPU utilization, operating system resources, and context switching overhead, but may lead to manual throughput reduction. If tasks block frequently (for example, if they are I/O boundaries), the system may schedule time for more threads than you allow

– Using small queues usually requires a larger pool size and higher CPU utilization, but may encounter unacceptable scheduling overhead, which also reduces throughput

9.1.1 ArrayBlockingQueue

Function:

– A bounded blocking queue composed of array structures

– Includes two int variables in addition to the fixed-length array to identify the position of the header and tail in the array

– No additional objects are created or reclaimed when enqueueing and enqueueing

– Supports fair and unfair modes and defaults to unfair locks

– Internal use of one lock + two conditions synchronization mode, not true concurrency

9.1.2 LinkedBlockingQueue

Function:

– A bounded blocking queue consisting of a linked list structure

– The default and maximum length of this queue is integer.max_value

– An additional Node object is generated/destroyed each time it joins/leaves a queue to implement the linked list structure

– Linked lists are generally better than arraylists (in theory), due to the difference between Google ArrayList and LinkedList

– Internal use of two locks + two conditions of synchronization, real concurrency

– Executors. NewFixedThreadPool () the blocking queue

Pit:

Using the default capacity, if the production rate is much faster than the consumption rate, the memory resource may run out before it is blocked by full queues

Advice:

– Most production-consumption requirements can usually be met by simply using LinkedBlockingQueue and ArrayListBlockingQueue

9.1.3 LinkedBlockingDeque

Function:

– A double-ended blocking queue consisting of a linked list structure

– Double-enqueued queues allow double-enqueued and double-enqueued queues, with many more xxFirst and xxLast methods

– When the initial capacity is not set, the default maximum capacity of this queue is integer.max_value

– Like ArrayListBlockingQueue, internal lock + two condition synchronization, not true concurrency

9.2 Unbounded Queue

An unbounded queue refers to a blocking queue with infinite capacity or 0 capacity. Note the following when using the queue:

1. When the capacity is 0, set maxPoolSize carefully to avoid rejecting new tasks

2. When the capacity is infinite, it means that maxPoolSize is invalid. Setting this value is meaningless, and the number of threads created cannot exceed corePoolSize

Applicable scenarios:
Unbounded queues are best suited when each task is independent and has no effect on each other

9.2.1 SynchronousQueue will

Features:

– A blocking queue that does not store elements and is the default task queue type of the thread pool

– Queues do not store tasks, they can only transfer elements between threads -> i.e. commit directly

– Support fair mode and unfair mode, default is not fair (see reentrantLock for fairness)

– Executors. NewCachedThreadPool () the blocking queue

Scene:

This policy avoids locking when processing request sets that may have internal dependencies

Pit:

– If no task is available to run immediately, joining the queue will fail and a new thread will be created. If maxPoolSize exceeds maxPoolSize, new tasks will be rejected.

– In the non-public model, if the gap between production and consumption is large, it is easy to starve, and some data may never be executed

Advice:

– Direct submission usually requires unbounded maximumPoolSizes to avoid rejecting newly submitted tasks

– This policy allows the possibility of unbounded threads growing when commands arrive consecutively at an average that exceeds what the queue can handle

9.2.2 PriorityBlockingQueue

Features:

– An unbounded blocking queue consisting of an array structure with priority. The default size is 11

– Default natural ordering with custom order for queue elements (implement Comparable interface)

– The sort algorithm is heap sort, and the internal thread synchronizes using fair locks

– Internal one lock + one condition synchronization: since it is an unbounded queue, only one notEmpty non-empty condition is required

– It is worth noting that only the first node is guaranteed the priority order, not the other nodes

Scene:
When you need an sorted array
Pit:

Since heap sort is used, once the consumption rate is much less than the production rate, over a long period of time, due to the task squeeze and heap sort requirements, it is likely to run out of memory

Advice:
The rich can add as much memory as they want, otherwise they need to ensure that the tasks don’t backlog too much

9.2.3 DelayQueue

Features:

– Use priority queues to implement an ordered and delayed blocking unbounded queue

– The entry element must implement the Delayed interface. Given the initial recognition delay time, the element can only be obtained from the queue when it reaches the delay time. The element cannot be null

– Internal synchronization using a lock + a condition + priority queue: Due to the delay nature, only an available condition is required to indicate whether the task is available

Scene:

– This parameter is used to implement the retry mechanism. Multiple times of delayed execution support the retry limit

– ScheduledThreadPoolExecutor delay DelayedWorkQueue delay blocking queue of the thread pool is the optimized version, for timing scheduling and other operations

– For caching, although NoSQL is recommended

-TimerQueue Indicates the underlying data structure

9.2.4 LinkedTransferQueue

Features:

– Unbounded blocking queues composed of linked list structures

-TransferQueue is a superset of ConcurrentLinkedQueue, SynchronousQueue (in fair mode), unbounded LinkedBlockingQueues, and so on

– There are tryTransfer() and Transfer() methods relative to other blocking queue LinkedTransferQueue

– When no consumer is waiting to receive an element, the Transfer () method stores the element on the tail node of the queue, blocking until the element is consumed by a consumer; Otherwise, it will be directly passed to the consumer and will not join the team

– Different from Transfer(), tryTransfer() will immediately return success or failure regardless of whether there are consumers waiting to receive elements. In this case, it will not queue and will not block

– This class uses complex dual data structures whose methods are implemented in two steps:
Keep:When a consumer fetts an element from the queue and finds that the queue is empty, a Node element with a null data field is created and placed in the queue, and the consumer must block (spin wait) until the data field is not empty
Delivery:When a producer is about to store an element to the queue and finds that the first element’s data field is null, it directly assigns that element’s data to the first element, which is done
Transmission of data

10. Thread pool monitoring

10.1 Native Monitoring

Monitoring the health of thread pools is important, especially when it comes to locating problems. Fortunately, thread pools natively provide several monitoring properties to get:

1. TaskCount: The number of tasks that need to be performed by the thread pool (approximate value)

2.completedTaskCount: Indicates the number of tasks completed by the thread pool during the execution, which is less than or equal to taskCount

LargestPoolSize: specifies the maximum number of threads that have been created in the thread pool. If this value is the same as maxPoolSize, the thread pool has been full

PoolSize: specifies the number of threads in the pool, including the number of worker threads that are not working. Note that threads in a thread pool are not automatically reclaimed if the pool is not closed, so this value only increases for a running thread pool

5. ActiveCount: Number of running worker threads (approximate value)

It is worth mentioning that the internal get methods of these monitoring properties are maintained using global locks, but since the thread pool state and thread count can be adjusted dynamically during running, Such as allowCoreThreadTimeOut(), setMaximumPoolSize(), setCorePoolSize(), shutdown(), and so on, so some values can only be approximated

10.2 Expanding monitoring

Thread pools provide three hook methods that can be used to extend functionality, such as monitoring the average execution time, maximum execution time, and minimum execution time of a task:
BeforeExecute () :Located in the
runWorker()In the method, in
run()Method before execution
AfterExecute () :Located in the
runWorker()In the method, in
run()Method after execution
Terminated () :Located in the
tryTerminate()In the method, the state CAS is
TIDYINGAfter performing

Note: Since the above methods are protected and empty by default in the thread pool, they can only be overridden by inheriting the thread pool or by construction

11. Saturation rejection strategy

The saturation rejection strategy of the thread pool is mainly used to reject tasks (but this does not mean that the task will not be executed). The thread pool natively provides four saturation rejection strategies, which basically cover common saturation processing scenarios:

AbortPolicy: The default policy that directly throws an exception

CallerRunsPolic: Only the calling thread performs this task

DiscardPolicy: Discards tasks directly

DiscardOldestPolicy: Discards the last-team task and tries to execute the task again in the thread pool

All rejection policies need to implement the rejection processor interface to be consistent:

Public interface RejectedExecutionHandler {public interface RejectedExecutionHandler {public interface RejectedExecutionHandler {public interface RejectedExecutionHandler {public interface RejectedExecutionHandler {public interface RejectedExecutionHandler { No more worker threads are available * 2. The task queue is full * 3. The thread pool is closed * * When there are no other processing options, Will choose the method throws RejectedExecutionException abnormal * the exception will be thrown up until the execute () the caller * / void rejectedExecution (Runnable r, ThreadPoolExecutor executor); }Copy the code

11.1 CallerRunsPolicy

Processing rule: The newly submitted task is executed directly by the caller thread

Recommendation: Reject policy CallerRunsPolicy is recommended because it does not discard the task or throw an exception, but instead pushes the task back to the caller’s thread for execution

/** * instead of discarding the method directly, execute it directly on the thread that called the execute() method. It's worth noting that all policy classes are static inner classes of public, Public static class CallerRunsPolicy Implements RejectedExecutionHandler {public static class CallerRunsPolicy implements RejectedExecutionHandler { Public void rejectedExecution(Runnable r,) {} public void rejectedExecution(Runnable r,) {} ThreadPoolExecutor e) {// Once the thread pool is closed, discard the task if (! E.isshutdown ()) {// Note that this task is not executed by a thread pool. }}}Copy the code

11.2 AbortPolicy

Processing rules: direct throw RejectedExecutionException anomalies

/ * * * is simple, rough directly thrown RejectedExecutionException abnormal * / public static class AbortPolicy implements RejectedExecutionHandler { Public AbortPolicy() {} /** * throws an exception directly, but the r.string () method tells you which task failed. * The more humane e.string () method also tells you: * Thread pool status, number of working threads, queue length, number of completed tasks * It is recommended to at least print this in the log if you do not handle exceptions. */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException( "Task " + r.toString() + " rejected from " + e.toString()); }}Copy the code

11.3 DiscardPolicy

Processing rules: Discard the latest submitted task directly according to LIFO(LIFO) rules

/** * Drop the quest * This is too hard, even no record, */ public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() {} /** * */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }Copy the code

11.4 DiscardOldestPolicy

Processing rules: Discard the last task according to the LRU(least recently used) rule, and then try to execute the newly submitted task

/** * This class discards the last unprocessed task in the queue rather than discarding it directly. */ Public Static Class DiscardOldestPolicy implements (); / / Execute () is then re-invoked to process the task. */ The task is discarded only when the thread pool is closed RejectedExecutionHandler {public DiscardOldestPolicy() {} /** * Discards the latest task in the queue. And execute the current task * the task is discarded only when the thread pool is closed * because the queue is based on a first-in, first-out FIFO, Public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {// Once the pool is closed, discard if (! E.isshutdown ()) {// where LLDB queue ().poll(); E.execute (r); }}}Copy the code

Thread pool exception handling

12.1 Submit () Exception handling

use
submit()There are four precautions when handling exceptions:

1. Exceptions are stored in the Future’s ExecutionException, which can be caught using a try-catch call to get(). If N tasks have exceptions, N exceptions will be thrown without terminating the current worker thread

2. Separate UncaughtExceptionHandler is not used, but combined with (3) it is effective

3. Allow try-catch within submit() without terminating the current thread

4. If you want to handle exceptions internally, you can override afterExecute(), for example:

static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 3, TimeUnit.SECONDS, New SynchronousQueue<>()) {afterExecute() = afterExecute(); Throwable t) { super.afterExecute(r, t); printException(r, t); }}; private static void printException(Runnable r, Throwable t) { if (t == null && r instanceof Future<? >) { try { Future<? > future = (Future<? >) r; if (future.isDone()) future.get(); } catch (ExecutionException e) { t = e.getCause(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } if (t ! = null) { System.out.println(t); }}Copy the code

12.2 Execute () Exception Handling

use
execute()There are four precautions when handling exceptions:

1. By default, an exception is thrown directly inside the execute() method. Note that this does not interrupt the thread pool, but terminates the current worker thread and creates a new one to execute the task

2. Allow a try-catch inside the execute() method. The advantage is that you don’t terminate the current thread and create a new one

3. Override the afterExecute() method

4. You can also set UncaughtExceptionHandler, for example:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS, new LinkedBlockingQueue(), / / we are a custom thread factory and rewrite the thread setUncaughtExceptionHandler method new ThreadFactory () {final AtomicInteger threadNumber = new AtomicInteger(1); public Thread newThread(Runnable r) { Thread thread = new Thread(Thread.currentThread().getThreadGroup(), r, "thread-" + (threadNumber.getAndIncrement())); thread.setUncaughtExceptionHandler((t,e) -> System.out.println(e)); return thread; }});Copy the code