State definition

The thread pool ThreadPoolExecutor has two properties: the running state and the number of threads.

The number of threads is pretty straightforward, just how many threads are in the pool; There are five operating states, respectively

RUNNING: Indicates that the thread pool is in the initial state. SHUTDOWN: stops receiving new tasks but continues to process tasks in the queue. STOP: stops receiving new tasks, processing existing tasks, and interrupting ongoing tasks. TIDYING: During cleanup, all tasks are stopped and the number of threads drops to zero. TERMINATED, the hook function TERMINATED () is executed and the thread pool TERMINATED.Copy the code

State of the circulation

The flow diagram for the five states is shown below

The source code comments are written very well, posted here

*   RUNNING:  Accept new tasks and process queued tasks
*   SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed * * The numerical order among these values matters, to allow * ordered comparisons. The runState monotonically increases over * time, but need not hit each state. The transitions are: * * RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() * SHUTDOWN -> TIDYING * When both queue and pool are empty * STOP -> TIDYING * When pool is empty * TIDYING -> TERMINATED * When the terminated() hook method has completedCopy the code

Code implementation

The number of threads is a non-negative integer, which can be represented by an int.

There are only 5 running states. To save space, three bits can be used, because three bits can represent up to eight states.

These two attributes of the thread pool need to be accessed at the same time, that is, atomic operations need to be guaranteed, and if they are represented separately as above, they may need to be modified with a lock.

Is there a more elegant implementation? Let’s look at Doug Lea’s implementation

/** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indicating the effective number of threads * runState, indicating whether running, shutting down etc */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Specifies the number of threads
private static final int COUNT_BITS = Integer.SIZE - 3;
// Maximum number of threads supported by the thread pool (536870911, sufficient)
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits.
// It is important to note that the number corresponding to the running state increases successively
// 1110 0000 0000 0000 0000 0000 0000 0000 0000 (-536870912)
private static final int RUNNING    = -1 << COUNT_BITS;
0000 0000 0000 0000 0000 0000 0000 (0)
private static final int SHUTDOWN   =  0 << COUNT_BITS;
0010 0000 0000 0000 0000 0000 0000 0000 0000 (536870912)
private static final int STOP       =  1 << COUNT_BITS;
// 0100 0000 0000 0000 0000 0000 0000 0000 0000 (1073741824)
private static final int TIDYING    =  2 << COUNT_BITS;
0110 0000 0000 0000 0000 0000 0000 0000 0000 (1610612736)
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking CTL, all atomic operations
// Get the running status
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// Get the number of threads
private static int workerCountOf(int c)  { return c & CAPACITY; }
// Assemble the running state and thread count into a CTL
private static int ctlOf(int rs, int wc) { return rs | wc; }

/ * * because running state exists int three level, and in turn increase, therefore can skillfully through size than the relations between the two state * /
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
// Check whether the thread is running
// why use < SHUTDOWN instead of = RUNNING, because CTL has a lower number of threads than 29 bits
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
Copy the code

Doug Lea makes clever use of bit operations, using an AtomicInteger to solve the atomicity problem of manipulating two attributes; By placing the state in the third digit, you can easily compare the state of the thread pool by size.

Doug Lea was also kind enough to explain why int was used instead of long

* In order to pack them into one int, we limit workerCount to
* (2^29) -1 (about 500 million) threads rather than (2^31)- 1(2
* billion) otherwise representable. If this is ever an issue in
* the future, the variable can be changed to be an AtomicLong,
* and the shift/mask constants below adjusted. But until the need
* arises, this code is a bit faster and simpler using an int.
Copy the code

If we reach 500 million threads, the optical thread stack will occupy at least 500 terabytes of memory. It is hard to imagine that we will ever reach this level of memory.

If AtomicInteger is not enough, we can switch to AtomicLong. But AtomicInteger is faster and simpler. It’s not overly designed

State of operation

State judgment

public boolean isShutdown(a) {
    return ! isRunning(ctl.get());
}

public boolean isTerminating(a) {
    int c = ctl.get();
    return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}

public boolean isTerminated(a) {
    return runStateAtLeast(ctl.get(), TERMINATED);
}
Copy the code

With the state definition explained above, it makes sense to look at the thread pool state determination method here.

IsShutdown: If the thread pool is not RUNNING, it is closed.

IsTerminating: terminating, not RUNNING, not terminating

IsTerminated: TERMINATED. The three digits must be 011. Therefore, the value of CTL is greater than or equal to TERMINATED.

Add the worker thread addWorker

When a new task arrives, a worker thread is added if the core thread count is not full or the blocking queue is full, that is, the addWorker method is called. The addWorker method takes two inputs. The first is the Runnable task, the latest incoming task, but this parameter may be null. The second parameter indicates whether a core thread is being added.

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // step 1
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null&&! workQueue.isEmpty()))return false;

        for (;;) {
            int wc = workerCountOf(c);
            / / step 2.1
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            / / step 2.2
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            / / step 2.3
            if(runStateOf(c) ! = rs)continue retry;
            / / step 2.4
            // else CAS failed due to workerCount change; retry inner loop}}// step 3
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if(t ! =null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true; }}finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true; }}}finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Copy the code

Next, let’s take a look at the specific process in turn.

The first is an infinite loop with a Retry label, which in Java is used in multiple loops for easy control of application flow. Here it is a double loop.

Step 1. In the outer loop, the state of the thread pool is judged.

if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null&&! workQueue.isEmpty()))Copy the code

This is a very complicated condition to write, Doug Lea’s style is to write a lot of processing logic into one big decision logic, but it also makes the code very difficult to understand, so we’re going to do a downconversion of this decision condition according to De Morgan’s law, which is

// Convert according to de Morgan's law
if(rs >= SHUTDOWN && (rs ! = SHUTDOWN || firstTask ! =null || workQueue.isEmpty()))
// Further simplification, removing the parentheses, becomes an or of the three conditions, that is, satisfying any one of the three conditions will add failure
if((rs >= SHUTDOWN && rs ! = SHUTDOWN) || (rs >= SHUTDOWN && firstTask ! =null) ||
   (rs >= SHUTDOWN && workQueue.isEmpty())) return false;
Copy the code

Now that we’ve simplified this a little bit, let’s look at the three subconditions separately

Condition 1, (rs >= SHUTDOWN && RS! The thread pool is in STOP, TIDYING, or TERMINATED state. The thread pool TERMINATED is no longer processing tasks.

Rs >= SHUTDOWN &&firstTask! If firstTask is not null, a new task is being added, let alone rs > SHUTDOWN (see condition 1).

If (rs >= SHUTDOWN &&workqueue.isempty ()), the task queue isEmpty. If (rs >= SHUTDOWN &&workqueue.isempty ()), the task queue isEmpty. If (rs >= SHUTDOWN &&workqueue.isempty ()), the task queue isEmpty.

Step 2.1, after passing the state judgment in the outer loop, you enter the inner loop, which has a bit of a layer upon layer smell. If core is true, it will judge whether the number of core threads has reached the upper limit. If core is false, it will judge whether the maximum number of threads has reached the upper limit. As long as the upper limit is reached, no more threads can be added.

Step 2.2, add one to CTL through CAS, that is, add one to the number of threads. Once the CAS returns successfully, you can jump out of the Retry double loop and proceed to the next level (step 3).

Step 2.3, CAS fails, only one possibility is that other threads also modify the CTL value at the same time. We know that the CTL contains two attributes: the running state and the number of threads, meaning that both of these attributes could have changed. So determine if the state of the thread pool has changed, and if so, retry the outer loop back to the first level

Step 2.4, although there is no corresponding code for this Step, it is meaningful, that is, the number of threads in CTL changes. There is no need to go back to the first level, just start over from the inner loop.

Step 3, the number of CTL threads has increased by 1, so we can’t just shout and do nothing, so we start to really add worker. This step is relatively simple: create the Worker object and add it to the Workers collection. This step is locked because we are rechecking the CTL state and adding elements to the workers HashSet, which we know is not thread-safe.

/** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */
private final HashSet<Worker> workers = new HashSet<Worker>();
Copy the code

Working thread Worker

What does a worker look like after so many challenges

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
    /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in. Null if factory fails. */
    final Thread thread;
    /** Initial task to run. Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker */
    public void run(a) {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively(a) {
        returngetState() ! =0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0.1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock(a)        { acquire(1); }
    public boolean tryLock(a)  { return tryAcquire(1); }
    public void unlock(a)      { release(1); }
    public boolean isLocked(a) { return isHeldExclusively(); }

    void interruptIfStarted(a) {
        Thread t;
        if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
Copy the code

The Worker class implements the Runnable interface and inherits the ABSTRACT AQS class, so it also maintains a synchronization state state with three values:

-1: initial state, assigned in Worker constructor, i.e., just new and not yet run 0: unlocked state, as can be seen later, in non-executing task period 1: locked state, locked state indicates that Worker is working (performing the submitted task)Copy the code

In the tryAcquire method implementation, success is returned only if state is 0, indicating that it is not reentrant.

If the worker starts, it interrupts. How does the worker know if it has started? If there is no start, then state is the initial value -1. Therefore, one judgment in the method is state>=0. If it is non-negative, it indicates that the worker has been started.

After looking at the Worker class, I find that there is no variable about whether it is a core thread. How can I distinguish core threads from non-core threads? In Doug Lea’s thread pool implementation, there is no difference between worker threads. The core thread count and maximum thread count are only used to control some of the thread pool processes, such as addWorker above and getTask below.

The Worker class holds a thread variable, which is created like this:

this.thread = getThreadFactory().newThread(this);
Copy the code

So when the thread starts, it executes the Worker class’s run method, which delegates directly to ThreadPoolExecutor’s runWorker method.

runWorker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // step 1
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // step 2
        while(task ! =null|| (task = getTask()) ! =null) {
            // step 3
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // step 4
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally{ afterExecute(task, thrown); }}finally {
                task = null;
                // step 5
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // step 6processWorkerExit(w, completedAbruptly); }}Copy the code

The runWorker method is longer, and the key six steps are highlighted here.

Unlock Step 1, now that you have run, set the state from -1 to 0 so that the lock cannot be unlocked.

Step 2: Take the task. The task is either the first task passed in when the worker is created, or it is blocked to grab the task in the queue.

Step 3: Lock the door when you get to work. Tell someone you’re busy. It’s like locking the door when you go to the bathroom.

Do your work, not just sit in the manger

Once you have completed a task successfully, add 1 to the drumstick and unlock it to let others know that you are free.

Step 6: The while loop exits, indicating that no task has been fetched, either the thread pool is closed, or the non-core thread has not fetched the task within the timeout period. In either case, the worker must be destroyed.

getTask

One of the core ideas of thread pooling to ensure thread reuse is that threads are constantly fetching and executing tasks to keep threads alive. Now, let’s look at how the worker thread retrieves the task.

private Runnable getTask(a) {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if(r ! =null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; }}}Copy the code

Here is a brief description of the idea of this method. If the thread pool is closed, the number of threads is subtracted and null is returned, allowing the worker thread to exit. Then determine whether the timeout period is needed, if the timeout period is set and the task is not fetched by the timeout period, the next loop will be entered, and one thread will exit in the next loop, because the maximum idle lifetime has been exceeded. So how do you guarantee that only one thread exits in a concurrent situation

if (compareAndDecrementWorkerCount(c)) return null;
Copy the code

If the number of threads is reduced by one in CAS mode, only the threads whose CAS succeeds will exit, and the threads whose CAS fails will enter the next loop. The timed variable may no longer be true the next time the number of threads decreases by one. In this way, the number of threads is maintained dynamically without requiring each worker to maintain a core identity, which simplifies the implementation and is really wonderful ~

processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if(! completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0&&! workQueue.isEmpty()) min =1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null.false); }}Copy the code

When the worker exits, the following things are done:

  1. If completedAbruptly is true, that is, worker exits abnormally, the number of threads is reduced by one. Because of the normal exit, the getTask method has been reduced by one.
  2. Add the number of completed tasks of the current worker to the number of completed tasks of the thread pool for statistical information; Remove worker from workers set;
  3. If a worker exits, there may be a thread pool problem, so try shutting down the thread pool (more on that below).
  4. As long as the thread pool is not stopped, either the worker exits abnormally or exits normally but there are not enough workers to handle the existing tasks. In both cases, new workers need to be added to make up the bit so as not to affect the task processing.

Closing the thread pool

There are two ways to disable a thread pool

  1. Shutdown: Change the thread pool state to shutdown and interrupt all idle threads. You can see that the execution of existing tasks has not been stopped.
  2. ShutdownNow: Changes the thread pool state to STOP, interrupts all threads, stops processing any more tasks, and returns unprocessed tasks as a package.

shutdown

Let’s start with the shutdown method

public void shutdown(a) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break; }}private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if(! t.isInterrupted() && w.tryLock()) {try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally{ w.unlock(); }}if (onlyOne)
                break; }}finally{ mainLock.unlock(); }}final void tryTerminate(a) {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if(workerCountOf(c) ! =0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return; }}finally {
            mainLock.unlock();
        }
        // else retry on failed CAS}}Copy the code

Shutdown calls the following methods in turn

  1. AdvanceRunState, again via an infinite loop +CAS to modify the state.
  2. InterruptIdleWorkers, which uses tryLock to determine whether the thread is idle and interrupts if it is idle.
  3. OnShutdown, the hook function, is left to subclasses for extension.
  4. TryTerminate, which is a little more complicated, is explained below.

If the thread pool is RUNNING or SHUTDOWN, do not close it. If the thread pool is RUNNING or SHUTDOWN, do not close it. If it is TIDYING or TERMINATED, there is no need to stop again and TERMINATED.

If the number of threads is not zero, an idle worker is interrupted and returned. The main scenario is that the shutdown method only interrupts the idle thread, not if the thread is executing a task, but when the thread completes all the tasks it will eventually become idle and execute the processWorkerExit method. As you saw above, the processWorkerExit method calls the tryTerminate method, which propagates until all idle threads are terminated.

The state CAS is TIDYING, and the hook function terminated when the CAS is successful. The state is terminated after the method is executed, which is consistent with the state transition diagram at the beginning. If the CAS fails, the next loop continues with various state judgments.

When the CTL state is TERMINATED, TERMINATED. SignalAll () wakes up the threads waiting on the termination queue. What is the waiting thread, the one calling the awaitTermination method from the thread pool

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false; nanos = termination.awaitNanos(nanos); }}finally{ mainLock.unlock(); }}Copy the code

shutdownNow

public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); }}Copy the code

Now that you understand the shutdown method, shutdownNow is much easier. There are three main differences:

  1. Change the thread pool state to STOP.
  2. All worker threads are interrupted using the interruptIfStarted method, whether idle or not.
  3. Retrieves all outstanding tasks from the blocking queue and returns.

conclusion

  1. Thread pools have five states, but do not necessarily experience all of them.
  2. The Worker class inherits AQS and determines whether it is idle by locking status.
  3. There is no distinction between core and non-core threads, just two numbers that control the different processing logic of the thread pool.
  4. After looking at the thread pool source code, we will find that multi-threaded concurrent processing is very complex, there are a variety of situations to consider, the source code often uses the infinite loop + state judgment +CAS to deal with. Concurrency guru Doug Lea provides us with the tried and true J.U.C. concurrency toolkit, so we don’t normally build our own wheels.

The resources

  1. Implementation principle of Java Thread Pool and its practice in Meituan business – Meituan Technical team
  2. Thoroughly understand the principles of Java thread Pool – Cloud + Community – Tencent Cloud
  3. Java Advanced Multithreading (40) — J.U.C. Executors Framework: ThreadPoolExecutor – SegmentFault