This article is written from the Huawei Cloud community. Learn how to implement ThreadPoolExecutor Thread Pools! , author: Little Fuge.

Thanks for the plane, note! , last time suffered in the thread, this may be a pit dropped twice!

Thank plane: you ask, I am ready!!

Interviewer: Well, how is thread pool state designed to be stored?

Xie Airplane: Here! Next! Next!

Interviewer: Why not use ReentrantLock to implement Worker’s implementation class instead of inheriting AQS?

Xie Airplane: I… !

Interviewer: Can you briefly describe the process of execute?

Xie Airplane: Goodbye!

Thread pool explanation

1. Let’s start with an example

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10.10.0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
threadPoolExecutor.execute(() -> {
    System.out.println("Hi thread pool!");
});
threadPoolExecutor.shutdown();

// Executors.newFixedThreadPool(10);
// Executors.newCachedThreadPool();
// Executors.newScheduledThreadPool(10);
// Executors.newSingleThreadExecutor();
Copy the code

This is an example of creating a thread pool that you have used many times.

The core purpose of thread pools is resource utilization, avoiding the resource consumption caused by the repeated creation of threads. Therefore, the idea of a pooling technique is introduced to avoid the performance overhead of repeated creation and destruction.

So, let’s take a hands-on look at the pool construction and see how it handles threads.

2. Write a thread pool

2.1 Implementation Process

In order to better understand and analyze the source code of the thread pool, let’s first follow the idea of the thread pool, write a very simple thread pool.

In fact, many times the core logic of a piece of functional code may not be very complicated, but in order for the core process to run smoothly, additional branches of the auxiliary process need to be added. Like I always say, the butt wipe paper was made that big to protect my hands!

With regard to Figure 21-1, the implementation of the pen-based thread pool is also very simple, showing only the core processes, including:

  1. There are n threads running all the time, which is the size of the thread pool we allowed when we created it.
  2. Commit threads to a thread pool to run.
  3. If the running thread pool is full, the thread is queued.
  4. Finally, when there is idle, the thread in the queue is acquired to run.

2.2 Implementation Code

public class ThreadPoolTrader implements Executor {

    private final AtomicInteger ctl = new AtomicInteger(0);

    private volatile int corePoolSize;
    private volatile int maximumPoolSize;

    private final BlockingQueue<Runnable> workQueue;

    public ThreadPoolTrader(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) {
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
    }

    @Override
    public void execute(Runnable command) {
        int c = ctl.get();
        if (c < corePoolSize) {
            if(! addWorker(command)) { reject(); }return;
        }
        if(! workQueue.offer(command)) {if(! addWorker(command)) { reject(); }}}private boolean addWorker(Runnable firstTask) {
        if (ctl.get() >= maximumPoolSize) return false;

        Worker worker = new Worker(firstTask);
        worker.thread.start();
        ctl.incrementAndGet();
        return true;
    }

    private final class Worker implements Runnable {

        final Thread thread;
        Runnable firstTask;

        public Worker(Runnable firstTask) {
            this.thread = new Thread(this);
            this.firstTask = firstTask;
        }

        @Override
        public void run(a) {
            Runnable task = firstTask;
            try {
                while(task ! =null|| (task = getTask()) ! =null) {
                    task.run();
                    if (ctl.get() > maximumPoolSize) {
                        break;
                    }
                    task = null; }}finally{ ctl.decrementAndGet(); }}private Runnable getTask(a) {
            for(; ;) {try {
                    System.out.println("WorkQueue. Size:" + workQueue.size());
                    return workQueue.take();
                } catch(InterruptedException e) { e.printStackTrace(); }}}}private void reject(a) {
        throw new RuntimeException("Error! CTL. The count," + ctl.get() + " workQueue.size:" + workQueue.size());
    }

    public static void main(String[] args) {
        ThreadPoolTrader threadPoolTrader = new ThreadPoolTrader(2.2.new ArrayBlockingQueue<Runnable>(10));

        for (int i = 0; i < 10; i++) {
            int finalI = i;
            threadPoolTrader.execute(() -> {
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task No. :"+ finalI); }); }}}// Test resultsTask No. :1Task No. :0WorkQueue. Size:8WorkQueue. Size:8Task No. :3WorkQueue. Size:6Task No. :2WorkQueue. Size:5Task No. :5WorkQueue. Size:4Task No. :4WorkQueue. Size:3Task No. :7WorkQueue. Size:2Task No. :6WorkQueue. Size:1Task No. :8Task No. :9WorkQueue. Size:0WorkQueue. Size:0
Copy the code

Above, the implementation of thread pool is still very simple, from the test results have been able to reflect the most core pooling ideas. The main functional logic includes:

  • ctlIs used to record the number of threads in the thread pool.
  • corePoolSize,maximumPoolSizeTo limit thread pool capacity.
  • workQueueThread pool queues, i.e. threads that cannot be run in time, are loaded into this queue.
  • executeFor submitting threads, this is the generic interface method. In this method, the main implementation is whether the current submitted thread is added to the worker, queue or abandoned.
  • addWorker, mainly classesWorkerTo create and execute a thread. And there are alsogetTask() Method, which continuously fetches unexecuted threads from the queue.

Ok, so this is the embodiment of this simple thread pool implementation. But if you think about it, you’ll see that there are a lot of improvements to be made. For example: Thread pool state, you can’t run all the time! ? What about thread pool locks and concurrency issues? , thread pool rejection policy? None of these issues are addressed in the main flow, and because there is no flow, the code above is easier to understand.

Next, we will start to analyze the thread pool source, compared to our implementation of a simple thread pool reference, it will be easier to understand 😄!

3. Thread pool source code analysis

3.1 Thread pool class diagram

The implementation and inheritance relationships between classes are centered around the implementation of the core class ThreadPoolExecutor, as shown in Figure 21-2.

  • interfaceExecutor,ExecutorService, the basic method for defining a thread pool. especially execute(Runnable command)Submit the thread pool method.
  • An abstract classAbstractExecutorService, the implementation of the basic universal interface methods.
  • ThreadPoolExecutorIs the core utility class method of the entire thread pool. All the other classes and interfaces provide functionality around this class.
  • WorkerIs the method of the task class, which is the thread that ultimately executes.
  • RejectedExecutionHandlerIs the rejection policy interface, which has four implementation classes;AbortPolicy(reject by throwing exceptions),DiscardPolicy(direct discard),DiscardOldestPolicy(Discarding the task with the longest lifetime),CallerRunsPolicy(whoever submits it executes it).
  • ExecutorsIs a thread pool for creating different policies that we use commonly,newFixedThreadPool,newCachedThreadPool,newScheduledThreadPool,newSingleThreadExecutor.

3.2 High 3 bits and low 29 bits

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code

In the ThreadPoolExecutor thread pool implementation class, a CTL of type AtomicInteger is used to record the thread pool state and the number of thread pools. To record multiple values on a type, it uses split data area, high 3 bits record state, low 29 bits store thread number, default RUNNING state, thread number is 0.

3.2 Thread pool status

Figure 22-4 shows the state flow relationship in the thread pool, including the following states:

  • RUNNING: Running state, accepting new tasks and processing tasks in the queue.
  • SHUTDOWN: Closed state (shutdown method called). Does not accept new tasks, but processes tasks in the queue.
  • STOP: Stopped state (the shutdownNow method was called). No new tasks are accepted, no tasks in the queue are processed, and ongoing tasks are interrupted.
  • TIDYING: All tasks are terminated, workerCount is 0, and the thread pool terminated() method is called into terminated state.
  • TERMINATED: terminated state, state after the method call terminated.

3.3 Submit Thread (Execute)

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false);
    }
    else if(! addWorker(command,false))
        reject(command);
}
Copy the code

When reading this section of the source code, you can refer to our own implementation of the thread pool. In fact, the end goal is the same, that is, this section of the submitted thread, start execution, join the queue, decision policy, these three ways.

  • ctl.get(), takes a value that records the thread state and the number of threads, and ultimately needs to use methodsworkerCountOf()To get the current thread count.‘workerCountOf performs c & CAPACITY
  • Based on the number of threads in the current thread pool, and the number of core threadscorePoolSizeFor comparison, if the number is smaller than that, add the thread to the task execution queue.
  • If the number of threads is full at this point, you need to determine whether the thread pool is runningisRunning(c). If it is running, the thread that cannot be executed is placed in the thread queue.
  • After it is put into the thread queue, it is necessary to re-determine whether the thread is running and remove the operation. If it is not running and removed, it will reject the policy. Otherwise, the number of threads is 0 and a new thread is added.
  • Finally, it tries to add the task execution again. At this time, the second input parameter of the addWorker method is false, which will affect the judgment of the number of added tasks. If the policy fails to be added, the policy is rejected.

3.5 Adding an Execution Task (addWorker)

private boolean addWorker(Runnable firstTask, boolean core)

Part one: Increase the number of threads

retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);
    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null&&! workQueue.isEmpty()))return false;
    for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        if (compareAndIncrementWorkerCount(c))
            break retry;
        c = ctl.get();  // Re-read ctl
        if(runStateOf(c) ! = rs)continue retry;
        // else CAS failed due to workerCount change; retry inner loop}}Copy the code

The first part is to create a startup thread

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if(t ! =null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int rs = runStateOf(ctl.get());
            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true; }}finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true; }}}finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;
Copy the code

The process of adding an execution task can be divided into two parts: the upper part of the code is used to record the number of threads, and the lower part of the code is used to create and start the execution thread in the exclusive lock. This part of the code does not look at locking, CAS, etc., so it is basically the same as our original handwritten thread pool

  • if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty())), checks whether the current thread pool state isSHUTDOWN,STOP,TIDYING,TERMINATEDOne of them. And the current state isSHUTDOWN, and the incoming task is null, and the queue is not empty. So return false.
  • compareAndIncrementWorkerCount, CAS operation, increase the number of threads, success will break out of the token body.
  • runStateOf(c) ! = rs, and finally, the thread pool state judgment to decide whether to loop or not.
  • After the number of thread pools is recorded successfully, it is necessary to enter the locking process, create the execution thread, and record the status. In the end, if the startup is not successful, you need to execute the addWorkerFailed method, remove to the thread method and other operations.

3.6 Execution Thread (runWorker)

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // Allow interrupts
    boolean completedAbruptly = true;
    try {
        while(task ! =null|| (task = getTask()) ! =null) 
            w.lock();
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } finally{ afterExecute(task, thrown); }}finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally{ processWorkerExit(w, completedAbruptly); }}Copy the code

In fact, with the basics of a handwritten thread pool, this is basically what a thread pool does. The core point here is task.run() to get the thread running. Additional additional processes are as follows;

  • beforeExecute,afterExecuteDo some statistics before and after the thread executes.
  • In addition, the lock operation here is the non-reentrant exclusive lock that the Worker inherits from AQS.
  • processWorkerExitIf you are interested, similar methods can also be studied in depth.It is also interesting that workers do some removal processing and the number of completed tasks when the thread exits

3.7 Obtaining a Task from a Queue (getTask)

If you’ve already started reading the source code, you can see a loop like this in the runWorker method while (task! = null || (task = getTask()) ! = null). This is the same way we operate in a handwritten thread pool, with the core purpose of fetching thread methods from a queue.

private Runnable getTask(a) {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if(r ! =null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; }}}Copy the code
  • The getTask method fetches the tasks waiting to be executed from the blocking queue, which is the thread fetching method.
  • if (rs >= SHUTDOWN ...To determine whether the thread is closed.
  • Wc = workerCountOf(c), wc > corePoolSizeIf the number of worker threads exceeds the number of core threadscorePoolSizeIf the workQueue is not empty, the worker thread is added. However, if the timeout does not fetch the thread, the thread larger than corePoolSize is destroyed.
  • timed, it isallowCoreThreadTimeOut. In the endtimedWhen true, timeout is controlled by the poll method of the blocking queue.
  • If thekeepAliveTimeIf no task is obtained within the specified time, null is returned. If false, it blocks.

Second, the summary

  • This section does not cover all thread pools, or it would be a bit bloated. In this chapter, we start from handwriting thread pool, and gradually analyze how these codes are implemented in Java thread pool, involving knowledge points including: queue, CAS, AQS, reentrant lock, exclusive lock and so on. And the knowledge is basically interlinked, so it’s better to have some foundation or it’s a little bit hard to understand.

  • Beyond what we’ve covered in this chapter, we haven’t covered the thread destruction process, the choice and use of the four thread pool methods, or how to configure for CPU-intensive or IO intensive tasks. Spring also has its own thread pool implementation. These points are very close to the actual operation.

Click to follow, the first time to learn about Huawei cloud fresh technology ~