introduce

Multithreading with multi-core processors can indeed improve throughput and performance in production systems. But in a real production environment, blindly creating a large number of threads can be detrimental to system performance, and can even drag down the system. That’s because it takes time for threads to be created and destroyed, threads themselves take up memory space, too many threads can consume CPU and memory resources, and a large number of thread reclaims can make GC times longer and put pressure on GC. We can avoid these problems by limiting the number of concurrent tasks.

With the right number of threads, it is a good idea to reuse the threads created in order to avoid the expense of creating and destroying them frequently. Similar to the idea of database connection pooling. Thread pools in Java are a similar idea.

Thread pools are the most widely used concurrency framework in Java, and can be used by both asynchronous and concurrent programs. The benefits of using thread pools properly can be summarized as follows:

  1. Reduce resource consumption. Reduce the cost of thread creation and destruction by reusing created threads.
  2. Improve response speed. When a task arrives, it can be executed immediately without waiting for the thread to be created.
  3. Improve thread manageability. Threads are a scarce resource. Unlimited creation consumes system resources and reduces system stability. Thread pools are used for uniform allocation, tuning, and monitoring.

Executor framework

The JUC package is the core class library of the JDK and its distribution. The JDK provides an Executor framework for thread control, essentially a thread pool. Let’s take a look at the Executor framework classes and interfaces:

  • Executor is an interface that is the foundation of the Executor framework and decouples the submission and execution of tasks.

  • ExecutorService is an extended interface to executors, adding life-cycle management methods and easy methods for task delivery.

  • ThreadPoolExecutor is the core implementation class of the thread pool used to execute the submitted task.

  • Executors act as a thread pool factory (get a thread pool for a specific function by using Executors).

  • ScheduledThreadPoolExecutor is an implementation class, can carry out task after the given delay or perform tasks on a regular basis

The other can be seen in the above method Runnable interface and Callable interface implementation class, can be ThreadPoolExecutor and ScheduledThreadPoolExecutor execution.

Executor framework asynchronous computation-related classes, including The Future and the FutureTask class that implements the Future interface. The main classes and interfaces are shown below:

As you can see from the figure above, FutureTask implements the Runnable interface, so you can directly create FutureTask and submit it to the ExecutorService for execution.

Now that you have a general idea of what these classes and interfaces can do, let’s put them together and take a look at how the Executor framework works:

1. The main thread creates a task object that implements the Runnable or Callable interface

2. Submit task objects to the ExecutorService. There are two types of Runnable task objects: Execute (does not return asynchronous processing results) and Submit (returns asynchronous processing results).

3. If there is an asynchronous process that returns a result (executing a Submit task), the object implementing the Future interface will be returned

4. The main thread waits for the task to complete by returning the object futureTask.get () method that implements the Future interface. Futuretask.cancel (Boolean mayInterruptIfRunning) can also be used to cancel the task.

ThreadPoolExecutor

ThreadPoolExecutor is the core thread pool implementation class. Follow up by creating a specified type of ThreadPoolExecutor using the Exector framework tool class Executors.

Creation of a thread pool

/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
Copy the code

These are the most important constructors of ThreadPoolExecutor. Let’s look at the parameters to help us understand the core internal implementation of the thread pool.

  • CorePoolSize: The base size (number of core threads) of the thread pool. By default, threads with a core number of threads are not started immediately after ThreadPoolExecutor is created, but only when a task is submitted. If you want to create and start all core threads ahead of time, you can call the thread pool’s prestartAllCoreThreads method implementation as follows:
/** * Starts all core threads, causing them to idly wait for work. This * overrides the default policy of starting core threads only when * new tasks are executed. * * @return the number of threads started */ public int prestartAllCoreThreads() { int n = 0; // Loop to add core size number of worker threads while (addWorker(null, true)) ++n; return n; }Copy the code

The other thing to note is that without calling prestartAllCoreThreads, the number of threads in the thread pool when a new task is submitted to the thread pool, a new thread will be created if there are free core threads in the thread pool that can execute the new task, It will not be created until the number of tasks that need to be executed is greater than the core size of the thread pool.

  • MaximumPoolSize: The maximum number of threads in the thread pool (the maximum number of threads that can be active at the same time). Thread pools beyond the core size are created only when the work queue is full.

  • KeepAliveTime: The lifetime of the idle thread exceeding corePoolSize. This means that thread pool threads that are larger than the core size and a thread whose idle time exceeds keepAliveTime are marked as recyclable and destroyed.

  • Unit: keepAliveTime unit

  • WorkQueue: task queue. It is a blocking queue waiting for tasks to be executed

  • ThreadFactory: threadFactory, used to create threads. You can also use a thread factory to give each created thread a meaningful name. Using ThreadFactoryBuilder provided by the open source framework Guava, you can quickly assign meaningful names to threads in a thread pool, such as the following code:

new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
Copy the code
  • Handler: The rejection policy used to process submitted new tasks when thread pools and queues are full.

Through the above introduction, let’s look at the main processing flow chart of thread pool:

Next, we will introduce the important parameters workQueue and handler in detail.

workQueue

Saves a blocking queue of tasks waiting to be executed. It is an object of the BlockingQueue interface that holds Runnable objects. The following statement reads:

/**
 * The queue used for holding tasks and handing off to worker
 * threads.  We do not require that workQueue.poll() returning
 * null necessarily means that workQueue.isEmpty(), so rely
 * solely on isEmpty to see if the queue is empty (which we must
 * do for example when deciding whether to transition from
 * SHUTDOWN to TIDYING).  This accommodates special-purpose
 * queues such as DelayQueues for which poll() is allowed to
 * return null even if it may later return non-null when delays
 * expire.
 */
private final BlockingQueue<Runnable> workQueue;
Copy the code

The thread pool can choose from the following queues:

  • ArrayBlockingQueue: A bounded blocking queue based on an array structure that sorts elements in FIFO (first-in, first-out) order. If bounded, the maximum capacity of life is required. The constructor is as follows:
/**
 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 * capacity and default access policy.
 *
 * @param capacity the capacity of this queue
 * @throws IllegalArgumentException if {@code capacity < 1}
 */
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
Copy the code
  • LinkedBlockingQueue: Unbounded queue, a blocking queue based on a linked list structure that sorts elements in FIFO order. This queue may specify the queue size, or it may not specify the default Integer.MAX_VALUE (which can be considered infinite). LinkedBlockingQueue has a higher throughput than ArrayBlockingQueue. However, if the task creation rate is much faster than the thread processing speed, the number of unbounded queue tasks will increase, which may cause memory depletion. The constructor looks like this:
/**
 * Creates a {@code LinkedBlockingQueue} with a capacity of
 * {@link Integer#MAX_VALUE}.
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

/**
 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
 *
 * @param capacity the capacity of this queue
 * @throws IllegalArgumentException if {@code capacity} is not greater
 *         than zero
 */
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
Copy the code
  • SynchronousQueue: Commits directly to the queue (does not store elements). Each insert operation needs to wait until the thread calls the remove operation. Throughput is usually higher than LinkedBlockingQueue. Since the submitted task will not be saved, a new task will be submitted to the thread for execution. If there are no free threads in the thread pool, a new thread will be created for execution. If the maximum number of threads in the thread pool is reached, a rejection policy is adopted. Therefore, using this queue usually requires a large maximumPoolSize maximum number of threads, otherwise it is easy to enforce a rejection policy. Constructors are as follows:
/** * Creates a {@code SynchronousQueue} with the specified fairness policy. * * @param fair if true, waiting threads contend in FIFO order for * access; */ / The fair parameter indicates whether the fair policy is used. If true, the waiting threads compete for access in FIFO order. Public SynchronousQueue(Boolean fair) {transferer = fair? new TransferQueue<E>() : new TransferStack<E>(); }Copy the code
  • PriorityBlockingQueue: An unbounded blocking queue with a priority. Constructors are as follows:
/** * Creates a {@code PriorityBlockingQueue} with the default * initial capacity (11) that orders its elements According to * their {@linkplain Comparable Natural ordering}. */ / default initialization capacity is 11 (private static final int) DEFAULT_INITIAL_CAPACITY = 11) public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null); } /** * Creates a {@code PriorityBlockingQueue} with the specified * initial capacity that orders its elements according  to their * {@linkplain Comparable natural ordering}. * * @param initialCapacity the initial capacity for this priority Queue * @throws IllegalArgumentException if {@code initialCapacity} is less * than 1 */ / initialCapacity constructor public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } /** * Creates a {@code PriorityBlockingQueue} with the specified initial * capacity that orders its elements according  to the specified * comparator. * * @param initialCapacity the initial capacity for this priority queue * @param comparator the comparator that will be used to order this * priority queue. If {@code null}, the {@linkplain Comparable * natural ordering} of the elements will be used. * @throws IllegalArgumentException if {@code initialCapacity} is less than 1 */ /initialCapacity is less than 1 */ /initialCapacity is the initialCapacity. Public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }Copy the code
Rejection Policy (Handler)

The built-in REJECTION policies in the JDK are as follows:

  • AbortPolicy: Directly throws an exception
  • CallerRunsPolicy policy: The thread pool is not closed and the caller thread runs the discarded task.
  • DiscardPolicy Indicates that the task is not processed or discarded
  • DiscardOldestPolicy: Discards the oldest request (i.e. the task to be executed) and attempts to commit the current task again

You can select different RejectedExecutionHandler policies based on your requirements. The RejectedExecutionHandler interface is used to implement all the RejectedExecutionHandler policies.

public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Copy the code

The implementation codes of the above four rejection policies are as follows:

/* Predefined RejectedExecutionHandlers */ /** * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method, * unless the executor has been shut down, in which case the task * is discarded. */ public static class CallerRunsPolicy implements RejectedExecutionHandler { /**  * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (! E.isshutdown ()) {r.run() is executed in the caller thread if the thread pool is not closed; } } } /** * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. */ public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {/ / direct selling RejectedExecutionException abnormal throw new RejectedExecutionException (" Task "+ r.t oString () +  " rejected from " + e.toString()); } } /** * A handler for rejected tasks that silently discards the * rejected task. */ public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the Attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }} /** * A handler for rejected tasks that discards the oldest unhandled * request and then retries {@code execute}, unless the executor * is shut down, in which case the task is discarded. */ public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (! E.isshutdown ()) {// if the thread is not closed, it discards the task to be executed and tries to execute the task again. e.execute(r); }}}Copy the code

A simple example of a rejection policy:

public class RejectThreadPoolDemo { public static class MyTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString()+"is discard"); }}); for (int i = 0; i <= Integer.MAX_VALUE; i++){ es.submit(task); Thread.sleep(10); }}}Copy the code

The execution results are as follows:

1631374743886:Thread ID:11 1631374743898:Thread ID:12 1631374743910:Thread ID:13 1631374743922:Thread ID:14 1631374743929:Thread ID:15 java.util.concurrent.FutureTask@37bba400is discard java.util.concurrent.FutureTask@179d3b25is  discard java.util.concurrent.FutureTask@254989ffis discard java.util.concurrent.FutureTask@5d099f62is discard 1631374743987:Thread ID:11 1631374743998:Thread ID:12 1631374744010:Thread ID:13 1631374744023:Thread ID:14 1631374744031:Thread ID:15 java.util.concurrent.FutureTask@37f8bb67is discard java.util.concurrent.FutureTask@49c2faaeis  discard java.util.concurrent.FutureTask@20ad9418is discardCopy the code

We will soon see discarded threads, depending on the thread pool parameters and thread execution time Settings.

Submit tasks to the thread pool

As described in the Executor Framework section above, the execute() and submit() methods can both submit tasks to a thread pool. Let’s look at them separately.

  • Execute method: Used to submit tasks that have no return value and cannot determine whether the task has been executed by the thread pool. The specific code is as follows:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); Int c = ctl.get(); // CTL = ctl.get(); WorkerCount < corePoolSize if (workerCountOf(c) < corePoolSize) { Return /* * The second parameter in addWorker to limit the number of threads added * If true, based on corePoolSize; */ if (addWorker(command, true)) return; C = ctl.get(); WorkQueue if (isRunning(c) &&workqueue.offer (command)) {// Get the thread pool status int again  recheck = ctl.get(); // If the thread pool is not in the running state, the task is removed, and the task is rejected. IsRunning (recheck) && remove(command)) // Call reject policy (command); If worerCount is 0, a thread is added (at least one thread exists in the thread pool) // if the first parameter of addWorker is null, a thread is created but not started (no work is submitted to the thread). Tasks submitted to the queue later are picked up and executed by the thread. else if (workerCountOf(recheck) == 0) addWorker(null, false); } // if the thread pool is not RUNNING or if the thread pool is RUNNING but the queue is full (workerCount>=corePoolSize), create a thread (maximum number of threads maximumPoolSize) to execute the task. Failed to create the task thread, the task will be rejected else if (! addWorker(command, false)) reject(command); }Copy the code

The above code flow is summarized as follows:

1. The current running thread workerCount < corePoolSize creates a new thread to execute the task

WorkerCount >=corePoolSize If the queue is not full, add the task to BlockingQueue.

3. If workerCount >= corePoolSize,workerCount<maximumPoolSize, and BlockingQueue is full, create a new thread to process the task

WorkerCount >maximumPoolSize; if the BlockingQueue is full, the task fails to be rejected.

The most commonly used is step 2, and once step 2 is satisfied, we’ll see that the thread pool state is checked again in the code. Why do you need to check that again? It’s because of two things.

  • The thread pool has been closed since the last check entered the condition judgment. The already queued tasks are then placed and the rejection policy is executed. This may be done to process the task in a timely manner and give the caller feedback on the current thread pool status.
  • The thread pool is still running but the number of valid working threads in the thread pool is zero. (For example, if corePoolSize is 0 and maximumPoolSize is greater than 0, it will be reclaimed when the thread pool’s idle lifetime expires.) In this case, a thread with no task is created to ensure that at least one thread exists in the thread pool and that threads in the queue can be executed as quickly as possible. You may be asking why not just execute the task in the newly created thread? That’s because the task has already been submitted to the blocking queue before this operation, and executing it directly causes the task to be executed twice, since tasks in the queue are also executed by threads. Student: So you would say why don’t you just get execution directly from queue contention? In my opinion, this is not feasible. On the one hand, the current task may not be obtained by queue competition (the current task may have been acquired and executed by other threads), and on the other hand, it will increase the task submission time if you participate in queue competition. Perhaps because of the above considerations, the worker thread’s interaction with the queue should be handled uniformly later, rather than at the time of submission.

The CTL thread state and its workerCount and runState are declared in the source code:

// An atomic variable representing the thread pool state, Private final AtomicInteger CTL = new AtomicInteger(ctlOf(RUNNING, 0)) Private static final int COUNT_BITS = integer. SIZE - 3; Private static final int CAPACITY = (1 << COUNT_BITS) -1; private static final int CAPACITY = (1 << COUNT_BITS) -1; // runState is stored in the high-order bits Private static final int RUNNING = -1 << COUNT_BITS; private static final int RUNNING = -1 << COUNT_BITS; Private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; Private static final int STOP = 1 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; // All tasks terminated, workerCount = 0, Terminate () hook method private static final int TIDYING = 2 << COUNT_BITS; // State terminated(terminated) private static final int terminated = 3 << COUNT_BITS; // state terminated(terminated). // Packing and unpacking CTL Private static int runStateOf(int c) {return c & ~CAPACITY; } private static int workerCountOf(int c) {return c & CAPACITY; } / / the thread state (record record the running state and the effective number of threads) private static int ctlOf (int the rs, int wc) {return rs | wc. }Copy the code

From the above we can see that the high 3 bits of CTL represent the running status and the low 29 bits represent the number of valid threads. We can also see the thread pool states: RUNNING, SHUTDOWN, STOP, TIDYING, TERMINATED.

Let’s take a look at the description of state transitions in the source code comments:

/**
 * RUNNING -> SHUTDOWN
 *    On invocation of shutdown(), perhaps implicitly in  finalize()
 * (RUNNING or SHUTDOWN) -> STOP
 *    On invocation of shutdownNow()
 * SHUTDOWN -> TIDYING
 *    When both queue and pool are empty
 * STOP -> TIDYING
 *    When pool is empty
 * TIDYING -> TERMINATED
 *    When the terminated() hook method has completed
 */
Copy the code

The above state transition is described by flow chart:

The addWorker method is the most commonly used in the execute method:

Private Boolean addWorker(Runnable firstTask, Boolean core) {retry: // The outermost loop checks thread pool state for (;;) {// Thread pool state int c = ctl.get(); Int rs = runStateOf(c); // Check if queue empty only if necessary. ' ' '/** * 1. Rs >= true (rs == SHUTDOWN && firstTask == null && ! Workqueue.isempty ()) * -- If all the conditions in the parentheses above are met, the thread pool line is closed (rs == SHUTDOWN) and does not accept new tasks (just as firstTask == null). If the BlockingQueue still has tasks to execute, the thread pool can continue to execute the tasks in the queue. The condition 2 is not true and continues * -- if either condition in parentheses 2 is not true, the condition 2 is true and returns false. * a).firstTask! * b).firstTask! *.c).workqueue.isEmpty (); *.workqueue.isempty (); If rs == SHUTDOWN && firstTask == null, the thread pool is closed and the new task is empty. If BlockingQueue is empty, it indicates that the task in the queue has been completed and no more threads are needed. If (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // loop for (;;) Int wc = workerCountOf(c); /** * returns false if one of the following two conditions is met: * 1. The number of worker threads is greater than or equal to the maximum CAPACITY((2^29)-1, about 500 million) x 2. If core is true and corePoolSize is greater than or equal to corePoolSize, The core to false and maximumPoolSize judgment (greater than or equal to maximumPoolSize) * / if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; / / try to increase the thread pool threads workerCount, success is out of the outer for loop if (compareAndIncrementWorkerCount (c)) break retry. C = ctl.get(); // re-read CTL // If the state of the thread pool is inconsistent with that of the outermost loop, the thread pool state has changed, and the outermost loop is re-executed. If (runStateOf(c)! = rs) continue retry; // else CAS failed due to workerCount change; Retry inner loop}} Boolean workerStarted = false; boolean workerAdded = false; Worker w = null; Try {// Create worker object (worker thread) w = new worker (firstTask); final Thread t = w.thread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; The lock (); / / locking mainLock. try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); Thread pool state rs < SHUTDOWN indicates RUNNING state * 2. Threads can be created when the thread pool is closed and the current task is NULL. (the SHUTDOWN state can not submit a new task, but can handle the queue tasks) * / if (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {/ / check to see if the thread t can be started, Do not start throwing exceptions if (t.i sAlive ()) / / precheck that t is startable throw new IllegalThreadStateException (); //workers contains the collection of all worker threads in the pool. Access only when mainLock is held. Workers Add a new worker thread worker. add(w); Int s = workers.size(); //largestPoolSize Indicates the maximum pool size to be traced. It can only be accessed under mainLock. If the current pool size is larger than largestPoolSize, change largestPoolSize if (s > largestPoolSize) largestPoolSize = s. WorkerAdded = true; }} finally {// unlock mainlock. unlock(); } // If (workerAdded) {t.start(); workerStarted = true; }}} finally {// thread failed to start if (! AddWorkerFailed (w); } return workerStarted; }Copy the code

From the above code we can find:

  1. AddWorker basically adds a worker thread and executes that thread.
  2. The new thread may execute the current task directly or the new thread may not execute the task temporarily.
  3. The number of threads to be added is smaller than corePoolSize or maximumPoolSize based on the core argument.

AddWorker also has some important methods, such as Worker object (implementation of Runnable interface), addWorkerFailed rollback Worker thread concrete implementation methods, etc.

  • Submit method: Used to submit a task that needs to return a value. Returns an object of type Future (to determine whether it was successfully executed), and gets the return value from the Future’s get() method. The Future’s get() will block the thread until the task is complete (there is also a get(long timeout, TimeUnit Unit) method to set timeout limits). The submit code looks like this:
public Future<? > submit(Runnable task) { if (task == null) throw new NullPointerException(); // Wrap the Runnable object as FutureTask RunnableFuture<Void> fTask = newTaskFor(task, null); Execute (ftask); // Execute (ftask); return ftask; }Copy the code

You’ll notice that the Submit method also calls the execute method internally, but the argument is FutureTask and the return value is the Same FutureTask object. You can determine that when a thread in the thread pool executes the FutureTask, the object will simultaneously record the return result or exception. We’ll talk more about FutureTask later.

Closing the thread pool

The thread pool can be shutdown by calling the shutdown or shutdownNow methods of the thread pool. Let’s start by looking at the lifecycle management interface of the ExecutorService:

void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; .Copy the code

ThreadPoolExecutor implements the interface as follows:

public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// checkShutdownAccess(); // Set the thread pool state to SHUTDOWN advanceRunState(SHUTDOWN); // interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // Try to terminate the thread pool tryTerminate(); } public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// checkShutdownAccess(); // Set the thread pool state to STOP advanceRunState(STOP); InterruptWorkers (); // interruptWorkers(); // Unexecuted tasks in the queue = drainQueue(); } finally { mainLock.unlock(); } // Try to terminate the thread pool tryTerminate(); return tasks; } public Boolean isShutdown() {return true! isRunning(ctl.get()); } public boolean isTerminating() { int c = ctl.get(); // Thread pool state c is greater than or equal to SHUTDOWN and less than TERMINATED, which returns true, otherwise false return! isRunning(c) && runStateLessThan(c, TERMINATED); Return runStateAtLeast(ctl.get(), TERMINATED);} public Boolean isTerminated() {// state of the thread pool TERMINATED >= return runStateAtLeast(ctl.get(), TERMINATED); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// Polling to determine whether the thread pool has terminated for (;;) // state of the thread pool is TERMINATED. Return true if (runStateAtLeast(ctl.get(), TERMINATED) return true; // state of the thread pool is TERMINATED. If (nanos <= 0) return false; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); }}Copy the code
  • The shutdown method performs a gentle shutdown: first set the thread pool state to shutdown, and then interrupt idle threads. Do not accept new tasks and wait for submitted tasks to complete (including submitted tasks that have not yet started execution).

  • ShutdownNow Performs a rude shutdown: First set the state of the thread pool to STOP, then try to STOP all threads executing or suspending tasks and return to the list of tasks awaiting execution. Tasks waiting to be executed are no longer started.

If the thread pool is closed, isShutdown returns true, and the submitted task will be rejected. When all tasks are closed, the thread pool is closed successfully. IsTerminated returns true. IsTerminating returns true if terminating. AwaitTermination is awaitTermination with a timeout limit. Returns true if the timeout period is terminated, false if the timeout period is not terminated.

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor inherited from ThreadPoolExecutor, have the function of scheduling tasks and will be in the specified time for scheduling tasks. It is mainly used for delayed tasks and periodic tasks.

ScheduledThreadPoolExecutor scheduling interface in ScheduledExecutorService (inherited the ExecutorService) defined, let’s look at the definition of the interface methods:

// Create and perform a one-time operation enabled after a given delay. public ScheduledFuture<? > schedule(Runnable command, long delay, TimeUnit unit); ScheduledFuture public <V> ScheduledFuture<V> schedule(Callable<V> Callable, long delay, TimeUnit unit); // create and execute a periodic action that is enabled first after a given initialDelay and then within a given period of time; That is, execution will begin after the initialDelay, then initialDelay+period, then initialDelay+ 2 Period, and so on. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task is terminated only by canceling or terminating the execution program. If any execution of this task takes longer than its cycle, subsequent executions may start late, but not concurrently. public ScheduledFuture<? > scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); // create and execute a periodic action that is first enabled after a given initialDelay, followed by a given delay between the end of one execution and the start of the next. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task is terminated only by canceling or terminating the execution program. public ScheduledFuture<? > scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);Copy the code

The schedule method schedules tasks once at a given time. The scheduleAtFixedRate and scheduleWithFixedDelay methods schedule tasks periodically, but they are slightly different. As shown below:

ScheduleAtFixedRate The time when the last task starts + period specifies the time when the next task is scheduled. If the execution time is longer than the period period, the next execution will be executed after the current execution.

ScheduleWithFixedDelay Indicates that a task is scheduled within the delay time after the last task is executed.

ScheduledThreadPoolExecutor we analysis to it, to be interested in research of its theory, for example, ScheduledThreadPoolExecutor use blocking queue DelayedWorkQueue, The interaction of threads in a thread pool retrieving task execution from a blocking queue, how to implement delayed execution, and so on can be explored.

Executors Thread pool for specified functions

Executors mainly provides the following factory methods:

// Returns a thread pool with a fixed number of threads. If there are no free threads in the thread pool, there will be a temporary LinkedBlockingQueue, Public static ExecutorService newFixedThreadPool(int nThreads) // Returns a thread pool with only one thread. If more than one task is submitted to the thread pool, it is saved to the LinkedBlockingQueue pair. Public static ExecutorService newSingleThreadExecutor() // Returns an unlimited number of threads, Public static ExecutorService newCachedThreadPool() // Returns a thread pool with only one thread. The ScheduledExecutorService interface extends the function of executing a task at a given time. Perform a task such as fixed delay after the execution or cycle public static ScheduledExecutorService newSingleThreadScheduledExecutor () Public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) public static ScheduledExecutorService newScheduledThreadPoolCopy the code

1. Fixed size thread pool

Using newFixedThreadPool as an example, let’s look at the following simple example:

public class ThreadPoolDemo { public static class MyTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis() + ": Thread Id:" + Thread.currentThread()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { MyTask task = new MyTask(); ExecutorService es = Executors.newFixedThreadPool(5); //ExecutorService es = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { es.submit(task); }}}Copy the code

The following output is displayed:

1630981722160: the Thread Id: Thread - the Thread pool - 1-1, 5, the main] 1630981722160: Thread Id:Thread[pool-1-thread-5,5,main] 1630981722160: Thread Id:Thread[pool-1-thread-4,5,main] 1630981722160: Thread Id:Thread[pool-1-thread-5,5,main] 1630981722160: Thread Id:Thread Id:Thread[pool-1-thread-3,5,main] 1630981722160: Thread Id:Thread[pool-1-thread-2,5,main] 1630981727167: Thread Id:Thread[pool-1-thread-4,5,main] 1630981727167: Thread Id:Thread[pool-1-thread-5,5,main] 1630981727167: Thread Id:Thread[pool-1-thread-5,5,main] 1630981727167: Thread Id:Thread Id:Thread[pool-1-thread-1,5,main] 1630981727167: Thread Id:Thread[pool-1-thread-1,5,main] 1630981727167: Thread Id:Thread[pool-1-thread-1,5,main] 1630981727167: Thread Id: Thread - the Thread pool - 1-3, 5, the main]Copy the code

We can see that there is a 5s difference between the first five threads and the last five threads, and the task is executed by every five threads (the first five threads and the last five threads have the same ID). If the above newFixedThreadPool is changed to newCachedThreadPool output:

1630981903770: the Thread Id: Thread - the Thread pool - 1-1, 5, the main] 1630981903770: Thread Id:Thread[pool-1-thread-5,5,main] 1630981903770: Thread Id:Thread[pool-1-thread-5,5,main] 1630981903770: Thread Id:Thread[pool-1-thread-5,5,main] 1630981903770: Thread Id:Thread[pool-1-thread-5,5,main] 1630981903770: Thread Id:Thread[pool-1-thread-3,5,main] 1630981903770: Thread Id:Thread[pool-1-thread-2,5,main] 1630981903770: Thread Id:Thread[pool-1-thread-3,5,main] 1630981903770: Thread Id:Thread[pool-1-thread-3,5,main] 1630981903770: Thread Id:Thread[pool-1-thread-6,5,main] 1630981903770: Thread Id:Thread[pool-1-thread-7,5,main] 1630981903770: Thread Id:Thread[pool-1-thread-6,5,main] 1630981903770: Thread Id:Thread[pool-1-thread-7,5,main] 1630981903770: Thread Id:Thread[pool-1-thread-8,5,main] 1630981903770: Thread Id:Thread[pool-1-thread-9,5,main] 1630981903770: The Thread Id: Thread pool - 1 - Thread - 10, 5, the main]Copy the code

We will see that 10 threads will execute together (since we are creating an unlimited number of threads, one thread will be created for each task submitted to the thread pool)

2. Plan tasks

We use scheduleAtFixedRate as an example:

public class ScheduledExecutorServiceDemo { public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); Ses.scheduleatfixedrate (() -> {thread.sleep (1000); System.out.println(System.currentTimeMillis()/1000); } catch (InterruptedException e) { e.printStackTrace(); } }, 0, 2, TimeUnit.SECONDS); }}Copy the code

The possible output is as follows:

1631442979 1631442981 1631442983 1631442985 1631442987 1631442989 1631442991 1631442993...Copy the code

We can see that the task execution interval is 2s. Let’s see what happens if the task execution time is longer than the set execution period. Sleep (5000) instead of thread.sleep (1000) to look at the execution interval. After the code modification, execute the following output:

1631444201
1631444206
1631444211
1631444216
1631444221
1631444226
1631444231
1631444236
Copy the code

We will see that the interval is executed every five seconds, so we can conclude that when the task execution time is greater than the set cycle time, the task’s next execution start time will be executed after the last execution.

Reasonable number of thread pool threads

For computationally intensive tasks, optimal utilization is achieved when the thread pool size is N+1 on a system with N processors. For tasks that contain I/O operations or other blocking operations, threads do not always execute, and the number of threads in the thread pool can be set to be larger. In general, determining the thread pool size takes into account the number of cpus, memory size, and other factors.

To keep the processor at the desired utilization, the optimal thread pool size is:

Optimal thread pool size = Number of cpus * target CPU usage * (1 + thread wait time/thread execution time)

Thread pool thread sizes are usually not fixed, but are provided by some configuration mechanism (such as distributed configuration), or the number of cpus available is calculated dynamically according to the following code.

Runtime.getRuntime().availableProcessors()
Copy the code

Abnormal disappear

Let’s start with a simple example:

public class DivTask implements Runnable { int a,b; public DivTask(int a, int b) { this.a = a; this.b = b; } @Override public void run() { double re = a/b; System.out.println(re); } public static void main(String[] args) { ThreadPoolExecutor pools = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<>()); for (int i = 0; i< 5; i++){ pools.submit(new DivTask(100, i)); }}}Copy the code

The following output is displayed:

100.0
25.0
33.0
50.0
Copy the code

We find that one result is missing, the one that divisor I =0 causes, but there is no exception output. If you have this kind of problem in a real project it can be very difficult and time-consuming to identify. This is definitely not allowed.

The above problem can be dealt with in two ways:

1. Change the thread pool submit method to execute method

The output results after transformation are as follows:

Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
	at com.wk.manage.web.controller.DivTask.run(DivTask.java:18)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
100.0
25.0
33.0
50.0
Copy the code

2. Future.get() :

try {
    Future re = pools.submit(new DivTask(100, i));
    re.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
Copy the code

The output results after transformation are as follows:

java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at com.test.manage.web.controller.DivTask.main(DivTask.java:27)  Caused by: java.lang.ArithmeticException: / by zero at com.test.manage.web.controller.DivTask.run(DivTask.java:16) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 100.0 50.0 33.0 25.0Copy the code

We can see that in the first way we know where the exception was thrown, and in the second way we know more about where the exception was thrown and where the task was submitted (via the future.get () method). Therefore, the second method is recommended because the exception information is more comprehensive.

If you must use the first method. We need to extend the ThreadPoolExecutor thread pool to do this. For example, the following extended thread pool saves the submitted task thread stack information before scheduling the task:

public class TraceThreadPoolExecutor extends ThreadPoolExecutor { public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public void execute(Runnable task) { super.execute(wrap(task, clientTrace(), Thread.currentThread().getName())); } @Override public Future<? > submit(Runnable task) { return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName())); } private Exception clientTrace(){ return new Exception("Client stack trace"); } private Runnable wrap(final Runnable task, final Exception clientStack, String clientThreadName){ return () -> { try { task.run(); } the catch (Exception e) {/ / to capture/abnormal/print submit task stack clientStack printStackTrace (); throw e; }}; }}Copy the code

Then we change the main method to:

public static void main(String[] args) throws ExecutionException, InterruptedException { ThreadPoolExecutor pools = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<>()); for (int i = 0; i< 5; i++){ pools.execute(new DivTask(100, i)); }}Copy the code

The following output is displayed:

java.lang.Exception: Client stack trace
	at com.wk.manage.web.controller.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:24)
	at com.wk.manage.web.controller.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:15)
	at com.wk.manage.web.controller.DivTask.main(DivTask.java:25)
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
	at com.wk.manage.web.controller.DivTask.run(DivTask.java:16)
	at com.wk.manage.web.controller.TraceThreadPoolExecutor.lambda$wrap$0(TraceThreadPoolExecutor.java:31)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
100.0
25.0
33.0
50.0
Copy the code

We can see that the location of the exception and the task submission are printed.

In addition, from the above example, we can see that the task submitted through EXECUTE can hand the exception it throws to the uncaught exception handler, while the task submitted through Submit is considered to be part of the state returned by the task, no matter whether it throws unchecked exceptions or checked exceptions. If a task submitted by Submit ends because it threw an exception, the exception will be rethrown by Future.get in an ExecutionException.

Thread pool monitoring

Monitoring thread pools is necessary to troubleshoot problems. Let’s take a look at the properties that thread pools provide that you can monitor.

  • TaskCount: Returns the approximate total number of tasks scheduled for execution. Since the state of the task and thread may change dynamically during the calculation, the return value is only an approximation.

  • CompletedTaskCount: The number of completed tasks (which have been terminated) that is less than or equal to the taskCount

  • LargestPoolSize: maximum number of threads ever created by the thread pool. This property tells you whether the number of threads in the thread pool has reached the maximum

  • GetPoolSize: Returns the current number of threads in the thread pool

  • GetActiveCount: Get the number of active threads (approximate number)

Alternatively, we can override the following methods by inheriting the ThreadPoolExecutor custom thread pool:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
Copy the code

conclusion

This article mainly introduces the use of thread pool reason, use method and related use principle, matters needing attention, help better use thread pool in daily life.

Reference books: Java High Concurrency Programming (2nd Edition), Practical Java Concurrent Programming, The Art of Java Concurrent Programming