This article focuses on parsing the source code for ThreadPoolExecutor.

This article converts the deep bit operations of ThreadPoolExecutor to binary for a more intuitive understanding of the use of methods and attributes. Detailed understanding of joining the thread pool, executing the worker thread, releasing the worker thread, terminating the thread pool, etc., is carried out so that every judgment and every line of code can be understood. Doug Lea’s Implementation of the ThreadPoolExecutor code is really a model to follow

attribute

NOTE: The bit operations in the code are not intuitive, we can learn them into decimal and binary, intuitive and easy to understand. Binary -> decimal integer.parseInt (“00111100”, 2) binary -> Integer. ToBinaryString (1)

/** ** <pre> * CTL both yes and * workerCount: indicates the number of valid threads * runState: </pre> * <pre> * Run state is stored in the high-order bits; worker count is stored in the low-order bits. The explanation is as follows: * | -- - | -- - high - low | -- - | * 536870912-536870912 -- -- -- -- > > 0 * | | -- - | -- - the thread state - the number of threads - | * the thread state trajectory is -536870912 starts incrementing until it reaches 0; = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = -536870912 * ctl = ctlOf(RUNNING, 0): -536870912 -> 11100000000000000000000000000000 -> size: 32 * </pre> */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // COUNT_BITS = 29 private static final int COUNT_BITS = Integer.SIZE - 3; /** * <pre> * (1 << COUNT_BITS) = (1 << 29) -> 536870912 -> 100000000000000000000000000000 -> size: 30 * CAPACITY = ((1 << 29) - 1) -> 536870911 -> 11111111111111111111111111111 -> size: 29 * ~CAPACITY = ~((1 << 29) - 1) -> -536870912 -> 11100000000000000000000000000000 -> size: 32 * </pre> */ private static final int CAPACITY = (1 << COUNT_BITS) - 1; /** * NOTE: These states are in numerical order, * * runState is stored in the high-order bits * <pre> * COUNT_BITS = 29 * RUNNING (-1 << 29): -536870912 -> 11100000000000000000000000000000 -> size: 32 * SHUTDOWN (0 << 29): 0 -> 0 -> -> size: 1 * STOP (1 << 29): 536870912 -> 100000000000000000000000000000 -> size: 30 * TIDYING (2 << 29): 1073741824 -> 1000000000000000000000000000000 -> size: 31 * TERMINATED (3 << 29): 1610612736 - > 1100000000000000000000000000000 - > size: < / pre > 31 * * * through practice concluded that the constant here just state boundary value. In other words, each state is actually a range, as follows * runState: ------- RUNNING -------- )[ ---------- SHUTDOWN --------- )[ ------------ STOP ---------- )[ ------------- TIDYING -------- )[ TERMINATED * 11100000000000000000000000000000 ~ 0 ~ 100000000000000000000000000000 ~ Infinite * 1000000000000000000000000000000 ~ 1100000000000000000000000000000 ~ / private static final ints RUNNING = 1 < < COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS;  private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;Copy the code

State method

/** * the thread status is RUNNING; If 0 is returned, the thread status is SHUTDOWN. Theory does not return a positive number < pre > * * * because * ~ CAPACITY = ~ ((1 < < 29) - 1) - > - 536870912 - > 11100000000000000000000000000000 - > size: 32 * ctl = ctlOf(RUNNING,0) -> -536870912 -> 11100000000000000000000000000000 -> size: 32 * so * c = CTL, CTL & ~ CAPACITY - > - - > = 11100000000000000000000000000000-536870912 & 536870912-536870912, With CTL ++, the runStateOf method results in a negative number, starting with -536870912 and passing +1 to 0, so a negative number indicates that the thread is RUNNING. * when CTL + + for the first time, CTL - > - 536870911 - > 11100000000000000000000000000001 * at this point, ctl & ~CAPACITY -> -536870911 & -536870912 -> 11100000000000000000000000000001 & 11100000000000000000000000000000 * */ private static int runStateOf(int c) {return c & ~CAPACITY; } / * * * < pre > * because * CAPACITY = (1 < < 29) - 1 - > 536870911 - > 11111111111111111111111111111 - > size: 29 * ctl = ctlOf(RUNNING, 0) -> -536870912 -> 11100000000000000000000000000000 -> size: 32 * so * c = CTL, CTL & CAPACITY - > - 536870912 & 536870911-00000000000000000000000000000000 = 0 > * so * with CTL + +, */ private static int workerCountOf(int c) {return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }Copy the code

Thread pool execution process, this website is very clear

Add the task method -execute

Public void execute(Runnable command) {/* * Proceed in 3 steps: * Proceed in 3 steps */ int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return; AddWorker returns false if other threads call shutdown or shutdownNow or terminate and so on. So I go to this line of code c = ctl.get(); } // = "isRunning(c) = c < SHUTDOWN =" If (isRunning(c) &&workqueue.offer (command)) {int recheck = ctl.get(); // If other threads call shutdown or terminate on the thread pool, delete the task that was just queued and call if (! isRunning(recheck) && remove(command)) reject(command); When the main thread reaches this line of code, T1 is finished running and has performed -1 on the CTL, which is 0. At this point, this line is considered true. This is because the task has already been added to the workQueue. In the runWorker method, if the worker's firstTask is null, the task will be fetched from the workQueue. So null is passed here to give addWorker a chance to execute t.start(), which executes the runWorker method. Else if (workerCountOf(recheck) == 0) addWorker(null, false); // (1)} // 1. The thread pool is RUNNING, but workerCount >= corePoolSize and workQueue is full. The thread pool is not in the RUNNING state, that is, c >= SHUTDOWN. AddWorker = SHUTDOWN (); addWorker = SHUTDOWN (); addWorker(command, false)) reject(command); }Copy the code

This raises A question (question A) : Why use workqueue.offer (command) as A non-blocking method instead of A blocking method like PUT? Think about it first and talk about it at the end of this article

Add task method – addWorker

Remember one premise: the condition for entering this method is the number of current threads <= number of core threads, or the queue is full & number of current threads <= maximum number of threads. Private Boolean addWorker(Runnable firstTask, Boolean core) {Retry: for (;) { int c = ctl.get(); int rs = runStateOf(c); // If rs>0, return false; // Alternatively, if rs=0 and firstTask is not null, return false; If (rs >= SHUTDOWN &&!) if (rs >= SHUTDOWN &&!) if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // Perform CAS on c until successful for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) ! = rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); / / (rs = = SHUTDOWN && firstTask = = null) this situation is only applicable to the execute method (1) of the if (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); //(1) workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code

The addWorker method does two things: 1. 2. The Worker object joins the workers collection and starts the worker. thread thread (i.e. the thread in the thread pool). To summarize, this method is to start the worker. thread thread. In this case, there are two questions:

  • Question B: incoming Runnable types of tasks and then to start, so why workers. The add (w) in the collection, workers set what is the value? Think for yourself and give the answer at the end of this article

  • Notice one detail: in what thread is the addWorker method executed? That helps us understand problem A

One more question (QUESTION E) : Do threads reside in the thread pool (ThreadPoolExecutor)? No, a bunch of Worker objects. Workers are neither threads nor tasks to be performed. So what does it do

Let’s look at the Worker constructor

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker  (1)
    this.firstTask = firstTask;                       // (2)
    this.thread = getThreadFactory().newThread(this); // (3)
}
Copy the code

From the constructor, we know that the task to be executed becomes a field of the Worker, and the Worker also has a thread field. Look at the code at (3) of the Worker, I think this line is very important, I change its synonym: This.thread. target = this, that is, as the value of worker’s thread field, worker itself is also Runnable from the definition of worker. Therefore, when t.start() is executed at (1) of the addWorker method, our task is also executed, as shown in the figure

How do worker.run () and runWorker(Worker) trigger

Task execution method – runWorker

Thread.start () is executed on Worker's thread field, triggering the execution of this method. final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts (b) boolean completedAbruptly = true; try { while (task ! = null || (task = getTask()) ! = null) { w.lock(); If the thread pool is stopping, make sure the current thread is interrupted. // If not, make sure the current thread is not interrupted; // ctl > STOP if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); Try {// Implement beforeExecute(wt, task) by business themselves; try { task.run(); } catch (Throwable x) { throw new Error(x); } finally {afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); }} // completedAbruptly variable to indicate whether an exception occurred during the execution of the task, the value of which is evaluated in the processWorkerExit method. completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}Copy the code
  • Notice one detail: in what thread is the runWorker method executed? That helps us understand problem D

The purpose of the runWorker method is to execute the task (task.run()). It first executes the Worker’s firstTask and then fetches the task from the workQueue to continue execution. In simple terms, it’s a task fetch, task fetch, task fetch. W.lock (), w.lock(), w.lock(), w.lock(), w.lock(), w.lock(

  • How to get: This is the getTask() method thing, but later
  • CTL check: runStateAtLeast(ctl.get(), STOP) => ctl.get() > STOP. This method returns true only when the thread is interrupted
  • W.lock () and w.lock(), worker’s lock correlation is used to distinguish whether a thread is idle. To understand this in conjunction with the shutdown() method, see shutdown below

We know from the while condition that the null value has a special purpose for runWorker() : to tell the worker thread that acquired the task to terminate and quit, so getTask is special when it returns null

Task execution method – getTask

private Runnable getTask() { boolean timedOut = false; // Did the last poll() call time out? for (;;) { int c = ctl.get(); // c & ~CAPACITY int rs = runStateOf(c); c & ~CAPACITY int rs = runStateOf(c); /* * if the thread pool status is rs >= SHUTDOWN, then the thread pool status is not RUNNING. * 2. Whether the blocking queue is empty. * If the above conditions are met, the entire judgment condition is true. If the current thread pool state is SHUTDOWN or above, no more tasks are allowed to be added to the blocking queue * * RS >= SHUTDOWN means that the current thread pool is at least waiting to be closed and no new tasks are accepted * rs >= STOP: So, the thread pool is closing, It won't be the mission task * / if (rs > = SHUTDOWN && (rs > = STOP | | workQueue. IsEmpty ())) {/ / thread pool to/are closed, it returns null, so runWorker dropped out. So the code going here means that the worker thread for this task is about to exit. // Accordingly, CTLS also decrementWorkerCount(); return null; } / => c & CAPACITY int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // wc > maximumPoolSize The answer is that someone dynamically adjusted the maximum number of threads // timeOut variable to indicate whether the fetch task has timed out, // Timed indicates whether the number of threads in the current thread pool exceeds the specified number. // If timeOut and timed are both true, the number of worker threads in the current thread pool is too many and the current worker thread is idle, which meets the condition of reclamation. // Decrement rement is a CTL minus 1 operation. And return NULL, Mean that a worker thread to exit the if ((wc > maximumPoolSize | | (timed && timedOut)) && (wc > 1 | | workQueue. IsEmpty ())) {/ / this thread to exit the, CTL minus one then the if (compareAndDecrementWorkerCount (c)) return null; continue; } try {// This line expresses the idea that as long as a thread is alive, it should execute the task. Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // (1) if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}Copy the code

Let’s analyze the getTask method at (1) : If the core thread can time out (allowCoreThreadTimeOut=true), then the core thread is always alive for keepAliveTime, so the queue executes as soon as a task arrives, otherwise it blocks, waiting for the task to arrive; Or the current number of threads exceeds the number of core threads, then within keepAliveTime, those threads that are larger than the number of core threads are always alive, so it is also the queue as soon as there is a task to execute, otherwise it blocks, waiting for the task to come; Here is a question (question D) : why use blocking method, not blocking method, the answer is given at the end of the paper

To better understand the logic of the execute, addWorker, runWorker, getTask methods, especially conditional judgment, shutdown, shutdownNow, tryTerminate, AwaitTermination is combined to make things clearer, so let’s look at these methods

Here’s how to close the correlation

Shutdown and shutdownNow methods are similar, as shown in the following figure

shutdown

The shutdown() method is called to enter the shutdown state. In the SHUTDOWN state, the thread pool does not accept new tasks, but continues to execute existing tasks in the task queue. How to prove that it does it by practise

By calling shutdown() to close the thread pool, the behavior after closure is that no more tasks can be submitted to the pool, but the tasks that were submitted before closure will still be executed. The thread pool does not enter the shutdown process until the task queue is empty

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
Copy the code

The core of shutdown method consists of three parts:

  1. AdvanceRunState (SHUTDOWN) : Changes the thread pool state to SHUTDOWN
  2. InterruptIdleWorkers () : Interrupts idle threads in the thread pool
  3. TryTerminate () : Transitions to TERMINATED state if either (SHUTDOWN and pool and queue empty) or (STOP and pool empty)

advanceRunState

Convert the thread pool state to the input value, which can only be SHUTDOWN or STOP. If SHUTDOWN is used, the CTL value >=0 after the advanceRunState method is executed, that is, >=SHUTDOWN. If the current number of threads is 3, then CTL is 3. If STOP is used, then the value of CTL after executing advanceRunState >=536870912, that is, >=STOP. If the current thread count is 3, then CTL is 536870912+3

So, this also confirms that the state of the thread pool is a range, not a value, as described at the beginning of the document

private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; }}Copy the code

interruptIdleWorkers

The method is clear: interrupt the thread corresponding to workers. You can tell from the name of the method that the function is to interrupt idle threads. How do you know which work threads are idle

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
Copy the code

Note this: w.trylock (), why try to lock. This is where we look at the runWorker method, which locks the worker (that is, calls a lock) when runWorker is executed. All working threads need to lock the Worker, so use worker.trylock () to determine whether the Worker thread being checked is idle or not. If so, the Worker thread can be locked and interrupt() is sent. In the process of sending interrupt command, because the worker thread is locked, the interrupted thread cannot be used to execute the task simultaneously.

tryTerminate

final void tryTerminate() { for (;;) { int c = ctl.get(); 1. The thread pool is in the RUNNING state and cannot be terminated // 2. The thread pool is TIDYING or TERMINATED because the thread pool is TERMINATED and no longer TERMINATED // 3. Thread pool for SHUTDOWN state & thread pool queue is not empty, there task queue, cannot terminate the if (set (c) | | runStateAtLeast (c, TIDYING) | | (runStateOf (c) = = SHUTDOWN &&!  workQueue.isEmpty())) return; If (workerCountOf(c)! WorkerCountOf (c)! = 0) {// Eligible to terminate // At this point there may be only one free thread, which executes workqueue.take () in the getTask method. This thread is one of the free threads (outside w.lock()), It's blocking and waiting for the thread to come. It will block until the interrupt is performed. You might say that all idle threads are interrupted when you execute interruptIdleWorkers(false). Imagine if a worker thread is not idle while you are executing interruptIdleWorkers(False). As soon as you finish executing interruptIdleWorkers(False), that thread goes back to the while and calls the getTask method. Workqueue.take () will block if there are no more tasks in the workQueue. Call tryTerminate at the end of the worker thread each time to try to interrupt the idle worker thread. If the queue is empty, the fetch task will continue to block. return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // Set the thread pool state to TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated(); Ctl.set (ctlOf(terminated, 0));} finally {// Terminated, the thread pool state is set to terminated. // Notify the awaitTermination method that the thread pool terminates termination. SignalAll (); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }Copy the code

shutdownNow

A call to shutdownNow() will enter the STOP state. In the STOP state, the thread pool neither accepts new tasks nor processes tasks already in the queue. For a worker thread that is still executing a task, the thread pool initiates an interrupt request to interrupt the executing task and empties the queue of tasks that have not yet been executed.

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
Copy the code

The core of shutdownNow method consists of four parts:

  1. AdvanceRunState (STOP) : Changes the thread pool state to STOP, which has the same logic as advanceRunState(SHUTDOWN)
  2. InterruptWorkers () : Interrupts all threads in the thread pool. This method interrupts all started worker threads, that is, ongoing tasks (w.lock executed, but w.lock not yet executed). These thread interrupts may or may not succeed
  3. TryTerminate () : You are done
  4. DrainQueue () : Pulls all unexecuted tasks from the task queue. The list of unexecuted tasks is returned to the app as a value

interruptWorkers

// Interrupts all threads, even if active private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) ! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }Copy the code

One catch: interruptWorkers interrupts all threads in the thread pool (the sum of idle and executing), but the interruptIfStarted() method interrupts only executing threads. If you are confused, let’s look at getState() >= 0. Let’s look at the runWorker method. First w.lock(), then w.lock(). Lock sets state to 1 and as long as runWorker is executed, state is >=0, whether idle or not. The interrupt is all the threads

drainQueue

Return the worker in the workerQueue

private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); if (! q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }Copy the code

Question and answer

Question A: Why use workQueue. Offer (command) as A non-blocking method instead of A blocking method like put? If the execute method is used in the main thread, then subsequent tasks cannot be added to the thread pool. Workers.add (w) is added to the workers.add(w) collection. The workers.add(w) collection is added to the workers.add(w) collection. Answer: The purpose of adding workers is to save the thread and worker at that time. Otherwise, how can workers and threads be locked and interrupted later? AddWorker and runWorker are parallel, and shutdown and shutdownNow should be monitored all the time. C: terminate: terminate: terminate: terminate: terminate: terminate: terminate: terminate: terminate: terminate: terminate: terminate: terminate: terminate: terminate Poll (num, timeUnit) and take() are used in the getTask workqueue.poll (num, timeUnit) method. GetTask has a for spin that keeps searching for tasks. If you do not block, the for spin will always occupy the CPU. This principle is the same as synchronized. No, Worker, Worker is the link between the thread and the task in the thread pool problem F: Lock joins the object, sometimes it is Worker, sometimes it is reentrantLock Answer: ReentrantLock is the essence of the lock. Question G: How to distinguish the idle thread from the executing thread in the thread pool? Answer: Whether the worker thread is locked.  1. 2. How does ThreadPoolExecutor run in a multi-threaded environmentCopy the code

summary

Each time I learn more about ThreadPoolExecutor, a seemingly random line of code and a judgment, I am amazed by the author’s programming ability. Every time I understand it, I feel that the author’s implementation skills have opened up new Windows that I have never seen before.

ThreadPoolExecutor

ThreadPoolExecutor attribute source code and an example of your own debug practice: ThreadPoolExecutorTest0

ThreadPoolExecutor is a ThreadPoolExecutor series