The overall structure of this paper

ThreadPoolExecutor is introduced

Let’s think about why we use thread pools in the first place. Java provides a multi-threading mechanism that allows us to run multiple tasks at the same time, just as multiple tasks are performed by multiple people at the same time, rather than one person performing them sequentially. However, if we create a thread every time we execute a task, the problems that result are

  1. Each time a thread is created and destroyed, there is some overhead
  2. The number of threads is difficult to control. Too many threads can lead to excessive memory usage, and exceptions may occur when the operating system limits are exceeded

So Java provides a default thread pool to help you solve these problems. With ThreadPoolExecutor, you can implement multiple thread creation recycle strategies to suit different requirements.

Thread pools can be used in several scenarios

  1. Batch processing. For example, if we conduct a sub-table scanning task and there is no dependency between each task, we can submit the tasks of each sub-table to the thread pool to improve the execution speed of the overall task.
  2. Optimize the interface time. If there are three independent AND time-consuming IO operations in our interface, we can submit the three IO operations to the thread pool and wait for the completion of the three operations. In this way, the interface time can be reduced from serial to parallel.
  3. Intra-process asynchronous decoupling. For example, in the registration process, an email is sent to the user after the user information is written to the database, and then the email is sent to the thread pool for execution. In this way, the interface registration time is reduced and the interface registration failure is avoided. This can be done with Kafka as well.

This section provides an example for using the ThreadPoolExecutor parameter

To use thread pools properly, we need to understand a few important parameters, which can be seen in the constructor of ThreadPoolExecutor as follows

(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

Copy the code
  • CorePoolSize: The number of core threads in the thread pool, as described in the thread creation recycle mechanism
  • MaximumPoolSize: specifies the maximum number of threads in a thread pool
  • KeepAliveTime: when a thread is recyclable, the thread is recycled after the idle time. By default, non-core threads are recycled, but allowCoreThreadTimeOut allows core threads to be recycled as well.
  • workQueue: Task queue, SynchronousQueue,ArrayBlockingQueue, LinkedBlockingQueue,DelayQueue, PriorityBlockingQueue etc
  • ThreadFactory: Factory for creating threads, controlling thread names, priority, and so on
  • handler: RejectedExecutionHandler RejectedExecutionHandler if the queue is full and the number of threads has reached maximumPoolSize Common reject policies include AbortPolicy, CallerRunsPoicy, DiscardPolicy, and DiscardOldestPolicy

With these parameters in mind, we are ready to create a thread pool and use it. The comments below give us an overview of the use and mechanism of thread pools, which we will examine in detail.

// Create an array blocking queue with a core thread count of 1, a maximum thread count of 4, a keepAliveTime of 1 minute, and a task queue of 10 ExecutorService = new ThreadPoolExecutor(1, 4, 1, timeUnit.minutes, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.CallerRunsPolicy()); // Submit a Callable task through the Submit interface, return a string, Future<String> task1Result = executorService.submit(() -> {thread.sleep (1000); return "hello"; }); // Submit a Callable task through the submit interface, return a string, Future<String> task2Result = executorService.submit(() -> {thread.sleep (1000); return "hello"; }); System.out.println(task1result.get ()); System.out.println(task2result.get ()); // Get the result of the second task with future.get. // The thread pool is used up, we need to shut it down, otherwise the JVM will not exit, because there are no non-daemon threads to exit. // All threads submitted after shutdown are rejected, which is processed by the rejection policy. The thread pool continues processing tasks from the executorservice.shutdown () task queue; // Wait for the thread pool to end. The executorService. AwaitTermination (1, TimeUnit. MINUTES)) {/ / if the execution time isn't over, Call shutdownNow situation task queue executorService. ShutdownNow (); / / wait a minute executorService awaitTermination (1, TimeUnit. MINUTES); }Copy the code

ThreadPoolExecutor implementation mechanism analysis

Now that you understand the use of ThreadPoolExecutor, let’s take a look at the internal architecture of a thread pool

By above can see ThreadPoolExecutor is important component is the workerPool thread pool (work), workQueue task queue, rejectionExecutionHandler (refused to strategy).

  1. The workerPool contains Worker objects, and each Worker object has a thread responsible for executing the submitted tasks and constantly fetching new tasks from the workQueue to execute.
  2. The workQueue is a queue that holds cached tasks. When the corePool (core thread) is full, tasks are placed in the workQueue first, and when the workQueue is full, non-core threads are added
  3. Tasks that are submitted when non-core threads are also full, or even after the thread pool SHUTDOWN is turned off, are executed using a rejection policy.

The more important parts of ThreadPoolExecutor are

  1. Task receiving and execution process
  2. The worker creates a recycling mechanism
  3. Close the process

Thread pool state

Let’s take a look at the states of the thread pool. There are five states in the thread pool and each state is described as

  • RUNNING: The thread pool runs normally after it is created
  • SHUTDOWN: After the SHUTDOWN command is invoked, the state changes to SHUTDOWN. New execute tasks are rejected, and existing tasks in queues and workers continue to be executed
  • STOP: Enter the STOP state after calling shutdownNow method, the task queue will be cleared and the Worker will be interrupted
  • TIDYING: The SHUTDOWN phase is complete or the STOP phase is complete
  • TERMINATED: The thread pool is TERMINATED after the TERMINATED hook method is called

The state machine is as follows

Task Submission Process (Execute method logic)

Let’s look at the execution logic of ThreadPoolExecutor’s execute

  1. If the number of workers is smaller than the number of core threads, a core thread is preferentially created to handle the task
  2. If the number of workers is greater than or equal to the number of core threads, an attempt is made to join the team. If joining the team fails, a non-core thread is created to handle the task
  3. If the thread pool is closed or fails to be enqueued, use the rejectHandler reject policy to handle the task

More details can be viewed through comments in the code.

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); Control state int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {// Try adding a core worker, If (addWorker(command, true)) is successfully added, the result will be returned directly. If the addWorker is not added successfully, other execute threads may trigger addWorker and scramble successfully or return. C = ctl.get(); c = ctl.get(); } // If workerCount is greater than or equal to corePoolSize, add it to workQueue. Worker if (isRunning(c) && workqueue.offer (command)) {int recheck = ctl.get(); // If the workQueue is closed, delete it, reject it if (! isRunning(recheck) && remove(command)) reject(command); Else if (workerCountOf(recheck) == 0) // Add a worker, AddWorker (null, false); } // If the thread pool is closed or the queue is full, it will go here // 1. RejectHandler: rejectHandler // 2. RejectHandler: If queue is full, non-core worker will be added. Else if (! addWorker(command, false)) reject(command); }Copy the code

The Worker class

The addWorker method is responsible for creating Worker objects. First take a look at the content of the Worker class. The Worker class needs to distinguish whether it is currently waiting to acquire a task or in the process of executing a task. The Worker realizes this through a non-reentrant lock, and the task can be executed only after obtaining the lock first.

This is to distinguish between the waiting task and the interrupt executing the task.

In order to prevent workers from interrupt workers when corePoolSize is invoked in their tasks, if they are able to re-enter, their thread interrupt status is set to interrupted so that running tasks may be interrupted later. Interrupt in the worker thread running a task can only be allowed if the thread pool is in the state after STOP.

Take a look at the Worker class definition

Private final class Worker extends AbstractQueuedSynchronizer implements Runnable {/ / thread responsible for performing tasks, If ThreadFactory fails, null final Thread Thread. // When created using the execute method, the initial Runnable firstTask may be passed in; Worker(Runnable firstTask) {// Prevent other threads from setting interrupt state to interrupt task setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); Public void run() {runWorker(this); public void run() {runWorker(this); } protected boolean isHeldExclusively() { return getState() ! = 0; } // implement AQS tryAcquire to implement lock function. Protected Boolean tryAcquire(int unused) {if (compareAndSetState(0, 0) 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } tryRelease(int unused) {setExclusiveOwnerThread(null);} tryRelease(int unused) {setExclusiveOwnerThread(null); setState(0); return true; } public void lock() {acquire(1); } public Boolean tryLock() {return tryAcquire(1); Public void unlock() {release(1); } public Boolean isLocked() {return isHeldExclusively(); } // This is for the shutdownNow method, which allows the interrupt Thread void interruptIfStarted() {Thread t; if (getState() >= 0 && (t = thread) ! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }Copy the code

The flow of the addWorker method

Private Boolean addWorker(Runnable firstTask, Boolean core) {Retry: // For (int c = ctl.get();;) {// In the SHUTDOWN state, if the task queue is empty, no new worker is needed and no worker can be created to execute firstTask // In the STOP state, false must be returned. Don't create the Worker if (runStateAtLeast (c, SHUTDOWN) && (runStateAtLeast (c, STOP) | | firstTask! = null || workQueue.isEmpty())) return false; For (;;); { if (workerCountOf(c) >= ((core ? CorePoolSize: maximumPoolSize) & COUNT_MASK)) // The number of threads has exceeded, return false; / / cas failure retry if (compareAndIncrementWorkerCount (c)) break retry. // If SHUTDOWN occurs, exit the outer loop and retry c = ctl.get(); // Re-read ctl if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; Try {// create Worker object w = new Worker(firstTask); final Thread t = w.thread; if (t ! Final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c  = ctl.get(); / / check again the thread pool state if (set (c) | | (runStateLessThan (c, STOP) && firstTask = = null)) {if (t.g etState ()! = Thread.State.NEW) throw new IllegalThreadStateException(); workers.add(w); workerAdded = true; int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; }} finally {// unlock mainlock. unlock(); } if (workerAdded) {// Start the thread in the worker and start the run method, i.e. runWorker t.start(); workerStarted = true; }} finally {// If the startup fails, such as OOM, rollback workerCount, etc. If (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code

Take a look at the runWorker method, which contains the worker’s task acquisition and execution logic.

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // Why not interrupt before this? w.unlock(); // allow interrupts boolean completedAbruptly = true; Try {/ / if you have firstTask, firstTask run directly, or through getTask block from the task queue waiting to acquire new tasks, if get from the queue is null is interrupt, the worker need to exit the while (task! = null || (task = getTask()) ! = null) {// Before executing the task, lock w.lock(); // If you are in the STOP state, the task needs interrupt // If it is not, it is possible that the interrupt caused by adjusting the parameter needs to be cleared by calling thread.interrupted. // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); Try {// Callback method before task execution beforeExecute(wt, task); Try {// The run method of the passed Runnable is executed. task.run(); AfterExecute (task, null); } catch (Throwable ex) {afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; // Release the lock w.nlock () after executing the current task or after the task exits unexpectedly; }} // Exit from the while condition completedAbruptly = false; } finally {// Call processWorkerExit, which will cause the worker thread to hang if the exit is abnormal, A new worker is created to replace the current worker processWorkerExit(w, completedAbruptly); }}Copy the code

GetTask implementation

The getTask method is responsible for constantly obtaining tasks from the task queue. It can be seen that when the thread can recycle, the keepAliveTime time will be used to block the queue poll wait to realize the recycle function after the Worker thread exceeds a certain idle idle time.

private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); // if the thread pool is SHUTDOWN and the task queue is empty or stopped, the current worker needs to exit. So null if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); / / allowCoreThreadTimeOut is true that the core thread can be recycled, otherwise only recycling non-core thread Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; // if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try {// Poll tasks from the task blocking queue, which can be retrieved with wait time, otherwise wait indefinitely. Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; // timedOut = true; } Catch (InterruptedException retry) {interrupt is used when the thread pool is closed or its configuration is reconfigured. Task timedOut = false; }}}Copy the code

Shutdown implementation

Existing tasks continue to be executed, but new tasks are not accepted

public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // Change the thread pool state to SHUTDOWN advanceRunState(SHUTDOWN); // Call interruptIdleWorkers() on the threads of idle workers that are not performing tasks. // shutdown callback onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }Copy the code

The shutdownNow method, which differs from shutdown in that it changes the state to STOP and drains tasks from the queue

public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // Change the thread pool state to STOP advanceRunState(STOP); // Interrupt all worker interruptWorkers(); Tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }Copy the code

The tryTerminate method attempts to close the thread pool

final void tryTerminate() { for (;;) { int c = ctl.get(); / / if it's RUNNING status, do not need to shut down the if (set (c) | | / / if it was TIDYING, that there are other threads in the terminate, the current thread does not need to deal with, Also return runStateAtLeast (c, TIDYING) | | / / if it is SHUTDOWN state, and task queue and task, still need to wait for tasks performed (runStateLessThan (c, STOP) &&! workQueue.isEmpty())) return; Interrupt if (workerCountOf(c)! Interrupt if (workerCountOf(c)! = 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } // The state of the worker and the task queue is empty. The state needs to be changed to TIDYING and the terminated callback is called. final ReentrantLock mainLock = this.mainLock; mainLock.lock(); If (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated(); // cas changes state to ensure that the terminated method does not invoke terminated. } finally {// change the state to TERMINATED ctl.set(ctlOf(TERMINATED, 0)); // awaitTerminate will await condition termination. SignalAll (); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }Copy the code

BlockingQueue choice

Queue type features
ArrayBlockingQueue Array – based blocking queue, bounded queue
LinkedBlockingQueue Based on the blocking queue list, and ArrayBlockingQueue functional difference between limitless can create a queue, such as Executors. NewFixedThreadPool (int) to create the queue of the thread pool is unbounded, this case queue accumulation lead to OOM problems may occur
SynchronizedQueue Synchronously block the queue, which is a queue with no length, to ensure that the task can be processed quickly and reduce the time in the queue
PriorityBlockingQueue Blocking queues with priority
DelayQueue Delay queue, ScheduledThreadPoolExecutor is to use the queue to achieve timing delay execution and executive function

RejectedExecutionHandler choice

This section introduces the common RejectedExecutionHandler

RejectedExecutionHandler  
AbortPolicy Refused to throw RejectedExecutionException is unusual, this is the default refusal strategies
DiscardPolicy The task is ignored and submitted with no exceptions
DiscardOldestPolicy Removes the earliest task from the task queue and retries submitting the current task
CallerRunsPolicy Use the thread that submitted the task, which is the thread that calls the execute method, to execute the task

Of course, we can also customize our own rejection policy, such as implementing a rejection policy that blocks the submission thread, which, like the CallerRunsPolicy, can slow the submission thread down, but does not use the submission thread to perform the task.

class BlockSubmitRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { e.printStackTrace(); }}}Copy the code

Dynamically Modifying parameters

ThreadPoolExecutor also provides methods to modify parameters such as corePoolSize and maximumPoolSize, allowing us to adjust thread pool parameters dynamically.

public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0 || maximumPoolSize < corePoolSize)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}
public void setMaximumPoolSize(int maximumPoolSize) {
    if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
        throw new IllegalArgumentException();
    this.maximumPoolSize = maximumPoolSize;
    if (workerCountOf(ctl.get()) > maximumPoolSize)
        interruptIdleWorkers();
}

Copy the code

How do I change the queue length? We can implement a variable length blocking queue, which is easy to implement by adding a lock to the LinkedBlockingQueue. Because Capacity is only stored as an int field in LinkedBlockingQueue, it does not affect the array length as ArrayBlockingQueue does. SignalAll (notFull. SignalAll);

This article uses the article synchronization assistant to synchronize