@TOC

preface

Last time we looked at the FutureTask source code to see how to retrieve the return value of a task, today we’ll look at ThreadPoolExecutor.

ThreadPoolExecutor is a ThreadPoolExecutor. It is a ThreadPoolExecutor. This blog post will analyze how the next thread task will be executed in the thread pool from the source point of view

The thread pool

When we said threads above, we also said that threads are extremely precious resources in the system, so we should make reasonable use of him, so with the emergence of thread pool, what benefits can the thread pool bring

  • Reduce resource consumption: Reduce thread creation and destruction costs by reusing already created threads
  • Provides speed of response: When our creation character arrives, the task can be executed immediately without waiting for the creation of a thread
  • Improve thread manageability: Threads are a scarce resource and cannot be created indefinitely, so use thread pools for the same management and allocation, tuning, monitoring, and so on.

Source code analysis

Inheritance structure

First let’s look at the inheritance of ThreadPoolExecutor

public class ThreadPoolExecutor extends AbstractExecutorService{}

public abstract class AbstractExecutorService implements ExecutorService{}

public interface ExecutorService extends Executor {<! -- Stop the thread pool, set the state to SHUTDOWN, and stop accepting new tasks.void shutdown(a); <! -- STOP the thread pool, set the state to STOP, no longer accept the previous task, try to interrupt the task in progress, return to the task not yet executed -->List<Runnable> shutdownNow(a); <! -- SHUTDOWN state -->boolean isShutdown(a); <! Whether all tasks have been terminated -->boolean isTerminated(a); <! -- Wait for the task to execute the task within the timeout period -->boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <! --Callable to submit tasks --> <T>Future<T> submit(Callable<T> task); <! --Runnable to submit tasks --> <T>Future<T> submit(Runnable task, T result); <! --Runnable to submit tasks --> Future<? > submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

public interface Executor {
    void execute(Runnable command);
}
Copy the code

Let’s start with the Executor interface at the bottom, which is the implementation, which is executing the execute method, and this interface is the entry point for thread execution

The ExecutorService interface inherits the Executor interface, which provides many methods, including shutdownNow, shutdown, and submit methods for submitting tasks to a thread pool. ExecutorService provides extensive task execution and management capabilities

Abstract class AbstractExecutorService is an abstract class that implements the ExecutorService interface. By the way, why do Java source code have a large number of abstract classes that implement interfaces, and then inherit from abstract classes? Why don’t classes implement interfaces directly? ‘ve set a layer, I don’t know before, afterwards I just know, abstract class to implement interface, is to achieve some common interface methods, this class to implement the interface again, just care about my different implementations, because we know that the implementation of the interface class is more than one, an abstract class is to remove them to achieve the realization of the interface of a class public extracted again, Avoid a lot of duplicate implementations, especially List and Set interfaces. Look at the abstract class implementations that almost always have responses!

Major variable

<! -- CTL stores thread pool state and thread count -->private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;/ / 32-3 = 29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//2 to the 29th minus 1

    // runState is stored in the high-order bits<! -- Indicates that the thread pool is running and can accept tasks from the thread pool -->private static final int RUNNING    = -1<< COUNT_BITS; <! -- Does not accept new tasks, but still processes tasks in the queue.private static final int SHUTDOWN   =  0<< COUNT_BITS; <! Do not accept new tasks, do not process tasks in the team, interrupt ongoing tasks -->private static final int STOP       =  1<< COUNT_BITS; <! Task interrupted, processing collation stateprivate static final int TIDYING    =  2<< COUNT_BITS; <! -- Indicates terminal state -->private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl<! Get the running status of the current thread poolprivate static int runStateOf(int c)     { returnc & ~CAPACITY; } <! Get the number of working threads in the current thread poolprivate static int workerCountOf(int c)  { returnc & CAPACITY; } <! Get CTL value ->private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code

About Ctl is how to deal with thread state and the number of threads, may have a look my another blog post: blog.csdn.net/zxlp520/art…

The constructor

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Copy the code

This constructor is the method that all constructors end up calling, so let’s talk about these specific parameters

  1. Int corePoolSize Specifies the number of threads in the core
  2. Int maximumPoolSize Specifies the maximum number of threads
  3. Long keepAliveTime Maximum time for a thread to live
  4. TimeUnit unit TimeUnit that corresponds to keepAliveTime
  5. BlockingQueue workQueue blocks a queue that stores tasks to be executed
  6. ThreadFactory ThreadFactory create executing thread factory default: Executors. DefaultThreadFactory ()
  7. RejectedExecutionHandler Indicates the Hander method to be rejected by a handler task
    • The default is AbortPolicy which throws an exception,
    • DiscardOldestPolicy Discards the task with the longest waiting time in the queue. CallerRunsPolicy Allows the calling thread to process the policy

Worker

Why worker first? Because the task Runnabale we submitted is run after the object of Worker is wrapped, I will talk about this in detail when I talk about addWorker method later

Let’s take a look at Worker’s code:

 /** Worker inherits AQS and implements the Runnable interface */
 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        /** the main thread running the worker is the */ in which the task is running
        final Thread thread;
        /** The task to run */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);// This is the current Worker object
        }

        /** Run the current task runWorker is a method in ThreadPoolExecutor */
        public void run(a) {
            runWorker(this);
        }

        // Lock methods
        // 0 indicates that it is not locked
        // 1 indicates locked state
        protected boolean isHeldExclusively(a) {
            returngetState() ! =0; } <! -- We should be familiar with this method. I talked about this method in AQS. What is done here is to try to modify the state of state, which means that the worker is locked and other threads cannot execute, -->protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0.1)) {//CAS changes the value of State. 1 indicates that the State is lockedsetExclusiveOwnerThread(Thread.currentThread()); Set the occupier thread of the current lock to the current threadreturn true;
            }
            return false; } <! Release the lock, that is, change the value of State to0Unused the name of the field is also interesting, meaning 'unused' -->protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);// Set the thread occupant of the current lock to NULL
            setState(0);
            return true; } <! -- Lock the current Worker, join the queue if it can't get it, block the current thread -->public void lock(a)        { acquire(1); } <! -- this is equivalent to an unfair lock implementation to try to lock -->public boolean tryLock(a)  { return tryAcquire(1); } <! -- Release lock -->public void unlock(a)      { release(1); }
        public boolean isLocked(a) { returnisHeldExclusively(); } <! Try to interrupt a running thread task when we call shutdownNow -->void interruptIfStarted(a) {
            Thread t;
            if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
Copy the code

First look at the Worker’s inheritance structure, the first is to implement the Runnable, and such a relationship, the Worker can be Thread to execute, another one inherited an abstract class AbstractQueuedSynchronizer, AQS for short, this class ha ha really disappeared for a long time, I spent 5 articles to explain this AQS, you can imagine its importance, many implementations of JUC are based on this, or not clear friends can go to my blog to find.

This.thread = getThreadFactory().newThread(this); This is the Worker we built, and thread is a thread created with ThreadFactory and executed by the Worker. In other words, the Worker can be executed by calling thread.start()

execute

Execute is the method that implements the Executor interface, which is the entry method for the task to be executed. So what does that do for the submission of the next task

 public void execute(Runnable command) {
       if (command == null)
           throw new NullPointerException();
       /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState  and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
       int c = ctl.get();// Get the current CTL value
       If the current number of worker threads is less than the set number of core threads, addWorker is called to create a new worker thread. If the addWorker thread is added successfully, it will return directly. If the addWorker thread fails, it will continue and then go to the next CTL
       if (workerCountOf(c) < corePoolSize) {
           if (addWorker(command, true))
               return;
           c = ctl.get();
       }
       /* * This step indicates that the current number of worker threads is greater than the number of core threads or that the addWorker has failed So let's get the CTL again, because we might change the CTL when we offer Runnable * the multiple validation here is very strict considering the high concurrency * the logic to proceed is to check again if the thread pool state is not Running, We remove the current task, reject it, and then do something different with it, depending on the reject strategy. Finally, we decide whether the current number of threads is zero, or we call addWorker, passing in an empty Runnalbe. False creates a non-core worker thread
       if (isRunning(c) && workQueue.offer(command)) {
           int recheck = ctl.get();
           if (! isRunning(recheck) && remove(command))
               reject(command);
           else if (workerCountOf(recheck) == 0)
               addWorker(null.false);
       }
       /* * The current thread pool state is not Running or the queue has failed. The queue may be full. Create non-core threads to process the task
       else if(! addWorker(command,false))
           reject(command);
   }
Copy the code

I am not willing to delete the English notes here, readers can go to translate the description may be more accurate than me, I believe you can understand, and then compare my Chinese notes below, I believe I can clearly understand a new task is how to process!

When do you create a core thread? When to create non-core threads? When will the task be added to the blocking queue? What are the situations when the rejection policy is finally implemented? Knowing these answers, you should know the execute method by heart!

addWorker

Now let’s look at the next important method, which is called a lot, so let’s go in and look at it

 private boolean addWorker(Runnable firstTask, boolean core) {
       retry:
       // This is a spin with a spin for the purpose of CAS to increase the number of thread pools
       for (;;) {
           int c = ctl.get();// Get the value of CTL
           int rs = runStateOf(c);// Get the current thread state

           // This condition looks confusing, but if you look closely, you will see
           Rs >= SHUTDOWN indicates that the thread pool status is abnormal
           // If one of the conditions in parentheses fails, the condition is false
           if (rs >= SHUTDOWN &&
               ! (rs == SHUTDOWN &&
                  firstTask == null&&! workQueue.isEmpty()))return false;

           for (;;) {
               If the number of workers in the thread is greater than the maximum value or the set threshold, return false
               int wc = workerCountOf(c);
               if (wc >= CAPACITY ||
                   wc >= (core ? corePoolSize : maximumPoolSize))
                   return false;
               // This means that if CAS modifies workerCount successfully, the entire outermost spin ends
               if (compareAndIncrementWorkerCount(c))
                   break retry;
               // If WorkerCount fails to change, CTL will change
               // If it is not equal to the outer spin, return the outer spin to override itThat's why I'm using alphacontinue retry 
               c = ctl.get();  // Re-read ctl
               if(runStateOf(c) ! = rs)continue retry;
               // else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;// Whether the worker has started executing
       boolean workerAdded = false;// whether the worker is added successfully
       Worker w = null;
       try {
           w = new Worker(firstTask);// Pass the Runnable to the worker's constructor, which uses firstTask to construct the first Thread
           final Thread t = w.thread;// The current t is the Runnable thread created in the worker
           if(t ! =null) {
               final ReentrantLock mainLock = this.mainLock;/ / reentrant lock
               mainLock.lock();// Ensure thread-safety when adding workders
               try {
                   int rs = runStateOf(ctl.get());
                   if (rs < SHUTDOWN ||
                       (rs == SHUTDOWN && firstTask == null)) {
                       if (t.isAlive()) // precheck that t is startable
                           throw new IllegalThreadStateException();
                       workers.add(w);// Add worker to a HashSet stored in a worker set
                       int s = workers.size();
                       if (s > largestPoolSize)
                           largestPoolSize = s;
                       workerAdded = true; }}finally {
                   mainLock.unlock();/ / releases the lock
               }
               if (workerAdded) {// If added successfully
                   t.start();// This is where the Worker is actually executed
                   workerStarted = true; }}}finally {
           if (! workerStarted)
               addWorkerFailed(w);// If the Worker ends up not running, then clear the WorkerCount corresponding to his changes
       }
       return workerStarted;
   }
Copy the code

At the beginning of the method, 2 spins were used to solve the failure of CAS modification workerCount in the case of concurrency. Every detail and every situation were considered in place, and the state judgment was particularly rigorous. Truly understand, it feels how troublesome programming in the case of multi-threading is, thanks to help us do the packaging!

We know that t is the Thread body created in the Worker, which is passed to the Thread as its own task. We know that start is the Thread that starts to run, and finally the run method will be called, so that is to call the run method in the Worker. Let’s revisit the run method in Worker

public void run(a) {
   runWorker(this);// methods in ThreadPoolExecutor
}
Copy the code

runWorker

I also said that the thread start calls the run method, so it calls the runWorker method. Let’s see what it says in this

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;// Get the tasks in Worker
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // Go through the while loop
            while(task ! =null|| (task = getTask()) ! =null) {
                w.lock();/ / lock the Worker
                // Check if the current thread pool state is stop and check if the current thread is interrupted if it is false.
                if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                    beforeExecute(wt, task);// Perform the pre-action of the task
                    Throwable thrown = null;
                    try {
                        task.run();// Execute the final Runnable task
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);/// Execute the task after Action}}finally {
                    task = null;
                    w.completedTasks++;//Worker completes tasks +1
                    w.unlock();/ / releases the lock
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);//Worker exits after execution}}Copy the code

The RunWorker method is the core method for running tasks in the entire thread pool. The thread uses a While loop to retrieve tasks from the blocking queue and then executes them. If there are no tasks in the blocking queue, the getTask() method blocks the thread until a new task arrives. So when we do unit tests, we use thread pools, and if you don’t call Shutdown, your debug red dot is always running, that’s why!

getTask

The method is to fetch the task from the blocking queue

 private Runnable getTask(a) {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            If the thread pool is SHUTDOWN and the queue is empty, or if the thread pool is null, the task is not fetched from the blocking queue
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            
            Timed is used to control whether a task in the waiting queue has a waiting time. The value of keepAliveTime set is used here. If a worker thread is waiting for a task and its value exceeds the set value, it quits the wait and the thread is recalled
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))// The number of worker threads is reduced by 1
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();// Get the task
                if(r ! =null)
                    return r;
                timedOut = true;// The wait timeout flag should be set in spin. This value will be used next time
            } catch (InterruptedException retry) {
                timedOut = false; }}}Copy the code

We all know that when we call shutdown, the thread pool state is shutdown, and when we call shutdownNow, the thread state is Stop. So how do these two states deal with the tasks in the blocking queue? When we get the task in the queue, we return null, which means that the task in the queue will not be executed. However, when the state is shutdown, we only return NULL if the queue is empty, which means that the task in the queue can be retrieved even if the queue is not empty. If you want to know the answer, or through the source code to really understand, just by the answer I believe you will soon forget!

submit

Submit is usually used to add tasks that return values. Let’s take a look at the code

 publicFuture<? > submit(Runnable task) {if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);// Wrap Runnable as a FutureTask for the thread to execute
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);// Wrap Runnable as a FutureTask for the thread to execute
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
Copy the code

If you look at the code here, it should sound a little familiar. I’ve already covered a lot of it in my last article when I talked about FutureTask, including how Runnable and Callable are converted, and how the Future gets its return value. Do not know the small partners can go to see my previous article! Blog.csdn.net/zxlp520/art…

The above three constructors correspond to FutureTask constructors, which means that execute is passed in using FutureTask because FutureTask implements the Runable interface

Execution flow chart

Finally, use a flow chart to describe the way the next task goes from add to finish!

conclusion

ThreadPoolExecutor has a lot of methods to execute, but if you have a basic knowledge of the common logical operators, AQS, threads, FutureTask, etc., you will not be too tired to read the source code. Finally I draw the flow chart, is a task in the new thread pool to execute the entire process!

Finally, I would like to share a passage I have seen recently: What is a crisis?

Real crises come from doing the wrong thing at the right time. Not building up for the next step at the right time is the root of the crisis.

If you are in the path of growing friends, it is better to wake up early than late, that is what I would say. Don’t wait until middle age to find that you have not built their own moat, this time to know how to work hard. At the stage of their own efforts, not only do not work hard but choose to indulge themselves, which is the root of the crisis.

I hope you will have a harvest, live up to time, live up to your!