“This is the 10th day of my participation in the First Challenge 2022. For details: First Challenge 2022”

Why use thread pools

In Java programs, we usually create a thread to execute after receiving a task, but if there are many tasks, we need to create and destroy threads frequently, which consumes more resources, and the response speed is relatively slow.

To avoid wasting resources by frequently creating and destroying threads, and to improve response times, we can optimize by using thread pools.

Thread pool advantage

Thread pool is to put threads into a “pool”. When a task needs to be executed, a thread from the pool is selected to execute it. Using thread pool has the following advantages:

  • Reduced resource consumption: Reduces the overhead of frequent thread creation and destruction by fetching threads directly from the pool.
  • Improved response time: When a task arrives, it is fetched directly from the pool without waiting for a thread to be created.
  • Improve thread manageability: threads can be managed to avoid creating multiple threads. Parameters can be tuned and monitored according to system characteristics.

ThreadPoolExecutor class

Java. Uitl. Concurrent. ThreadPoolExecutor class is the core of a thread pool class, there are four constructor:

public ThreadPoolExecutor(    int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {}Copy the code

Detailed explanation of related parameters of the construction method

  • CorePoolSize: number of core threads. After the thread pool is created, the number of threads in the pool is 0.
  • MaximumPoolSize: Maximum number of threads: the maximum number of threads that can be held in the thread pool. When the number of threads reaches corePoolSize and the workQueue is full, additional threads are created and added to the thread pool if new tasks come in before maximumPoolSize is reached.
  • KeepAliveTime: indicates the keepAliveTime when a thread is idle
  • unit: Enumeration type, the unit of keepAliveTime, with the following enumeration values
    • TimeUnit.DAYS; / / day
    • TimeUnit.HOURS; / / hour
    • TimeUnit.MINUTES; / / minute
    • TimeUnit.SECONDS; / / SEC.
    • TimeUnit.MILLISECONDS; / / ms
    • TimeUnit.MICROSECONDS; / / subtle
    • TimeUnit.NANOSECONDS; / / nanoseconds
  • workQueue: blocking queue, used to temporarily store tasks waiting to be executed. Common options are:
    • ArrayBlockingQueue: array-based first-in, first-out queue that must be created with a specified size;
    • LinkedBlockingQueue: a first in, first out queue based on a list, default to integer.max_value if the queue size is not specified when created;
    • SynchronousQueue: This queue is special in that it does not hold submitted tasks and instead creates a new thread to execute the new tasks.
    • PriorityBlockingQueue: An unbounded blocking queue that supports priority sorting
    • DelayQueue: An unbounded blocking queue implemented using priority queues.
  • ThreadFactory: A factory for creating threads.
  • handler: Reject policy. When the task queue of the thread pool reaches maximumPoolSize, a new task will be rejected. There are four types of reject policy:
    • ThreadPoolExecutor. AbortPolicy: discard task throws an exception;
    • ThreadPoolExecutor. DiscardPolicy: discard task but not throw exceptions;
    • Every time ThreadPoolExecutor. DiscardOldPolicy: new task, discarding the waiting queue in front of the tasks.

There are several very important methods in the ThreadPoolExecutor class

  • Execute () : submits the task to a thread pool for execution.

  • Submit () : As with execute(), submit a task to the thread pool, which executes it. As with any ExecutorService, this method declaration is implemented in AbstractExecutorService, and is not overridden in ThreadPoolExecutor. Instead, it calls the submit() method of its parent class, unlike execute(). It can return the result of task execution, using the Future to retrieve the result of task execution.

  • Shutdown () : The thread pool is not terminated immediately, but is not terminated until all tasks in the task cache queue have completed, but no new tasks are accepted.

  • ShutdownNow () : Immediately terminates the thread pool and attempts to interrupt tasks in progress, and empties the task cache queue to return tasks that have not yet been executed.

Thread pool state

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code

CTL contains two parts of information: the running status and the number of threads. The higher three bits report runState and the lower 29 bits save workCount.

There are five thread pool states:

  • RUNNING: Can accept new tasks or process tasks in a blocking queue.
  • SHUTDOWN: Call SHUTDOWN () to enter this state, close the state, no longer accept new tasks, can continue to process the existing tasks in the blocking queue;
  • STOP: Call shutdownNow() to enter this state, no new task is accepted, no task is processed, and the thread being processed is interrupted;
  • TIDYING: All tasks have terminated, workCount is 0;
  • TERMINATED: State of death, entered after the TERMINATED () method is executed.

The instance

Create a thread pool:

ThreadPoolExecutor executor = new ThreadPoolExecutor(5.10.20, 
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10));
Copy the code

The number of core threads is 5, the maximum number of threads is 10, the idle lifetime is 20 seconds, and the blocking queue is LinkedBlockingQueue with a size of 10.

Create a task class MyTask that implements the Runnable interface and override the run() method.

class MyTask implements Runnable{
    private final int taskNum;
    public MyTask(int num){
        this.taskNum=num;
    }

    @Override
    public void run(a) {
        System.out.println("Thread"+Thread.currentThread().getName()+"In action :"+taskNum);
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Task"+taskNum+"Executed!"); }}Copy the code

Cyclic submit task

for (int i=0; i<15; i++){ MyTask task =new MyTask(i);
    executor.execute(task);
    System.out.println("Number of threads in the thread pool:"+executor.getPoolSize()
            +", number of tasks waiting in the queue:" +executor.getQueue().size()
    +", number of completed tasks:"+executor.getCompletedTaskCount());
}
Copy the code

Complete code:

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test01 {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5.5.20,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10));
        for (int i=0; i<15; i++){ MyTask task =new MyTask(i);
            executor.execute(task);
            System.out.println("Number of threads in the thread pool:"+executor.getPoolSize()
                    +", number of tasks waiting in the queue:" +executor.getQueue().size()
            +", number of completed tasks:"+executor.getCompletedTaskCount()); } executor.shutdown(); }}class MyTask implements Runnable{
    private final int taskNum;
    public MyTask(int num){
        this.taskNum=num;
    }

    @Override
    public void run(a) {
        System.out.println("Thread"+Thread.currentThread().getName()+"In action :"+taskNum);
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Task"+taskNum+"Executed!"); }}Copy the code

Running results:

The thread pool- 1-thread- 1Performing tasks:0Number of threads in the thread pool:1, the number of tasks waiting in the queue:0, the number of completed tasks:0Number of threads in the thread pool:2, the number of tasks waiting in the queue:0, the number of completed tasks:0Number of threads in the thread pool:3, the number of tasks waiting in the queue:0, the number of completed tasks:0The thread pool- 1-thread2 -Performing tasks:1The thread pool- 1-thread- 3Performing tasks:2Number of threads in the thread pool:4, the number of tasks waiting in the queue:0, the number of completed tasks:0The thread pool- 1-thread4 -Performing tasks:3Number of threads in the thread pool:5, the number of tasks waiting in the queue:0, the number of completed tasks:0The thread pool- 1-thread- 5Performing tasks:4Number of threads in the thread pool:5, the number of tasks waiting in the queue:1, the number of completed tasks:0Number of threads in the thread pool:5, the number of tasks waiting in the queue:2, the number of completed tasks:0Number of threads in the thread pool:5, the number of tasks waiting in the queue:3, the number of completed tasks:0Number of threads in the thread pool:5, the number of tasks waiting in the queue:4, the number of completed tasks:0Number of threads in the thread pool:5, the number of tasks waiting in the queue:5, the number of completed tasks:0Number of threads in the thread pool:5, the number of tasks waiting in the queue:6, the number of completed tasks:0Number of threads in the thread pool:5, the number of tasks waiting in the queue:7, the number of completed tasks:0Number of threads in the thread pool:5, the number of tasks waiting in the queue:8, the number of completed tasks:0Number of threads in the thread pool:5, the number of tasks waiting in the queue:9, the number of completed tasks:0Number of threads in the thread pool:5, the number of tasks waiting in the queue:10, the number of completed tasks:0task2Execution complete! task0Execution complete! task3Execution complete! task1Execution complete! task4Execution complete! The thread pool- 1-thread2 -Performing tasks:8The thread pool- 1-thread4 -Performing tasks:7The thread pool- 1-thread- 1Performing tasks:6The thread pool- 1-thread- 3Performing tasks:5The thread pool- 1-thread- 5Performing tasks:9task8Execution complete! task9Execution complete! The thread pool- 1-thread2 -Performing tasks:10task7Execution complete! task5Execution complete! task6Execution complete! The thread pool- 1-thread- 3Performing tasks:13The thread pool- 1-thread4 -Performing tasks:12The thread pool- 1-thread- 5Performing tasks:11The thread pool- 1-thread- 1Performing tasks:14task10Execution complete! task13Execution complete! task11Execution complete! task14Execution complete! task12Execution complete!Copy the code

Analysis of execution results:

The number of core threads in the thread pool is 5, so tasks 0,1,2,3, and 4 are executed by the threads created by the thread pool respectively. Tasks 5-14 enter the blocking queue, and the number of waiting tasks in the blocking queue is 10. When tasks 0,1,2,3, and 4 are completed, Tasks 5,6,7,8, and 9 are executed by threads pool-1-thread-1 – pool-1-thread-5 in sequence. The specific sequence is not determined. Tasks 10-14 in the blocking queue are executed after tasks 5-9 are executed.

If you change the blocking queue size to 5

ThreadPoolExecutor executor = new ThreadPoolExecutor(5.10.20, 
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(5));
Copy the code

Execution Result:

The thread pool- 1-thread- 1Performing tasks:0Number of threads in the thread pool:1, the number of tasks waiting in the queue:0, the number of completed tasks:0Number of threads in the thread pool:2, the number of tasks waiting in the queue:0, the number of completed tasks:0The thread pool- 1-thread2 -Performing tasks:1Number of threads in the thread pool:3, the number of tasks waiting in the queue:0, the number of completed tasks:0The thread pool- 1-thread- 3Performing tasks:2Number of threads in the thread pool:4, the number of tasks waiting in the queue:0, the number of completed tasks:0The thread pool- 1-thread4 -Performing tasks:3Number of threads in the thread pool:5, the number of tasks waiting in the queue:0, the number of completed tasks:0The thread pool- 1-thread- 5Performing tasks:4Number of threads in the thread pool:5, the number of tasks waiting in the queue:1, the number of completed tasks:0Number of threads in the thread pool:5, the number of tasks waiting in the queue:2, the number of completed tasks:0Number of threads in the thread pool:5, the number of tasks waiting in the queue:3, the number of completed tasks:0Number of threads in the thread pool:5, the number of tasks waiting in the queue:4, the number of completed tasks:0Number of threads in the thread pool:5, the number of tasks waiting in the queue:5, the number of completed tasks:0Number of threads in the thread pool:6, the number of tasks waiting in the queue:5, the number of completed tasks:0Number of threads in the thread pool:7, the number of tasks waiting in the queue:5, the number of completed tasks:0The thread pool- 1-thread- 6Performing tasks:10Number of threads in the thread pool:8, the number of tasks waiting in the queue:5, the number of completed tasks:0The thread pool- 1-thread7 -Performing tasks:11Number of threads in the thread pool:9, the number of tasks waiting in the queue:5, the number of completed tasks:0The thread pool- 1-thread- 8 -Performing tasks:12The thread pool- 1-thread9 -Performing tasks:13Number of threads in the thread pool:10, the number of tasks waiting in the queue:5, the number of completed tasks:0The thread pool- 1-thread- 10Performing tasks:14
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task testThread.MyTask@12a3a380 rejected from java.util.concurrent.ThreadPoolExecutor@29453f44[Running.pool size = 10.active threads = 10.queued tasks = 5.completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at testThread.Test01.main(Test01.java:16) tasks13Execution complete! task3Execution complete! task2Execution complete! task1Execution complete! task12Execution complete! task11Execution complete! task10Execution complete! task14Execution complete! task0Execution complete! task4Execution complete! The thread pool- 1-thread- 8 -Performing tasks:9The thread pool- 1-thread2 -Performing tasks:8The thread pool- 1-thread4 -Performing tasks:7The thread pool- 1-thread- 3Performing tasks:6The thread pool- 1-thread9 -Performing tasks:5task8Execution complete! task9Execution complete! task7Execution complete! task6Execution complete! task5Execution complete!Copy the code

Analysis of execution results:

We can see that when the number of threads in the thread pool is 10 and the number of tasks waiting in the queue is 5, the new task comes and throws an exception.

In the above execution result, when the wait queue is full, the number of core threads is also full, and the number of threads has not reached the maximum, the new task will arrive and be executed before the wait queue.

The default rejection policy is AbortPolicy, which throws an exception and rejects the new task.

The time to reject a task is when the number of threads is equal to maximumPoolSize and the blocking queue is full.

There are four common thread pools

ExecutorService is a class provided by Java for managing thread pools. This class has two functions: controlling the number of threads and reusing threads.

  • Executors.newCacheThreadPool()

    Cache the thread pool. When a new task is created, check whether there are threads available in the pool. If there are no threads available, create a new thread to execute the task in the pool.

public static ExecutorService newCachedThreadPool(a) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
Copy the code
The maximum number of threads is integer. MAX_VALUE, the thread idle lifetime is 60 seconds, and the SynchronousQueue is used. New tasks are not queued.Copy the code
  • Executors.newFixedThreadPool(int nThreads)

    A thread pool with a fixed number of threads is constructed with a fixed number of threads nThreads. When there are no idle threads, tasks are queued to wait.

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
Copy the code
  • Executors.newScheduledThreadPool(int corePoolSize)

    Create a fixed – length thread pool to support scheduled and periodic task execution.

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
public ScheduledThreadPoolExecutor(int corePoolSize) {
      super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
            new DelayedWorkQueue());
  }
Copy the code

Delay execution schedule() :

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        scheduledExecutorService.schedule(
          () -> System.out.println("Output after delay of 2 seconds"),
          2,
          TimeUnit.SECONDS
          );
        scheduledExecutorService.shutdown();
Copy the code

Execute scheduleAtFixedRate() periodically:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        scheduledExecutorService.scheduleAtFixedRate(
          ()-> System.out.println("Periodic cycle"),
          0.2,TimeUnit.SECONDS);
Copy the code

Another method is to execute it regularly: scheduleWithFixedDelay();

It is also deferred, but there are differences:

ScheduleAtFixedRate () executes a task at a fixed time. If the execution time of a previous task is longer than the delay time, the next task is executed immediately.

ScheduleWithFixedDelay () indicates that the next task is executed after the last task is executed.

Example:

scheduleAtFixedRate():

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        scheduledExecutorService.scheduleAtFixedRate(()-> {
            try {
                System.out.print("System time:"+ LocalDateTime.now().getMinute()+":"+LocalDateTime.now().getSecond());
                System.out.println("Postpone the next mission.");
                Thread.sleep(6000);
                System.out.println("Sleep 6 seconds, delay 5 seconds.");
            } catch(InterruptedException e) { e.printStackTrace(); }},0.5,TimeUnit.SECONDS);
Copy the code

Execution Result:

You can see that the next task is executed immediately after the last one

ScheduleWithFixedDelay () :

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
scheduledExecutorService.scheduleWithFixedDelay(()-> {
    try {
        System.out.print("System time:"+ LocalDateTime.now().getMinute()+":"+LocalDateTime.now().getSecond());
        System.out.println("Postpone the next mission.");
        Thread.sleep(6000);
        System.out.println("Sleep 6 seconds, delay 5 seconds.");
    } catch(InterruptedException e) { e.printStackTrace(); }},0.5,TimeUnit.SECONDS);
Copy the code

Execution Result:

You can see that the next task is executed 11 seconds later (5 seconds after the last task is completed).

  • Executors.newSingleThreadExecutor()

Single dog thread pool, there is only one thread, the idle thread is destroyed, all tasks in order to execute.

public static ExecutorService newSingleThreadExecutor(a) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
Copy the code

There is an OOM risk when using the newCacheThreadPool and newFixedThreadPool thread pools. If each thread created is always running, new tasks will be created continuously. You are advised to set thread pool parameters as required.

Dynamic parameter setting

JDK native thread pool ThreadPoolExecutor provides the following public setter methods to dynamically set thread pool parameters.


The resources

  • Implementation principle of Java thread pool and its practice in Meituan business

  • Four common thread pools are introduced

  • ScheduleAtFixedRate is different from scheduleWithFixedDelay

  • Java Thread Pool application and Principle Analysis (JDK1.8)