Java thread pool implementation principle and source analysis

preface

I started writing this article in late November 2019. I didn’t start writing it until the end of 2020, and I didn’t finish it until the beginning of 2021.

Time too fast and too slow ~!

I vaguely remember in October 2019, Mou Dong resigned from a startup company and planned to interview for a job. He asked me if I knew how to do thread pool. Then he sent me a note I wrote in 2017 called “Thread Pools for Concurrent Programming in Java,” and he said, “Is that it? I thought the thread pool was about as big as it was.

On November 9, 2019, MOU Dong and I took bus 815 from Dawang Road to Yanjiao. It was just that I was learning a little bit about multithreading, and there was nothing else going on on the bus and we were chatty. He asked me a few questions about thread pools, and I figured if I knew what to do with a worker pool, I’d optimize the core thread count. The main discussion is still about multi-threaded concurrency and locks.

At the end of the year, I was usually busy with my work, so I seldom studied myself. A week later, I remembered the question someone asked me: “How are threads generated in the thread pool, and how are tasks waiting to be executed?” .

I am not very clear about the logic of this, so I temporarily recorded this TODO item, hoping to study it when I have time in the future. As a result, it jumps from 2019 and 2020 straight to 2021.

Forgive me for being long-winded, but the point is that the time span of this article is too long and the impression on me is too deep. Have to say say, below begin to go into topic ~!

JDK1.8 source code to analyze the core design and implementation of Java thread pool.

This article refers to the implementation principle of Java thread pool and its practice in Meituan business.

Java thread pool implementation principle and its practice in Meituan business this article is very good, in addition to the content of this article, this article also describes the background of thread pool, thread pool in business practice and dynamic thread pool, etc. So if you want to learn more about thread pools, you can read the Java thread pool implementation principle and its practice in meituan business.

If you are a server developer, you are strongly advised to read the Java thread pool implementation principle and its practice in Meituan business.

appearance

The look and feel is mostly the dots we see when we normally work with thread pools.

  • Inheritance relationship;
  • Constructor;
  • Parameters in the constructor;
  • Blocking queues in constructors;
  • Thread pool creation;
  • The rejection policy in the constructor;

Thread pool inheritance relationships

! [ThreadPoolExecutor – uml. PNG] (img – blog. Csdnimg. Cn/img_convert… =240×300)

The top-level interface of ThreadPoolExecutor implementation is Executor. In the interface Executor, users do not need to worry about how to create threads or schedule threads to execute tasks. Users only need to provide Runnable objects and submit the running logic of tasks to the Executor. The Executor framework does the dispatching of threads and the execution of tasks.

The ExecutorService interface adds some capabilities:

  1. Extend the ability to execute tasks. Supplements can be generated for one or a group of asynchronous tasksFutureThe method;
  2. Provides methods for managing thread pools, such as stopping them from running.

AbstractExecutorService is a high-level abstract class that strings together the process of performing a task, ensuring that the underlying implementation only needs to focus on a single method to perform the task.

The lowest implementation class, ThreadPoolExecutor, implements the most complex part of the run:

  1. A set of threads can be automatically created, managed, and reused with a specified number of parties simply submitting tasks

  2. ThreadPoolExecutor has properties such as stateful, number of core threads, and non-core threads. It makes extensive use of CAS and AQS locks to avoid concurrency conflicts

  3. The concepts of core threads, buffered blocking queues, non-core threads, and discard policies can be combined according to actual application scenarios

  4. BeforeExecute and afterExecute() are provided to support extending the functionality of thread pools

The constructor

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
Copy the code
  • corePoolSize: The number of core threads in the thread pool. Normally, the number of core threads in the thread pool will always exist regardless of whether there is a task or notThreadPoolExecutorThe methods inallowCoreThreadTimeOut(boolean value)Set totrue, the idle core thread will have a timeout mechanism, if there is no new task within the specified time, the core thread will also be terminated, and this time interval is determined by the third attributekeepAliveTimeSpecified.
  • MaximumPoolSize: The maximum number of threads that can be held in the thread pool. When the number of active threads reaches this value, new tasks will be blocked.
  • keepAliveTime: controls the timeout period when a thread is idle, and terminates the thread if it exceeds the timeout period. Generally used for non-core threads, only inThreadPoolExecutorThe methods inallowCoreThreadTimeOut(boolean value)Set totrueIs also applied to the core thread.
  • unit: used to specifykeepAliveTimeThe time unit of the parameter,TimeUnitIs aenumEnumeration types, commonly used are:TimeUnit. HOURS (h),TimeUnit. MINUTES (MINUTES),TimeUnit. SECONDS (in SECONDS)andTimeUnit. MILLISECONDS (ms)And so on.
  • workQueue: task queue of the thread pool, through the thread poolexecute(Runnable command)The method will take the taskRunnableStored in a queue.
  • ThreadFactory: threadFactory, which is an interface for creating new threads for the thread pool.
  • Handler: Reject policy. When a task is added to a thread pool, the thread pool rejects the corresponding policy adopted by the task.

Member variables

/** * task blocking queue */
private final BlockingQueue<Runnable> workQueue; 
/** * Unfair mutex (reentrant lock) */
private final ReentrantLock mainLock = new ReentrantLock();
/** * each Worker corresponds to one thread. There is no talk from the core thread, only the number of core threads */
private final HashSet<Worker> workers = new HashSet<Worker>();
/** * With mainLock, the Condition allows for finer control of multi-thread sleep and wake up */
private final Condition termination = mainLock.newCondition();
/** * The maximum number of threads in the thread pool. * /
private int largestPoolSize;  
/** * Number of completed tasks */
private long completedTaskCount;
/** * ThreadFactory object, used to create threads. * /
private volatile ThreadFactory threadFactory;  
/** * Reject policy handle * now provides CallerRunsPolicy, AbortPolicy, DiscardOldestPolicy, DiscardPolicy */ by default
private volatile RejectedExecutionHandler handler;
/** * The thread pool maintains the idle time allowed by threads (exceeding the number of core threads) */
private volatile long keepAliveTime;
/** * allows the core thread in the thread pool to timeout for destruction */
private volatile boolean allowCoreThreadTimeOut;  
/** * Thread pool maintains a minimum number of threads, even idle */
private volatile int corePoolSize;
/** * The maximum number of threads maintained by the thread pool, after which new submitted tasks need to be blocked */
private volatile int maximumPoolSize;
Copy the code

Creating a thread pool

Executors provides methods for obtaining common thread pools:

  • Cache thread pool

NewCachedThreadPool is a thread pool where new threads can be created as needed, but previously constructed threads will be reused as they become available. For programs that perform many short-term asynchronous tasks, these thread pools can often improve program performance. Calling execute() will reuse the previously constructed thread (if available). If no existing thread is available, a new thread is created and added to the pool. Terminates and removes threads from the cache that have not been used for 60 seconds. Therefore, a thread pool that remains idle for a long time does not use any resources. Note that you can use the ThreadPoolExecutor constructor to create thread pools with similar properties but different details, such as the timeout parameter.

public static ExecutorService newCachedThreadPool(a) {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                  60L, TimeUnit.SECONDS,
                  new SynchronousQueue<Runnable>());
}
Copy the code
  • Single-threaded thread pool

NewSingleThreadExecutor is created as a single thread pool, that is, only one thread in the pool is working, all tasks are executed sequentially, and if the unique thread terminates due to an exception, a new thread is created to replace it. This thread pool ensures that all tasks are executed in the order in which they were submitted.

public static ExecutorService newSingleThreadExecutor(a) {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()));
}
Copy the code
  • Fixed size thread pool

NewFixedThreadPool creates a fixed size thread pool, one thread is created each time a task is submitted until the maximum size of the thread pool is reached. Once the maximum size of the thread pool is reached, a new thread is added to the pool if a thread terminates due to an execution exception.

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                  0L, TimeUnit.MILLISECONDS,
                  new LinkedBlockingQueue<Runnable>(),
                  threadFactory);
}
Copy the code
  • Single-threaded thread pool

NewScheduledThreadPool creates a thread pool of unlimited size that supports the need to execute tasks regularly and periodically.

public static ScheduledExecutorService newScheduledThreadPool(
    int corePoolSize, ThreadFactory threadFactory) {
  return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                   ThreadFactory threadFactory) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
      new DelayedWorkQueue(), threadFactory);
}
Copy the code

We can see that the above method uses DelayedWorkQueue, LinkedBlockingQueue, and SynchronousQueue. This is the blocking queue, one of the core threads.

Task blocking queue

It is generally divided into direct submission queue, bounded task queue, unbounded task queue and priority task queue.

SynchronousQueue

1. Direct submission to queue: SynchronousQueue. SynchronousQueue is a special BlockingQueue that has no capacity, blocks each insert, and waits for another delete to wake up, or for each delete to wait for an insert.

With SynchronousQueue, submitted tasks are not saved and are always submitted for execution immediately. If the number of threads used to execute the task is smaller than maximumPoolSize, try to create a new process, and if maximumPoolSize is set to the maximum, the handler you set is rejected. In this case, you need to have an accurate estimate of the concurrency of your application before setting the appropriate number of maximumPoolSize. Otherwise, it is easy to implement a rejection policy.

ArrayBlockingQueue

2. Bounded task queues: A bounded task queue can be implemented using ArrayBlockingQueue, as shown below:

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
Copy the code

With the ArrayBlockingQueue bounded task queue, if a new task needs to be executed, the thread pool creates a new thread until the number of created threads reaches corePoolSize, and the new task is added to the queue. If the wait queue is full, that is, more than the capacity initialized by ArrayBlockingQueue, threads continue to be created until the maximum number of threads set by maximumPoolSize is reached. If it is larger than maximumPoolSize, a rejection policy is executed. In this case, the maximum number of threads is directly related to the status of the bounded task queue. If the bounded task queue has a large initial capacity or is not overloaded, the number of threads will remain below corePoolSize, whereas when the task queue is full, the maximum number of threads will be maximumPoolSize.

LinkedBlockingQueue

Unbounded task queues: Unbounded task queues can be implemented using LinkedBlockingQueue, as shown below:

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
Copy the code

The maximum number of threads created by the thread pool is the number of threads you set to corePoolSize. In this case, the maximumPoolSize parameter is invalid, even if your task queue has a large number of unexecuted tasks. When the number of threads in the thread pool reaches corePoolSize, it is no longer increased. If new tasks are added later, they are directly queued to wait. When using this task queue mode, you must pay attention to the coordination and control between task submission and processing, otherwise the tasks in the queue will continue to grow due to the failure of timely processing until the resources are exhausted.

PriorityBlockingQueue

** The priority task queue is implemented via PriorityBlockingQueue:

Tasks are rearranged in order of priority and the number of threads in the thread pool is always corePoolSize, which means there is only one thread.

PriorityBlockingQueue is a special unbounded queue. No matter how many tasks are added to it, the thread pool can create no more threads than corePoolSize, except that other queues typically handle tasks on a first-in-first-out basis. PriorityBlockingQueue allows you to customize rules to execute tasks in priority order.

LinkedBlockingQueue can also be bounded, and its default bound is integer.max_value. It also supports setting the queue size at construction time.

Rejection policies

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Copy the code

When Executor has been shutdown, after the executorservice.shutdown () method has been executed, or when Executor uses finite bounds for maximum thread and work queue capacity and is saturated. New tasks submitted using the execute() method will be rejected. In the above circumstances, The execute method will be called the RejectedExecutionHandler RejectedExecutionHandler. RejectedExecution (Java. Lang. Runnable, Java. Util. Concurrent ThreadPoolExecutor) method.

AbortPolicy Default reject policy

Also known as termination strategy, rejected RejectedExecutionException throws runtime. By catching exceptions, the business can receive feedback on the submitted results in time.

public static class AbortPolicy implements RejectedExecutionHandler {
  public AbortPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from "+ e.toString()); }}Copy the code

CallerRunsPolicy

Having autonomous feedback control that allows submitters to perform the submission can slow down the submission of new tasks. In this case, all tasks need to be completed.

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if(! e.isShutdown()) { r.run(); }}}Copy the code

DiscardPolicy

A handler that rejects a task and silently discards the task. Using this strategy, we may not be aware of the abnormal state of the system. Careful ~!

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}Copy the code

DiscardOldestPolicy

Discard the first task in the queue and resubmit the rejected task. Whether to use this policy depends on whether the service needs to be replaced with the old one. Use this policy with caution.

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if(! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}Copy the code

The kernel

Now that I’ve talked about what a thread pool looks like, I’m going to talk about its kernel.

Thread pooling actually builds a producer-consumer model internally, decoupling threads and tasks from each other and not directly related to each other, so as to buffer tasks well and reuse threads.

The operation of thread pool is mainly divided into two parts: task management and thread management.

The task management part acts as a producer, and when a task is submitted, the thread pool determines the subsequent flow of the task:

  1. Directly apply the thread to execute the task;
  2. Buffered to a queue for thread execution;
  3. Reject the task.

The thread management part is the consumer, which is uniformly maintained in the thread pool. According to the task request, threads are allocated. When the thread completes the task, it will continue to acquire new tasks to execute.

Next, we will explain the thread pool operation mechanism in detail in the following three parts:

  1. How thread pools maintain their state.
  2. How thread pools manage tasks.
  3. How thread pools manage threads.

The thread pool life cycle

The running state of the thread pool is not explicitly set by the user, but is maintained internally along with the running of the thread pool.

A variable is used internally to maintain two values: runState and number of threads (workerCount).

In the implementation, thread pools combine the maintenance of two key parameters: runState and workerCount:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
Copy the code

CTL The AtomicInteger type is a field that controls the running state of the thread pool and the number of valid threads in the pool.

It contains two parts of information: the running state of the thread pool (runState) and the number of valid threads in the thread pool (workerCount). The higher three bits hold runState and the lower 29 bits hold workerCount. The two variables do not interfere with each other.

Using a variable to store two values can avoid inconsistencies when making relevant decisions. It is unnecessary to occupy lock resources to maintain the consistency of the two values. As you can also see from reading the thread pool source code, it is often necessary to determine both the running state of the thread pool and the number of threads. Thread pools also provide several methods for the user to obtain the current running state of the thread pool and the number of threads. This is all done in bitwise, and is much faster than basic arithmetic (PS: this usage can be seen in many sources).

The internal encapsulation to get the life cycle state, get the number of threads in the thread pool is calculated as follows:

private static final int COUNT_BITS = Integer.SIZE - 3;/ / 32-3
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// All the lower 29 bits are 1, and all the higher bits are 0

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;/ / 111
private static final int SHUTDOWN   =  0 << COUNT_BITS;/ / 000
private static final int STOP       =  1 << COUNT_BITS;/ / 001
private static final int TIDYING    =  2 << COUNT_BITS;/ / 010
private static final int TERMINATED =  3 << COUNT_BITS;/ / 011

// Packing and unpacking ctl
// Calculate the current running status
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// Count the number of current threads, take 29 bits lower
private static int workerCountOf(int c)  { return c & CAPACITY; }
// Generate CTLS from state and thread count
private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code

ThreadPoolExecutor can run in five states:

Running state State description
RUNNING Can accept newly submitted tasks and also process tasks in a blocking queue
SHUTDOWN Cannot accept newly submitted tasks, but can continue processing tasks in the blocking queue
STOP Cannot accept new tasks, cannot process tasks in the queue and interrupts the task thread being processed
TIDYING All tasks have terminated, and the workCount is 0
TERMINATED The state is entered after the terminated method is executed

Task scheduling mechanism

Task scheduling is the main entry point to the thread pool. When a user submits a task, how the task will be executed is determined by this stage. Understanding this section is equivalent to understanding the core workings of thread pools.

First of all, all tasks are scheduled by the Execute method. This part of the job is to check the running status of the current thread pool, the number of running threads, the execution policy, and determine the next process to be executed, whether to directly apply for thread execution, or buffer to the queue execution, or directly reject the task. Its execution process is as follows:

  1. First detect thread pool running status, if notRUNNING, is directly rejected, and the thread pool must be guaranteed inRUNNINGTo execute the task.
  2. ifworkerCount < corePoolSize, a thread is created and started to execute the newly submitted task.
  3. ifworkerCount >= corePoolSize, and the blocking queue in the thread pool is not full, the task is added to the blocking queue.
  4. ifworkerCount >= corePoolSize && workerCount < maximumPoolSize, and the blocking queue in the thread pool is full, a thread is created and started to execute the newly submitted task.
  5. ifworkerCount >= maximumPoolSize, and the blocking queue in the thread pool is full, the task is processed according to the reject policy. The default processing method is to throw an exception directly.

Next, enter the source code analysis time ~!

Submit a task

//AbstractExecutorService.java
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
//ThreadPoolExecutor.java
public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  int c = ctl.get();/ / for CTL
  // Check whether the current number of core threads is smaller than the size limit of the core threads
  if (workerCountOf(c) < corePoolSize) {
    // If the size limit of the core thread is not reached, add another core thread to perform the task
    if (addWorker(command, true))
      return;
    // If the add fails, refresh the CTL value
    c = ctl.get();
  }
  // Check the running status of the thread pool again and add the task to the wait queue
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();// Refresh the CTL value
    // If the current thread pool load is not running, remove the task you just added
    if (! isRunning(recheck) && remove(command))
      reject(command);// After the task is successfully removed, use the reject policy to process the task.
    else if (workerCountOf(recheck) == 0)// The current number of working threads is 0
      // Thread pool is running, or task removal failed.
      // Add a non-core thread without specifying its running task.
      // After the thread is created, the task is fetched from the wait queue.
      addWorker(null.false);
  } 
  The thread pool is not in the RUNNING state, or the queue is full and a new non-core thread needs to be created to perform the task.
  // If the creation fails, then the non-core thread is full, use the reject policy to process the task;
  else if(! addWorker(command,false))
    reject(command);
}
Copy the code

Add worker threads and execute tasks

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
  Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;// The initialized task can be null
    this.thread = getThreadFactory().newThread(this);// The thread held by the Worker
  }
  /**部分代码省略*/
	public void run(a) {
  	runWorker(this); }}Copy the code

Adding Worker threads and executing tasks: The general idea is to create the Worker and find a matching Runnable for it.

Adding a worker thread

Increase the thread by thread pool of addWorker method, the function of the method is to increase a single thread, this method does not consider the thread pool is to increase the thread, in which stage the allocation of thread strategy is done in the last step, this step only complete increase thread, and make it run, finally returned to the success of the results.

The addWorker method takes two parameters: firstTask and core.

The firstTask parameter is used to specify the firstTask to be executed by the new thread. This parameter can be null.

If the core parameter is true, it will determine whether the number of active threads is less than corePoolSize before adding a thread. If the core parameter is false, it will determine whether the number of active threads is less than maximumPoolSize before adding a thread.

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:// Break and continue break tags
    for (;;) {
        int c = ctl.get();// Get the value of CTL
        int rs = runStateOf(c);// Get the state of the current thread pool;
        /** * if the current thread pool state is not RUNNING * 2, the current thread pool is RUNNING, no new tasks are added, and the wait queue is not empty. In this case, you need to create a thread of execution. * If 1, but not 2, the thread fails to be created, return false. * /
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null&&! workQueue.isEmpty()))return false;
        /** enter the inner loop */
        for (;;) {
            int wc = workerCountOf(c);// Get the number of threads currently executing
            If the number of worker threads is greater than or equal to the counter's maximum threshold, then false is returned. * 2, if the number of worker threads is greater than corePoolSize, create thread failed, return false. If the number of worker threads is greater than maximumPoolSize, false is returned. * /
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // Increase the number of threads by 1 with the CAS operation
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if(runStateOf(c) ! = rs)// The thread state is different, re-execute the outer loop
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
            // If the CAS operation fails due to an increase in the number of worker threads, then the inner loop is repeated}}/** As of now, the number of threads has increased. But the actual thread object has not been created yet. * /
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if(t ! =null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();/ / lock
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                /** * Check whether the thread pool is RUNNING. */ * * * * * * * * * * * * * * * * * * * * * *
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // Check whether the thread is started
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);// Add the change worker thread to the collection
                    int s = workers.size();
                    if (s > largestPoolSize)// Update the maximum number of threads at run time for the thread pool
                        largestPoolSize = s;
                    workerAdded = true;// Indicates that the worker thread was added successfully}}finally {/ / releases the lock
                mainLock.unlock();
            }
            if (workerAdded) {// If the worker thread is added successfully, the thread is started
                t.start();
                workerStarted = true; }}}finally {
        // If the worker thread fails to be added, fail processing is performed
        // Reduce the number of threads that have been added and remove the worker threads added to the collection
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Copy the code

Perform a task

As we saw in the adding worker thread section, the thread is started to execute the task after the successful addition.

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // Unlock to allow interrupt
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // If the current worker thread already has an execution task, or the execution task can be obtained from the wait queue
        //getTask blocks when getting the task
        while(task ! =null|| (task = getTask()) ! =null) {
            w.lock();// Start execution, lock
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // Determine whether the thread needs to be interrupted
            TERMINATED if the thread pool state is STOP\TIDYING\TERMINATED and the current thread is TERMINATED
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                beforeExecute(wt, task);
                Throwable thrown = null;// Exception handling
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally{ afterExecute(task, thrown); }}finally {
                task = null;// The current task of the worker thread is null
                w.completedTasks++;// The number of threads completed by the current worker thread +1
                w.unlock();// The account is unlocked
            }
        }
        completedAbruptly = false;Exit as normal after completing all tasks
    } finally {// Perform the exit operation of the worker threadprocessWorkerExit(w, completedAbruptly); }}Copy the code

The worker thread retrieves the task

private Runnable getTask(a) {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();// Get the value of CTL
        int rs = runStateOf(c);// Get the thread pool state

        // Check if queue empty only if necessary.
        /** * TERMINATED: no task in the waiting queue can be executed * number of worker threads TERMINATED by 1 */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);// Get the number of worker threads
        // Are workers subject to culling?
        // If the core thread is allowed to timeout, or the current number of worker threads is greater than the number of core threads. Indicates that timeout detection is required
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        /** * 1, if the current number of workers is greater than the maximum number of workers allowed by the thread pool (maximumPoolSize can be set dynamically) *, or if timeout control is required and the last time the task was fetched from the wait queue timed out. * 2, if the current thread is not unique, and there is no task to execute in the wait queue. * If these two conditions exist together, it means that the worker thread has timed out and needs to be reclaimed, so the number of threads is -1; * /
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))// The number of threads reduced successfully, otherwise re-execute the loop
                return null;
            continue;
        }
        try {
            // If there is a timeout set, then set the timeout. Otherwise, infinite blocking waits for the task to execute
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if(r ! =null)
                return r;
            timedOut = true;// Get timeout, set flag
        } catch (InterruptedException retry) {
            timedOut = false; }}}Copy the code

Exit of worker thread

The thread pool’s job is to maintain a certain number of thread references based on the current state of the thread pool and prevent these threads from being reclaimed by the JVM. When the thread pool decides which threads need to be reclaimed, it simply removes the references. After Worker is created, it will poll continuously and then acquire tasks for execution. Core threads can wait indefinitely to acquire tasks, while non-core threads have to acquire tasks within a limited time. When the Worker fails to obtain the task, that is, the acquired task is empty, the loop will end and the Worker will actively eliminate its own reference in the thread pool.

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //completedAbruptly is true, indicating that the worker thread execution is abnormal and the number of worker threads is reduced by one
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    // Otherwise, the worker thread is marked as terminated normally, in which case the worker thread has been subtracted by one in the getTask method
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();/ / lock
    try {
        completedTaskCount += w.completedTasks;// Update the number of thread completions in the thread pool
        workers.remove(w);// The worker thread container removes the worker thread
    } finally {
        mainLock.unlock();/ / unlock
    }
    // Try to terminate the thread pool
    tryTerminate();
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {// If the current thread pool is RUNNING\SHUTDOWN
        if(! completedAbruptly) {// If the worker thread terminates normally
            /** * Determine the minimum number of core threads currently required (0 if core threads are allowed to timeout, corePoolSize otherwise) */
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // If the core thread is allowed to time out and the wait queue is not empty, the minimum worker thread is 1, otherwise it is 0.
            if (min == 0&&! workQueue.isEmpty()) min =1;
            // The current number of working threads, whether to meet the first number of core threads
            if (workerCountOf(c) >= min)
                //如果满足那么直接return
                return; // replacement not needed
        }
        // If the end is abnormal, or the current number of threads does not meet the minimum number of core threads, then add a non-core thread
        // There is no difference between core threads and non-core threads
        addWorker(null.false); }}Copy the code

The special

Thread pool monitoring

Monitored by the parameters provided by the thread pool. There are several properties in the thread pool that you can use when monitoring the thread pool

  • getTaskCount: Total number of executed and unexecuted tasks in the thread pool;
  • getCompletedTaskCount: Number of completed tasks in the thread pool. The value must be less than or equal totaskCount;
  • getLargestPoolSize: Maximum number of threads ever created by the thread pool. This data tells you if the thread pool is full, or reachedmaximumPoolSize;
  • getPoolSize: The current number of threads in the thread pool;
  • getActiveCount: Number of threads executing tasks in the current thread pool.

Dynamically resize the thread pool

The JDK allows thread pool users to dynamically set thread pool core policies through instances of ThreadPoolExecutor, using the setCorePoolSize method as an example.

After the run-time thread pool consumer calls this method to set corePoolSize, the thread pool overwrites the original corePoolSize value and takes a different processing strategy based on the comparison between the current value and the original value.

If the current value is smaller than the current number of worker threads, it indicates that there are redundant worker threads. At this time, the idle worker threads will send interrupt requests to realize recycling, and the redundant workers will also be reclaimed in the next IDEL. If the current value is greater than the original value and the task is currently in the queue, the thread pool will create a new worker thread to execute the queued task (PS: IDEL state is the state after the worker thread releases the lock, because it is locked during the run).

public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    // Calculate the increment
    int delta = corePoolSize - this.corePoolSize;
    // Override the existing corePoolSize
    this.corePoolSize = corePoolSize;
    Interrupt worker processing if the current worker thread number is greater than the maximum runnable core thread number in the thread pool
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {// If the increment is greater than 0
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        // Wait queue is not empty, get the minimum value of wait task and increment
        int k = Math.min(delta, workQueue.size());
        // Loop creates a core worker thread to execute tasks in the wait queue
        while (k-- > 0 && addWorker(null.true)) {
            if (workQueue.isEmpty())
                break; }}}private void interruptIdleWorkers(a) {
    interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();/ / lock
    try {
        // Iterate over a collection of worker threads
        for (Worker w : workers) {
            Thread t = w.thread;
            // If the current thread is not interrupted and can acquire the lock, try to interrupt and release the lock
            if(! t.isInterrupted() && w.tryLock()) {try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally{ w.unlock(); }}// Whether to interrupt only one worker thread
            if (onlyOne)
                break; }}finally {/ / releases the lockmainLock.unlock(); }}Copy the code

Gracefully close the thread pool

As you can also see from the ThreadPool Declaration Cycle diagram, executing the ThreadPoolExecutor#shutdown method will change the thread pool state from RUNNING to shutdown. Calling ThreadPoolExecutor#shutdownNow changes the thread pool state from RUNNING to STOP.

shutdown

Stop receiving new tasks, and the original task continues

  1. Stop receiving new Submit tasks
  2. Tasks that have been submitted (both running and queuing) will continue to complete.
  3. Wait until step 2 is complete before you really stop;
public void shutdown(a) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();// Check permissions
        advanceRunState(SHUTDOWN);// Set the thread pool state
        interruptIdleWorkers();// Interrupt the idle thread
        // The hook function is used to clean up some resources
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
Copy the code

The shutdown method locks the system first and then checks the system installation status. The thread pool state is then changed to SHUTDOWN, after which the thread pool no longer accepts new submitted tasks. At this time if you still continue to submit task thread pool, will use the thread pool refused to policy response, ThreadPoolExecutor) will be used by default AbortPolicy, throw RejectedExecutionException anomalies.

The interruptIdleWorkers method, described in the source code for dynamically resizing the thread pool, interrupts only idle threads, not ongoing threads. Idle threads will block on the blocking queue of the thread pool.

shutdownNow

Stop receiving new tasks and stop executing original tasks

  1. withshutdown() Again, stop receiving new onessubmitThe task;
  2. Ignore tasks waiting in the queue.
  3. Try the task that will be executedinterruptInterrupt;
  4. Returns a list of unexecuted tasks.
public List<Runnable> shutdownNow(a) {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();// Check the status
        advanceRunState(STOP);// Change the thread pool state to STOP
        interruptWorkers();// Interrupts all threads, both working and idle
        tasks = drainQueue();// Discards the existing tasks in the work queue
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
private void interruptWorkers(a) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            // If the worker thread has already started, interrupt is called
            w.interruptIfStarted();
    } finally{ mainLock.unlock(); }}private List<Runnable> drainQueue(a) {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    // Remove all available elements from this queue and add them to the given collection.
    q.drainTo(taskList);
    // If the queue is DelayQueue or some other type of queue, poll or drainTo may not be able to remove some elements, they are removed one by one.
    if(! q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0]) {if(q.remove(r)) taskList.add(r); }}return taskList;
}
Copy the code

ShutdownNow attempts to terminate a Thread by calling thread.interrupt (), which is of limited use, The interrupt() method cannot interrupt the current thread without sleep, wait, Condition, timed lock, etc. Therefore, shutdownNow() does not necessarily mean that the thread pool can exit immediately; it may also have to wait for all ongoing tasks to complete before exiting. But most of the time you can quit immediately.

Thread interrupt mechanism: Thread# interrupt simply sets an interrupt flag and does not immediately interrupt a normal thread. If you want the interruption to take effect immediately, you must call Thread.interrupted() within the Thread to determine the interrupted status of the Thread. For a blocked thread, when the call is interrupted, the thread immediately exits the blocking state and throws InterruptedException. You need to handle InterruptedException correctly for blocked threads.

awaitTermination

The shutdown and shutdownNow methods in the thread pool do not actively wait for the end of the task. If the task execution in the thread pool needs to end, you need to call awaitTermination to actively wait for the end of the task.

  • Wait for all submitted tasks (running and queuing) to complete;
  • When the timeout period is up;
  • The thread is interrupted and thrownInterruptedException;

The awaitTermination method returns true if the thread pool task ends, or false if the wait time exceeds the specified time.

// Close the thread pool hook function
private static void shutdown(ExecutorService executorService) {
    // Step 1: Make the new task unsubmitted
    executorService.shutdown();
    try {
        // Step 2: Wait for unfinished tasks to complete
        if(! executorService.awaitTermination(60, TimeUnit.SECONDS)) {
             // Step 3: Cancel the current task
            executorService.shutdownNow();
            // Step 4: Wait for the response to the task cancellation
            if(! executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                System.err.println("Thread pool did not terminate"); }}}catch(InterruptedException ie) {
        // Step 5: Cancel the current task if an exception occurs
        executorService.shutdownNow();
        Thread.currentThread().interrupt(); // Set the thread interrupt status}}Copy the code

other

Feel a lot of content, write not over ah ~!

The thread pool has to say that multi-threaded concurrent operation, synchronous, asynchronous, CSA, AQS, fair lock and non-fair lock, reentrant lock and non-reentrant lock and other concurrent control needs knowledge points.

I use less in my daily work, and I have no systematic knowledge system structure. Cause a lot of learned and forgotten, and then learn and forget.

I hope I can learn and share gradually in the future.

The article here is all about the end, if there are other need to exchange can leave a message oh ~! ~!