The premise

JUC thread pool ThreadPoolExecutor (ThreadPoolExecutor) {ThreadPoolExecutor (ThreadPoolExecutor); Concurrency master Doug Lea designed the top-level interface of ThreadPoolExecutor’s submitted tasks to have only one stateless execution method:

public interface Executor {

    void execute(Runnable command);
}
Copy the code

The ExecutorService provides a number of extensibility methods, and the underlying extensibility is based on the Executor#execute() method. This article focuses on the implementation of ThreadpoolOr #execute(), which will be analyzed in detail from the implementation principle, source code implementation and simplified examples. The source code for ThreadPoolExecutor is essentially unchanged from JDK8 to JDK11, which was used when this article was written.

The principle of ThreadPoolExecutor

AbstractQueuedSynchronizer ThreadPoolExecutor inside to use JUC synchronizer framework (commonly known as AQS), a large number of operations, the CAS operation. ThreadPoolExecutor provides fixed active threads (core threads), additional threads (additional threads created in the thread pool capacity to the number of core threads, referred to as non-core threads), task queues, and rejection policies.

JUC synchronizer framework

ThreadPoolExecutor uses the JUC synchronizer framework for four main purposes:

  • Global lockmainLockThe member property is a reentrant lockReentrantLockType, used primarily to access worker threadsWorkerSet and lock operations when recording data statistics.
  • Condition variables,termination.ConditionType used primarily by threads to wait for finalizationawaitTermination()Method with a deadline block.
  • Task queueworkQueue.BlockingQueueType, task queue, used to store tasks to be executed.
  • Worker thread, inner classWorkerType, which is the actual worker thread object in the thread pool.

About AQS before I wrote an article on the relevant source code analysis, JUC synchronizer framework AbstractQueuedSynchronizer source graphic analysis.

Core thread

ThreadPoolExecutor: ThreadPoolExecutor: ThreadPoolExecutor: ThreadPoolExecutor: ThreadPoolExecutor

  • The processing of abnormal task execution is not considered for the time being.
  • The task queue is unbounded.
  • The thread pool capacity is fixed to the number of core threads.
  • Ignore the rejection strategy for now.
public class CoreThreadPool implements Executor { private BlockingQueue<Runnable> workQueue; private static final AtomicInteger COUNTER = new AtomicInteger(); private int coreSize; private int threadCount = 0; public CoreThreadPool(int coreSize) { this.coreSize = coreSize; this.workQueue = new LinkedBlockingQueue<>(); } @Override public void execute(Runnable command) { if (++threadCount <= coreSize) { new Worker(command).start(); } else { try { workQueue.put(command); } catch (InterruptedException e) { throw new IllegalStateException(e); } } } private class Worker extends Thread { private Runnable firstTask; public Worker(Runnable runnable) { super(String.format("Worker-%d", COUNTER.getAndIncrement())); this.firstTask = runnable; } @Override public void run() { Runnable task = this.firstTask; while (null ! = task || null ! = (task = getTask())) { try { task.run(); } finally { task = null; } } } } private Runnable getTask() { try { return workQueue.take(); } catch (InterruptedException e) { throw new IllegalStateException(e); } } public static void main(String[] args) throws Exception { CoreThreadPool pool = new CoreThreadPool(5); IntStream.range(0, 10) .forEach(i -> pool.execute(() -> System.out.println(String.format("Thread:%s,value:%d", Thread.currentThread().getName(), i)))); Thread.sleep(Integer.MAX_VALUE); }}Copy the code

The result is as follows:

Thread:Worker-0,value:0
Thread:Worker-3,value:3
Thread:Worker-2,value:2
Thread:Worker-1,value:1
Thread:Worker-4,value:4
Thread:Worker-1,value:5
Thread:Worker-2,value:8
Thread:Worker-4,value:7
Thread:Worker-0,value:6
Thread:Worker-3,value:9
Copy the code

This thread pool is designed so that the core thread is created lazily and blocks the take() method on the task queue if the thread is idle. ThreadPoolExecutor is implemented similarly. However, if keepAliveTime is used and core threads are allowed to timeout (allowCoreThreadTimeOut is set to true), polling is done using BlockingQueue#poll(keepAliveTime) instead of permanent blocking.

Other additional features

When building an instance of ThreadPoolExecutor, you need to define maximumPoolSize (the maximum number of threads in the thread pool) and corePoolSize (the number of core threads). When the task queue is a bounded blocking queue, the core thread is full, and the task queue is full, an additional maximumPoolSize -corePoolsize thread is attempted to execute the newly submitted task. The two main additional functions implemented here when ThreadPoolExecutor are:

  • Non-core threads are created to perform tasks under certain conditions, and the recycle cycle of non-core threads (the end of the thread’s life cycle) iskeepAliveTime, the thread life cycle terminates if the next time a task is fetched from the task queue and its lifetime exceedskeepAliveTime.
  • Provides a reject policy that triggers a reject policy when the core thread is full, the task queue is full, and the non-core thread is full.

Source code analysis

We’ll examine the key properties of thread pools, then their state control, and finally the ThreadPoolExecutor#execute() method.

Key attributes

Public Class ThreadPoolExecutor extends AbstractExecutorService {Private Final AtomicInteger CTL = new  AtomicInteger(ctlOf(RUNNING, 0)); Private final BlockingQueue<Runnable> workQueue; Private final HashSet<Worker> workers = new HashSet<>(); private final HashSet<Worker> workers = new HashSet<>() Private final ReentrantLock mainLock = new ReentrantLock(); Private final Condition termination = mainlock. newCondition(); // awaitTermination = mainLock.newCondition(); Private int largestPoolSize; private int largestPoolSize; Private Long completedTaskCount; // Record the number of successfully executed tasks. // ThreadFactory, used to create new thread instances private volatile ThreadFactory ThreadFactory; Private Volatile RejectedExecutionHandler Handler; private volatile RejectedExecutionHandler handler; Private volatile Long keepAliveTime; private volatile long keepAliveTime; // Whether to allow core threads to timeout. If true, keepAliveTime is valid for core threads. Private volatile int corePoolSize; // Thread pool size private volatile int maximumPoolSize; // omit other code}Copy the code

Let’s look at the constructor with the longest argument list:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
Copy the code

You can customize the number of core threads, the capacity of the thread pool (maximum number of threads), the waiting period of idle threads, the task queue, the thread factory, and the rejection policy. The following is a brief analysis of the meanings and functions of each parameter:

  • CorePoolSize: int number of core threads.

  • MaximumPoolSize: int specifies the maximum number of threads, or the capacity of the thread pool.

  • KeepAliveTime: a type of long. It is also related to the life cycle of a worker thread.

  • Unit: type of TimeUnit, keepAliveTime TimeUnit of the keepAliveTime parameter, actually keepAliveTime is eventually converted to nanoseconds.

  • WorkQueue: BlockingQueue type, waiting queue or task queue.

  • ThreadFactory: ThreadFactory type, thread factory, used to create a worker thread (including core thread and non-core thread), use the default Executors. DefaultThreadFactory () as a built-in thread factory as an example, the general custom thread factory to better track the worker thread.

  • Handler:

  • RejectedExecutionHandler
    Copy the code

    More commonly known as a rejection policy, a rejection policy is executed when the blocking queue is full, there are no free threads (both core and non-core) and the task continues to be submitted. Four built-in rejection policy implementations are provided:

  • AbortPolicy: direct refused to strategy, which is not a mission, throw RejectedExecutionException directly, this is the default refusal strategies.

  • DiscardPolicy: DiscardPolicy, i.e. simply ignore the submitted task (in layman’s terms, empty implementation).

  • DiscardOldestPolicy: Discarding the oldest task policy. DiscardOldestPolicy: Discarding the oldest task policy by polling () and executing the currently committed task.

  • CallerRunsPolicy: the caller execution strategy, which is that the current Executor#execute() thread directly calls the task Runnable#run(). This strategy is usually used to avoid task loss, but from a practical point of view, the original asynchronous invocation intention will be reduced to synchronous invocation.

State control

State control mainly revolves around atomic integer member variable CTL:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; Private static int runStateOf(int c) {return c & ~COUNT_MASK; } private static int workerCountOf(int c) {return c & COUNT_MASK; } / / by running the state and the value of the worker thread count CTL or operation private static int ctlOf (int the rs, int wc) {return rs | wc. } private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } private static boolean isRunning(int c) { return c < SHUTDOWN; } / / CAS operation thread by the increase in the number one private Boolean compareAndIncrementWorkerCount (int) expect {return ctl.com pareAndSet (expect. expect + 1); } / / CAS number of threads to reduce operation 1 private Boolean compareAndDecrementWorkerCount (int) expect {return ctl.com pareAndSet (expect. expect - 1); Private void decrementWorkerCount() {ctl.addandGet (-1); }Copy the code

The state variable of the thread pool is COUNT_BITS, which has a value of integer.size-3 (29) :

The size of an instance of the Integer wrapper type Integer is 4 bytes, which is a total of 32 bits, or a total of 32 bits used to hold 0s or 1s. In the ThreadPoolExecutor implementation, a 32-bit integer wrapper type is used to store the number of worker threads and thread pool state. The lower 29 bits are used to store the number of worker threads and the higher 3 bits are used to store the state of the thread pool, so there can be no more than 2^3 states of the thread pool. The maximum number of worker threads is 2^ 29-1, more than 500 million, this number will not be considered for a short time.

COUNT_MASK: (1 < COUNT_BITS) -l: (1 < COUNT_BITS) -l: (1 < COUNT_BITS) -l: (1 < COUNT_BITS) -l: (1 < COUNT_BITS) -l

The thread pool state constants are RUNNING state constants.

/ / - 1 complement for: 111-11111111/111111111111111111111 / left after 29:111-00000000/000000000000000000000 / decimal value is: Private static final int RUNNING = -1 << COUNT_BITS; private static final int RUNNING = -1 << COUNT_BITS;Copy the code

The composition of the control variable CTL is obtained by the operation or of the thread pool running state RS and the number of worker threads WC:

/ / rs = RUNNING value is: 111-00000000 / / wc has a value of 000000000000000000000 0:00 | 0-00000000000000000000000000000 / / rs wc results as follows: 111-00000000000000000000000000000 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static int ctlOf(int rs, int wc) { return rs | wc; }Copy the code

So how do we get the thread pool state up to three bits from the CTL? The runStateOf() method provided in the source code above extracts the running state:

/ / to take the COUNT_MASK (~ COUNT_MASK), get: 000000000000000000000 / / CTL 111-00000000 bitmap features are: XXX - do a yyyyyyyyyyyyyyyyyyyyyyyyyyyyyy / / two and arithmetic can get high three XXX private static int runStateOf (int) c {return c & ~ COUNT_MASK; }Copy the code

Similarly, remove the low number of worker threads 29 only need to put the CTL and COUNT_MASK 000-11111111 (111111111111111111111) to do with the operation at a time.

If the number of worker threads is 0, summarize the running state constant of the thread pool:

Here’s a special trick. Since the run-state values are stored three bits higher, you can compare and determine the state of a thread pool directly from a decimal value (you can even ignore the lower 29 bits and compare directly with a CTL, or use a CTL to compare with a thread pool state constant) :

RUNNING(-536870912) < SHUTDOWN(0) < STOP(536870912) < TIDYING(1073741824) < TERMINATED(1610612736)

Here are three ways to use this technique:

Private static Boolean runStateLessThan(int c, int s) {return c < s; Private static Boolean runStateAtLeast(int c, int s) {return c >= s; private static Boolean runStateAtLeast(int c, int s) {return c >= s; } private static Boolean isRunning(int c) {return c < SHUTDOWN; }Copy the code

Finally, the transition diagram for thread pool state:

PS: There are many intermediate variables in the thread pool source code with simple single letter, for example, C is CTL, WC is worker count, rs is running status.

Execute method source code analysis

ThreadPoolExecutor#execute() ¶

// Execute the command, Public void execute(Runnable command) {// If (command == null) throw new NullPointerException(); Int c = ctl.get(); If the number of working threads is smaller than the number of core threads, If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))) return if (addWorker(command, true)); C c = ctl.get(); } // The thread pool is running, and the thread pool is running. If (isRunning(c) &&workqueue.offer (command)) {int recheck = ctl.get(); // If the status of the thread pool is not in the running state, the current task is removed from the task queue. If (! IsRunning (recheck) && remove(command)) // Call reject policy processing - reject(command); // Go to the following else if branch, which has the following premises: // The incoming task may fail to be removed from the task queue (the only possibility of removal is that the task has already been executed) // If the current number of worker threads is 0, // If the number of previous worker threads is not 0, it should be the last else branch, but it can do nothing. Else if (workerCountOf(recheck) == 0) addWorker(null, false); } // The following preconditions exist: The thread pool may not be in the RUNNING state and the task queue may be full If the task fails to be added to the task queue, an attempt will be made to create a non-core thread. Else if (! AddWorker (command, false)) // Call reject policy processing task - return reject(command); }Copy the code

Here’s a quick look at the process:

  1. If the total number of current worker threads is less thancorePoolSize, the core thread is created directly to execute the task (the task instance is passed in to be used directly to construct the worker thread instance).
  2. If the number of current worker threads is greater than or equal tocorePoolSizeIf the current number of worker threads is 0, a non-core thread will be created and the task object passed in is NULL.
  3. If placing a task to the task queue fails (the task queue is full), an attempt is made to create a non-core thread to pass in the task instance to execute.
  4. If a non-core thread fails to be created, reject the task and invoke the reject policy to process the task.

Here’s a bit of a puzzle: why do I need to double-check the running status of the thread pool when the current worker thread count is zero and try to create a non-core thread with a null task object passed in? This can be seen in the API comments:

If a task is successfully added to the task queue, we still need to double-check whether we need to add another worker thread (since all alive worker threads may have terminated after the last check) or whether the thread pool was shutdown when the current method was executed. So we need to double check the state of the thread pool, remove the task from the queue if necessary, or create a new worker thread if no worker thread is available.

The task submission process from the caller’s point of view is as follows:

AddWorker method source code analysis

Boolean addWorker(Runnable firstTask, Boolean core) The first argument to the method can be used to pass in the task instance directly. The second argument identifies whether the worker thread to be created is a core thread. The method source code is as follows:

// Add a worker thread. If false is returned, no new worker thread is created. Private Boolean addWorker(Runnable firstTask, Boolean core) {Retry: For (int c = ctl.get();) {// This is a very complicated condition, here we split several and (&&) condition: // 1. The thread pool state is at least SHUTDOWN, i.e. Rs >= SHUTDOWN(0) // 2. If the thread pool is at least in the STOP state (rs >= STOP(1)), or if the firstTask instance passed is not null, or if the task queue is empty, the thread pool will not accept new tasks. If (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask ! = null || workQueue.isEmpty())) return false; // Note that this is also an infinite loop - layer loop for (;;) {// the number of worker threads is reacquired for each loop. If wc >= corePoolSize, return false if corePoolSize failed to create the core thread // 1. If wc >= maximumPoolSize, if wc >= maximumPoolSize, If (workerCountOf(c) >= ((core? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; / / success by CAS update working threads wc, break to the outermost loop if (compareAndIncrementWorkerCount (c)) break retry. C = ctl.get(); c = RUNNING; c = SHUTDOWN; // re-read CTL // If the thread pool status has changed from RUNNING to SHUTDOWN, jump out of the outer loop again and continue to run if (runStateAtLeast(c, SHUTDOWN)) continue retry; Else CAS failed due to workerCount change; otherwise CAS failed due to workerCount change; Retry inner loop}} Boolean workerStarted = false; Boolean workerAdded = false; Worker w = null; Thread t = w.tuhread; // Create a Worker instance by passing in the task instance firstTask. The new Thread object is created through the Thread factory. Thread instance w = new Worker(firstTask); final Thread t = w.thread; if (t ! = null) {final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c  = ctl.get(); // The thread pool state is changed to SHUTDOWN after the thread pool is locked. If the thread pool state is still RUNNING, it only needs to determine whether the thread instance is alive and needs to be added to the Worker thread set and start a new Worker // 2. If the thread pool state is less than STOP (i.e., RUNNING or SHUTDOWN) and the incoming task instance firstTask is null, it needs to add to the Worker thread set and start a new Worker // for 2, in other words, if the thread pool is in SHUTDOWN state, If firstTask is not null, the new Worker will not be added to the Worker thread collection and the new Worker will not be started. This Worker is likely to succeed the next round of GC object is garbage collected the if (set (c) | | (runStateLessThan (c, STOP) && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException();  // Add the created worker thread instance to the worker thread collection worker.add (w); int s = workers.size(); If (s > largestPoolSize) largestPoolSize = s; Thread#start() is used to start the actual thread instance workerAdded = true; } } finally { mainLock.unlock(); If (workerAdded) {t.start(); if (workerAdded) {t.start(); if (workerAdded); WorkerStarted = true; }}} finally {// thread start failure, need to remove the corresponding Worker if (! workerStarted) addWorkerFailed(w); } return workerStarted; } private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// Remove if (w! = null) workers.remove(w); DecrementWorkerCount (); // Try to terminate the thread pool based on state judgment tryTerminate(); } finally { mainLock.unlock(); }}Copy the code

Doug Lea is a big fan of complex conditional judgments, and he doesn’t like curly braces for single-line complex judgments. Code like the following is common in many of his libraries:

if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask ! = null || workQueue.isEmpty())) return false; / /... Boolean atLeastShutdown = runStateAtLeast(c, SHUTDOWN); # rs >= SHUTDOWN(0) boolean atLeastStop = runStateAtLeast(c, STOP) || firstTask ! = null || workQueue.isEmpty(); if (atLeastShutdown && atLeastStop){ return false; }Copy the code

In the above analysis logic, it should be noted that when the Worker instance is created, a Java Thread instance will be created in its constructor through ThreadFactory. After that, it will check whether the Worker instance needs to be added to the Worker Thread set workers and whether the Thread instance held by the Worker needs to be started. Only when the Thread instance instance is started, the Worker really starts to operate; otherwise, it is just a useless temporary object. The Worker itself also implements the Runnable interface, which can be regarded as a Runnable adapter.

Worker thread internal class Worker source code analysis

Each concrete work threads in a thread pool Worker packaged for inner class instance, Worker inheritance in AbstractQueuedSynchronizer (AQS), implements the Runnable interface:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; If ThreadFactory fails to create a Thread, null final Thread is used. // Save the incoming Runnable task instance Runnable firstTask; // Record the total number of tasks completed by each thread. Volatile Long completedTasks; Null Worker(Runnable firstTask) {disable thread interruption until setState(-1) is executed by the runWorker() method; // inhibit interrupts until runWorker this.firstTask = firstTask; ThreadFactory(); ThreadFactory(); ThreadFactory(); ThreadFactory(); ThreadFactory(); } // Delegate to the external runWorker() method, note that the runWorker() method is a thread pool method, not the Worker method public void run() {runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. // Protected Boolean isHeldExclusively() {return getState()! = 0; } // in exclusive mode, try to obtain the resource. Protected Boolean tryAcquire(int unused) {if (compareAndSetState(0, 0) 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // Set state to 0 protected Boolean tryRelease(int unused) {setExclusiveOwnerThread(null); setState(0); return true; } public void lock() {acquire(1); } public Boolean tryLock() {return tryAcquire(1); } // unlock public void unlock() {release(1); } public Boolean isLocked() {return isHeldExclusively(); } void interruptIfStarted() {Thread t; if (getState() >= 0 && (t = thread) ! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }Copy the code

The logic in Worker’s constructor is very important. Thread instances created by ThreadFactory are passed to Worker instances at the same time. Since the Worker itself implements Runnable, it can be submitted to the Thread for execution as a task. Whenever the Worker’s thread instance W calls Thread#start(), it can execute Worker#run() at the appropriate time. To simplify the logic:

Worker = createWorker(); // Pass ThreadFactory ThreadFactory = getThreadFactory(); Thread Thread = threadfactory. newThread(Worker); // start thread.start() in addWorker();Copy the code

Worker inherits from AQS, which uses the exclusive mode of AQS. A technique is to set the resource (state) of AQS to -1 by setState(-1) when constructing Worker. This is because the default value of state in AQS is 0 when the Worker instance is just created, and the thread has not been started yet. Thread interrupts cannot occur at this time, as described in the Worker#interruptIfStarted() method. In Worker, two AQS covering methods tryAcquire() and tryRelease() do not judge external variables. The former directly CAS(0,1), and the latter directly setState(0). Moving on to the core method threadpoolor# runWorker() :

Final void runWorker(Worker w) {Thread wt = thread.currentThread (); Runnable task = w.firstTask; Runnable task = w.firstTask; // Set the initialization task object passed in the Worker to null. Wnlock (); wnlock (); wnlock (); // Allow interrupts // Logs whether threads are terminated due to user exceptions. The default is true Boolean completedAbruptly = true; Try {// Initialize the task object not null, or get the task from the task queue not empty (the task from the task queue is updated to the temporary variable task) // getTask() Because of the blocking queue, the while loop will block or timeout if the second half of the hit is blocked, GetTask () returns null causing the thread to jump out of the loop and terminate while (task! = null || (task = getTask()) ! = null) {// the Worker is locked and the CAS state is changed from 0 to 1. // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while Clearing Interrupt // If the thread pool is stopping (i.e. from RUNNING or SHUTDOWN to STOP), make sure that the current worker thread is interrupted // otherwise, To ensure that the current Thread is not interrupted status if ((runStateAtLeast (CTL) get (), STOP) | | (Thread. Interrupted () && runStateAtLeast (CTL) get (), STOP))) &&! wt.isInterrupted()) wt.interrupt(); Try {// Hook method, beforeExecute(wt, task) before task; try { task.run(); AfterExecute (task, null); } catch (Throwable ex) {afterExecute(task, ex); throw ex; }} finally {// Empty the task temporary variable, this is important, otherwise the while will execute the same task in an infinite loop task = null; W.com pletedTasks++; // set state to 0 w.nlock (); GetTask () returns null and the thread exits completedAbruptly = false; ProcessWorkerExit (w, completedAbruptly); processWorkerExit(w, completedAbruptly); }}Copy the code

The code that determines the interrupt status of the current worker thread is dissected here:

if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); Rs >= STOP(1) Boolean atLeastStop = runStateAtLeast(ctl.get(), STOP); // Check whether the thread pool state is at least STOP, And judge the current Thread state of disruption and to empty the current Thread state of interrupt Boolean interruptedAndAtLeastStop = Thread. Interrupted () && runStateAtLeast (CTL) get (), STOP); if (atLeastStop || interruptedAndAtLeastStop && ! wt.isInterrupted()){ wt.interrupt(); }Copy the code

Thread.interrupted() gets the interrupted status of the Thread and clears the interrupted status. This method is called because it is possible to invoke shutdownNow() while executing the if logic. ShutdownNow () method also has the logic to interrupt all Worker threads. However, since shutdownNow() method will traverse all workers to do thread interruption, it may not be able to interrupt tasks before they are submitted to the Worker for execution. Therefore, the interrupt logic will be executed inside the Worker, which is the logic of the if code block.

It should also be noted that all newly submitted tasks will be rejected in the STOP state, tasks in the task queue will not be executed, and all Worker threads will be interrupted.

That is, even if the task Runnable has been pulled out of the first half of the runWorker() logic, it may be interrupted before it reaches the point of calling its Runnable#run().

Assuming that the logic that goes into the if block happens and the shutdownNow() method is called externally, the if logic will determine the thread abort status and reset, InterruptWorkers () called in the shutdownNow() method will not cause the thread to interrupt twice (which would cause an exception) because of a problem with the interrupt status judgment.

To summarize the core flow of the runWorker() method above:

  1. WorkerPerform the unlocking operation first to release the uninterruptible state.
  2. throughwhileCycle callgetTask()Method gets the task from the task queue (of course, the first loop could also be an external incoming firstTask instance).
  3. If the thread pool becomes moreSTOPState, you need to ensure that the worker thread is interrupted and interrupt processing, otherwise you must ensure that the worker thread is not interrupted state.
  4. Execution Task InstanceRunnale#run()Method, the hook method is called before and after the execution of the task instance, including normal completion and exception execution, respectivelybeforeExecute()andafterExecute().
  5. whileBreaking out of the loop meansrunWorker()Method end and worker thread lifecycle end (Worker#run()Lifecycle end), will be calledprocessWorkerExit()Handles the aftermath of worker thread exit.

Write in the last

Welcome to pay attention to my public number [calm as code], massive Java related articles, learning materials will be updated in it, sorting out the data will be placed in it.

If you think it’s written well, click a “like” and add a follow! Point attention, do not get lost, continue to update!!