This article mainly explains the basic knowledge of Java thread pool.

preface

At present, the book “Java Concurrent Programming Practice” saw “Chapter 7: Cancel and close”, which involves part of the content of the thread pool, and then chapter 8 is the thread pool, so I plan to read the thread pool information again, so that I can better understand the content of the book later.

I have read a blog before, which explains the content of thread pool very well. I will only intercept the basic knowledge part, master all the basic content of Java, and then carry on an in-depth understanding of the principle part inside. The link of this blog will be attached later.

Getting to know thread pools

As we know, thread creation and destruction need to be mapped to the operating system, so it can be expensive. Thread pools were created to avoid frequent thread creation and destruction and to facilitate thread management.

Thread pool advantage

  • Reduced resource consumption: Thread pools typically maintain threads (corePoolSize) that are reused to perform different tasks and are not destroyed once the task is complete. When there are a lot of tasks to be processed, the reuse of thread resources avoids the frequent creation and destruction of threads, thus reducing the consumption of system resources.
  • Improved response time: Because the thread pool maintains a batch of threads that are alive, when a task arrives, the thread does not need to be created, but is directly used to execute the task, thus reducing the waiting time for the task.
  • Improved thread manageability: Threads can be uniformly allocated, tuned, and monitored using thread pools.

Thread pool design idea

There is a saying that art comes from life, and the same is true of programming languages. Many design ideas can be mapped to everyday life, such as object-oriented thinking, encapsulation, inheritance, and so on. Today we talk about thread pools, which can also be found in the real world equivalent entities – factories. Imagine a factory’s production process:

A factory has a fixed group of workers, called regular workers, who fulfill the orders received by the factory. When orders increase and regular workers are too busy, the factory will temporarily pile up production materials in the warehouse and deal with them when there are idle workers (because idle workers will not take the initiative to deal with the production tasks in the warehouse, so the dispatcher needs to make real-time scheduling). What if the orders are still increasing after the warehouse is full? Factories have to hire more workers temporarily to cope with peak production, and these workers have to leave when the peak is over, so they are called temporary workers. At that time, temporary workers were also hired (due to the limitation of the number of temporary workers, there is a cap), and subsequent orders had to be reluctantly rejected. Let’s do the following mapping:

  • Factory – Thread pool
  • Order — Task (Runnable)
  • Regular workers — core threads
  • Temps – Common threads
  • Repository – Task queues
  • Operator – getTask ()

GetTask () is a method that schedules tasks from the task queue to idle threads.

After mapping, the flow chart of thread pool is as follows. Are they similar?

Comment on: feel the author of this analogy, too TM classic!! It captures the essence of thread invocation. That’s why you can’t just stare at a book when you’re reading. You might not be able to understand what the book is about for a long time. Find a classic blog post that instantly clears your mind and impresses you.

This way, the thread pool’s working principle or flow is easily understood, distilled into a simple diagram:

Deep into the thread pool

So, the question is, how exactly does thread pooling work? From the Java ThreadPoolExecutor framework we can see that the real implementation class of thread pool is ThreadPoolExecutor, so we will focus on this class next.

A constructor

To study a class, start with its constructor. ThreadPoolExecutor provides four parameter constructors:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
Copy the code

The following two optional parameters are not the same: “Thread factory” and “rejection policy”. The combination of the two parameters is 2*2=4 cases.

Explain the parameters involved in the constructor:

  • CorePoolSize (required) : Number of core threads. The number of threads in the pool that remain alive at all times, even if they are idle. However, if the allowCoreThreadTimeOut parameter is set to true, the core thread will also be reclaimed if it is idle for more than a period of time.
  • MaximumPoolSize (required) : Maximum number of threads allowed in the pool. When the core threads are all busy and the task queue is full, threads are temporarily added to the pool until the total number of threads reaches maximumPoolSize.
  • KeepAliveTime (required) : Specifies the idle timeout time of a thread. When a non-core thread is idle for more than that time, the thread is reclaimed. With the allowCoreThreadTimeOut parameter set to true, the core thread is also reclaimed.
  • Unit (required) : keepAliveTime Time unit of the parameter. There are: Timeunit.days, timeunit.hours, timeUnit.minutes, timeUnit.seconds, timeUnit.milliseconds, timeUnit.microsec ONDS (microseconds), TimeUnit.NANOSECONDS
  • WorkQueue (required) : A task queue implemented using a blocking queue. When all the core threads are busy, subsequent runnables submitted by the execute method are stored in the task queue, waiting to be processed by the thread.
  • ThreadFactory (optional) : threadFactory. Specifies how a thread pool creates a thread.
  • Handler (Optional) : rejects the policy. When the number of threads in the thread pool reaches maximumPoolSize and the workQueue is full, subsequent submitted tasks will be rejected, and the handler can specify how to reject the task.

Once you understand the “thread design idea,” look at these constructor parameters and it feels very easy to understand.

Task queue

Using ThreadPoolExecutor requires specifying a task wait queue that implements the BlockingQueue interface. In the API documentation for the ThreadPoolExecutor thread pool, three types of wait queues are recommended:

  1. SynchronousQueue: SynchronousQueue. This is a blocking queue with no internal capacity, and any insert element waits for a relative delete/read operation, otherwise the insert thread waits, and vice versa.
  2. LinkedBlockingQueue: An unbounded queue (not strictly unbounded; the upper limit is integer.max_value), based on a linked list structure. With unbounded queues, subsequent tasks can be queued indefinitely when the core threads are busy, so the number of threads in the thread pool cannot exceed the number of core threads. This queue can improve thread pool throughput at the expense of memory space or even memory overflow. In addition, you can specify the capacity when you use it, which makes it a bounded queue.
  3. ArrayBlockingQueue: a bounded queue, implemented based on arrays. When the thread pool is initialized, the size of the queue is specified and cannot be adjusted later. Such bounded queues are good for preventing resource exhaustion, but can be harder to tune and control.

In addition, Java provides four other types of queues:

  1. PriorityBlockingQueue: An unbounded blocking queue that supports priority sorting. Elements stored in PriorityBlockingQueue must implement the Comparable interface in order to be sorted by implementing the compareTo() method. The element with the highest priority will always be at the head of the queue; PriorityBlockingQueue does not guarantee the ordering of elements of the same priority, nor does it guarantee that all but the highest-priority element in the current queue will be in the correct order at all times.
  2. DelayQueue: indicates the DelayQueue. It is based on binary heap and has the characteristics of unbounded queue, blocking queue and priority queue. DelayQueue the object stored in the DelayQueue, which must be a class that implements the Delayed interface. The task is extracted from the queue by executing the delay, and the task cannot be extracted until the time is up. See DelayQueue for more information.
  3. LinkedBlockingDeque: A two-ended queue. Based on a linked list implementation, elements can be inserted/removed from the tail as well as inserted/removed from the head.
  4. LinkedTransferQueue: An unbounded blocking queue consisting of a linked list structure. If the queue is empty, a node (node element is null) will be generated and queued. Then the consumer thread will wait on this node. When the producer thread joins the queue, it finds that there is a node whose element is null. The producer thread does not join the queue, and directly fills the node with the element, and wakes up the waiting thread of the node. The awakened consumer thread takes the element.

This seems like a lot of queues, 7 total! To be honest, I don’t know which one is better for the actual scenario, but we can see what queues are used in the thread pool wrapped in Java later.

Rejection policies

Thread pools have an important mechanism: rejection policies. When the thread pool workQueue is full and no new thread pool can be created, subsequent tasks are rejected. The rejection policies need to implement the RejectedExecutionHandler interface, but Executors framework has implemented 4 rejection policies for us:

  1. AbortPolicy (default) : discard task and throw RejectedExecutionException anomalies.
  2. CallerRunsPolicy: Runs the run method of this task directly, but not by the thread in the thread pool, but by the calling thread of the task.
  3. DiscardPolicy: Discards the task without throwing any exception.
  4. DiscardOldestPolicy: Forcibly removes a waiting task currently in the head of the wait queue column and then attempts to commit the currently rejected task to the thread pool.

Thread Factory specifies the way the thread is created. If so, the Executors class has thoughtfully provided a default thread factory:

/** * The default thread factory */
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private finalString namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s ! =null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix ="pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if(t.getPriority() ! = Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);returnt; }}Copy the code

Thread pool state

Thread pools have five states:

volatile int runState;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code

RunState represents the state of the current thread pool, which is a volatile variable used to ensure visibility between threads.

The following static final variables represent the possible values of runState, with the following states:

  • RUNNING: When a thread pool is created, it is initially in the RUNNING state.
  • SHUTDOWN: if the SHUTDOWN () method is called, the thread pool is SHUTDOWN and cannot accept new tasks. It waits for all tasks to complete.
  • STOP: If the shutdownNow() method is called, the thread pool is stopped. The thread pool cannot accept new tasks and will try to terminate the ongoing task.
  • TERMINATED: the thread pool is set to TERMINATED when it is SHUTDOWN or stopped and all worker threads have been destroyed, the task cache queue has been emptied, or execution is TERMINATED.

Initialization & Capacity adjustment & Close

Thread initialization

By default, after a thread pool is created, there are no threads in the pool and the thread is created only after the task is submitted.

In practice, if you need to create a thread immediately after the thread pool is created, you can do this in two ways:

  • PrestartCoreThread () : Boolean prestartCoreThread(), initializes a core thread
  • PrestartAllCoreThreads () : int prestartAllCoreThreads(), initializes all core threads and returns the number of initialized threads
public boolean prestartCoreThread(a) {
    return addIfUnderCorePoolSize(null); // Note that the argument passed in is null
}

public int prestartAllCoreThreads(a) {
    int n = 0;
    while (addIfUnderCorePoolSize(null))// Note that the argument passed in is null
        ++n;
    return n;
}
Copy the code

Thread pool shutdown

ThreadPoolExecutor provides two methods for closing thread pools:

  • Shutdown () : The thread pool is not terminated immediately, but is not terminated until all tasks in the task cache queue have completed, but no new tasks are accepted
  • ShutdownNow () : Immediately terminates the thread pool and attempts to interrupt tasks in progress, and empties the task cache queue to return tasks that have not yet been executed

Thread pool capacity adjustment

ThreadPoolExecutor provides methods to dynamically adjust thread pool capacity:

  • SetCorePoolSize: Sets the core pool size
  • SetMaximumPoolSize: Sets the maximum number of threads that can be created in the thread pool

When the above parameter increases from small to large, ThreadPoolExecutor performs thread assignment and may immediately create a new thread to perform the task.

Using thread pools

ThreadPoolExecutor

Using a ThreadPoolExecutor constructor is the most direct way to use a thread pool. Here’s an example:

public class ThreadPoolExecutorTest {
    public static void main(String args[]) {
        // Create a thread pool (core thread count is 3, maximum thread count is 5, timeout is 5 seconds)
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3.5.5, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(5));

        // Submit tasks to the thread pool
        for (int i = 0; i < threadPool.getCorePoolSize(); i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run(a) {
                    for (int j = 0; j < 2; j++) {
                        System.out.println(Thread.currentThread().getName() + ":" + j);
                        try {
                            Thread.sleep(2000);
                        } catch(InterruptedException e) { e.printStackTrace(); }}}}); }// Close the thread pool
        threadPool.shutdown(); // Set the state of the thread pool to SHUTDOWN, and then interrupt all threads that are not executing tasks
        // threadPool.shutdownNow(); // Set the state of the thread pool to STOP, and then try to STOP all threads executing or suspending tasks and return to the list of waiting tasks. Use this method with caution because it may cause uncontrollable consequences}}/ / output:
// pool-1-thread-1:0
// pool-1-thread-3:0
// pool-1-thread-2:0
// pool-1-thread-2:1
// pool-1-thread-1:1
// pool-1-thread-3:1
Copy the code

Executors Encapsulate a thread pool

Plus, Executors wrap up 4 common thread pools (again, very nice) :

FixedThreadPool

Fixed capacity thread pool. Its characteristic is that the maximum number of threads is the number of core threads, which means that the thread pool can only create core threads, keepAliveTime is 0, that is, the thread completes the task immediately recycle. If no capacity is specified for the task queue, the default value integer.max_value is used. Suitable for scenarios where concurrent threads need to be controlled.

// Use the default thread factory
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
// A custom thread factory is required
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
Copy the code

Example:

public class FixedThreadPoolTest {
    public static void main(String args[]) {
        // Create a thread pool object and set the core thread and maximum number of threads to 5
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        fixedThreadPool.execute(new Runnable() {
            @Override
            public void run(a) {
                System.out.println(Thread.currentThread().getName() + " is running.");
                try {
                    Thread.sleep(10000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    System.out.println("Throw Exception.");
                }
                System.out.println(Thread.currentThread().getName() + " after sleep, is still running."); }});//fixedThreadPool.shutdown();
        fixedThreadPool.shutdownNow(); // This is not recommended, it is dangerous, it is only used for testing}}/ / output:
// pool-1-thread-1 is running.
// Throw Exception.
// pool-1-thread-1 after sleep, is still running.
// java.lang.InterruptedException: sleep interrupted
// at java.lang.Thread.sleep(Native Method)
// at com.java.parallel.pool.FixedThreadPoolTest$1.run(FixedThreadPoolTest.java:15)
// at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
// at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
// at java.lang.Thread.run(Thread.java:748)
Copy the code

I added a little bit of flavor to the author’s example. I wanted to see what a direct interrupt would do, and it turned out that after a direct interrupt, the thread threw an exception directly, and I caught the exception and output some results. Normally, catching an exception requires some processing, but I’m just testing it here.

SingleThreadExecutor

Single-threaded thread pool. The thread pool has only one thread (the core thread), and the thread returns immediately after completing the task, using a bounded blocking queue (the capacity is not specified, using the default value integer.max_value).

public static ExecutorService newSingleThreadExecutor(a) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
// To save space, omit the source code of the custom thread factory mode
Copy the code

Example:

public class SingleThreadExecutorTest {
    public static void main(String args[]) {
        // Create a thread pool object and set the core thread and maximum thread count to 1
        ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
        singleThreadPool.execute(new Runnable() {
            @Override
            public void run(a) {
                System.out.println(Thread.currentThread().getName() + " is running."); }}); singleThreadPool.shutdown(); }}/ / output:
// pool-1-thread-1 is running.
Copy the code

ScheduledThreadPool

Timed thread pool. Specifies the number of core threads, the number of common threads is unlimited, the thread executes the task immediately recycle, the task queue is delayed blocking queue. This is a special thread pool suitable for performing timed or periodic tasks.

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

// Inherits ThreadPoolExecutor
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    // constructor, omitting the constructor for the custom thread factory
	public ScheduledThreadPoolExecutor(int corePoolSize) {
    	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          	new DelayedWorkQueue());
	}
	
	// Execute tasks in a delayed manner
	publicScheduledFuture<? > schedule(Runnable command,long delay,
                                       TimeUnit unit) {
        ...
    }
	// Execute tasks regularly
	publicScheduledFuture<? > scheduleAtFixedRate(Runnable command,long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {...}
}
Copy the code

Example:

public class ScheduledThreadPoolTest {
    public static void main(String args[]) {
        // Create a timed thread pool
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        // Submit tasks to the thread pool
        scheduledThreadPool.schedule(new Runnable(){
            public void run(a) {
                System.out.println(Thread.currentThread().getName() + "-- - > run"); }},5, TimeUnit.SECONDS); // The task will be executed after 5sscheduledThreadPool.shutdown(); }}/ / output:
/ / - thread pool - 1-1 - > run
Copy the code

CachedThreadPool

Cache thread pools. There is no core thread, the number of common threads is integer. MAX_VALUE, which is reclaimed after 60 seconds. The task queue uses the SynchronousQueue, which has no capacity. This method is recommended for scenarios with a large number of tasks but low time consumption.

public static ExecutorService newCachedThreadPool(a) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
Copy the code

Example:

public class CachedThreadPoolTest {
    public static void main(String args[]) {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        cachedThreadPool.execute(new Runnable(){
            public void run(a) {
                System.out.println(Thread.currentThread().getName() + "-- - > run"); }}); cachedThreadPool.shutdown(); }}/ / output:
/ / - thread pool - 1-1 - > run
Copy the code

Executors wrap thread pools, so what queues are used for each mode:

  • FixedThreadPool: LinkedBlockingQueue Unbounded queue
  • SingleThreadExecutor: LinkedBlockingQueue Unbounded queue
  • Timed thread pool ScheduledThreadPool: DelayQueue DelayQueue
  • CachedThreadPool: SynchronousQueue SynchronousQueue

A little interpretation:

  • The FixedThreadPool constructor has a keepAliveTime=0, and the number of core threads and the maximum number of threads are limited, so we use LinkedBlockingQueue as an unbounded queue.
  • SingleThreadExecutor: The core thread and maximum thread are both 1 and keepAliveTime=0. This is also used as a LinkedBlockingQueue.
  • ScheduledThreadPool: There is nothing wrong with using DelayQueue because of the delay.
  • CachedThreadPool: no core threads, maximum threads, and no capacity for the SynchronousQueue.

conclusion

This article is actually I just started to contact with the thread pool when I found, feel very classic, just now need to systematically learn Java concurrent programming, I will rearrange this article, and then some places to add their own interpretation, I hope to help you.

Source link: blog.csdn.net/mu_wind/art…

Welcome everyone to like a lot, more articles, please pay attention to the wechat public number “Lou Zai advanced road”, point attention, do not get lost ~~