The core class for thread pools in Java is ThreadPoolExecutor.

What’s great about using thread pools?

Why use thread pools

If we don’t apply thread pools, we create one thread for each task and 1000 threads for each task.

Problems:

The first point is that creating and destroying threads repeatedly is expensive. It takes time for each thread to be created and destroyed. If the task is relatively simple, it may cause the creation and destruction of threads to consume more resources than the task itself.

Second point: If too many threads are created, too many threads will occupy too much memory and other resources, cause too many context switches, and cause system instability.

How is thread pooling resolved

(1) In view of the problem that the system overhead of repeatedly creating and destroying threads is high, the thread pool uses some fixed threads to keep alive all the time to repeatedly execute tasks, that is, the number of core threads.

(2) In view of the problem that too many threads occupy too much memory resources, the thread pool will create threads according to the need to control the total number of threads to avoid occupying too much memory resources. Concepts such as core thread count, maximum thread count and blocking queue are introduced.

How do I use thread pools

The core class for thread pools in Java is ThreadPoolExecutor. The role of each parameter will be explained later.

public static void main(String[] args) {
    Create a thread pool
    ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
            // Number of core threads
            Runtime.getRuntime().availableProcessors(),
            // Maximum number of threads
            Runtime.getRuntime().availableProcessors() * 2 + 1.// Survival time
            5.// Time unit
            TimeUnit.SECONDS,
            // block the queue
            new ArrayBlockingQueue<>(10),
            // Create a factory for the thread
            Executors.defaultThreadFactory(),
            // Reject the policy
            new ThreadPoolExecutor.AbortPolicy());
    // Perform tasks through a thread pool
    poolExecutor.execute(() -> {
        System.out.println("Perform tasks from thread pools");
    });
}
Copy the code

The benefits of using thread pools

Three benefits:

Thread pools can solve the system overhead of thread creation and destruction, and also speed up response time.

Because threads in the thread pool are reusable, we use only a few threads to perform a large number of tasks, which greatly reduces the overhead of the thread life cycle. And threads are usually not created AD hoc, waiting for a task to be received. Because even core threads are created after the task arrives, rather than directly), they are already created ready to execute the task, which eliminates thread creation delays, improves response times and enhances the user experience.

2. Thread pool can manage the number of threads to avoid improper use of resources.

The thread pool flexibly controls the number of threads based on the configuration and number of tasks, creating when there are not enough threads and recycling when there are too many, avoiding memory overflow due to too many threads or WASTING CPU resources due to too few threads. It strikes a perfect balance.

3. Thread pools can centrally manage resources.

For example, thread pool can unify the management of task queue and thread, and can uniformly start or end tasks, which is more convenient and easier to manage than a single thread to process tasks one by one. At the same time, it is also conducive to data statistics, for example, we can easily count the number of tasks that have been executed.

Thread pool processing flow

(1) Submit the task to the thread pool

(2) Judge whether the core threads are in working state:

  • If they are working, the number of core threads has reached the specified number and are submitted to the blocking queue for processing.
  • If a core thread is not working, it indicates that a core thread is idle, and the core thread is invoked to process the task. In another case, the existing core threads are working, but the number of core threads does not reach the specified number, and a thread is created as the core thread to process the task.

(3) Determine whether the queue is full:

  • If the queue is full, it is directly submitted to the queue for processing.
  • If the queue is full, the logic enters to determine whether the current number of threads exceeds the maximum number of threads.

(4) Judge whether the current number of threads exceeds the maximum number of threads:

  • If it has been exceeded, then the appropriate policy is invoked to handle the task (such as discarding,JDKOffers four rejection strategies, described below).
  • If not, a thread is created to execute the task.

Thread pool seven parameters

ThreadPoolExecutor constructor

corePoolSize

CorePoolSize refers to the number of core threads. When the thread pool is initialized, the default number of threads is 0. When a new task is submitted, a new thread is created to execute the task. It will not be destroyed even if there may be no future mission to execute.

maximumPoolSize

MaximumPoolSize refers to the maximum number of threads in the thread pool. After the queue is full, the pool creates new threads up to maximumPoolSize to handle the multi-task scenario. If there are idle threads in the future, threads larger than corePoolSize are reclaimed.

Normally, the number of threads in a thread pool is in a closed range between corePoolSize and maximumPoolSize.

keepAliveTime

KeepAliveTime refers to the thread lifetime. This parameter is set for maximumPoolSize.

When the number of threads in the thread pool exceeds the number of core threads, and there is no work to do, the thread pool checks the keepAliveTime of the thread. If the specified time is exceeded, the thread with nothing to do is destroyed to reduce memory usage and resource consumption.

TimeUnit

TimeUnit refers to the keepAliveTime unit.

This class is commonly used for thread sleep, such as timeunit.seconds.sleep (1); Indicates that the thread sleeps for 1 second.

workQueue

A workQueue is a blocking queue for temporary tasks, as discussed below.

threadFactory

A threadFactory is a threadFactory that produces threads to perform tasks.

We can choose to use the default thread factory, create threads in the same thread group, and has the same priority, and is not a daemon thread, we can also choose their own custom thread factory, for easy to thread the custom name, different thread pool threads are usually based on specific business to customize different threads.

The source code of the interface is as follows:

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}
Copy the code

handler

Handler refers to the rejection policy of a task, which is enabled when both the queue and the maximum thread are full to determine the processing of the task. More on that later.

Blocking queues commonly used by thread pools

ArrayBlockingQueue

Underneath it is an array, and when you use it you have to specify the size.

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
Copy the code

LinkedBlockingQueue

Underneath it is a linked list whose default size is Integer’s maximum if we do not specify the size.

public LinkedBlockingQueue(a) {
    this(Integer.MAX_VALUE);
}
Copy the code

DelayedWorkQueue

DelayedWorkQueue is characterized by the fact that the internal elements are not sorted by the time they were put in, but by the amount of time they were delayed, using a “heap” data structure.

SynchronousQueue

This queue is a bit special and has no capacity. That is, we cannot store tasks in this queue.

If this queue is applied, it means: the task I submit must be executed immediately. Don’t put me on hold. Because if the core threads are working, it’s going to commit to the queue, and the queue just doesn’t accept, and you have to create a thread to do that, don’t bother me.

Thread pool four rejection policies

The thread pool will reject a newly submitted task in two cases.

  • The first case is when we callshutdownAfter the thread pool is closed, the thread pool will be rejected if the thread pool is closed, even though there may still be unfinished tasks in the thread pool.
  • The second case is when the thread pool has no capacity to continue processing newly submitted tasks.

AbortPolicy

The rejection strategy in refused to task, can throw a type directly for RejectedExecutionException RuntimeException, let you perceive task was refused, so you can choose according to the business logic retry submitted or give up strategy, etc.

DiscardPolicy

This rejection policy is just as its name describes. When a new task is submitted, it will be directly discarded without any notification. Relatively speaking, there is a certain risk, because we do not know that the task will be discarded at the time of submission, which may cause data loss.

DiscardOldestPolicy

If the thread pool is not closed and unable to perform, will be the first node, discarding the task queue is usually the longest survival time of tasks, this strategy is different from the second is that it abandoned not new submission, but in the queue the longest survival time, so you can make room for new submission task, but in the same way it also exists certain risk of data loss.

CallerRunsPolicy

When a new task is submitted, if the thread pool is not closed and cannot execute it, the task is handed over to the thread that submitted the task. That is, whoever submits the task is responsible for executing the task.

This has two main advantages:

  • First, newly submitted tasks are not discarded, so there is no loss of business.
  • The second advantage is that because who submit task who will be responsible for performing the task, so submit task thread responsible for performing tasks, and perform the task is more time consuming, during this period, submit task thread is occupied, also won’t submit new tasks, slow down the speed of tasks submission, rather then a negative feedback. During this period, threads in the thread pool can also make full use of this period to execute some tasks, freeing up some space, which is equivalent to giving the thread pool a certain buffer period.

Thread pools provided by the JDK

FixedThreadPool

Create by using the Executors tool class, source code is as follows:

Executors.newFixedThreadPool(5);

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
Copy the code

We can see that the number of core threads is the same as the maximum number of threads.

CachedThreadPool

Create by using the Executors tool class, source code is as follows:

Executors.newCachedThreadPool();

public static ExecutorService newCachedThreadPool(a) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
Copy the code

As you can see, its core thread count is 0 and the maximum number of threads is Integer, which means that it can create as many threads as you want. This thread pool is not recommended.

ScheduledThreadPool

It supports timed or periodic execution of tasks. For example, to perform a task every 10 seconds, there are three main ways to achieve this function, code:

ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
/ / (1)
service.schedule(new Task(), 10, TimeUnit.SECONDS);
/ / (2)
service.scheduleAtFixedRate(new Task(), 10.10, TimeUnit.SECONDS);
/ / (3)
service.scheduleWithFixedDelay(new Task(), 10.10, TimeUnit.SECONDS);
Copy the code
  • The first wayscheduleIf the parameter in the code is set to 10 seconds, that is, the task will be executed after 10 seconds.
  • The second wayscheduleAtFixedRateThe second argument to the task, which is performed at a fixed frequencyinitialDelayRepresents the first delay time, and the third parameterperiodIndicates the period, that is, how long it takes to execute a task after the first delay.
  • The third wayscheduleWithFixedDelaySimilar to the second method, the task is executed periodically, the difference is the definition of the period, the previous onescheduleAtFixedRateStart the timer at the beginning of the task, and execute the second task when the time is up, regardless of how long the task takes to execute; whilescheduleWithFixedDelayMethod starts the timer at the end of the task as the start of the next loop.

SingleThreadExecutor

Create by using the Executors tool class, source code is as follows:

Executors.newSingleThreadExecutor();

public static ExecutorService newSingleThreadExecutor(a) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
Copy the code

As you can see, it has a core thread count of 1 and a maximum thread count of 1. Suitable for scenarios where all tasks need to be executed in the order in which they are submitted.

ForkJoinPool

It differs from other thread pools in two ways:

(1) It is very suitable for performing tasks that can generate subtasks.

(2) Each thread in a ForkJoinPool has its own separate queue of tasks

ForkJoinPool is ideal for recursive scenarios such as tree traversal, optimal path searching, and so on.

How to choose the thread pool provided by the JDK?

Answer first: no, you usually use your own custom thread pool.

Let’s take a look at some of the drawbacks of the thread pools provided by the JDK.

FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
Copy the code

The important point is that the queue we use is LinkedBlockingQueue with unlimited capacity. If we process tasks slowly, as the number of requests increases, more and more tasks will accumulate in the queue. Eventually, a large number of tasks will occupy a large amount of memory and result in OOM, which will affect the whole program. There could be serious consequences.

SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor(a) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
Copy the code

The task queue is still an unbounded LinkedBlockingQueue, so it causes the same problem, which can take up a lot of memory and result in OOM when tasks pile up.

CachedThreadPool

public static ExecutorService newCachedThreadPool(a) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
Copy the code

The maximum number of threads is set to integer.max_value. When the number of tasks is extremely high, it can lead to the creation of so many threads that the operating system cannot create new threads or run out of memory.

Number of CPU cores versus number of threads?

The main reason we adjust the number of threads in the thread pool is to maximize application performance by using resources such as CPU and memory efficiently. In practical work, we need to choose corresponding strategies according to different task types.

CPU intensive task

Such as encryption, decryption, compression, computing and a series of tasks that need a lot of CPU resources.

The optimal number of threads for such a task is 1-2 times the number of CPU cores.

Time-consuming I/O tasks

For example, database, file reading and writing, and network communication tasks do not consume CPU resources, but I/O operations are time-consuming and generally take up a lot of time.

Brain Goetz, author of Java Concurrent Programming In Action, recommends the following calculation method:

Number of threads = number of CPU cores * (1 + Average waiting time/Average working time)Copy the code

conclusion

  • The higher the percentage of average working time of threads, the fewer threads are needed;
  • The higher the average waiting time ratio of threads, the more threads are needed;
  • For different programs, the corresponding actual test can get the most appropriate choice.

How to set the parameters of the custom thread pool?

Core threads

The higher the percentage of average working time of threads, the fewer threads are needed; The higher the proportion of the average wait time for a thread, the more threads are required.

For the maximum number of threads, if the type of task we perform is not fixed, for example, it may be CPU intensive at one time, IO intensive at another time, or there may be a mix of two tasks at the same time. In this case, we can set the maximum number of threads to several times the number of core threads in case of unexpected tasks.

It is better to have different thread pools for different types of tasks, so that tasks are separated by type rather than jumbled together.

Blocking queue

ArrayBlockingQueue is a common blocking queue, which is also commonly used in thread pools. It is an array that requires a capacity to be passed in to create an object and cannot be expanded.

Thread factory

We can use the default defaultThreadFactory, or we can pass in custom thread factories with additional capabilities. Since we may have multiple thread pools and it is necessary to distinguish thread pools by different names, we can pass in thread factories that can be named based on business information. This allows you to quickly locate the problem code by distinguishing between different services based on thread names.

By com.google.com mon. Util. Concurrent. ThreadFactoryBuilder to implement:

ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
ThreadFactory rpcFactory = builder.setNameFormat("rpc-pool-%d").build();
Copy the code

The above code generates thread names in a fixed format: “rpc-pool-1”, “rpc-pool-2”, and so on.

Thread pool parameters are actually hard to set because we can’t estimate beforehand.

Rejection policies

You can select the RejectedExecutionHandler to customize the RejectedExecutionHandler policy based on service requirements. The RejectedExecutionHandler policy can be customized to implement customized RejectedExecutionHandler policy, such as log printing, temporary task execution, and reexecution, to meet service requirements.

Closing the thread pool

shutdown()

It can safely shut down a thread pool.

The thread pool is not shutdown immediately after the shutdown() method is called because there may be many tasks in the pool that are being executed, or there may be a large number of tasks waiting to be executed in the task queue. After the shutdown() method is called, the thread pool is completely shutdown after executing the tasks in progress and the tasks waiting in the queue.

If a new task is submitted after the shutdown() method is called, the thread pool will reject subsequent new tasks based on the rejection policy.

isShutdown()

It can return true or false to determine whether the thread pool has started its shutdown, that is, whether the shutdown or shutdownNow methods have been executed.

Note that if the isShutdown() method returns true, it does not mean that the thread pool has been shut down. It simply means that the thread pool has started the process of shutting down. That is, there may still be threads executing tasks in the thread pool, and there may still be tasks waiting to be executed in the queue.

isTerminated()

This method detects whether the thread pool is truly “terminated,” which means not only that the pool is closed, but that all tasks in the pool have been completed.

awaitTermination()

This method is mainly used to determine the thread pool state.

After the awaitTermination method is called, the current thread attempts to wait for a specified amount of time. The method returns true if the pool is closed and all internal tasks have been executed during the wait time, meaning that the pool is truly “terminated.” Otherwise, fasLE is returned after timeout.

shutdownNow()

This method closes the thread pool immediately.

After the shutdownNow method is executed, all threads in the thread pool are first sent an interrupt signal to try to interrupt the execution of these tasks, and then all tasks waiting in the task queue are moved to a List and returned, We can do some remedial actions based on the returned task List, such as logging and retry later.

The source code is as follows:

public List<Runnable> shutdownNow(a) {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        // Send an interrupt signal
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    // Return to the waiting task
    return tasks;
}
Copy the code

Note that even if we invoke shutdownNow, it is possible that the task will not stop if the interrupted thread does not respond to the interrupt signal, due to limitations in Java that do not recommend forcing the thread to stop.

Develop reading

Implementation principle of Java thread pool and its practice in Meituan business

Remember a thought about the use of thread pools triggered by failures

An online thread pool task problem processing process