Hi, I’m Jack Xu, and this is the second part of my series on concurrent programming, and I’m going to talk a little bit about thread pools. This article is a bit long, the boys calm down, patience to read it through.

Why use thread pools

1) Reduce the performance overhead of creating and destroying threads

2) Improve the response speed. When a new task needs to be executed, it can be executed immediately without waiting for thread creation

3) Reasonable thread pool size can avoid problems caused by the number of threads exceeding the hardware resource bottleneck

Let’s look at alibaba’s code specification. To create a thread in a project, it must be created using a thread pool, for the reasons I mentioned above

Use of thread pools

First let’s look at the UML class diagram

  • Executor: You can see that the top layer is the Executor interface. This interface is simple, with only one execute method. The purpose of this interface is to decouple task submission from task execution.

  • ExecutorService: This is also an interface, inherited from Executor, that extends the Executor interface to define more multithreaded pool-related operations.

  • AbstractExecutorService: Provides a partial default implementation of the ExecutorService.

  • ThreadPoolExecutor: The actual thread pool implementation we use is ThreadPoolExecutor. It implements the complete mechanism for thread pools to work. It is also the focus of our next analysis.

  • ForkJoinPool: and ThreadPoolExecutor inherit from AbstractExecutorService and are suitable for dial-and-rule recursive computations

  • ScheduledExecutorService: this interface extends the ExecutorService, defining methods to delay and execute tasks periodically.

  • ScheduledThreadPoolExecutor: this interface is on the basis of inheriting ThreadPoolExecutor ScheduledExecutorService interface, provide timing and characteristics of the cycle to perform a task.

So let’s look at the two ways that threads can be created. The first way is by using the factory method provided by Executors. There are four ways to do so

        Executor executor1 = Executors.newFixedThreadPool(10);
        Executor executor2 = Executors.newSingleThreadExecutor();
        Executor executor3 = Executors.newCachedThreadPool();
        Executor executor4 = Executors.newScheduledThreadPool(10);
Copy the code

The second way is through a constructor

        ExecutorService executor5 = new ThreadPoolExecutor(1.1.0L,
                new ArrayBlockingQueue<Runnable>(2), Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
Copy the code

In fact, if you look at the source code created in the first way, you will find:

    public static ExecutorService newCachedThreadPool(a) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
Copy the code

So essentially, there’s only one way to create a pool, and that’s by using the constructor, so I’m not going to go through the * * factory method that created the pool using the Executors. Let’s look at one more Alibaba rule.

If you are not familiar with the four thread pools created, you will not be able to use them. If you are not familiar with the four thread pools created, you will not be able to use them. Let’s focus on the meanings of the parameters in the ThreadPoolExecutor constructor. There are many constructors, and I’ve chosen the most complete one.

public ThreadPoolExecutor(intCorePoolSize, // Number of core threadsintMaximumPoolSize, // Maximum number of threadslongKeepAliveTime, keepAliveTime, TimeUnit, BlockingQueue<Runnable> workQueue, BlockingQueue<Runnable> workQueue, // If a task cannot be executed, RejectedExecutionHandler will RejectedExecutionHandler.
Copy the code
  • CorePoolSize: The number of core threads in a thread pool, which is also the minimum number of threads. If allowCoreThreadTimeOut is not set, all threads within the core thread count survive. Instead of self-destructing, the thread returns to the pool in a suspended state until the suspended thread in the pool is reactivated to execute when the application requests the pool again.

  • MaximumPoolSize: Specifies the maximum number of threads in the thread pool

  • KeepAliveTime and Unit: Survival time and unit after the number of core threads is exceeded

  • WorkQueue: a blocking queue that holds all the tasks for the thread pool to execute. There are usually three types:

1) ArrayBlockingQueue: Array-based FIRST-in, first-out queue. The size of this queue must be specified when created; 2) LinkedBlockingQueue: A LinkedBlockingQueue based on a linked list. If the queue size is not specified when it is created, it defaults to integer.max_value. 3) SynchronousQueue: This is a unique SynchronousQueue. Instead of holding the submitted task, it creates a new thread to execute the new task.Copy the code
  • ThreadFactory: we usually use Executors. DefaultThreadFactory () the default factory, why want to use the factory, in fact is the specification of Thread. Avoid calling new Thread creation, which may cause different threads to be created

  • Handler: Saturation policy when the queue and maximum thread pool are full.

1. AbortPolicy: throw exception directly, default policy; CallerRunsPolicy: execute the task with the caller's thread. 3. DiscardOldestPolicy: Discards the first blocking task in the queue and executes the current task. 4. DiscardPolicy: Directly discards the task. Of course, RejectedExecutionHandler can also be implemented according to the application scenario, such as logging or persistent storage can not handle the saturation policyCopy the code

It is also easy to use a thread pool after it has been created, with or without a return value, passing in the implementation of the Runnable or Callable interface

        // No return value
        executor5.execute(() -> System.out.println("jack xushuaige"));
        // With a return value
        String message = executor5.submit(() -> { return "jack xushuaige"; }).get();
Copy the code

Source code analysis

The execute method

Based on the source code entry for analysis, first look at the execute method

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
            c = ctl.get();
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
            else if (workerCountOf(recheck) == 0)
        else if(! addWorker(command,false))
Copy the code

There is a key comment in the source code that I did not post, so I will translate this key comment first:

Do this in three steps:

If the number of threads running is smaller than the corePoolSize, try to create a new thread and execute the passed command as its first task. Calling addWorker automatically checks runState and workCount to prevent error warnings for adding threads when they shouldn’t;

2. Even if the task can be queued successfully, we still need to double-check whether we should add thread (because some threads may have died since the last check) or whether the thread pool has stopped after entering this method. So we’ll check the status again and roll back the queue if necessary. Or if there are no threads, start a new thread.

If task cannot be added to queue, try adding a new thread. If the addition fails, this is because the thread pool is closed or saturated, so reject the task.

If you see after still a face meng, that is ok, I draw down this process, you taste, you fine taste, a good understanding

Then introduce the source of CTL is what, point in the source view

An int is 32 bits. The high 3 bits hold the state of the thread pool and the low 29 bits hold the number of threads.

CtlOf (RUNNING, 0) =-1 << COUNT_BITS; The binary value of -1 is 32 ones (1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111). 111 | 0 or 111, then by the same token can have bit of other states. This bit operation is very interesting, hashMap source code is also used in the bit operation, the boys in peacetime development can also try to use, so the speed will be fast, and can install B, introduce the state of the five thread pools

  • RUNNING: Receives new tasks and executes tasks in the queue

  • SHUTDOWN: No new tasks are received, but tasks in the queue are executed

  • STOP: Does not receive new tasks, does not execute tasks in the queue, and interrupts ongoing tasks

  • TIDYING: All the missions are over,

If the number of threads is zero, the pool in the state will call the terminated() method

  • TERMINATED: Execution of the TERMINATED () method is complete

Their conversion relationship is as follows:

AddWorker method

We see that the core method of the execute process is addWorker. We continue to analyze the source code, which looks quite scary, in fact, it does two things

Step 1: Update the number of workers. The code is as follows:

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null&&! workQueue.isEmpty()))return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if(runStateOf(c) ! = rs)continue retry;
                // else CAS failed due to workerCount change; retry inner loop}}Copy the code

Retry is a flag that is used in conjunction with a loop. Continue retry and jump to the retry place. If break retry, the entire loop body is broken out. The source gets the CTLS, checks their status, and then checks the number of them, depending on the type of thread being created. After the CTL status is updated through CAS, the loop is broken if the CTL status is successfully updated. Otherwise, get the thread pool state again, and if it is not consistent with the original, then start from scratch. Continue to update the number of workers if the status has not changed. The flowchart is as follows:

Step 2: Add worker to workers’ set. And start the threads held in the worker. The code is as follows:

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if(t ! =null) {
        final ReentrantLock mainLock = this.mainLock;
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true; }}finally {
        if (workerAdded) {
            workerStarted = true; }}}finally {
    if (! workerStarted)
return workerStarted;
Copy the code

As you can see, you need to acquire locks when adding work to ensure concurrent multi-threading security. If the worker is added successfully, the start method of the thread in the worker is called to start the thread. If the startup fails, the addWorkerFailed method is called to roll back. When you see the boys here

ThreadPoolExecutor does not start or create any threads after initialization. AddWorker is called when the execute method is called

2. The addWorker method creates a new worker and starts the thread it holds to execute the task.

As mentioned above, if the number of threads reaches corePoolSize, only commands will be added to the workQueue. How will the commands that are added to the workQueue be executed? Next, let’s analyze the source code of the Worker.

The Worker class

Workers encapsulate threads and are units of work in executor. The worker inherited from AbstractQueuedSynchronizer, and implement Runnable. The Worker is simply a thread that reconstructs the run method. Let’s look at its constructor:

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
Copy the code

Now let’s look at these two important attributes

        /** Thread this worker is running in. Null if factory fails. */
        final Thread thread;
        /** Initial task to run. Possibly null. */
        Runnable firstTask;
Copy the code

FirstTask uses it to save incoming tasks; A thread is a thread that is created using a ThreadFactory when the constructor is called. It is a thread that is used to process tasks. A thread is created using a ThreadFactory instead of new. Since the Worker itself inherits the Runnable interface, t.start() called in addWork actually runs the run method of the Worker to which t belongs. The run method of the worker is as follows:

public void run(a) {
Copy the code

RunWorker source code is as follows:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while(task ! =null|| (task = getTask()) ! =null) {
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted. This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally{ afterExecute(task, thrown); }}finally {
                    task = null;
            completedAbruptly = false;
        } finally{ processWorkerExit(w, completedAbruptly); }}Copy the code

Just a quick analysis

1. First take out the firstTask in the worker and clear it;

If there is no firstTask, get the task from workQueue by calling getTask;

3. Acquire the lock.

4. Run beforeExecute. Here is the empty method, if necessary in the subclass implementation;

5. Run;

6. Run afterExecute. Here is the empty method, if necessary in the subclass implementation;

7, clear task, completedTasks++, release the lock;

8. When there is an exception or no task to execute, the outer Finnaly code block is entered. Call processWorkerExit to exit the current worker. After removing this worker from Works, if the number of workers is smaller than corePoolSize, a new worker is created to maintain the number of threads of corePoolSize.

While (task! = null || (task = getTask()) ! = null) to ensure that the worker keeps fetching tasks from the workQueue for execution. GetTask is going to poll or take tasks out of BlockingQueue workQueue.

So far, the process of executor creating and starting a thread to execute a task has been analyzed, leaving shutdown(), shutdownNow(), and other methods to be observed and studied.

How do I properly configure the thread pool size

Thread pool size is not a matter of guessing, and more is better.

  • CPU intensive tasks: Tasks that perform computing tasks with fast response times and a CPU that is running all the time

If the utilization is high, the number of threads should be configured according to the number of CPU cores, and fewer threads should be allocated, such as the number of cpus.

  • IO intensive tasks: These are mainly IO operations that take a long time to perform because the CPU is idle because the thread is not running all the time.

In this case, you can increase the thread pool size, such as the number of cpus x 2

Of course, these are all experience points, and the best way is to test the best configuration for the actual situation.

Thread pool monitoring

If thread pools are used on a large scale in a project, there must be a monitoring system to guide the status of the current thread pool and to quickly locate problems when they occur. We can monitor threads by overwriting the thread pool beforeExecute, afterExecute, and shutdown

As you can see from the names and definitions, it’s up to subclasses to execute custom logic before, after, and at the end of the thread.


Thread pool in the simple is also a simple, also said that difficult difficult, simply because it’s simple to use, so men might think this have what good say, difficult is difficult to know his at the bottom of the source code, he is how to schedule threads, say two points, the first is the flow chart of this article used a lot of, when we are reading the source code or complex business development, Must calm down and draw a diagram first, otherwise will be around, after being interrupted dizzy or again from the beginning to the end of the part, the second is to read the source code, has just graduated from junior partner may use as long as the line, but if you work for five years, or will only be used, but don’t know that he is how to realize the inside, where the advantages of you than just graduated, with what wages higher than just graduated. Good, the level of the author of this article is limited, if there is any doubt, welcome to exchange and discuss together…