Benefits of thread pools

  • Reuse threads
  • Control the number of resources

Thread pool core configuration

  • CorePoolSize Number of core threads
  • MaximumPoolSize Maximum number of threads. The number of core threads increases when the blocking queue is full
  • KeepAliveTime Indicates the survival of a temporary thread
  • Handler rejection policy
  • ThreadFactory specifies the threadFactory
  • WorkQueue Blocking queue

Thread pool control properties

Following the usual juC routine, it is common to use an int value to represent two states. For example, the read and write lock ReentrantReadWriteLock is represented by state in AQS. The first 16 bits indicate the shared read lock, and the last 16 bits indicate the exclusive write lock. The thread pool uses CTL to represent the read lock

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
Copy the code

Int specifies the state of the thread pool by 3 bits, and the number of threads by 29 bits

private static final int RUNNING    = -1 << COUNT_BITS; 
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code

Two methods are provided to obtain each value

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
Copy the code

Thread pools perform tasks

Use the execute method to submit tasks. At this point, the code needs to be divided into three scenarios

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if(workerCountOf(c) < corePoolSize) {//if (addWorker(command.true))
                return; c = ctl.get(); } // // Join the task queue in case twoif (isRunning(c) && workQueue.offer(command)) {
        
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        
        else if(! addWorker(command.false) // Reject (command);
    }

Copy the code

Execute Checks whether the number of active threads is smaller than the number of core threads. If not, apply for temporary threads to execute tasks. If temporary threads cannot be requested, the denial policy is started and threads in the thread pool are created lazily

If the number of active threads is smaller than the number of core threads, create a new thread and add it to the thread pool

 if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command.true))
        return;
    c = ctl.get();
}
Copy the code

AddWorker creates a new thread to work on. The thread pool wraps the thread as a worker. Command is the added task.

Case 2: When the number of core threads is full, join the queue. After joining the queue, check the running status of the thread pool again to determine whether the current number of threads is 0. If it is 0, add more threads.

if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
Copy the code

Case 3: The number of core threads is also insufficient, and enqueueing fails. Then we need to increase the number of additional threads that are less than the maximum number of threads we set. If yes, the policy is rejected

else if(! addWorker(command.false))
            reject(command);
Copy the code

The thread joins the pool

The key is how do you create a thread in AddWorker

  • Create a thread
  • Check whether the threshold is exceeded
  • Counting threads
  • A container for the number of threads
Private Boolean addWorker(Runnable firstTask, Boolean core) {retry:for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if(runStateOf(c) ! = rs)continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if(t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheckwhile holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
Copy the code

Here you see the use of syntax like goto in Java. The first step is to change the value of CTL, according to the above mentioned 3+29, increase the number of threads in the next 29 bits, through cas+ spin modification. It can be found that in JUC, as long as there is cas operation, generally combined with spin, to ensure thread safety.

retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if(runStateOf(c) ! = rs)continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
Copy the code

After cas changes the number of threads, create a new worker thread, lock it, put it into the container (hashset), start the thread to release the lock, handle the exception in finally, roll back CTL, delete wooker in hashset, and terminate the thread

boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if(t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheckwhile holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;

Copy the code

The reason for locking is that the HashSet is not thread safe, and the largestPoolSize value needs to be changed here. In CHM 1.7 and 1.8, synchronized and CAS were used to improve performance. ReentrantLock is still being used here, and I wonder if it will change to another way to control in the future.

The key point is that a HashSet workers is a container that holds wrokers, threads

How workers work

Worker is a thread in a thread pool. How does it work

Runnable Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; Thread = getThreadFactory().newThread(this); // Get a thread from the thread factory}Copy the code

Run method

public void run() { runWorker(this); } final void runWorker(Worker w) {Thread wt = thread.currentThread (); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // Allow to be interrupted Boolean completedAbruptly =true; // cas gets the task until no task is passed in w and executes it, or gets the task in the queue if no task is passed in Wwhile(task ! = null || (task = getTask()) ! = null) { w.lock(); // If pool is stopping, ensure thread is interrupted; //if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly =false; } finally { processWorkerExit(w, completedAbruptly); }}Copy the code

In the w.u nlock (); To lock means that this can be interrupted. Before executing, it is important to determine the state of the thread pool. In the stopped state, the running thread should also be interrupted. When the run is complete, call processWorkerExit to exit a thread and see what was done when it was pushed out

Private void processWorkerExit(Worker w, Boolean completedAbruptly) {// This branch is run when an exception is sent, reducing the number of threads in the CTLif (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// Count the number of threads completedTaskCount += w.completedTasks; // Remove worker worker. Remove (w); } finally { mainLock.unlock(); } tryTerminate(); Int c = ctl.get(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (! completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); }}Copy the code

So far, I’ve seen from the source that threads are sent when joining and exiting, but the core thread count should be kept in the thread pool, where it is represented. GetTask where the task was acquired in the first place

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                returnnull; } int wc = workerCountOf(c); / / waiting for task whether to allow the timeout Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if(r ! = null)return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false; }}Copy the code

Poll passes in a time parameter and waits for its duration. The poll thread will run InterruptedException and cas will be thrown if the poll thread is running. So when a task comes in, there’s a thread in the pool that survived the last time. If the number of active threads is greater than the number of core threads, the time to wait for keepAliveTime is null if there are no tasks in the cache queue. This means that the while loop in the runWorker() method is exited. The corresponding thread will be destroyed, that is, there will be one less thread in the thread pool. So as long as there are more threads in the thread pool than there are core threads, these extra threads are destroyed one by one.

If the current number of active threads is less than or equal to the number of core threads, it will also fetch tasks from the cache queue, but when there are no tasks in the cache queue, it will enter the blocking state until the task can be fetched. Therefore, the thread is blocked and will not be destroyed because there are no tasks in the cache queue. This ensures that N threads in the pool are alive and ready to handle tasks, thus achieving reuse.