Instead of using the Thread class directly for multithreaded programming, we use a more convenient Thread pool for task scheduling and management. Thread pools are like shared bikes, we just have to get them when we need them. The thread pool is even better, we just need to submit tasks to it and it will run at the right time. However, using the Thread class directly, we would have to create, run, and wait for threads every time we executed a task, and managing threads as a whole would be difficult, which is not an easy task. Now that we have a thread pool, let’s leave this to the thread pool.

The previous article on thread pool usage and its source code was too long and too broad to make sense. So I reorganized the content into two articles and added some content to make it easier for you to understand.

This article will start with the concept and general use of thread pools, and first introduce the general use of thread pools. It then introduces common configurable items in thread pools, such as task queues and rejection policies, and finally introduces four common thread pool configurations. Through this article, you can master the use of thread pool, in practice, the use of thread pool for flexible scheduling.

Reading this article requires a basic understanding of multithreaded programming, such as what threads are and what problems multithreading solves. For those who don’t know, please refer to my previous post, “This time, Let’s fully Master Java Multithreading (2/10).”

The most commonly used thread pool implementation class is ThreadPoolExecutor, and we’ll show you how to use it. The JDK already encapsulates thread pools fairly well, which should make the process very easy.

Basic use of thread pools

Creating a thread pool

Since a thread pool is a Java class, the most straightforward way to use it would be to new an object of the ThreadPoolExecutor class, For example, ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, timeunit.milliseconds, New LinkedBlockingQueue < Runnable > ()). So what does each parameter in this constructor mean? We can forget about these details for a while, continue our thread pool journey, and come back to this later.

Submit a task

Once a thread pool is created, we can submit tasks to the thread pool for execution. Submitting tasks to the Thread pool is fairly simple. We simply pass the Runnable object passed to the Thread constructor into the execute or Submit method of the Thread pool. The execute method is basically the same as the Submit method. The only difference between the submit method and the execute method is that the Submit method returns a Future object to check the execution status of the asynchronous task and obtain the execution result (after the asynchronous task is completed).

We can try to use the simple execute method first, the code example is as follows:

public class ThreadPoolTest {

    private static int count = 0;

    public static void main(String[] args) throws Exception {
        Runnable task = new Runnable() {
            public void run() {
                for(int i = 0; i < 1000000; ++i) { synchronized (ThreadPoolTest.class) { count += 1; }}}}; Important: / / ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, timeunit.milliseconds, new LinkedBlockingQueue<Runnable>()); // important: submit two tasks to the threadPool threadpool.execute (task); threadPool.execute(task); // wait for all tasks in the threadPool to complete threadpool.shutdown ();while(! threadPool.awaitTermination(1L, TimeUnit.MINUTES)) { System.out.println("Not yet. Still waiting for termination");
        }
        
        System.out.println("count = "+ count); }}Copy the code

When we ran it, we got two million, and we successfully implemented the first program to use thread pools. So back to the question, what are the parameters passed in to create the thread pool?

Parse thread pools in depth

Parameter to create a thread pool

Here is the constructor definition for ThreadPoolExecutor:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
Copy the code

Each parameter indicates the following meanings:

  1. CorePoolSize, the size of the core thread pool, which generally holds at least this many threads;
  2. MaximumPoolSize specifies the maximum number of threads in the thread pool.
  3. KeepAliveTime and unit together make up a timeout,keepAliveTimeIs the amount of time,unitIs a unit of time, plus the number of units that make up the final timeout. This timeout indicates if the thread pool contains more thancorePoolSizeThe number of threads will be destroyed when a thread is idle for more than the timeout period.
  4. WorkQueue is a blocking queue for tasks that are put to the queue for execution if there are not enough threads available in the thread pool. The queue type passed in here determines the thread pool’s strategy for handling these tasks, as described below;
  5. ThreadFactory, the thread’s factory object from which the thread pool creates threads. We can pass in a custom implementationThreadFactoryInterface class to modify the thread creation logic, can not pass, default useExecutors.defaultThreadFactory()As the default thread factory;
  6. Handler, an object that rejects a policy when the thread pool is unable to execute or save a newly submitted task. The following policy classes are commonly used:
    • ThreadPoolExecutor.AbortPolicy, the default policy, the behavior is directly throwRejectedExecutionExceptionabnormal
    • ThreadPoolExecutor.CallerRunsPolicy, using the caller’s thread to perform the task
    • ThreadPoolExecutor.DiscardOldestPolicyDiscards the earliest submitted task in the blocking queue and retry the execute method
    • ThreadPoolExecutor.DiscardPolicySilently discards the task without returning any errors

Seeing that most readers may not understand what each parameter does, let’s take a closer look at the thread pool source code that uses these parameter configurations to understand what each parameter means.

Implementation of the execute method

We normally submit our tasks using the execute method, so what does the thread pool do in this process? In the source code for the Execute () method of the ThreadPoolExecutor class, we do four main things:

  1. If the number of threads in the current thread pool is less than the number of core threads corePoolSize, a new thread is created using threadFactory and the incoming task is passed to the thread as the first task.
  2. If the number of threads in the current thread pool has reached the core thread countcorePoolSize“, then it will passBlock queue workerQueuetheofferMethod to add the task to the queue and save it, waiting for the thread to be idle for execution;
  3. If the number of threads has reached corePoolSize and the blocking queue cannot insert the task (for example, it is full), the thread pool will add another thread to execute the task unless the maximum number of threads has reached maximumPoolSize;
  4. If the maximum number of threads has indeed been reached, the task is rejected by rejecting the policy object handler.

The overall execution process is as follows, with solid black dots on the left representing the start of the process and black concentric circles on the bottom representing the end of the process:

The thread pool constructor parameters are used for all parameters except the timeout, and you can understand the meaning of each parameter from the flow above. But there is one term that we haven’t explored in depth yet, and that is the meaning of blocking queues.

Blocking queues in thread pools

A blocking queue is a data structure that is a queue (similar to a List) that can hold anywhere from 0 to N elements. We can insert and eject elements from this queue, which can be interpreted as an operation to fetch and remove an element from the queue. When there are no elements in the queue, fetching the queue will be blocked until an element is inserted. When the queue is full, insertions to the queue will be blocked until an element is ejected.

Such a data structure is well suited to the thread pool scenario, where a worker thread is blocked when there is no work to process and is not woken up until a new task is submitted.

In a thread pool, different blocking queue types can be affected differently by the behavior of the thread pool. Here are three of the most common blocking queue types:

  1. Straight company line up toSynchronousQueueClass represents, and the queue does not store any tasks. When a task submission thread tries to add the pending task to the queue, it will be blocked. When a task processing thread tries to obtain the pending task from the queue, it will have direct contact with the task submission thread in the blocked state, and the task submission thread will directly hand the task to the task execution thread.
  2. Unbounded queue toLinkedBlockingQueueClass represents, and an unlimited number of tasks can be stored in the queue. This queue will never fail because the queue is full, so we can see that when using an unbounded queue, the number of threads in the thread pool can only reach the core number of threads and no longer grow. The maximum number of threads maximumPoolSize parameter does not take effect.
  3. Bounded queue toArrayBlockingQueueClass is represented and can hold a fixed number of tasks. This type of queue is commonly used in practice because it does not consume too many resources by holding too many tasks (unbounded queue) and does not affect system performance by blocking the task submission thread (direct queue). In general, the bounded queue is more balanced in practice.

Read the source code for the execute method

In the IDE, such as IDEA, we can jump to the JDK source code for the ThreadPoolExecutor class by clicking on the ThreadPoolExecutor class in our sample code. In the source code, we can see various comments left by “Doug Lea”, the creator of the Java.util.Concurrent package. The image below is a screenshot of the source code for the class.

The content of these notes is very valuable for reference, and it is suggested that competent readers can read it by themselves. Let’s take a step-by-step look at the source code of the thread pool class ThreadPoolExecutor. But this step is not necessary and can be skipped.

Execute (ThreadPoolExecutor); execute (ThreadPoolExecutor);

public void execute(Runnable command) {// Check if the submitted task is emptyif (command== null) throw new NullPointerException(); Int c = ctl.get(); // Check whether the current number of threads reaches the number of core threadsif(workerCountOf(c) < corePoolSize) {// If the number of core threads is not reached, a new thread is created // and the incoming task is the first task of the threadif (addWorker(command.trueIf the thread is added successfully, the thread is returned. Otherwise, the execution continuesreturn; C = ctl.get(); c = ctl.get(); } // Determine if the thread pool is currently running // If so, call the workqueue.offer method to put the task on the blocking queueif (isRunning(c) && workQueue.offer(commandInt recheck = ctl.get(); int recheck = ctl.get(); // If the current state is not running, take out the task that was put in the blocking queue. If the task is taken out successfully, reject the task directlyif (! isRunning(recheck) && remove(command))
            reject(command);
        else if(workerCountOf(recheck) == 0) // If there are no threads in the pool, create an addWorker(null,false); } // If putting into the blocking queue fails (e.g. the queue is full), add a threadelse if(! addWorker(command.false)) // If adding a thread fails (for example, the maximum number of threads has been reached), reject the task (command);
}
Copy the code

In this source code, we can see that the thread pool is created by the addWorker method, which refers to the Worker object in the ThreadPoolExecutor class that wraps and manages threads. If you want to know the specific execution process of the Worker class, you can read the next article that dissects the task execution process of the thread pool in depth.

timeout

So what role does a timeout that we haven’t talked about play in this process? As you can see from the previous section, the number of threads is divided into core threads and maximum threads. A thread that blocks when there are no tasks to execute is called an idle thread. Once a new task is submitted, the thread exits the blocking state and starts executing the new task.

If the total number of threads in the current thread pool is greater than the number of core threads, any thread that is idle for longer than the timeout period will be destroyed. If the total number of threads in the thread pool is less than or equal to the number of core threads, the timeout thread will not be destroyed (except in some special cases). This is where the timeout parameter comes in.

Other thread pool operations

Closing the thread pool

Previous code using thread pools to perform tasks has used the shutdown() method, which is a way to turn off a thread pool, in order to wait for all tasks in the pool to complete. For ThreadPoolExecutor, there are two main ways to turn off a thread pool:

  1. shutdown()In order to close the thread pool, the thread pool will let the submitted task complete execution, but will not accept new tasks.
  2. shutdownNow()When the thread pool is closed, the running tasks in the thread pool are interrupted, and the tasks waiting to be executed are not executed, but the tasks still waiting in the blocking queue are returned as the return value.

Monitor thread pool health

We can get the current running information of the thread pool by calling several methods on the thread pool object:

  • GetTaskCount, the estimated total number of completed, executing, and waiting tasks in the thread pool. The final result is not an accurate value because the tasks will change dynamically during the statistical process.
  • GetCompletedTaskCount, the total number of completed tasks in the thread pool, which is also an estimate;
  • GetLargestPoolSize, the maximum number of threads ever created by the thread pool. This data tells us whether the thread pool is full, i.e., maximumPoolSize;
  • GetPoolSize, the current number of threads in the thread pool;
  • GetActiveCount, an estimate of the number of threads executing tasks in the current thread pool.

There are four common thread pools

Most of the time, we also don’t create ThreadPoolExecutor objects directly. Instead, we create purpose-specific thread pools by using a few static methods for Executors as needed. There are four common thread pools:

  1. Cacheable thread pool, usedExecutors.newCachedThreadPoolMethod to create
  2. Fixed length thread pool, usedExecutors.newFixedThreadPoolMethod to create
  3. Deferred task thread pool, usedExecutors.newScheduledThreadPoolMethod to create
  4. Single-threaded thread pool, usedExecutors.newSingleThreadExecutorMethod to create

Let’s take a look at the characteristics and scenarios of different types of thread pools using the source code for these static methods.

Cacheable thread pools

We jump through in the IDE in the JDK source code can be easily view, here are Executors. NewCachedThreadPool method in the source code. As you can see from the code, the cacheable thread pool is actually created by creating the constructor of the ThreadPoolExecutor class directly, but the parameters are already set, so we don’t need to do any specific Settings. So the main thing to look at is how a configured ThreadPoolExecutor object is generated in this method, and what scenarios such a thread pool is suitable for.

From the following code, we can see that the values passed to the ThreadPoolExecutor constructor are: – corePoolSize Specifies the number of core threads to be 0, indicating that the number of threads in the thread pool can be 0. – maximumPoolSize specifies the maximum number of threads to be integer. MAX_VALUE, indicating that the number of threads in the thread pool can be unlimited. Indicates that threads in the thread pool will be reclaimed after 60 seconds of idle time – a SynchronousQueue of type blocking is passed in, meaning that each new task is immediately processed by a worker thread

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
Copy the code

Therefore, cacheable thread pool will preferentially use idle threads when adding tasks. If not, a new thread will be created. There is no upper limit on the number of threads, so each task will be immediately allocated to a worker thread for execution without waiting in a blocking queue. If a thread pool is idle for a long time, all threads in it are destroyed, saving system resources.

  • advantages
    • Tasks can be executed immediately after they are added without having to wait in a blocking queue
    • Threads are not kept when idle, saving system resources
  • disadvantages
    • There is no limit to the number of threads, which may consume excessive system resources
  • Applicable scenario
    • This method is suitable for scenarios with a large number of short – time tasks and high response time requirements

Fixed-length thread pool

The values passed to the ThreadPoolExecutor constructor are:

  • CorePoolSize Specifies the number of core threads and maximumPoolSize specifies the maximum number of threadsnThreads, that is, the number of threads in the thread pool stays atnThreadsSo it’s called “fixed-length thread pool”
  • The timeout is set to 0 milliseconds because there are only core threads in the thread pool, so you don’t need to consider timeout release
  • The last parameter uses an unbounded queue, so tasks can be added indefinitely to the blocking queue waiting for execution if all threads are working on them
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
Copy the code

The number of threads in the fixed-length thread pool will grow to nThreads, and the number of threads will remain at nThreads until idle threads are released. If all threads are busy when a task is added, the task is added to a blocking queue for execution. There is no upper limit to the total number of tasks in the blocking queue.

  • advantages
    • The number of threads is fixed and the consumption of system resources is controllable
  • disadvantages
    • When the number of tasks increases rapidly, the thread pool will not grow flexibly, which will delay the task completion time
    • If the number of threads is set too small, unbounded queues may lead to excessive backlog of tasks, resulting in late task completion time and excessive resource consumption
  • Applicable scenario
    • Scenarios where the peak of the task volume is not too high and the task does not require high response time

Delayed task thread pool

Differs from the previous two methods, Executors newScheduledThreadPool returns ScheduledExecutorService interface object, can provide executed, time delay, and other functions. The thread pool configuration has the following characteristics:

  • MaximumPoolSize The maximum number of threads is unlimited, and a large number of new threads can be created to execute the task when the task volume is large
  • The timeout period is 0 and the thread is destroyed as soon as it is idle
  • A delay work queue is used. The elements in the delay work queue have their expiration time. Only the expired elements can be ejected
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
Copy the code

The delayed task thread pool implements the ScheduledExecutorService interface, which is mainly used for delayed and scheduled execution.

Single-threaded thread pool

There is only one worker thread in the single-threaded thread pool, which ensures that added tasks are executed in the specified order (FIFO, LIFO, priority). But if there’s only one Thread in the pool, why would we use a Thread pool instead of threads? There are two main advantages in this case: one is that we can easily submit tasks for asynchronous execution through a shared thread pool, rather than managing the thread lifecycle ourselves; Second, we can use the task queue and specify the task execution order, it is easy to achieve the function of task management.

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
Copy the code

conclusion

In this article, we start from the concept and basic use of thread pool, through the execute method of the source code in-depth analysis of the whole process of task submission and the role of each thread pool constructor parameters in the actual running process of thread pool. You actually read the source code for the Execute method of the thread pool class ThreadPoolExecutor. Finally, we looked at other common operations on thread pools and four common thread pools.

This concludes our thread pool source code tour and hopefully gives you an idea of how to use and run a thread pool. Why say just have a general impression? Because I think many readers who don’t have the basic knowledge of thread pools are just getting an idea of them, and some of the details may not be fully captured. So I encourage you to go back to the beginning of this article and read it a few more times, because IT was my third reading of the Source code for the ThreadPoolExecutor class that really cracked some of the key points.

primers

In this article, we’ve only explored the basic use of thread pools and the source code for the commit task method Execute. So how does the task get executed by the thread pool after it’s submitted? We’ll find out in the next article, which will delve into the thread pool task execution process.