Why do we need thread pools

In practice, threads occupy system resources, and can easily lead to system problems if threads are not properly managed. Therefore, thread pools are used to manage threads in most concurrency frameworks. The main benefits of using thread pools to manage threads are as follows:

  • 1. Using a thread pool can reuse existing threads to continue executing tasks, avoiding the consumption caused by thread creation and destruction
  • 2, because there is no thread creation and destruction of consumption, can improve the system response speed
  • 3, through the thread can be reasonable management of the thread, according to the system’s capacity to adjust the size of the number of running threads

Two, the working principle

Flow chart:

The thread pool executes the submitted task process:

Forced the 1, for example, we set the number of core thread pool to 30, regardless of user connections, we always guarantee 30 connection, this is the core number of threads, the core number of threads here doesn’t have to be 30 you can according to your demand, to set up the business and the concurrent visits, to determine whether all core thread pool threads in a thread pool is on a mission, If not, create a new thread to execute the submitted task. Otherwise, all threads in the core thread pool are executing the task, and go to Step 2.

▪ 2. If the number of core threads has reached 30, we need to check the blocking queue to determine whether the current blocking queue is full. If not, we will put the submitted task in the blocking queue for execution; Otherwise, go to step 3;

▪ 3. Check whether all threads in the thread pool are executing the task. If not, create a new thread to execute the task

Note: There is a number of core number of threads and a thread pool, this is two different concepts, the core number of threads on behalf of me to maintain common thread overhead, and the number of thread pool represents my best to create the number of threads, for example in our rural household has a well of water, basically have half the water of the well depth can maintain the use of our daily life, Here, the half-well water is like the number of core threads, and the capacity of the half-well water is the maximum water resources that our well can hold. If the water capacity exceeds that, it will not work, and the water will overflow. This is similar to the number of thread pools, I don’t know if we can understand it better here

Classification of thread pools

1. NewCachedThreadPool:Create a thread pool that can create new threads as needed, but reuse previously constructed threads when they are available, and create new threads using the supplied ThreadFactory when needed

Features:

(1) The number of threads in the pool is not fixed, and can reach the maximum value (integer.max_value =2147483647).

(2) Threads in the thread pool can be used for cache reuse and recycling (the default recycling time is 1 minute)

(3) When there are no threads available in the thread pool, a new thread will be created

2. NewFixedThreadPool: Create a reusable thread pool with a fixed number of threads to run in a shared unbounded queue. At any point, at most nThreads will be active for processing tasks. If the attachment task is submitted while all threads are active, the attachment task will wait in the queue until there are available threads, and if any threads are terminated due to failure during execution prior to closure, a new thread will take its place to perform subsequent tasks (if needed). Threads in the pool will remain in existence until a thread is explicitly closed

Features:

(1) Threads in the thread pool are in a certain amount, which can well control the concurrency of threads

(2) Threads can be reused and will remain in existence until the display is closed

(3) When more than a certain number of threads are submitted, they must wait in the queue

3. NewSingleThreadExecutor: create a use of a single worker thread Executor, in unbounded queue way to run the threads. (Note that if this single thread is terminated due to a failure during execution prior to closure, a new thread will replace it to perform subsequent tasks if needed). It is guaranteed that the tasks are executed sequentially and that no more than one thread is active at any given time, and that unlike the equivalent newFixedThreadPool(1), it is guaranteed that the executors returned by this method can use other threads without reconfiguring them

Features:

(1) A maximum of one thread will be executed in the thread pool, after which the submitted thread will be queued for execution

4. NewSingleThreadScheduledExecutor: create a single thread execution procedure, it can be arranged in a given delay after running commands or perform on a regular basis

Features:

(1) A maximum of one thread will be executed in the thread pool, after which the submitted thread activities will be queued for execution

(2) Thread activities can be timed or delayed

NewScheduledThreadPool: Creates a thread pool that can schedule commands to run after a given delay or to execute them periodically

Features:

(1) The thread pool has a number of threads to execute, even if the thread is empty

(2) Thread activities can be timed or delayed

NewWorkStealingPool: Create a thread pool with parallelism level. Parallelism level determines the maximum number of threads executing at one time. If the parallelism level parameter is not passed, it will default to the number of cpus in the current system

You can check out all of the above methods by searching for Executors in the Developer tools class

Implementation of a thread pool: ThreadPoolExecutor

Thread utility class — Task:

public class Task implements Runnable{
    @Override
    public void run(a) {
        try {
            // Sleep for 1 second
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // Outputs the thread name
        System.out.println(Thread.currentThread().getName()+"-------running"); }}Copy the code

4.1 newCachedThreadPool

Source code implementation:

 public static ExecutorService newCachedThreadPool(a) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
Copy the code

Case study:

public class CacheThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; i++) {
            // Submit the task
            executorService.execute(new Task());
        }
        // Initiates an orderly shutdown in which previously committed tasks are executed, but no new tasks are acceptedexecutorService.shutdown(); }}Copy the code

Output: We output a total of 20 (pool-1-thread-1 to pool-1-thread-20) threads from start to finish

pool-1-thread-2-------running
pool-1-thread-6-------running
pool-1-thread-1-------running
pool-1-thread-3-------running
pool-1-thread-5-------running
pool-1-thread-4-------running
pool-1-thread-7-------running
pool-1-thread-11-------running
pool-1-thread-9-------running
pool-1-thread-10-------running
pool-1-thread-17-------running
pool-1-thread-15-------running
pool-1-thread-18-------running
pool-1-thread-16-------running
pool-1-thread-8-------running
pool-1-thread-20-------running
pool-1-thread-13-------running
pool-1-thread-19-------running
pool-1-thread-14-------running
pool-1-thread-12-------running
Copy the code

4.2 newFixedThreadPool

Source code implementation:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
Copy the code

Case study:

public class FixedThreadPoolDemo {
    public static void main(String[] args) {
        // Create a thread pool that allows up to five threads to execute
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 20; i++) {
            // Submit the task
            executorService.execute(new Task());
        }
        // Initiates an orderly shutdown in which previously committed tasks are executed, but no new tasks are acceptedexecutorService.shutdown(); }}Copy the code

Pool-1-thread-1 to pool-1-thread-5 threads are allowed to execute at most five times in the current thread run

pool-1-thread-4-------running
pool-1-thread-2-------running
pool-1-thread-1-------running
pool-1-thread-3-------running
pool-1-thread-5-------running
pool-1-thread-4-------running
pool-1-thread-5-------running
pool-1-thread-3-------running
pool-1-thread-2-------running
pool-1-thread-1-------running
pool-1-thread-4-------running
pool-1-thread-2-------running
pool-1-thread-1-------running
pool-1-thread-3-------running
pool-1-thread-5-------running
pool-1-thread-4-------running
pool-1-thread-5-------running
pool-1-thread-2-------running
pool-1-thread-1-------running
pool-1-thread-3-------running
Copy the code

4.3 newSingleThreadExecutor

Source code implementation:

      public static ExecutorService newSingleThreadExecutor(a) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
Copy the code

Case study:

public class SingleThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 20; i++) {
            // Submit the task
            executorService.execute(new Task());
        }
        // Initiates an orderly shutdown in which previously committed tasks are executed, but no new tasks are acceptedexecutorService.shutdown(); }}Copy the code

Result output: We can see that thread 1 outputs the result each time

pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
pool-1-thread-1-------running
Copy the code

Five, the realization of a thread pool: ScheduledThreadPoolExecutor

5.1 newScheduledThreadPool

Case study:

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
// for (int i = 0; i < 20; i++) {
        System.out.println(System.currentTimeMillis());
            scheduledExecutorService.schedule(new Runnable() {
                @Override
                public void run(a) {
                    System.out.println("Execution delayed by three seconds"); System.out.println(System.currentTimeMillis()); }},3, TimeUnit.SECONDS);
/ /}scheduledExecutorService.shutdown(); }}Copy the code

Output result:

1606744468814The execution is delayed by three seconds1606744471815
Copy the code

5.2 newSingleThreadScheduledExecutor

Case study:

 public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
            scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                int i = 1;
                @Override
                public void run(a) { System.out.println(i); i++; }},0.1, TimeUnit.SECONDS);
// scheduledExecutorService.shutdown();
    }
Copy the code

Output result:

1
2
3
4
5
Copy the code

Life cycle of thread pool

Generally speaking, thread pools have only two states, one isRunning, one isTERMINATEDEverything in the middle is in transition

Running: the system can accept the newly submitted tasks and process the tasks in the blocking queue. SHUTDOWN: The system stops accepting the newly submitted tasks and continues to process the saved tasks in the blocking queue. TIDYING: If all tasks have terminated, the workerCount is 0. The thread pool enters this state by calling the terminated() method. Terminated: The thread pool enters this state after the terminated() method is complete

Create a thread pool

7.1 Executors source

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
Copy the code

7.2 Parameter Description

CorePoolSize: specifies the size of the core thread pool. MaximumPoolSize: specifies the maximum number of threads that can be created in the thread pool. KeepAliveTime: specifies the keepAliveTime unit: specifies the time unit for keepAliveTime. ThreadFactory: engineering class for creating threads Handler: Saturation policy (rejection policy)

Block the queue

ArrayBlockingQueue:

ArrayBlockingQueue (ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue Identifies the position of the head and tail of the queue in the array.

ArrayBlockingQueue, in particular, differs from LinkedBlockingQueue in that it shares the same lock object where producers put data and consumers get data, meaning that they cannot really run in parallel. ArrayBlockingQueue can use separate locks to run producer and consumer operations in full parallel. Doug Lea probably didn’t do this because the data write and fetch operations of ArrayBlockingQueue are already light enough that introducing a separate locking mechanism would have no performance benefit other than adding additional complexity to the code.

Another significant difference between ArrayBlockingQueue and LinkedBlockingQueue is that the ArrayBlockingQueue does not create or destroy any additional object instances when inserting or deleting elements, while the ArrayBlockingQueue generates an additional Node object. This has a different impact on GC in systems that need to process large volumes of data efficiently and concurrently over long periods of time. When creating ArrayBlockingQueue, we can also control whether the internal lock of the object is fair, which is not fair by default.

LinkedBlockingQueue:

A list-based blocking queue, like an ArrayListBlockingQueue, maintains a data buffer queue (consisting of a linked list). When a producer puts data into the queue, the queue takes it from the producer and caches it inside the queue. The producer immediately returns the data. Only when the queue buffer reaches its maximum cache capacity (which can be specified by the constructor) will the producer queue be blocked until the consumer consumes a piece of data from the queue and the producer thread is woken up, and the same principle applies to the consumer. LinkedBlockingQueue can process concurrent data efficiently because it uses separate locks on the producer side and the consumer side to control data synchronization. This means that in high concurrency cases, producers and consumers can operate data in the queue in parallel to improve the concurrency performance of the entire queue.

DelayQueue:

An element in a DelayQueue can only be fetched from the queue when the specified delay time is up. DelayQueue is a queue with no size limit, so the operations that insert data into the queue (producers) are never blocked, but only the operations that fetch data (consumers) are blocked.

Usage scenarios: DelayQueue is used in a few but clever ways, with common examples such as using a DelayQueue to manage a connection queue that has timed out without responding.

PriorityBlockingQueue:

A priority-based blocking queue (the priority is determined by the Compator object passed in by the constructor), but note that PriorityBlockingQueue does not block the data producer, but only the consumer of the data if there is no data to consume. It is important to note that producers must not produce data faster than consumers can consume it, or over time they will eventually exhaust all available heap memory space. When implementing PriorityBlockingQueue, the internal control thread synchronization lock is a fair lock.

SynchronousQueue will:

A unbuffered waiting queue, similar to deal directly without mediation, a bit like a primitive society of producers and consumers, producers and their products to market products sell to the final consumers, and consumers must go to market find to commodity producers directly, if one side failed to find a suitable target, so I’m sorry, everyone’s in the market waiting for. Relative to a buffer BlockingQueue, less a middle dealer link (buffer), if there is a distributor, the products directly to the wholesale to distributors, producers and dealers don’t need to care about will eventually to sell these products to the customer, the dealer can part inventory goods, so relative to the direct trading patterns, Generally speaking, adopting the mode of intermediary dealer will have higher throughput (it can buy and sell in batches). On the other hand, the timely response performance of individual products may be reduced due to the introduction of distributors, which add additional transaction links from producer to consumer. There are two different ways to declare a SynchronousQueue, and they have different behaviors. The difference between fair mode and unfair mode: If fair mode is used, SynchronousQueue uses fair lock and a FIFO queue to block redundant producers and consumers. But if the mode is unfair (SynchronousQueue defaults) : SynchronousQueue uses an unfair lock and a LIFO queue to manage redundant producers and consumers. In the SynchronousQueue, if there is a gap in the processing speed of producers and consumers, it is likely that some producer or consumer data will never be processed.

Note the difference between ArrayBlockingQueue and LinkedBlockQueue: 1. Locks are implemented differently in queues

ArrayBlockingQueue implements a queue with no separate locks, i.e. production and consumption use the same lock.

LinkedBlockingQueue implements a separate lock on the queue, with putLock for production and takeLock2 for consumption. The queue size is initialized differently

ArrayBlockingQueue specifies the size of the queue. The LinkedBlockingQueue implementation does not specify the queue size, but defaults to integer.max_value

9. Rejection strategy

ThreadPoolExecutor. AbortPolicy (the default) : discard task and throw RejectedExecutionException abnormalities, let you perceive task was rejected, we can choose according to the business logic to retry submitted or give up strategy, etc

ThreadPoolExecutor. DiscardPolicy: discard task too, but does not throw an exception, relatively has certain risk, because we submit don’t know when this task will be discarded, may cause data loss.

ThreadPoolExecutor. DiscardOldestPolicy: discard queue in front of the task, and then to try to perform a task (repeat), usually is the longest survival time, it also exists certain risk of data loss

ThreadPoolExecutor. CallerRunsPolicy: handle the tasks by the calling thread

Execute () and submit() methods

10.1 Execute Method Execution logic

  • If fewer threads are currently running than corePoolSize, a new thread is created to perform the new task;
  • If the number of threads running is equal to or greater than corePoolSize, the submitted task is placed in the blocking queue workQueue.
  • If the current workQueue is full, a new thread is created to execute the task.
  • If the number of threads exceeds maximumPoolSize, the saturation policy RejectedExecutionHandler will be used

10.2 Submit

Submit is an extension of the base method executor.execute (Runnable), which can be used to cancel execution and/or wait for completion by creating and returning a Future class object.

Closing the thread pool

  • To shutdown the thread pool, you can use both shutdown and shutdownNow
  • How it works: Iterate over all threads in a thread pool, then interrupt in turn
  • 1. ShutdownNow first sets the state of the thread pool to STOP, then tries to STOP all threads with executing and unexecuting tasks, and returns the list of waiting tasks;
  • Shutdown simply sets the state of the thread pool to shutdown and interrupts all threads that are not executing tasks