1, the preface

I’m sure you’ve used thread pools in your multi-threaded development. What is a thread pool? Thread pool is a concept of managing threads. We can entrust the creation, destruction and scheduling of threads to the thread pool. On the one hand, it can greatly reduce the amount of development of programmers; Thread pools, on the other hand, maintain a certain number of live threads, reducing the amount of resources wasted by constantly creating and destroying threads. By the way, the concept of “pooling” is a very general concept. Basically, things that are frequently used, expensive to create or destroy, or complex can be pooled. For example: database connection pools, thread pools, JVM constant pools, and so on. So if we extend this, do caches count as pools? I think you can call it a pool in a broad sense.

But anyway, let’s go back to the thread pool. How do we create a thread pool? Java already provides a class for creating thread pools: Executor, and when we do, we typically use a subclass of ThreadPoolExecutor. Let’s take a look at the source code for ThreadPoolExecutor

2. Key attributes

2.1 Running status and number of workers

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
Copy the code

The CTL property is interesting. A thread-safe Integer stores two values, up to three bits to place the state of the thread pool. There are 29 bits left to place the number of workers, so there can be 2^29-1 workers at most

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code

You can see that the top three bits of the thread pool state have values, and the other bits are all 0. What do these states mean? How do they switch states between them? Here’s a quote: blog.csdn.net/qq_24384579…

  • RUNNING: Default state. This is the default state once the thread pool is created and worker is 0. This state creates the core thread, which can add tasks to the wait queue
  • SHUTDOWN: Switches to this state when the SHUTDOWN interface is called. This state does not accept new tasks, but can process tasks already in the pool. He is a soft stop.
  • STOP: This state is converted by calling the shutdownNow interface. Not accepting new tasks, not processing tasks in the pool, will interrupt the existing thread.
  • TIDYING: The thread pool changes to TIDYING state when all tasks have terminated and the “task count” recorded by the CTL is 0. TIDYING is a termination precondition, and I understand it should be a short-term condition. When the thread pool is in TIDYING state, the hook function terminated() is executed and goes to the next state.
  • TERMINATED: Thread pool TERMINATED completely. TERMINATED: Thread pool TERMINATED.

2.2 Waiting queue and worker collection

Private final BlockingQueue<Runnable> workQueue; / / private final HashSet< worker > workers = new HashSet< worker >();Copy the code

A workQueue is a wait queue, and when threads from the thread pool don’t meet the criteria, they add them and wait. So how many queues are there?

  • ArrayBlockingQueue is a bounded blocking queue, implemented with Array, first come, first served.
  • LinkedBlockingQueue is a LinkedBlockingQueue. You can set the queue size manually. If you do not set the queue size to the maximum value of type int, please note that if you set the queue size too large, there is an OOM exception risk. Because it is a linked list structure, it has a higher queue throughput than ArrayBlockingQueue.
  • DelayQueue is a DelayQueue. Is a queue for delayed execution of a scheduled task period. Sort from smallest to largest by the specified execution time, otherwise by the order in which inserts were made to the queue. The newScheduledThreadPool thread pool uses this queue.
  • PriorityBlockingQueue a PriorityBlockingQueue is a queue with a priority that can be set manually.
  • SynchronousQueue Is a very special queue. There are no elements in the queue. If a thread wants to insert its element into the queue, it must wait for another thread to remove its element from the queue.

The worker set is actually a pool. The thread is maintained in the worker, and the thread needs to run is obtained here. So why is worker wrapped? See worker for details.

2.3 Locks and number of threads

/** Private final ReentrantLock mainLock = new ReentrantLock(); /** private int largestPoolSize; Private volatile long keepAliveTime; private volatile long keepAliveTime; Private volatile int corePoolSize; private volatile int corePoolSize; Private volatile int maximumPoolSize; private volatile int maximumPoolSize;Copy the code

MainLock is a reentrant lock used to lock important code, where addWorker locks workers and largestPoolSize, where workers is not thread safe; LargestPoolSize also needs to be changed so it must be locked. Why is mainLock designed to be reentrant? I understand that it is necessary to repeatedly lock when adding workers or tryTerminate. For example, addWorkerFailed will lock addWorkerFailed once, and tryTerminate will be locked again before the lock is released. If it’s not reentrant, it’s locked. (Hey, interview question, how many locks are there in ThreadPoolExecutor?)

CorePoolSize maximumPoolSize workQueue At first, the thread pool is empty, and then a task is submitted, and the core thread needs to be built. Once the core thread has created the corePoolSize, it then adds to the workQueue, leaving the task waiting. If the workQueue is also full, then non-core threads are created until maximumPoolSize is reached. If you do, you have to reject the task according to the rejection strategy.

3. Inner class — Worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ /** Thread this worker is running in.  Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() ! = 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) ! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }Copy the code

Worker is an interesting internal class that does the job of executing externally submitted tasks (implementing the Run method). Continue to see the Worker, he inherited the AbstractQueuedSynchronizer, he’s a lock! Take a closer look at tryLock, combined with juejin.cn/post/703887… Look, he’s also a non-reentrant lock. Why is Worker designed as a lock? Check out the notes:

We implement a simple non-reentrant mutual exclusion lock rather than use ReentrantLock because we do not want worker tasks to be able to reacquire the lock when they invoke pool control methods like setCorePoolSize. Additionally, to suppress interrupts until the thread actually starts running tasks, we initialize lock state to a negative value, and clear it upon start (in runWorker).

Let me translate it briefly and rudely. It is the following paragraph

Instead of using a reentrant lock, we implemented a simple non-reentrant mutex because we didn’t want the work task to be able to retrieve the lock when calling a pool control method such as setCorePoolSize. In addition, to suppress interrupts before the thread actually starts running the task, we initialize the lock status to negative and clear it at startup (in runWorker).

All right, look where the lock is and try to get the lock. You can see that lock is only called in the runWorker method to lock the task while it is running; When do you attempt to acquire locks? InterruptIdleWorkers acquire locks when attempting to interrupt. In combination, the thread cannot be interrupted while running, so lock it before running. Try to get the lock before the break, and never interrupt until you get it. Can you answer that now? How many locks are there in ThreadPoolExecutor?

At the same time, Worker is also a delegate class executed by threads, and all runable is delegated to Worker. I think it can be understood that all “living” threads in the thread pool are threads held by the worker. These threads may be core or non-core.

4. Key methods

Execute method: the most common method that we execute

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return; c = ctl.get(); */ if (isRunning(c) &&workqueue.offer (command)) {int recheck = ctl.get();  /** This is done twice, because isRunning and offer are time-consuming. If (! IsRunning (recheck) && remove(command)) /** If the command is not running, then remove the command. else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (! AddWorker (command, false) /** addWorker(command, false) /** addWorker(command, false) /** addWorker(command, false) /** addWorker(command, false) }Copy the code

The addWorker method adds the worker to the worker’s set

Private Boolean addWorker(Runnable firstTask, Boolean core) {Retry: /** For (;) { int c = ctl.get(); int rs = runStateOf(c); /** If the thread is not running, rs is not SHUTDOWN, firstTask is not empty, and * workQueue is empty, return false * 1. (2) The shutdown state does not accept new tasks, but it still performs any tasks that have been added to the task queue. Worker */ if (rs >= SHUTDOWN &&!) {if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); / * * if the thread pool maximum CAPACITY, or more than the core or the maximum number of threads (related to the new worker is the core and non-core) are not allowed to increase the worker the * / if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; / * * to increase the number of the worker, once the CAS increase success, then jump out directly, no longer to enter circulation * / if (compareAndIncrementWorkerCount (c)) break retry. c = ctl.get(); // re-read CTL /** To re-check the current state of the thread pool, if the state has changed, * to re-check the loop to see if it is necessary to add */ (runStateOf(c)! = rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; /** Lock workers' hashset and largestPoolSize */ mainlock. lock(); Try {/** recheck while retaining the lock. Exit when ThreadFactory fails or closes before acquiring the lock. */ int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new  IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } the if (workerAdded) {/ * * t here is the inside of the worker threads, thread start is called after the worker's run method, and then enter the runWorker starts * / t.s tart (); workerStarted = true; }}} finally {/** If the worker is not started, then you need to lower the count of the worker * and enter tryTerminate to try to reduce the idle worker */ if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code

TryTerminate method: attempts to terminate the thread pool only if the thread pool is shutdown and both worker and queue are empty, or if the thread pool is stopped and worker is empty

Final void tryTerminate() {/** set cTL.com pareAndSet(c, ctlOf(TIDYING, 0)); { int c = ctl.get(); /** If the thread pool is running, or the state is TIDYING, or the state is terminated, * or the state is Shutdow but the queue is not available. It must not terminate at this point, so the return * queue may terminate if it is empty, but it must not terminate if it is not empty. */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; /** If there is a worker, you cannot terminate it, but you need to interrupt an idle worker to let the semaphore pass out. */ if (workerCountOf(c) ! = 0) { interruptIdleWorkers(ONLY_ONE); /** Only one idle worker is interrupted after the interrupt. Of course, there may be no idle worker. * Exit, let the external method enter tryTerminate again, and check again if all workers have stopped */ return; } / / final ReentrantLock mainLock = this.mainlock; mainLock.lock(); /** CAS sets thread pool to terminated */ if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {/** subclass implementation of terminated method */ terminated(); } finally {/** set thread pool state to TERMINATED */ ctl.set(TERMINATED, 0);} finally {/** set thread pool state to TERMINATED */ ctl.set(TERMINATED, 0); termination.signalAll(); } return; } } finally { mainLock.unlock(); }}}Copy the code

“But an idle worker needs to be interrupted to make the semaphore pass out”. This sentence is very confusing… To understand this statement, look at it in conjunction with the getTask method. GetTask is a method of getting a task from a blocking queue, executed in the runWorker. Look at the code below

Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
Copy the code

When the queue is empty, it blocks there. It is clear that worker has several states: 1. 2. The queue is empty, blocking, or the worker has not obtained the task (the worker is idle). If interruptIdleWorkers interrupts an idle worker, an Interupt exception is thrown, freeing the getTask block. After being captured externally, getTask is reentered, and the first step into getTask is to check whether the thread pool is shutdown and the queue is empty. If empty, a null is returned to the runWoker method, at which point the runWorker exits the loop and enters finnaly’s processWorkerExit — the worker exit action. When you return to tryTerminate, you interrupt one worker and then return. Since you do not know if there is any worker after the interrupt, you need to call tryTerminate again to check whether you can interrupt until all workers terminate and then terminate.

RunWorker and getTask were mentioned earlier, so let’s take a look at these two parts of the code

RunWorker method: Executes the worker method, which is called by the run method of the worker class

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); /** Get tasks from worker */ Runnable task = w.task; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; Try {/** the task is not empty, so it enters the loop directly. * if it is empty, the getTask gets the task in the queue and assigns it to the task. * If it gets, it enters the loop. If it does not get, it enters the finnaly worker termination method. = null || (task = getTask()) ! = null) {/** lock worker */ w.lock(); /** If the pool is stopping, make sure the thread is interrupted; * If not, make sure the thread is not interrupted. This needs to be double-checked, * If ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); Try {/** subclasses implement logic before execution */ beforeExecute(wt, task); Throwable thrown = null; Try {/** Execute a specific task */ task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {/** subclass implementation, after execution logic */ afterExecute(task, thrown); } } finally { task = null; /** set task to empty */ w.completedTasks++; w.unlock(); } } completedAbruptly = false; */ processWorkerExit(w, completedAbruptly); / / processWorkerExit(w, completedAbruptly); }}Copy the code

GetTask method: get a task from a blocking queue

private Runnable getTask() { boolean timedOut = false; /** Loop over and over until the task is obtained or not, return null to exit worker */ for (;;) { int c = ctl.get(); int rs = runStateOf(c); / * * ready to terminate if the thread pool, and the thread pool is stop, or wait for the queue is empty * / if (rs > = SHUTDOWN && (rs > = stop | | workQueue. IsEmpty ())) {/ * * the worker number minus one, and returns null, DecrementWorkerCount () since the thread pool is ready to terminate, let the worker terminate */ decrementWorkerCount(); return null; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /** Check whether one worker needs to be reduced. Two conditions need to be met * 1. There are either non-core workers or core workers are timed out. * 2. If 1 is satisfied, we need to check whether one worker can be reduced, because either the queue is empty and can be directly reduced. Queue is empty without have to retain a worker/if the queue is depleted * ((wc > maximumPoolSize | | (timed && timedOut)) && (wc > 1 | | workQueue. IsEmpty ())) {if (compareAndDecrementWorkerCount(c)) return null; continue; } /** get data from queue */ try {Runnable r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}Copy the code