define

The definition of thread pool should be familiar, mainly the management of thread resources, and statistical analysis. Avoid threads frequently created for destruction to affect performance, etc.

The inheritance of ThreadPoolExecutor

ThreadPoolExecutor extends AbstractExecutorService

AbstractExecutorService implements ExecutorService

ExecutorService implements Executor
Copy the code
  1. Executor is a top-level abstraction that defines a execute(Runnable command) method. Description provides a task to perform.
  2. ExecutorService extended executors by adding life-cycle (shutdown, shutdownNow, etc.) and Submit (Runnable R) task submission methods.
  3. Abstract AbstractExecutorService provides a basic implementation of ExecutorService, including a unified abstract package for submit() task submission and an invokeAny() implementation.
  4. A concrete implementation of a ThreadPoolExecutor thread pool.

ThreadPoolExecutor creates the parameter

ThreadPoolExecutor provides several overloaded constructors. Let’s look at the constructor with the most arguments. The other constructors pass this(…) I keep calling it here.

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    ...
}
Copy the code

Inside the constructor are just some security checks and simple attribute assignments. Here is to understand the meaning of each parameter:

  1. CorePoolSize Number of core thread pools. When the number of threads in the submission task thread pool does not reach corePoolSize, a new core thread is created and the task is the first task for this thread. (This is not reclaimed by default unless allowCoreThreadTimeOut is set, at which point the core thread idle time reaches keepAliveTime is reclaimed.)
  2. MaximumPoolSize Maximum number of threads. When the number of threads reaches corePoolSize and the task queue is full, the number of threads does not reach maximumPoolSize. Threads are created to execute the task (there is a detail here, in fact, after the task is added to the queue, the number of threads is checked twice to determine whether to add more threads). When the non-core thread idle time reaches keepAliveTime, it is reclaimed.
  3. KeepAliveTime indicates the keepAliveTime of the idle thread
  4. Unit Specifies the unit of keepAliveTime and finally calculates a nanosecond keepAliveTime from util
  5. WorkQueue Task queue. Tasks are added to the queue when the number of threads reaches corePoolSize.
  6. ThreadFactory Factory is a factory created by a thread. In practice, it is mainly used to customize some meaningful thread names.
  7. Handler Rejects the handler. When the task queue is full and the number of threads reaches maximumPoolSize, Handle is called for processing. (There are also times when the thread pool is no longer in the running state when the reject policy is used.)

The state of the ThreadPoolExecutor

The concurrency library was written by Doug Lea’s predecessor, and the state of ThreadPoolExecutor is naturally not one of those ints that we routinely use in development. Let’s take a look at the source code for states:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code

This can be summed up in a single sentence, through a CTL’s atomic int object, whose high 3 bits describe the running state of the thread pool, and low 29 bits describe the number of threads in the thread pool.

Now let’s look at the states in detail.

Thread pool state

  1. RUNNING RUNNING status where new tasks can be submitted/obtained and executed
private static final int RUNNING    = -1 << COUNT_BITS;
Copy the code

RUNNING the value is 1 (11111111111111111111111111111111) left COUNT_BITS (29), the resulting value three equals 1, low 29 is equal to zero: 11100000000000000000000000000000 (decimal – 536870912)

  1. SHUTDOWN cannot submit new tasks, but existing tasks continue to execute.
private static final int SHUTDOWN   =  0 << COUNT_BITS;
Copy the code

The value of SHUTDOWN shifts 0 to COUNT_BITS (29 bits) and ends up being 0

  1. STOP cannot submit new tasks, interrupts executing tasks, and removes waiting tasks from the queue.
private static final int STOP       =  1 << COUNT_BITS;
Copy the code

STOP’s value will be 1 left COUNT_BITS (29), the resulting value is highest is equal to 1, low 29 equals 0:10 0000000000000000000000000000 536870912 (decimal)

  1. All tasks in TIDYING are completed and the working thread is 0.
private static final int TIDYING    =  2 << COUNT_BITS;
Copy the code

TIDYING the value is 2 left COUNT_BITS (29), the resulting value is highest is equal to 1, low 30 equals 0:10 00000000000000000000000000000 1073741824 (decimal)

  1. TERMINATED termination
private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code

Value is TERMINATED will 3 left COUNT_BITS (29), finally got the highest value is equal to 1, 2 low 29 equals 0:11 00000000000000000000000000000 1610612736 (decimal)

CTL calculates the thread pool running status

private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

private static int runStateOf(int c)     { return c & ~CAPACITY; }
Copy the code
  1. First calculate the CAPACITY, is equal to 1 left after 29 minus one, equal to 00011111111111111111111111111111 (536870911) is the maximum number of threads:
  2. Calculate the running status runState equals c & ~CAPACITY. By step know CAPACITY on who is three binary 0 low 29 to 1, and then take the mean high three equals 1 low 29 equals 0 (11100000000000000000000000000000). Finally, the & is computed with the current CTL value, which is to calculate the state result of the third digit (ignoring the lower 29 digits).
Summary of thread pool health status
  1. The status values are described only by the top 3 bits of the CTL
  2. The state values are as follows: RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
  3. In the calculation of the current operating state, it is very clever to invert according to CAPACITY, and then perform the & operation with the higher 3 bits to obtain the status result value.

Number of threads

Now that you understand the thread pool state calculation above, the number of threads is simple.

private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

private static int workerCountOf(int c)  { return c & CAPACITY; }
Copy the code
  1. CAPACITY calculation of thread size is the same, the maximum is 00011111111111111111111111111111 (536870911).
  2. The current CTL value is used for & operation with CAPACITY (the highest 3 bits are ignored) to obtain the calculation result of the lower 29.

Task Submission Process

Worker

The Worker is the object in the thread pool, which is the Worker thread that executes our submitted task. Let’s see that the Worker source code is defined inside ThreadPoolExecutor.

Private final class Worker extends AbstractQueuedSynchronizer implements Runnable {/ specific Thread object of * * * Worker/final Thread thread; /** Init task, first submitted task */ Runnable firstTask; /** The number of tasks completed by the current worker */ volatile long completedTasks; /** constructor, based on firstTask. And create a new thread through the thread factory. */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() {// Call the external ThreadPoolExecutor runWorker method to execute the task runWorker(this); If (interruptIfStarted()) state equals 0, it is unlocked. If (interruptIfStarted() state equals 1, it is locked boolean isHeldExclusively() { return getState() ! = 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; If (getState() >= 0 && (t = thread)! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }Copy the code

To summarize:

  1. The Worker wraps an internal thread to execute the task, passing in the firstTask as its first execution in the constructor. Internal threads are created through the thread factory (the factory object passed in by our thread pool).
  2. Worker inherits AQS and also implements related methods to achieve non-reentrant mutex. This ensures that the Worker will not be interrupted during the task execution.
  3. To prevent the internal thread from being interrupted before it actually runs, the constructor calls setState(-1) of the parent AQS class.

Execute () Submits the task to execute

Post the source code first, after all, there is no secret before the source code.

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (! addWorker(command, false)) reject(command); }Copy the code

The first two lines of empty value judgment, the following step by step disassemble detailed analysis of the execution process.

// get the CTL value for the first time. int c = ctl.get(); /* The workerCountOf(c) in the outer if calculates whether the number of current threads is less than corePoolSize. If (addWorker(command, true)) return; if (workerCountOf(c) < corePoolSize) {if (workerCountOf(c) < corePoolSize) { c = ctl.get(); } // Perform subsequent processes if conditions are not metCopy the code

One detail to note here:

The execute() method itself does not do any synchronization, although the outer if gets the number of threads at that time and compares them, but when multiple threads are concurrently executing addWorker(), the other threads are also executing the same logic, which may exceed the corePoolSize.

So addWorker() internally checks the thread pool status and the number of threads atomically. When the condition is not met and the return value is false, the rest of the process proceeds.

Next, let’s analyze the subsequent process

If (isRunning(c) && workqueue.offer (command)) {// 1.1 Get the CTL value int recheck = ctl.get(); // 1.2 Check the status again. If it is not RUNNING, remove the task. And reject task processing if (! isRunning(recheck) && remove(command)) reject(command); Else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 2. Add a non-core thread to execute the task again. addWorker(command, false)) reject(command);Copy the code

To summarize, the outer if determines the state first and adds the task to the queue. If the if condition is not met, try adding a new thread to process it.

Here are some details:

  1. The CTL value is retrieved again in comment 1.1, and the thread pool status is checked again in comment 1.2, otherwise the task is removed and the rejection policy is executed.
  2. The result of the condition in comment 1.2 May not be met. This is done to ensure that any thread has died since the last check. So to ensure that the task can be executed, another thread is added (you can see here that the first argument is passed null because the task has been added to the queue).
  3. In comment 2, the else if at the end if this means that the queue is full, it adds a new thread to come out, and if it fails, it rejects the policy. (Another possibility is that the thread pool state is not RUNNING. The addWorker will fail and the rejection policy will be implemented, which ensures that when we call methods such as shutdown(), shutdownNow, the new task will be rejected.)

AddWork () adds a worker thread

Post the source code first.

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 1. Check that the thread pool is healthy and the queue is empty (no thread will be created) if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; (CAS) for (;;) { int wc = workerCountOf(c); / / 2.1 according to the incoming tag check whether the number of threads is beyond the core number of threads or maximum number of threads the if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; / / try to increase the number of threads in the CAS way 2.2 + 1 if (compareAndIncrementWorkerCount (c)) break retry. C = ctl.get(); // re-read CTL // Thread pool state changes are sent back to the outer loop to reprocess if (runStateOf(c)! = rs) continue retry; // else CAS failed due to workerCount change; Retry Inner loop No changes workCount}} Boolean workerStarted = false; boolean workerAdded = false; Worker w = null; Try {// 3. Create a new work w = new Worker(firstTask); final Thread t = w.thread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; // 3.1 Lock using mainLock. mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); / / 3.1.1 to ensure normal thread state if (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {if (t.i sAlive ()) / / precheck that t is startable throw new IllegalThreadStateException(); // 3.1.1.1 Add work to workers.add(w); int s = workers.size(); If (s > largestPoolSize) largestPoolSize = s; // Record the maximum number of threads created in the thread pool. WorkerAdded = true; } } finally { mainLock.unlock(); If (workerAdded) {t.start(); workerStarted = true; }}} finally {// Start failed to perform a rollback if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code

AddWork () does two things:

  1. Checks the thread pool state, and checks and increments the number of threads
  2. Create worker, add worker to workers set by locking and unlock it, and finally start thread.

Here are some details to note:

  1. New Worker() passes firstTask as its firstTask. In addition, Worker inherits AQS. In the constructor, a thread is created internally through threadFactory.
  2. AddWorkerFailed () will be called to rollback when it is judged that the thread in the worker fails to be created (possibly abnormal thread creation factory or OOM), or the thread fails to be started successfully.

AddWorkerFailed () source code analysis

private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// Delete the worker if (w! = null) workers.remove(w); DecrementWorkerCount (); // Loop the CAS mode on the number of threads decrementWorkerCount(); // Try to terminate the thread pool. TryTerminate (); } finally { mainLock.unlock(); }}Copy the code

Task Execution Process

RunWorker () performs the task

Internal Worker thread will eventually callback Worker since the start of the run () method, and the run () method is mainly to entrust ThreadPoolExecutor. RunWorker () to work. Here is the source code:

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 1. Assign Runnable task = w.task; w.firstTask = null; // 2. Unlock w.nlock (); Boolean completedAbruptly = true; // Allow interrupts // 3. Try {// 4. Check that the task is not null. = null || (task = getTask()) ! = null) {// 4.1 w.lock(); 4.2 Check whether the current working thread is interrupted. If the thread pool running status >=STOP ensures that the thread is interrupted, it will not be interrupted. if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); Try {// 4.3 Processing tasks before execution beforeExecute(wt, task); Throwable thrown = null; Try {// 4.4 Executing task task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {// 4.5 afterExecute(task, thrown); }} finally {// 4.6 worker records the number of completed threads and releases the lock task = null; w.completedTasks++; w.unlock(); }} // 5. The worker completes the change flag completedAbruptly = false; } finally {// 6. ProcessWorkerExit (w, completedAbruptly); }}Copy the code

Here we describe the implementation process in detail:

  1. Note 1 default task to firstTask if firstTask! =null means that the first execution must be firstTask. Note 4: Task! If null is true, the getTask() command will be used if the task is not executed for the first time or has already been executed.
  2. Note 2 Calling w.nlock () can be misleading. This is actually related to factors such as Worker’s implementation of AQS and thread pool closing. The default worker state is -1 to ensure that it will not be interrupted, so calling unlock() here changes the lock release state to 0.
  3. The loop in comment 4 does not end until a null is returned or an exception is thrown.
  4. Note 4.1 locks tasks before they are executed to ensure that ongoing tasks will not be interrupted.
  5. Note 4.3 provides callback processing before task execution, note 4.4 actually calls task execution, and note 4.5 calls callback processing after task execution (typical template method pattern)
  6. Note 6 Perform the worker exit logic.

To summarize:

  1. Analysis shows that if the thread pool state is <=SHUTDOWN, the task will not be interrupted.
  2. There are callback methods for task execution that we can extend, but if an exception is thrown (or executed by the task itself) it can be retrieved in the afterExecutor() method, but will eventually be thrown out causing the worker to exit.

ProcessWorkerExit () exits the cleanup

show code…

private void processWorkerExit(Worker w, boolean completedAbruptly) { // 1. If (completedAbruptly) // if abnormal, then workerCount wasn't adjusted decrementWorkerCount(); Final ReentrantLock mainLock = this.mainLock; final ReentrantLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } // 3. Try to terminate the thread pool tryTerminate(); int c = ctl.get(); If (runStateLessThan(c, STOP)) {// 4.1 Worker completes if (! completedAbruptly) {// 4.1.1 count min int min = allowCoreThreadTimeOut? 0 : corePoolSize; If (min == 0 &&! workQueue.isEmpty()) min = 1; If (workerCountOf(c) >= min) return; // replacement not needed} // 4.2 addWorker(null, false); }}Copy the code

To summarize:

  1. Notes 1 to 3 are all cleaning up work, removing workers and recording the number of completed tasks.
  2. The logic in comment 4.1 determines whether to add a thread to the thread pool based on a series of criteria.
  3. Note 4.2 addWorker() adds a non-core thread. The worker exits abnormally. 2. The number of current threads is lower than the number of core threads. The queue is not empty and there are no worker threads.

Reject policies for tasks

As we know from the previous analysis, when the task queue is full and the number of threads reaches maxPoolSize, the reject policy is invoked to process the task. ThreadPoolExecutor provides several default handling methods:

  1. CallerRunsPolicy The thread that submits the task to process
  2. AbortPolicy throws an exception directly
  3. DiscardPolicy Directly discards cards
  4. DiscardOldestPolicy Discards the oldest task in the task queue

In actual development, the above policies may not be met and are generally recorded and warned. The ability to trigger a reject policy means that the thread pool configuration needs to be adjusted, or the system resources are insufficient and overloaded.

ThreadPoolExecutor destroyed

There are two main methods to shutdown the thread pool: shutdown() and shutdownNow(). Let’s analyze it in detail.

shutdown()

public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // 1. CheckShutdownAccess (); // 2. Change the running state of the thread pool to SHUTDOWN advanceRunState(SHUTDOWN); // 3. Interrupt idle Worker interruptIdleWorkers(); // 4. Callback for custom implementation of onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // Try to terminate tryTerminate(); }Copy the code
  1. Note 2 loops through CAS to change the state
  2. Note 3 will perform specific interrupt actions, mainly to interrupt idle workers. Here is the source code
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Worker for (worker w: workers) {Thread t = w.t_read; // If (! t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); }} // break onlyOne worker if (onlyOne) break; } } finally { mainLock.unlock(); }}Copy the code

The above logic is very clear, mainly to see whether the worker’s thread has been interrupted, if not, try to obtain the lock(here also to ensure that the execution of the task will not be interrupted) after success to interrupt the thread.

shutdownNow()

ShutdownNow () is a bit rougher than shutdown(). Here’s the source code.

public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // All worker interruptWorkers(); // Remove tasks waiting in the queue and return tasks = drainQueue(); } finally { mainLock.unlock(); } // Try to interrupt thread pool tryTerminate(); return tasks; }Copy the code

There are two differences compared to shutdown() :

  1. All workers will be interrupted. As long as the state of worker is not -1, the interrupt method will be called.
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) ! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }Copy the code
  1. Removes and returns a task waiting to be executed in the queue
private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); Q.drainto (taskList); // 1. // 2. Check that the queue is not empty, iterate over it and add it to taskList if (! q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }Copy the code

Note 2 May be a little unclear here, but when a queue is DelayedWorkQueue, etc., the drainTo method is not removed until the task expires. You need to call the remove method to do it one by one. Here want to know the details you can refer to: Java. Util. Concurrent. ScheduledThreadPoolExecutor. DelayedWorkQueue# drainTo (Java. Util. Collection <? super java.lang.Runnable>)

shutdown() & shutdownNow()

Here’s a summary of the differences:

  1. Shutdown () changes the state to shutdown and interrupts only idle workers.
  2. ShutdownNow () changes the state to STOP, interrupts all workers, removes the list of tasks waiting to be executed in the queue and returns.

TryTerminate wandering in all corners ()

Let’s look at the source code

final void tryTerminate() { for (;;) { int c = ctl.get(); // Check whether each state and queue can perform subsequent operations. if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // If the number of threads is greater than 0, the current thread is interrupted. if (workerCountOf(c) ! = 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Ctl.com pareAndSet(c, ctlOf(TIDYING, 0))) {try {// State terminated is called. The default empty implementation terminated() is terminated; } finally {// change state to TERMINATED ctl.set(ctlOf(TERMINATED, 0)); // The thread that wakes up the termination condition, mainly termination. SignalAll (); } return; } } finally { mainLock.unlock(); } // repeat the loop}}Copy the code

To summarize:

  1. The state can be TERMINATED if: 1). SHUTDOWN and the number of threads is 0, and the number of tasks in the queue is empty. 2). STOP state and the number of threads is 0.
  2. Success in state TERMINATED wakes up other threads that call awaitTermination().

conclusion

  1. The lifetime of thread pools is analyzed. Most people will probably just create a thread pool to submit tasks according to the spec, but knowing the internal details will make it easier to handle exceptions.

  2. From the thread pool design can see how deep programming skills of predecessors, worship, learn.