The thread pool ThreadPoolExecutor is the source of this article

Common thread pool types

Java Creates four thread pools by using the Executors static method.

  • NewSingleThreadExecutor creates a singleton thread pool to ensure that tasks are executed in order. Out-of-thread tasks will wait in the task and all tasks will be executed in FIFO queue order.
  • NewFixedThreadPool creates a fixed size thread group, specifies the number of worker threads, and queues tasks for execution when they exceed the specified number of workers.
  • NewCachedThreadPool Creates a pool of cacheable threads. This pool has 0 active threads and a maximum of integer.max. When new tasks are added to the pool, new threads can be created immediately to execute the task. When idle threads exceed 60 seconds, the system reclaims them.
  • NewScheduleThreadPool creates a thread pool of fixed length and supports timed and periodic task execution, similar to timers.
  • NewWorkStealingPool will create a thread pool containing enough threads to maintain the corresponding level of parallelism. It will make the multi-core CPU not idle by the way of work stealing, and there will always be a live thread for the CPU to run.

NewSingleThreadExecutor, newFixedThreadPool, and newCachedThreadPool all internally encapsulate ThreadPoolExecutor to generate thread pools. Let’s look at the ThreadPoolExecutor class in detail.

ThreadPoolExecutor constructor

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

Copy the code
  • CorePoolSize Specifies the number of core threads that will not be collected.
  • MaximumPoolSize Maximum number of threads that can be applied to a thread pool
  • WorkQueue Synchronizes tasks in a queue
  • KeepAliveTime When the number of threads is greater than the core, this is the maximum time that extra idle threads can wait for a new task before terminating.
  • ThreadFactory specifies the threadFactory
  • Handler When the number of tasks exceeds the queue capacity, this situation needs to be handled
    • AbortPolicy: Throws an exception directly. This is the default policy.
    • CallerRunsPolicy: Executes the task using the caller’s thread;
    • DiscardOldestPolicy: Discards the most advanced task in the blocking queue and executes the current task.
    • DiscardPolicy: Directly discards the task.

Thread pool query

We already know the basic core construction parameters for creating a thread pool, but there are a lot of questions we don’t understand. How do you know if each thread in the thread pool is working or idle? Is there a special thread to mark idle thread activity? How threads share threads. Read the code with these questions in mind.

State of a thread in a thread pool

The following are comments from the ThreadPoolExecutor code. The thread state in the thread pool is maintained by an AtomicInteger CTL, which is an AtomicInteger that wraps two domain meanings.

  • WorkerCount Specifies the number of valid threads. The total number of threads is 2 ^ 29-1. The number of threads started does not include the number of threads stopped, which may be temporarily different from the actual number of active threads. For example, when ThreadFactory fails to create a thread and the thread is exiting, the count of threads still includes the exiting thread.

  • RunState Thread status

    • RUNNINGReceiving new tasks and processing tasks in the queue
    • SHUTDOWNDon’t take on new tasks, but can handle them
    • STOPCannot accept new tasks, cannot process tasks in the queue, but can interrupt ongoing tasks.
    • TIDYINGAll tasks terminate, workerCount is 0, and the thread is transitioned to TIDYING state and is about to run terminated() hook method
    • TERMINATEDTerminated () hook method is complete

Each of these states has a transition order

  • RUNNING -> SHUTDOWNPerformed shutdown ()
  • (RUNNING or SHUTDOWN) -> STOPPerform shutdownNow ()
  • SHUTDOWN -> TIDYINGWhen both the task queue and thread pool are empty
  • STOP -> TIDYINGThread pools are empty
  • TIDYING -> TERMINATEDWhen the terminated() hook method is finished executing these state-specific code implementations
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

Copy the code

Execute method parsing

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /* * Handle 3 steps * 1. If the number of running threads is smaller than the number of core threads, directly create a new thread to perform the task * call the addWorker method to automatically check the thread status and number to avoid error alerts when adding threads when they cannot be added * * 2. If the task can be successfully enqueued, we still need to double check whether a thread is added * because there was a thread death last time we checked or the thread pool was closing when we entered the method * so we recheck the state and roll back the queue if stopped, or start a new thread if not. * * 3. Failed to add task, try to create a thread, if failed, If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // The number of current threads is smaller than the number of core threads
            if (addWorker(command, true)) // Create a thread
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // The thread pool is RUNNING and the task was added successfully
            int recheck = ctl.get(); // Second check
            if (! isRunning(recheck) && remove(command)) // Determine the thread pool status
                reject(command);
            else if (workerCountOf(recheck) == 0)  // The number of thread pools is 0
                addWorker(null.false);
        }
        else if(! addWorker(command,false)) // The thread pool state is not RUNNING or the queue is full. It is used to start non-core thread pull tasks
            reject(command);
    }
 
Copy the code

Next we go to the core method of addWorker to create threads

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry: // Retry flag, first seen at 😓
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN) / / SHUTDOWN at least
                && (runStateAtLeast(c, STOP) // At least STOP is illegal|| firstTask ! =null
                    || workQueue.isEmpty()))
                return false;

            for (;;) { // The status is valid
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) // There is no need to create threads that are larger than the core thread or the maximum thread, and the mask prevents the maximum thread count from exceeding 2 ^ 29-1 details
                    return false;
                if (compareAndIncrementWorkerCount(c))  // CTL auto-increment succeeds, and jumps out of the entire loop
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN)) // State is at least SHUTDOWN to re-enter the loop
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); // Create a thread
            final Thread t = w.thread;
            if(t ! =null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Re-check thread pool status during lock
                    int c = ctl.get();

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.isAlive()) // The thread that was just created has already started executing tasks, this is a problem
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true; }}finally {
                    mainLock.unlock();
                }
                if (workerAdded) { 
                    t.start(); // Start the task
                    workerStarted = true; }}}finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
Copy the code

The main process of addWorker() is to check whether the thread pool status is valid, create a new thread, join workers, and call start() to execute the task. Let’s look at the Worker class

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in. Null if factory fails. */
        final Thread thread;
        /** Initial task to run. Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        // TODO: switch to AbstractQueuedLongSynchronizer and move
        // completedTasks into the lock word.

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        public void run(a) {
            runWorker(this); }}Copy the code

Worker is actually a Runnable wrapper class, but it adds the function of task interruption. Its main task is to maintain the interrupted state. Inherits AQS, it can simplify the acquisition and release of the lock around each task execution, and prevent the interruption aimed at waking up the Worker thread waiting for the task. Learn how workers perform tasks by entering runWorker()

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask; // Retrieve the task
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while(task ! =null|| (task = getTask()) ! =null) { // If the current worker has no tasks, get the tasks from the queue until the queue is empty
                w.lock();
                // Handle thread interrupt mechanisms
                if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                    beforeExecute(wt, task); // Preprocessing, similar to interceptor mechanism, requires subclasses to implement
                    try {
                        task.run(); // Call the task method
                        afterExecute(task, null); // post-processing
                    } catch (Throwable ex) {
                        afterExecute(task, ex); // Exception handling
                        throwex; }}finally {
                    task = null;
                    w.completedTasks++;  // Number of tasks performed + 1
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly); // When the thread life cycle is complete, the collection is performed}}Copy the code

In combination with the Worker constructor, the Worker locks itself during initialization, preventing the thread from being interrupted before the task has even started. The starting thread executes the runWorker method, fetches the task, releases the lock, and if the task in the Worker is empty, pulls the task from the queue. Processing thread interrupt, mainly based on the state of the first line has at least STOP state, and then clear the interrupt state, in the judgment of the thread has no interrupt signal, and then send interrupt signal. Threads should interrupt when the thread pool is already in the process of stopping, but must double-check to prevent competing relay signals during the closing process. Call the run method to perform the task. The reason why a task needs to be locked is mainly in the process of executing the task. The thread can be interrupted only when the task starts to close because the Worker does not support reentrant lock. This is where we finally see thread sharing, where threads continually fetch tasks from the queue and then call the run method to execute the task. When the thread exits the fetch queue loop, the thread life cycle ends.

geTask()

    private Runnable getTask(a) {
        boolean timedOut = false; // Whether the last pull timed out

        for (;;) {
            int c = ctl.get();

            // Check that the thread pool state is SHUTDOWN and no new tasks are accepted
            // The task queue is empty
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                decrementWorkerCount(); // Number of core threads workercount-1
                return null;
            }

            int wc = workerCountOf(c); 

            AllowCoreThreadTimeOut Whether to reclaim the number of core threads when idle The default is false
           // The number of current threads is greater than the number of core threads
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // If wc is greater than the maximum number of threads, process the number of threads first
           // If a thread does not acquire a task during its lifetime, it needs to reclaim the number of threads in the previous loop
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) { //wc should not be 0 and the task queue is empty
                if (compareAndDecrementWorkerCount(c)) // thread-1 succeeded, no other threads contested, no new tasks added
                    return null;
                continue;
            }

            try {
                Runnable r = timed ? 
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // Return null after timeout
                    workQueue.take();
                if(r ! =null)
                    return r;
                timedOut = true; 
            } catch (InterruptedException retry) { // Abort the task execution
                timedOut = false; }}}Copy the code

Poll () + timeout: poll() + timeout: poll() + timeout: poll() + timeout: poll() + timeout: poll() + timeout: poll()

Enter processWorkerExit ()

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If the task is not executed, the core thread is -1
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w); // Remove the current worker and the thread will be reclaimed
        } finally {
            mainLock.unlock();
        }

        tryTerminate(); // Check the state of the thread pool, whether the thread pool is closed signal

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) { // The thread pool can still execute or accept tasks when it is in the RUNNABLE or SHUTDOWN state
            if(! completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0&&! workQueue.isEmpty())// Threads in the thread pool have been reclaimed and the task has not been completed
                    min = 1;
                if (workerCountOf(c) >= min) // There are more threads in the thread pool than in the core thread pool
                    return; // replacement not needed
            }
            addWorker(null.false); // Create a new thread to process the task}}Copy the code

Enter tryTerminate ()

When the thread pool is SHUTDOWN and the task queue is empty, or the core queue is empty in the STOP state, the thread pool transitions to TIDYING and transmits the closed pool signal.

    final void tryTerminate(a) {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) || // The RUNNING state does not need to be handled
                runStateAtLeast(c, TIDYING) || // It has entered TIDYING, and does not deal with it
                (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) // The task queue is not empty and does not meet the condition
                return;
            if(workerCountOf(c) ! =0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE); Try to interrupt a workerreturn;
            }
        
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock(); // Lock modifies the thread pool state
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // Enter the TIDYING state
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));  // the state of terminated() is terminated
                        termination.signalAll();
                    }
                    return; }}finally {
                mainLock.unlock();
            }
            // else retry on failed CAS}}Copy the code

shutdown()

Let’s look at thread pool termination methods

   public void shutdown(a) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN); // Change the state of the thread pool to SHUTDOWN
            interruptIdleWorkers(); // Interrupt the thread
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
Copy the code

How does interruptIdleWorkers() interrupt the thread

    private void interruptIdleWorkers(a) {
        interruptIdleWorkers(false);
    }

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock(); // Workers are an unsafe set
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if(! t.isInterrupted() && w.tryLock()) {// If there is no interrupt and the lock can be acquired, the thread pool is not executing the task and the Worker does not support reentry
                    try {
                        t.interrupt(); 
                    } catch (SecurityException ignore) {
                    } finally{ w.unlock(); }}if (onlyOne)
                    break; }}finally{ mainLock.unlock(); }}Copy the code

The solution is simple: change the thread pool state to not accept new tasks, and pull out idle threads from Works to signal an interrupt.

shutdownNow

    public List<Runnable> shutdownNow(a) {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();  // Delete the task from the queue and return to Tasks
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

Copy the code

ShutdownNow removes all tasks from the queue that have not yet been processed, terminating the thread pool life cycle by calling tryTerminate() directly.

conclusion

Now we know how thread creation, thread sharing, idle collection, and thread pool life cycle work inside a thread pool. Execute () is called to submit the task. If the current thread pool is smaller than the number of core threads, addWorker() is called to create a new thread pool to execute the task, otherwise it is directly queued. In addWorker(), a thread is started to continually pull tasks from the queue and is not recycled until the queue is empty or no work has been executed for a lifetime. When setting the thread pool, pay attention to some details. The number of core threads is set according to the task situation. In most cases, it is the number of core threads that processes tasks. So when setting the maximum number of threads, be careful to set the queue size. If Integer.MAX, the number of threads will never exceed the number of core threads. Only when the task exceeds the queue capacity + the maximum number of threads, the saturation policy will be triggered. Select an appropriate processing method according to the task requirements.