Today, thread pools, from design ideas to source code parsing.

preface

Dear friends, the Spring Festival has come to an end. Here is a belated New Year’s greeting for the Spring Festival holiday. I hope readers and friends can have a harvest. Like, comment and bookmark!

Getting to know thread pools

As we know, thread creation and destruction need to be mapped to the operating system, so it can be expensive. Thread pools were created to avoid frequent thread creation and destruction and to facilitate thread management.

Thread pool advantage

  • “Reduced resource consumption” : Thread pools typically maintain a number of threads (corePoolSize) that are reused to perform different tasks and are not destroyed once the task is complete. When there are a lot of tasks to be processed, the reuse of thread resources avoids the frequent creation and destruction of threads, thus reducing the consumption of system resources.

  • “Improved response time” : Because the thread pool maintains a batch of threads that are alive, when a task arrives, the thread does not need to be created, but is directly used to execute the task, thus reducing the waiting time for the task.

  • “Improved thread manageability” : Threads can be uniformly allocated, tuned, and monitored using thread pools.

Thread pool design idea

There is a saying that art comes from life, and the same is true of programming languages. Many design ideas can be mapped to everyday life, such as object-oriented thinking, encapsulation, inheritance, and so on. Today we talk about thread pools, which can also be found in the real world equivalent entities – factories.

Imagine a factory’s production process:

Thread pool design idea

A factory has a fixed group of workers, called regular workers, who fulfill the orders received by the factory. When orders increase and regular workers are too busy, the factory will temporarily pile up production materials in the warehouse and deal with them when there are idle workers (because idle workers will not take the initiative to deal with the production tasks in the warehouse, so the dispatcher needs to make real-time scheduling). What if the orders are still increasing after the warehouse is full? Factories have to hire more workers temporarily to cope with peak production, and these workers have to leave when the peak is over, so they are called temporary workers. At that time, temporary workers were also hired (limited by the station limit, the number of temporary workers has an upper limit), the orders can only reluctantly refused.

Let’s do the following mapping:

  • Factory – Thread pool

  • Order — Task (Runnable)

  • Regular workers — core threads

  • Temps – Common threads

  • Repository – Task queues

  • Operator – getTask ()

GetTask () is a method that schedules tasks from a task queue to idle threads, as described in more detail in reading thread pools

After mapping, the flow chart of thread pool is as follows. Are they similar?

Thread pool flowchart

This way, the thread pool’s working principle or flow is easily understood, distilled into a simple diagram:

How thread pools work

Deep into the thread pool

So, the question is, how exactly does thread pooling work? From the Java ThreadPoolExecutor framework we can see that the real implementation class of thread pool is ThreadPoolExecutor, so we will focus on this class next.

How thread pools work

A constructor

To study a class, start with its constructor. ThreadPoolExecutor provides four parameter constructors:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

Copy the code

Explain the parameters involved in the constructor:

  • CorePoolSize (required) : Number of core threads. The number of threads in the pool that remain alive at all times, even if they are idle. However, if the allowCoreThreadTimeOut parameter is set to true, the core thread will also be reclaimed if it is idle for more than a period of time.

  • MaximumPoolSize (required) : Maximum number of threads allowed in the pool. When the core threads are all busy and the task queue is full, threads are temporarily added to the pool until the total number of threads reaches maximumPoolSize.

  • KeepAliveTime (required) : specifies the idle timeout time of a thread. When a non-core thread is idle for more than that time, the thread is reclaimed. With the allowCoreThreadTimeOut parameter set to true, the core thread is also reclaimed.

  • Unit (required) : keepAliveTime Time unit of the keepAliveTime parameter. There are: Timeunit.days, timeunit.hours, timeUnit.minutes, timeUnit.seconds, timeUnit.milliseconds, and timeUnit.micr OSECONDS (microseconds), timeUnit.nanoseconds (NANOSECONDS)

  • WorkQueue (required) : A task queue, implemented using a blocking queue. When all the core threads are busy, subsequent runnables submitted by the execute method are stored in the task queue, waiting to be processed by the thread.

  • ThreadFactory (optional) : indicates the threadFactory. Specifies how a thread pool creates a thread.

  • Handler (Optional) : rejects the policy. When the number of threads in the thread pool reaches maximumPoolSize and the workQueue is full, subsequent submitted tasks will be rejected, and the handler can specify how to reject the task.

Let’s put it all together:

Factories and thread pools

Task queue

Using ThreadPoolExecutor requires specifying a task wait queue that implements the BlockingQueue interface. The API documentation for the ThreadPoolExecutor thread pool recommends three types of wait queues: SynchronousQueue, LinkedBlockingQueue, and ArrayBlockingQueue.

  1. SynchronousQueue: SynchronousQueue. This is a blocking queue with no internal capacity, and any insert element waits for a relative delete/read operation, otherwise the insert thread waits, and vice versa.

  2. “LinkedBlockingQueue” : an unbounded queue (not strictly unbounded; the upper limit is integer.max_value), based on a linked list structure. With unbounded queues, subsequent tasks can be queued indefinitely when the core threads are busy, so the number of threads in the thread pool cannot exceed the number of core threads. This queue can improve thread pool throughput at the expense of memory space or even memory overflow. In addition, you can specify the capacity when you use it, which makes it a bounded queue.

  3. “ArrayBlockingQueue” : a bounded queue, implemented based on arrays. When the thread pool is initialized, the size of the queue is specified and cannot be adjusted later. Such bounded queues are good for preventing resource exhaustion, but can be harder to tune and control.

In addition, Java provides four other types of queues:

  1. “PriorityBlockingQueue” : An unbounded blocking queue that supports priority sorting. Elements stored in PriorityBlockingQueue must implement the Comparable interface in order to be sorted by implementing the compareTo() method. The element with the highest priority will always be at the head of the queue; PriorityBlockingQueue does not guarantee the ordering of elements of the same priority, nor does it guarantee that all but the highest-priority element in the current queue will be in the correct order at all times.

  2. DelayQueue: indicates the DelayQueue. It is based on binary heap and has the characteristics of unbounded queue, blocking queue and priority queue. DelayQueue the object stored in the DelayQueue, which must be a class that implements the Delayed interface. The task is extracted from the queue by executing the delay, and the task cannot be extracted until the time is up. Interviewer: Talk about the principles and usage of DelayQueue in Java.

  3. “LinkedBlockingDeque” : a two-ended queue. Based on a linked list implementation, elements can be inserted/removed from the tail as well as inserted/removed from the head.

  4. LinkedTransferQueue: An unbounded blocking queue consisting of a linked list structure. If the queue is empty, a node (node element is null) will be generated and queued. Then the consumer thread will wait on this node. When the producer thread joins the queue, it finds that there is a node whose element is null. The producer thread does not join the queue, and directly fills the node with the element, and wakes up the waiting thread of the node. The awakened consumer thread takes the element.

Rejection policies

Thread pools have an important mechanism: rejection policies. When the thread pool workQueue is full and no new thread pool can be created, subsequent tasks are rejected. The rejection policies need to implement the RejectedExecutionHandler interface, but Executors framework has implemented 4 rejection policies for us:

  1. “AbortPolicy” (the default) : discard task and throw RejectedExecutionException anomalies.

  2. “CallerRunsPolicy” : Runs the run method of this task directly, but not by the thread in the thread pool, but by the calling thread of the task.

  3. DiscardPolicy: Discards the task without throwing any exception.

  4. DiscardOldestPolicy: Forcibly removes a waiting task currently in the head of the wait queue column and then attempts to commit the currently rejected task to the thread pool for execution.

Thread Factory specifies the way the thread is created. If so, the Executors class has thoughtfully provided a default thread factory:

/** * The default thread factory */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s ! = null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);  if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() ! = Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }}Copy the code

Thread pool state

Thread pools have five states:

volatile int runState;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

Copy the code

RunState represents the state of the current thread pool, which is a volatile variable used to ensure visibility between threads.

The following static final variables represent the possible values of runState, with the following states:

  • “RUNNING” : After a thread pool is created, it is in the RUNNING state initially.

  • “SHUTDOWN” : if the SHUTDOWN () method is called, the thread pool is SHUTDOWN and cannot accept new tasks. It waits for all tasks to complete.

  • “STOP” : If the shutdownNow() method is called, the thread pool is stopped. The thread pool cannot accept new tasks and will try to terminate the ongoing task.

  • TERMINATED: the thread pool is set to TERMINATED when it is SHUTDOWN or stopped, all worker threads are destroyed, the task cache queue is cleared, or execution is TERMINATED.

Initialization & Capacity adjustment & Close

1. Thread Initialization

By default, after a thread pool is created, there are no threads in the pool and the thread is created only after the task is submitted.

In practice, if you need to create a thread immediately after the thread pool is created, you can do this in two ways:

  • PrestartCoreThread () : Boolean prestartCoreThread(), initializes a core thread

  • “PrestartAllCoreThreads ()” : int prestartAllCoreThreads(), initializes all core threads and returns the number of initialized threads

public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); } public int prestartAllCoreThreads() {int n = 0; AddIfUnderCorePoolSize (null) while addIfUnderCorePoolSize(null) return n; }Copy the code

“2. Thread pool closed”

ThreadPoolExecutor provides two methods for closing thread pools:

  • “Shutdown ()” : Does not terminate the thread pool immediately, but does not terminate until all tasks in the task cache queue have completed, but no new tasks are accepted

  • “ShutdownNow ()” : Immediately terminates the thread pool and attempts to interrupt tasks in progress, and empties the task cache queue to return tasks that have not yet been executed

“3. Thread pool capacity Adjustment”

ThreadPoolExecutor provides methods to dynamically adjust thread pool capacity:

  • SetCorePoolSize: Sets the core pool size

  • SetMaximumPoolSize: Sets the maximum number of threads that can be created in the thread pool

When the above parameter increases from small to large, ThreadPoolExecutor performs thread assignment and may immediately create a new thread to perform the task.

Using thread pools

ThreadPoolExecutor

Using a ThreadPoolExecutor constructor is the most direct way to use a thread pool. Here’s an example:

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; Public class MyTest {public static void main(String[] args) {// create threadPool ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 5, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5)); For (int I = 0; i < threadPool.getCorePoolSize(); i++) { threadPool.execute(new Runnable() { @Override public void run() { for (int x = 0; x < 2; x++) { System.out.println(Thread.currentThread().getName() + ":" + x); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }}}}); } // close the threadPool threadpool.shutdown (); Threadpool.shutdownnow (); // set the state of the threadPool to SHUTDOWN and then interrupt all threads that are not executing tasks. // Set the state of the thread pool to STOP, then try to STOP all the threads that are executing or suspending tasks, and return to the list of tasks that are waiting to be executed.Copy the code

Running results:

pool-1-thread-2:0
pool-1-thread-1:0
pool-1-thread-3:0
pool-1-thread-2:1
pool-1-thread-3:1
pool-1-thread-1:1

Copy the code

Executors Encapsulate a thread pool

Plus, Executors wrap up 4 common thread pools (again, very nice) :

1. FixedThreadPool

Fixed capacity thread pool. Its characteristic is that the maximum number of threads is the number of core threads, which means that the thread pool can only create core threads, keepAliveTime is 0, that is, the thread completes the task immediately recycle. If no capacity is specified for the task queue, the default value integer.max_value is used. Suitable for scenarios where concurrent threads need to be controlled.

ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }Copy the code

Example:

/ / 1. Create a thread pool object, set the core thread and the maximum number of threads to 5 ExecutorService fixedThreadPool = Executors. NewFixedThreadPool (5); Runnable task =new Runnable(){public void run() {public void run() { System.out.println(thread.currentThread ().getName() + "-- > run "); }}; // 3. Submit the task fixedThreadPool.execute(task) to the thread pool;Copy the code

“2. SingleThreadExecutor”

Single-threaded thread pool. The thread pool has only one thread (the core thread), and the thread returns immediately after completing the task, using a bounded blocking queue (the capacity is not specified, using the default value integer.max_value).

public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } // To save space, the source code for the custom thread factory mode is omittedCopy the code

Example:

/ / 1. Create a single thread thread pool ExecutorService singleThreadExecutor = Executors. NewSingleThreadExecutor (); Runnable task = new Runnable(){public void run() {public void run() { System.out.println(thread.currentThread ().getName() + "-- > run "); }}; / / 3. Submit a task to a thread pool singleThreadExecutor. Execute (task);Copy the code

3. ScheduledThreadPool

Timed thread pool. Specifies the number of core threads, the number of common threads is unlimited, the thread executes the task immediately recycle, the task queue is delayed blocking queue. This is a special thread pool for “performing timed or periodic tasks” **.

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } / / inherited ThreadPoolExecutor public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {// Constructor, Omit the custom thread factory constructor public ScheduledThreadPoolExecutor (int corePoolSize) {super (corePoolSize, Integer. MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledFuture<? > schedule(Runnable command, long delay, TimeUnit unit) { ... } public ScheduledFuture<? > scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {... }}Copy the code

Example:

/ / 1. Create a thread pool regularly ExecutorService scheduledThreadPool = Executors. NewScheduledThreadPool (5); Runnable task = new Runnable(){public void run() {public void run() { System.out.println(thread.currentThread ().getName() + "-- > run "); }}; / / 3. Submit a task to a thread pool scheduledThreadPool. The schedule (task, 2, TimeUnit. SECONDS); . / / delay after 2 s mission scheduledThreadPool scheduleAtFixedRate (task, 50200, TimeUnit. MILLISECONDS); // perform tasks every 2000ms after a delay of 50msCopy the code

4. CachedThreadPool

Cache thread pools. There is no core thread, the number of common threads is integer. MAX_VALUE, which is reclaimed after 60 seconds. The task queue uses the SynchronousQueue, which has no capacity. This method is applicable to ** scenarios with a large amount of tasks but low time consumption.

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Copy the code

Example:

/ / 1. Create a cached thread pool ExecutorService cachedThreadPool = Executors. NewCachedThreadPool (); Runnable task = new Runnable(){public void run() {public void run() { System.out.println(thread.currentThread ().getName() + "-- > run "); }}; // 3. Submit the task cachedThreadPool.execute(task) to the thread pool;Copy the code

Unscramble the thread pool

OK, I believe that the previous content is easy to read, so from here into the deep water, if you can understand the latter, then the thread pool knowledge is really mastered.

The execute() method of ThreadPoolExecutor is used to submit tasks to a Thread pool. However, the execution () method of ThreadPoolExecutor is quite complex, involving six methods of ThreadPoolExecutor, Worker, and Thread:

Submit tasks to the thread pool

execute()

In the ThreadPoolExecutor class, the entry point to the task submission method is the execute(Runnable Command) method (the submit() method also calls execute()), which tries to do only one thing: After various validations, the addWorker(Runnable Command, Boolean core) method is called to create a thread for the pool and execute the task. The result of execute() is two:

Parameter Description:

  1. Runnable Command: indicates the task to be executed

“Execution Process:”

Get () to get the current number of threads in the thread pool. If the number of threads is smaller than corePoolSize, call addWorker(Commond,true) to create a new thread to execute the task; otherwise, perform Step 2.

2. If Step 1 fails, it indicates that no new thread can be created. Therefore, put the task into a blocking queue and wait for the thread that has finished executing the task to process it. Based on this, determine whether the thread pool is in the Running state (only the thread pool in the Running state can accept new tasks). If the task is added to the task queue successfully, go to Step 3; if the task fails, go to Step 4.

3. At this step, it is necessary to indicate that the task has been added to the task queue. At this time, it is necessary to check the status of the thread pool twice, which will have the following situation:

  • The thread pool is no longer in the Running state, and the task needs to be removed from the task queue. If the task is removed successfully, the task is rejected

  • If the thread pool is in the Running state, check whether the worker thread of the thread pool is 0. If it is, call addWorker(Commond,true) to add a thread with no initial task (this thread will get the current task that has been added to the task queue and execute it), otherwise go to Step 4.

  • The thread pool is not in the Running state, but failed to remove the task from the task queue. , go to Step 4.

4. Expand the thread pool to maximumPoolSize and call addWorker(Commond,false) to create a new thread to execute the task.

“Flow chart:”

Create a new thread to execute the task

“Read the source code:”

/** * Performs the given task at some point in the future. Tasks can be executed in a new thread or in an existing pool thread. * If a task cannot be submitted for execution because this actuator is closed or has reached its capacity, the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute command */ public void execute(Runnable command) {if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer threads are running than corePoolSize, an attempt is made to start a new thread with the given command as the first task. * * 2. If a task can be queued successfully, we still need to double-check whether we should add a thread * (since some existing threads have died since the last check), and that the thread pool state has changed to non-running at this point. Therefore, we re-check the status. If the check fails, we remove the listed tasks. If the check passes and the number of threads in the thread pool is 0, we start a new thread. * * 3. If the task cannot be added to the task queue, expand the thread pool to its maximum capacity and try to create a new thread. If the task fails, reject the task. */ int c = ctl.get(); // Step 1: If (workerCountOf(c) < corePoolSize) {if (workerCountOf(c) < corePoolSize) {if (workerCountOf(c) < corePoolSize) { Otherwise use maximumPoolSize if (addWorker(command, true)) return; c = ctl.get(); } // Step 2: WorkerCountOf (c) < corePoolSize or addWorker fails If (isRunning(c) && workqueue.offer (command)) {int recheck =. If (isRunning(c) && workqueue.offer (command)) {int recheck = ctl.get(); If the thread pool is not Running and the task is successfully removed from the task queue, the task is rejected. isRunning(recheck) && remove(command)) reject(command); Else if (workerCountOf(recheck) == 0) // addWorker(null, false) if the thread pool is not in the Running state; } // step 3: if the thread pool is not in the Running state or the task fails to join the column, try to expand the maxPoolSize and addWorker again. Else if (! addWorker(command, false)) reject(command); }Copy the code

addWorker()

The addWorker(Runnable firstTask, Boolean core) method, as the name implies, adds a worker thread with a task to the thread pool.

Parameter Description:

  1. “Runnable firstTask” : The task that the newly created thread should run first (null if none exists).

  2. “Boolean core” : This parameter determines the constraint on the capacity of the thread pool, that is, the limit value of the current number of threads. If true, corePollSize is used as the constraint value, otherwise maximumPoolSize is used.

“Execution Process:”

1. The outer loop determines whether the state of the thread pool is ready to add worker threads. This layer of verification is based on the following two principles:

  • When the thread pool is in the Running state, it can either accept new tasks or process them

  • Workers that can only add empty tasks to the workQueue when the thread pool is closed cannot accept new tasks

2. The inner loop adds a worker thread to the thread pool and returns the result of successful addition.

  • Check whether the number of threads exceeds the limit. If yes, return false. Otherwise, go to the next step

  • Make the number of worker threads +1 through CAS. If yes, step 3 is entered. If no, the thread pool is checked again to see if it is running

3, the number of core threads +1 successful follow-up: add to the worker thread collection, and start the worker thread

  • After obtaining the lock, verify the thread pool state again (see code notes for specific verification rules). If the lock passes, the thread pool state will enter the next step; if the lock fails, the thread will fail to be added

  • After the thread pool status is verified, the system checks whether the thread is started. If yes, an exception is thrown; otherwise, the system tries to add the thread to the thread pool

  • Check whether the thread is started successfully. If the thread is started successfully, true is returned. If the thread fails, the addWorkerFailed method is entered

“Flow chart:”

Add a worker thread with a task to the thread pool

“Read the source code:”

Private Boolean addWorker(Runnable firstTask, Boolean core) {// Loop: Retry: for (;) { int c = ctl.get(); int rs = runStateOf(c); /** * 1. The thread pool is not in the Running state (in the Running state, core threads can be added and tasks can be accepted) * 2. The thread is shutdown and firstTask is empty and the queue is not empty * 3. If (rs >= shutdown &&!) if (rs >= shutdown &&!) if (rs >= shutdown &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // Inner loop: the thread pool adds the core thread and returns for (;;) { int wc = workerCountOf(c); // Check whether the number of threads in the thread pool exceeds the limit: / / 1. The thread pool maximum limit CAPACITY / / 2. CorePoolSize or maximumPoolSize (depending on the core refs) if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; / / by CAS operation of worker threads number + 1, jump out of the outer loop if (compareAndIncrementWorkerCount (c)) break retry. CTL c = ctl.get(); // re-read CTL // If the thread pool state is no longer running, repeat the outer loop if (runStateOf(c)! = rs) continue retry; // Else CAS failed due to workerCount change; Retry inner loop}} /** ** Add to the worker thread set and start the worker thread */ Boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t ! = null) {// The thread pool primary lock mainlock. lock(); Int c = ctl.get(); int c = ctl.get(); int rs = runStateOf(c); / / to test whether the thread pool is again running status, or thread pool shutdown but is empty if thread task (rs < shutdown | | (rs = = shutdown && firstTask = = null)) {/ / thread has been started, Throw an illegal thread state exception // Why does this state exist? Did not solve the if (t.i sAlive ()) / / precheck that t is startable throw new IllegalThreadStateException (); workers.add(w); Int s = worker.size (); If (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); If (workerAdded) {t.start(); workerStarted = true; }} finally {addWorkerFailed if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code

The Worker class

Worker class is an inner class, which implements Runnable and inherited AbstractQueuedSynchronizer (hereinafter referred to as the AQS), so it is not only the task of an executable, and can achieve the effect of the lock.

The Worker class mainly maintains the interrupt control state of the thread running the task, as well as other minor records. This class timely inherited AbstractQueuedSynchronizer class, in order to simplify the acquisition and release the lock (the lock function on each task execution code) process. This prevents you from interrupting a running task and only interrupts the thread waiting to get the task from the task queue.

Instead of using a reentrant lock, we implemented a simple non-reentrant mutex because we didn’t want the work task to be able to retrieve the lock when calling a pool control method like setCorePoolSize. Also, to disable interrupts until the thread actually starts running the task, we initialize the lock status to negative and clear it at startup (in runWorker).

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) {// Set AQS synchronization state // state: lock state, unlock state, 1 lock state setState(-1); FirstTask = firstTask; // Inhibit interrupts until runWorker // Thread factory creates a thread this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); // Lock methods // The value 0 represents The unlocked state // The Value 1 Represents the locked state. 1 Represents the state of the pole. = 0; } /* protected Boolean tryAcquire(int unused) {// Select * from tryAcquire();} /* protected Boolean tryAcquire(int unused) { Therefore, when state is -1, the lock cannot be obtained. // Every time 0->1, If (compareAndSetState(0, SetExclusiveOwnerThread (thread.currentThread ()); return true; } return false; } /* protected Boolean tryRelease(int unused) {setExclusiveOwnerThread(null); /* protected Boolean tryRelease(int unused) {setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } /** * Interrupts (if run) * shutdownNow loops on worker threads and does not require the worker lock. */ void interruptIfStarted() {Thread t;  // if state>=0, t! If (getState() >= 0 && (t = thread)! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }Copy the code

runWorker()

It can be said that runWorker(Worker W) is the method that actually processes tasks in the thread pool, and the previous execute() and addWorker() are both preparing and paving the way for this method.

Parameter Description:

  1. “The Worker w”: Encapsulated Worker, carrying many elements of Worker thread, includingRunnable(Tasks to be handled),lock(lock),completedTasks(Record the number of completed tasks in the thread pool)

“Execution Process:”

1. Check whether the current task or tasks obtained from the task queue are not empty. If both tasks are empty, go to Step 2; otherwise, go to Step 3

2. If the task is empty, set completedAbruptly to false (that is, the thread is not suddenly terminated) and execute processWorkerExit(w,completedAbruptly) to enter the thread exit procedure

3, the task is not empty, then enter the loop, and lock

4. Determine whether to add an interrupt flag for the thread. If one of the following two conditions is met, add an interrupt flag:

  • Thread pool state >=STOP, which is STOP or TERMINATED

  • If thread.interrupted () is true, check whether the Thread pool state is >=STOP (in effect, the shutdown method is TERMINATED and the Thread pool is in STOP or TERMINATED).

Execute task.run() method beforeExecute(wt, task) (this method is empty and implemented by subclasses).

Log between tasks created by a subclass after an afterExecute(task, Thrown) method is executed. Log between tasks created by a subclass.

7. Judge the circular conditions again.

“Flow chart:”

Thread pool flowchart

“Read the source code:”

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // Allow interrupts // New Worker() is state==-1, where the Worker class tryRelease() method is called, setting state to 0. InterruptIfStarted () allows only state>=0 to call interrupt w.nlock (); Boolean completedAbruptly = true; Try {// The current task and the task fetched from the task queue are empty, and the loop stops while (task! = null || (task = getTask()) ! = null) {// Locking prevents terminating the running worker at shutdown(), instead of dealing with concurrency w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while Clearing interrupt /** * Verify that wt is set to TERMINATED only if the thread is stopped and WT is not TERMINATED. If thread.interrupted () is true, check whether the Thread pool status is >=STOP. The shutdown method is TERMINATED and the Thread pool is in STOP or TERMINATED. * If both condition 1 and Condition 2 are satisfied and WT is not interrupted, then wt is interrupted, Otherwise enter the next * / if ((runStateAtLeast (CTL) get (), STOP) | | (Thread. Interrupted () && runStateAtLeast (CTL) get (), STOP))) &&! wt.isInterrupted()) wt.interrupt(); // Call interrupt() interrupt try {// before execution (empty method, implemented by subclass overwriting) beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {// After execution (null method, implemented by subclass override) afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; // Number of completed tasks +1 w.nlock (); // release lock}} // completedAbruptly = false; } finally {// Handle the worker's exit processWorkerExit(w, completedAbruptly); }}Copy the code

5. GetTask ()

The Runnable getTask() method serves the void runWorker(Worker w) method in the implementation of the ThreadPoolExecutor class, as shown in the function call diagram. It is used to get the task (Runnable) from the workQueue.

Parameter Description: No parameter

“Execution Process” :

  1. Set timedOut to false to enter an infinite loop

  2. If the thread pool is Shutdown and the task queue is empty, or if the thread pool is TERMINATED or TERMINATED, it does not need to fetch any more tasks. Number of current worker threads -1 and return null, otherwise go to Step 3

  3. If the number of thread pools exceeds the upper limit or the time exceeds the upper limit and the task queue is empty or the number of current threads is greater than 1, go to Step 4. Otherwise, go to Step 5.

  4. If the worker thread is removed, null is returned on success, and the next loop is entered on failure.

  5. Try to get the task with poll() or take() (depending on the value of timed), and return the task if it is not empty. If null, set timeOut to true to enter the next loop. If an exception occurs during the task acquisition process, set timeOut to false and enter the next loop.

“Flow chart” :

Thread pool flowchart

“Read the source code:”

Private Runnable getTask() {// Last poll timeout Boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /** * State 1: the thread pool is TERMINATED. Thread pool STOP, TERMINATED, or workQueue is empty. If condition 1 and condition 2 are true, workerCount-1 and null are returned. Condition 2 is that the thread pool with SHUTDOWN state will not accept the task, But will still be processing task * / if (rs > = SHUTDOWN && (rs > = STOP | | workQueue. IsEmpty ())) {decrementWorkerCount (); return null; } int wc = workerCountOf(c); // Are workers subject to culling? /** * If either of the following two conditions is met, the worker thread currently trying to acquire the task is set a blocking time limit (timeout will be destroyed? AllowCoreThreadTimeOut: Whether the current thread waits for the task with keepAliveTime * 2. The current number of threads has transcended the core number of threads * / Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; // Condition 1: the number of worker threads is greater than maximumPoolSize, or (the worker thread is blocked for a limited time and the last time the task was pulled from the task queue timed out) // Condition 2: Wc > 1 or task queue is empty the if ((wc > maximumPoolSize | | (timed && timedOut)) && (wc > 1 | | workQueue. IsEmpty ())) {/ / remove the worker thread, Successful it returns null, no success, into the next round of cycle if (compareAndDecrementWorkerCount (c)) return null; continue; Poll () if the worker thread blocks for a certain time, poll() if the worker thread blocks for a certain time, poll() if the worker thread blocks for a certain time, poll() if the worker thread blocks for a certain time, poll() if the worker thread blocks for a certain time, poll() if the worker thread blocks for a certain time, poll() if the worker thread blocks for a certain time, poll() if the worker thread blocks for a certain time, poll() if the worker thread blocks for a certain time. R = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // if r is not empty, return the Runnable if (r! = null) return r; TimedOut = true; timedOut = true; timedOut = true; } catch (InterruptedException retry) {// Respond to an interruption and set the timeout status of the latest fetch task to false before the next loop. }}}Copy the code

processWorkerExit()

ProcessWorkerExit (Worker w, Boolean completedAbruptly) The method to execute thread exit

Parameter Description:

  1. “Worker W” : the Worker thread to terminate.

  2. Boolean completedAbruptly: indicates whether to complete suddenly (due to an exception). If the worker thread dies due to an abnormal user, the completedAbruptly parameter is true.

“Execution Process:”

1. If completedAbruptly is true, that is, the worker thread dies suddenly due to an exception, the worker thread-1 operation is performed.

2. After the main thread obtains the lock, the number of completed tasks in the thread pool will be appended to the number of completed tasks by W (the current worker thread), and the current worker will be removed from the set set of worker.

TryTerminate () is executed to terminate the thread pool.

4. Whether to add worker threads, if the thread pool has not been completely terminated, still need to keep a certain number of threads.

  • If the current thread terminates suddenly, addWorker() is called to create a worker thread

  • The current thread is not terminated suddenly, but the number of current worker threads is less than the number of threads required to be maintained by the thread pool. The number of threads that need to be maintained is either corePoolSize (depending on whether the member variable allowCoreThreadTimeOut is false) or 1.

  • Read the source code:

/** * Performs cleanup and bookkeeping for a dying worker. Called * only from worker threads. Unless completedAbruptly is set, * assumes that workerCount has already been adjusted to account * for exit. This method removes thread from worker set, and * possibly terminates the pool or replaces the worker if either * it exited due to user task exception or if fewer than * corePoolSize workers are running or queue is non-empty but * there are no workers. * * @param w the worker * @param completedAbruptly if the worker died due to user exception */ private void processWorkerExit(Worker w, boolean completedAbruptly) { /** * 1. Worker thread -1 operation * 1) If completedAbruptly is true, the worker thread has an exception and the number of working threads is -1 * 2) If completedAbruptly is false, the worker thread has no work to execute, Worker-1 executed by getTask() */ if (completedAbruptly) // if abrupt, then workerCount wasn't adjusted decrementWorkerCount(); Final ReentrantLock mainLock = this.mainLock; final ReentrantLock = this.mainLock; mainLock.lock(); Try {// Append the number of tasks completed by the worker to the number of tasks completed by the thread pool completedTaskCount += w.completedTasks; // remove the Worker Worker from HashSet<Worker>. Remove (w); } finally { mainLock.unlock(); } // 3. TryTerminate (); AddWorker () * If the current thread is not terminated suddenly, but the number of current threads is less than the number of threads to maintain, */ int c = ctl.get(); */ int c = ctl.get(); */ int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (! completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); }}Copy the code

Well, that’s all for Java thread pools. What did you get, persevering folks? Feel helpful points like it, I wish you a New Year new atmosphere, upgrade pay!