What is a thread pool

Thread Pool is a tool to manage threads based on pooling idea, which often appears in multi-threaded server, such as MySQL.

We all know that thread creation and destruction require a certain amount of resource overhead, reducing the overall performance of the computer. So is there a way to avoid frequent thread creation and destruction? This leads to the concept of thread pools, which provide a number of benefits:

  • Reduced resource consumption: Reuse of created threads through pooling techniques to reduce wastage from thread creation and destruction.
  • Improved response time: Tasks can be executed immediately when they arrive without waiting for threads to be created.
  • Improve manageability of threads: Threads are scarce resources. If they are created without limit, they will not only consume system resources, but also cause resource scheduling imbalance due to unreasonable distribution of threads, which reduces system stability. Thread pools allow for uniform allocation, tuning, and monitoring.
  • More and more power: Thread pools are extensible, allowing developers to add more functionality to them. Such as delay timer thread pool ScheduledThreadPoolExecutor, allows a stay of execution or regular task execution.

2. Thread pool design

ThreadPoolExecutor is the core implementation class of ThreadPoolExecutor in Java. Let’s look at the UML class diagram of ThreadPoolExecutor to understand the inheritance of ThreadPoolExecutor.

The top-level interface of ThreadPoolExecutor implementation is Executor, and the top-level interface Executor provides the idea of decoupling task submission from task execution. You do not need to worry about how to create a thread or schedule a thread to execute a task. You only need to provide a Runnable object and submit the execution logic of a task to an Executor. The Executor framework takes care of thread allocation and task execution. The ExecutorService interface adds some new capabilities :(1) extending the ability to execute tasks, and adding ways to generate futures for one or a group of asynchronous tasks; (2) provides methods to manage thread pools, such as stopping them from running. AbstractExecutorService is a high-level abstract class that strings together the process of performing a task, ensuring that the underlying implementation only needs to focus on a single method to perform the task. The lowest implementation class, ThreadPoolExecutor, implements the most complex part of the run. ThreadPoolExecutor will maintain its own life cycle while managing threads and tasks in a good combination to execute parallel tasks.

3. Implementation of thread pool

3.1. Provide four thread pools by Executors:

1. NewCachedThreadPool Creates a cacheable thread pool. If the length of the thread pool exceeds the processing requirement, the thread can be recycled flexibly. The thread pool is infinite, and when the second task is executed and the first task is completed, the thread that executed the first task is reused instead of creating a new thread.

NewFixedThreadPool creates a thread pool of fixed length, which controls the maximum number of concurrent threads. The initial number of threads is the same as the maximum number of threads. If the number of threads to execute is greater than the initial number of threads, the extra thread tasks are queued to the cache for execution.

3. NewScheduledThreadPool Creates a thread pool of fixed length to support the execution of scheduled and periodic tasks.

NewSingleThreadExecutor creates a single threaded thread pool that will only use a single worker thread to execute tasks, ensuring that all tasks are executed in the specified order (FIFO,LIFO, priority);

Thread pools created by Executors have a fatal drawback, take newCachedThreadPool for example
public class ThreadPoolDemo {
    public static void main(String[] args)throws Exception { ExecutorService threadPool = Executors.newCachedThreadPool(); }}Copy the code

If we click on the newCachedThreadPool() method, we see the following:

public static ExecutorService newCachedThreadPool(a) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
Integer.MAX_VALUE specifies the maximum number of worker threads. If the number of worker threads is too high, the user will receive OOM.
Copy the code

So we have to specify the parameters ourselves with new ThreadPoolExecutor().

3.2 Create a thread pool using ThreadPoolExecutor

The construction method is as follows:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
Copy the code
  • CorePoolSize: Represents the number of core threads, which is the best number of threads in the thread pool.
  • MaximumPoolSize: the maximum number of threads that can work when there are many tasks.
  • KeepAliveTime: indicates the idle time of a thread. That is, when the actual number of working threads is less than the maximum number of threads, some threads are in the idle state, when these idle threads reach keepAliveTime will be killed. But make sure the minimum number of threads is corePoolSize.
  • Unit: Indicates the unit of idle time.
  • WorkQueue: a workQueue, in which tasks are placed in the workQueue when the number of worker threads reaches corePoolSize.
  • ThreadFactory: threadFactory, used to create threads.
  • Handler: Rejects the handler, which is what the thread pool should do when the maximum worker thread is reached and the work queue is full.

3.3 Workflow of thread pool.

The flow chart is as follows:

When the task comes, it will first determine whether the number of working threads in the thread pool reaches the number of core threads. If not, it will execute directly. If so, it will check whether the work queue is full. If it is not full, the task is put into the work queue; if it is full, the number of worker threads is increased to process the task. If both worker threads and queues are full, a policy is used to reject the task.

3.4 Rejection Policy

  • AbortPolicy (default) : an exception is thrown directly RejectedExecutionException exception to prevent normal operation of system.

  • CallerRunsPolicy: CallerRunsPolicy: CallerRunsPolicy is a moderation mechanism that does not discard tasks or exceptions, but instead rolls back some tasks to the caller, reducing traffic for new tasks.

  • DiscardOldestPolicy: Discards the longest waiting task in the queue and then adds the current task to the queue to try to submit the current task again.

  • DiscardPolicy: Discards the task without processing or throwing an exception. This is the best solution if you allow task loss.

The code demonstrates the rejection policy:

1.AbortPolicy

public static void main(String[] args)throws Exception {
    ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2.5.1L,TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(3),Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());
    try {
        for (int i = 0; i < 10; i++) {
            poolExecutor.execute(()->{
                System.out.println(Thread.currentThread().getName()+"\t Transact business"); }); }}catch (Exception e) {
        e.printStackTrace();
    } finally {
        poolExecutor.shutdown();
    }
    System.out.println(Thread.currentThread().getName()+"\t Transact business");
}
Copy the code

When both the worker thread and the work queue are full, the thread pool will reject the task and report an error.

2.CallerRunsPolicy

public static void main(String[] args)throws Exception {
    ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2.5.1L,TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(3),Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy());
    try {
        for (int i = 0; i < 10; i++) {
            poolExecutor.execute(()->{
                System.out.println(Thread.currentThread().getName()+"\t Transact business"); }); }}catch (Exception e) {
        e.printStackTrace();
    } finally {
        poolExecutor.shutdown();
    }
    System.out.println(Thread.currentThread().getName()+"\t Transact business");
}
Copy the code

When both the worker thread and the work queue are full, the task is returned to the caller of the thread pool for processing.

3. DiscardOldestPolicy and DiscardPolicy

public static void main(String[] args)throws Exception {
    ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2.5.1L,TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(3),Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.DiscardOldestPolicy());
    try {
        for (int i = 0; i < 10; i++) {
            poolExecutor.execute(()->{
                System.out.println(Thread.currentThread().getName()+"\t Transact business"); }); }}catch (Exception e) {
        e.printStackTrace();
    } finally {
        poolExecutor.shutdown();
    }
    System.out.println(Thread.currentThread().getName()+"\t Transact business");
}
Copy the code

A total of 10 records were output, indicating that a message was discarded.