The thread pool

Pooling technology

The operation of the program, the essence: occupy the resources of the system! , we need to optimize the use of resources, hence the pooling technology Such as thread pool, JDBC connection pool, memory pool, pool, and so on resource pooling resources to create, destroy, very consumption technology: prepare in advance some resources, if someone want to use, just come to me, and back to me after finished, in order to improve efficiency.

Why use thread pools?

Java’s thread pool is the most widely used concurrency framework, and can be used by almost any program that needs to perform tasks asynchronously or concurrently. The benefits of using thread pools properly:

  • Reduce resource consumption. Reduce the cost of thread creation and destruction by reusing already created threads. For example, the worker thread Woker wirelessly loops over tasks in a blocking queue to execute.
  • Improve response speed. When a task arrives, it can be executed immediately without waiting for the thread to be created.

Improve thread manageability.

  • Threads are a scarce resource, and Java’s thread pool provides uniform allocation, tuning, and monitoring of thread resources.

1. Three methods

// Tool class Executors three methods;
public class Demo01 {
    public static void main(String[] args) {
 
        ExecutorService threadPool = Executors.newSingleThreadExecutor();// Single thread
        ExecutorService threadPool2 = Executors.newFixedThreadPool(5); // Create a fixed thread pool size
        ExecutorService threadPool3 = Executors.newCachedThreadPool(); // Retractable
 
        // The thread pool must be closed when it runs out
        try {
 
            for (int i = 1; i <=100 ; i++) {
                // Create threads from a thread pool
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+ " ok"); }); }}catch (Exception e) {
            e.printStackTrace();
        } finally{ threadPool.shutdown(); }}}Copy the code

Source code analysis

public static ExecutorService newSingleThreadExecutor(a) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
Copy the code
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
Copy the code
public static ExecutorService newCachedThreadPool(a) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
Copy the code

The Alibaba Java operation manual clearly states that the initial value of integer. MAX_VALUE is large, so it is generally the caseWe will use the underlying ThreadPoolExecutor to create the thread pool.

2. Seven parameters

public ThreadPoolExecutor(intCorePoolSize, // Core thread pool sizeintMaximumPoolSize, // The maximum thread pool sizelongKeepAliveTime, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler // reject policy) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
Copy the code

1. CorePoolSize (base size of thread pool) :

  • When a task is submitted to the thread pool, the thread pool creates a new thread to execute the task. Note: A new thread is created even if there are free base threads that can perform the task.

  • If the number of threads in the thread pool is already greater than or equal to corePoolSize, no new threads are created.

If the thread pool’s prestartAllCoreThreads() method is called, the pool creates and starts all base threads ahead of time.

2. MaximumPoolSize (maximum number of thread pools) :

  • Maximum number of threads allowed to be created in a thread pool.
  • If the blocking queue is full and the number of threads is less than maximumPoolSize, a new thread can be created to execute the task.
  • This parameter has no effect if unbounded blocking queues are used.

3. WorkQueue:

  • A blocking queue used to hold tasks waiting to be executed.

KeepAliveTime (thread activity hold time) :

  • The amount of time that a worker thread in a thread pool remains alive after it is idle. If there are many tasks and the task execution time is short, you can increase the keepAliveTime to improve thread utilization.

5. Unit (thread hold time) :

  • The unit can be DAYS, HOURS, MINUTES, milliseconds, microseconds, or nanoseconds.

6. Handler (saturation policy, also known as rejection policy) : When both queues and thread pools are full, i.e., the thread pool is saturated, a policy must be adopted to handle new submitted tasks.

  • AbortPolicy: unable to process a new task, direct selling RejectedExecutionException is unusual, this is the default policy.
  • CallerRunsPolicy: Executing a task with the caller’s thread causes the main thread to block.
  • DiscardOldestPolicy: Discards the most advanced task in the blocking queue and executes the current task.
  • DiscardPolicy: Discards tasks directly.

ThreadFactory: The factory class for building threads

The five states of the thread pool

The five states of the thread pool are Running, ShutDown, Stop, Tidying, and Terminated.

Thread pool each state switch block diagram: 1.RUNNING

  • Status description: When the thread pool is in the RUNNING state, it can receive new tasks and process added tasks.

  • State switching: The initial state of the thread pool is RUNNING. In other words, once created, the thread pool is in the RUNNING state and the number of tasks in the pool is zero

2.SHUTDOWN

  • Status description: When the thread pool is in the SHUTDOWN state, it does not receive new tasks but can process added tasks.

  • State switching: When the shutdown() interface of the thread pool is called, the thread pool is run -> shutdown.

3.STOP

  • Status description: When the thread pool is in the STOP state, it does not receive new tasks, does not process added tasks, and interrupts ongoing tasks.

  • State switch: When the shutdownNow() interface of the thread pool is called, the thread pool is run or SHUTDOWN -> STOP.

4.TIDYING

  • State description: When all tasks have terminated, the “task quantity” recorded by CTL is 0, and the thread pool changes to TIDYING state. The hook function terminated() is executed when the thread pool is in TIDYING state. Terminated () is empty in the ThreadPoolExecutor class and is processed if the user wants the thread pool to become TIDYING; This can be done by overloading the terminated() function.
  • State switching: SHUTDOWN -> TIDYING occurs when the thread pool is in SHUTDOWN state, the blocking queue is empty, and the tasks executed in the thread pool are empty. When the thread pool is in the STOP state and the task executed in the thread pool is empty, STOP -> TIDYING is invoked.

5.TERMINATED

  • The thread pool is TERMINATED completely and becomes TERMINATED.
  • State switching: the thread pool terminated in TIDYING state is terminated by TIDYING -> terminated().

4. Thread pool workflow

When a new task is added to the thread pool, the thread pool process is as follows:

ThreadPoolExecutor ¶

The core implementation class for thread pools isThreadPoolExecutorClass to perform the submitted task. Therefore, when a task is submitted to the thread pool, the specific process is handled by the ThreadPoolExecutor classThe execute () methodTo get it done.

  1. After creating a thread pool, wait for submitted task requests.
  2. When the execute() method is called to add a request task, the thread pool makes the following judgments: 2.1 If the number of running threads is smaller than corePoolSize, create a thread to run the task immediately; 2.2 If the number of running threads is greater than or equal to corePoolSize, queue the task; 2.3 If the queue is full and the number of running threads is less than maximumPoolSize, create a non-core thread to run the task immediately. 2.4 If the queue is full and the number of running threads is greater than or equal to maximumPoolSize, the thread pool will initiate a saturation denial policy to execute.
  3. When a thread completes a task, it takes the next task from the queue and executes it.
  4. When a thread has nothing to do for more than a certain amount of time (keepAliveTime), the thread pool determines that if the number of threads currently running is greater than corePoolSize, the thread is stopped. So the thread pool eventually shrinks to corePoolSize after all its tasks are complete.

Consider the analogy of a thread pool in banking

Specific business process:

  1. Number of core threads (corePoolSize)(corresponding to our window on duty), usually only the core thread is open when there are few requests,
  2. When the core threads are occupied (the window is occupied), a new request comes in and enters our blocking queue.
  3. When there are too many requests and the blocking queue is full (the waiting area is full of people whining), the manager calls the people on vacation to come in and work overtime, which opens our other Windows and the thread is open at maximumPoolSize.
  4. Requests continue to increase until they exceed the maximum number of threads + blocking queues (all Windows are occupied and the waiting area is full), at which point the manager maintains order at the door to prevent new guests from coming in, initiating a rejection policy
  5. When the guests finished, one after another began to leave, so the overtime window began to free, but they would not immediately leave, but played the king of Glory, to see if there would be more people to deal with, when they finished a game, found no one came, so ready to go home to sleep, This is the keepAliveTime of the idle thread. Only when the number of threads exceeds the keepAliveTime of the core thread, the idle thread is destroyed until the number of core threads is left

5. Block the queue

(1) features:

  • When the blocking queue is empty, fetching elements from the queue will be blocked.
  • When the blocking queue is full, adding elements to the queue will be blocked.

(2) categories:

ArrayBlockingQueue (ArrayBlockingQueue) ArrayBlockingQueue (ArrayBlockingQueue) .

LinkedBlockingQueue: a bounded (but defaulted to Integer.MAX VALUG) blocking queue (almost unbounded) consisting of a list structure.

③PriorityBlockingQueue: an unbounded blocking queue that supports priority sorting.

④DelayQueue: DelayQueue is an unbounded queue with delay (sorted by delay time) implemented by priority queues.

⑤SynchronousQueue: a blocking queue that does not store elements, i.e., a queue of individual elements.

⑥LinkedTransferQueue: an unbounded blocking queue composed of linked lists.

⑦LinkedBlockingDeque: a bidirectional blocking queue consisting of a linked list structure

③BlockQueue core method:

6. Source code analysis of four rejection strategies

RejectedExecutionHandler interface

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Copy the code
  • There’s only one way. When the number of threads to be created is greater than the maximum number of threads in the pool, the new task is rejected and the method in this interface is called.

  • You can implement this interface yourself to handle these excessive tasks.

ThreadPoolExecutor himself has provided four rejection policies, CallerRunsPolicy, AbortPolicy, DiscardPolicy, DiscardOldestPolicy

These four rejection strategies are actually very simple when you look at the implementation method.

1) AbortPolicy

The default reject policy in ThreadPoolExecutor is AbortPolicy. Throw an exception directly.

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
Copy the code

Implementation:

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from "+ e.toString()); }}Copy the code

(2) CallerRunsPolicy

“Caller runs”, which neither abandons the task nor throws an exception, but pushes some tasks back to the caller to execute the implementation:

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if(! e.isShutdown()) { r.run(); }}}Copy the code

(3) DiscardPolicy

The handling of this strategy is much simpler, as can be seen from the implementation:

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}Copy the code

This thing doesn’t do anything, so by using this rejection policy, the task that was rejected by the thread pool is thrown away without throwing an exception or executing it.

(4) DiscardOldestPolicy

The DiscardOldestPolicy discards the oldest task in the task queue and adds the new task to the queue when a task is rejected.

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if(! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}Copy the code

In the rejectedExecution command, the task that is added first is always removed from the task queue, leaving a vacant position, and then execute the execute method again to add the task to the queue.

⑤ Customize the rejection policy

By looking at the four rejection policies provided by the previous system, you can see that the implementation of the rejection policy is very simple. The same goes for writing it yourself

For example, if you want the rejected task to be executed in a new thread, you can write:

static class MyRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        new Thread(r,"New thread"+new Random().nextInt(10)).start(); }}Copy the code

7. How to properly configure the number of threads

1) cpu-intensive

  • CPU intensive means that the task requires a lot of computation without blocking and the CPU runs at full speed all the time.
  • Cpu-intensive tasks can only be speeded up on a truly multi-core CPU (via multi-threading), whereas on a single-core CPU (sadly) they can’t be speeded up no matter how many simulated multi-threads you run, because that’s the total CPU power.

CPU intensive tasks are configured with as few threads as possible: General formula: number of CPU cores +1 thread pool

(2) the IO intensive

First: Since IO intensive task threads are not always performing tasks, configure as many threads as possible, such as CPU cores x 2

The second type: IO intensive, that is, the task requires a lot of IO, that is, a lot of blocking.

Running IO – intensive tasks on a single thread can waste a lot of CPU power. So using multithreading in IO intensive tasks can greatly speed up programs, even on single-core cpus, by taking advantage of wasted blocking time.

In I/O intensive mode, most threads are blocked. Therefore, you need to configure the number of threads. Formula: Number of CPU cores / 1 – Blocking coefficient The blocking coefficient ranges from 0.8 to 0.9

For example, an 8-core CPU:

  • 8/1-0.9 = 80 threads

Learn more about the underlying principles of thread pools