preface

There are two paragraphs in the Alibaba Java development manual, as shown in the figure below

You can see the two points mentioned. The first requirement is that the creation Thread cannot be displayed, that is, the form of new Thread, and the Thread pool should be used to manage the Thread. The second requirement is that the official four Thread pools are not allowed, but the Thread pool should be created by oneself, so as to better understand the allowed rules of Thread pool

This article is based on JDK1.8 code, the thread pool source code for parsing, with you can better understand the concept of thread pool and its running rules, if there are errors, please point out

One, ThreadPoolExecutor source code

1. Constructor

Start with the constructor:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
Copy the code
  • CorePoolSize: The number of core threads that are not reclaimed by default, but if allowCoreTimeOut is set to true, core threads will also be reclaimed if they are idle.
  • MaximumPoolSize: Maximum number of threads, maximum CAPACITY that a thread pool can hold. The upper limit is limited by CAPACITY (2^29-1) (as you can see in the subsequent code).
  • KeepAliveTime: The time limit for an idle thread to be reclaimed, i.e. the lifetime of the idle thread
  • Unit: Indicates the unit of keepAliveTime
  • WorkQueue: a queue for storing tasks
  • ThreadFactory: Factory class that creates threads
  • Handler Notifies the caller when a task fails, using handler to represent a rejection policy

Some friends may not be very clear, for example, a company, the core thread is the internal core staff on behalf of the company, the largest number of threads is the maximum number of employees, may include the staff, because there are some pilot or simple project, need some outsourcing staff to do, which is core thread, so when the project completed or failed, In order to save on staffing costs, companies are laying off non-core employees, or the lifetime of idle threads. If everyone in the core is busy, but the demand comes in waves, then the task scheduling, the task queue, will continue to demand when the task queue is full? Sorry, do not accept, direct reject, this is the handler corresponding to the rejection strategy, can not be very appropriate examples, but mainly to help you understand the general meaning.

2. Thread pool status

When you open the source class, you see the following variables

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code

AtomicInteger is an atomic operation class that is thread safe, with the lowest 29 bits representing the maximum number of threads and the highest 3 bits representing the five thread pool states. It maintains two parameters, workCount and runState. WorkCount indicates the number of valid threads, and runState indicates the running state of the thread pool.

  • RUNNING: RUNNING state. New tasks can be accepted and processed
  • New tasks will not be accepted, but existing tasks will be processed
  • STOP: Indicates the STOP state. New tasks are not accepted, and queue tasks are not processed
  • TIDYING: all missions have been terminated
  • TERMINATEDSaid:terminated()The method has been executed

Cite a picture to help you understand the 5 states

3. Execute the process

The execute ()

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState  and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
        int c = ctl.get();
    	// If the current number of threads is less than the number of core threads, run addWorker to create a new thread to execute the command task
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
    	// If the current status is running, put the task into the blocking queue and double-check the thread pool status
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // If the thread pool status is not running, remove the added task and refuse to change the task
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // If you are in the running state but have no thread, create a thread
            else if (workerCountOf(recheck) == 0)
                addWorker(null.false);
        }
    	// Reject the task when a new thread fails to be created in the thread pool
        else if(! addWorker(command,false))
            reject(command);
    }
Copy the code

Here is a summary of the execution process of the execute method, in fact, you see the source code method annotation is a good way to learn

  • First, determine whether the current number of threads is less than the number of core threads, if yes, directly create a core thread to execute the task, otherwise go to the second step
  • If the current number of threads is equal to the number of core threads, then the task is scheduled, and the task is queued. When the task is queued, check the pool state again. In a multithreaded environment, the ctl.get() method is not an atomic operation. If it is not in the RUNNING state, the task will never be executed, so check again. If it is not in the RUNNING state, remove the task and reject it. If it is in the RUNNING state and there is no thread, create the thread directly
  • The prerequisite for this step is that the add queue in step 2 fails, that is, the task queue is full, then consider creating non-core threads to execute the task, if the add non-core threads also fail, then reject directly

Note here, when the core thread full, will not go to create non-core thread to perform the task directly, but in the first task queue, can be understood as demand task is first need to make the internal core staff to complete, the task queue priority is higher than that of core employees, addWorker (), Boolean value of entered here, This means creating core or non-core threads

reject()

final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
}
Copy the code

The reject method calls the rejectedExecution(Command,this) method of the Handler interface, which is RejectedExecutionHandler. The default implementation is AbortPolicy. Here is the implementation of AbortPolicy:

public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from "+ e.toString()); }}Copy the code

As you can see, the default policy is to throw an exception directly. This is just the policy that is used by default, and you can implement your own logic by implementing the interface.

addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Return false in the following cases
           	The current status is stop or above. 2. The current status is SHUTDOWN, but the firstTask is not empty
            //3. The current state is SHUTDOWN, but the queue is empty
            // We know from section 1 that the SHUTDOWN state does not execute incoming tasks, but will continue to execute queued tasks
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null&&! workQueue.isEmpty()))return false;
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if(runStateOf(c) ! = rs)continue retry;
                // else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if(t ! =null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true; }}finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true; }}}finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
Copy the code

Here is the main process analysis

  • If the thread pool is in the stop or above state, return the thread pool status. If the thread pool is in SHUTDOWN state and the firstTask is not empty or the queue is empty, return the thread pool status
  • The inner loop queries the number of threads, passes the Boolean value, and compares it with the core thread and the maximum number of threads, if true, the number of workers +1, and breaks out of the loop.
  • Breaking out of the loop is the actual execution of the task, and the Worker encapsulates the Worker thread and task into its own internal. We can regard the Worker as a Worker thread. How does the Worker execute the task and fetch the task from the blocking queue

Worker()

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in. Null if factory fails. */
        final Thread thread;
        /** Initial task to run. Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        public void run(a) {
            runWorker(this); }... }Copy the code

As you can see, Worker internal maintenance, a thread variable and a task variable, starting the thread thread contained in a Worker object is equivalent to executing the runWorker() method and taking the Worker object as a parameter to the method.

runWorker()

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // The getTask method keeps fetching tasks from the queue. // The getTask method keeps fetching tasks from the queue
            while(task ! =null|| (task = getTask()) ! =null) {
                w.lock();
                // Check the thread pool status again. If the thread pool status is stop, interrupt the task directly
                if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // Execute the task
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally{ afterExecute(task, thrown); }}finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally{ processWorkerExit(w, completedAbruptly); }}Copy the code

GetTask () ¶ getTask () ¶ getTask (); getTask (); getTask (); getTask ();

getTask()

private Runnable getTask(a) {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if(r ! =null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false; }}}Copy the code

There are mainly two judgments to be made:

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize

AllowCoreThreadTimeOut: if this property is set for a core thread, it is required to recycle. Wc > corePoolSize: if the current thread is not a core thread, the timed property is set to true

timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take()

If the timed flag bit is true, call the poll method to get the task and set the timeout time. If there is no task, the timeout will return null and the loop of the runWorker will be recovered. If the timed flag bit is false, call the take method and null will not be returned if there is no task. Instead, it goes into a blocked state, waiting for the task, and is not reclaimed

Second, the summary

There are many details in the thread pool, so let’s make a summary

  • Structure parameters of the thread pool determines the operation strategy of thread pool, you need to understand the meaning of each parameter, because each of the parameters of different largely determines the rules of the operation of the thread pool, which is why alibaba development mentioned in the manual to create ways to create a thread pool, instead of using official offer 4 kinds of thread pool
  • Thread pool involves multiple threads, and the status changes frequently. During task execution, you need to check the thread pool status multiple times to ensure the accuracy of task execution
  • The priority of the task queue is higher than that of non-core threads. When the core threads are full, tasks are put into the task queue first, and non-core threads are enabled for execution
  • Core and non-core threads are essentially the same, and when a core thread sets its allowCoreThreadTimeOut property to true, it will eventually be destroyed because of a timeout
  • The thread pool overview is to control the whole process through two variables, the state of the thread pool and the number of threads in the thread pool. There are not many methods involved, but there are many loops. It is necessary to understand the jump out condition of each loop and the corresponding state.

Probably analysis is so much, hope to help some friends to better understand the working principle of thread pool and in the use of better use, if there is any question or error, welcome to discuss

Please like the top/comments! Because your encouragement is the biggest motivation for my writing!