This is the fifth day of my participation in Gwen Challenge


1. Why use thread pools

Limited by hardware, memory, and performance, it is impossible to create an unlimited number of threads because the maximum number of threads allowed per machine is a bounded value. A thread pool is a pool that uses a finite number of threads to execute a submitted task. However, for multi-user, high-concurrency applications, the number of submitted tasks is very large, and must be much larger than the maximum number of threads allowed. To solve this problem, queuing mechanisms must be introduced, either in memory or in large storage media such as hard disks. Java’s ThreadPoolExecutor only allows tasks to be queued in memory, with BlockingQueue temporarily holding tasks that have not yet been executed.

2. Benefits of using thread pools

  • Reduces resource consumption, reducing the cost of creating and destroying threads by reusing threads that have been created.
  • Improved response times so that when a task arrives, it can be executed immediately without waiting for a thread to be created.
  • Improve manageability of threads: Threads are scarce resources. If created without limit, they consume system resources and degrade system stability. Thread pools are used for uniform allocation, tuning, and monitoring.

3. Java implementation

Task management is one of the easier things to do, complicated by thread management, which involves thread count, wait/wake up, synchronization/locking, thread creation, and death. ThreadPoolExecutor has several thread-specific member variables: KeepAliveTime, allowCoreThreadTimeOut, poolSize, corePoolSize, and maximumPoolSize, which together are responsible for thread creation and destruction. Common constructor

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue < Runnable > workQueue);Copy the code

A few important parameters:

  • CorePoolSize: Number of core threads that will live as long as they are created, even if they are not idle
  • MaximumPoolSize: The maximum number of threads maintained by the thread pool
  • KeepAliveTime: The idle time allowed by the thread pool to maintain threads
  • Unit: The unit of idle time allowed by thread pool maintenance
  • WorkQueue: The buffer queue used by the thread pool
  • AllowCoreThreadTimeout: Allows the core thread to time out
  • RejectedExecutionHandler: task rejection handler. The task can be rejected in either of the following cases:
    • When the number of threads reaches maxPoolSize and the cut queue is full, new tasks are rejected
    • When a thread pool is called shutdown(), it waits for tasks in the pool to complete before shutting down. If a task is submitted between the call to shutdown() and the actual shutdown of the thread pool, the new task is rejected
  • The thread pool calls rejectedExecutionHandler to handle this task. If not set, the default is AbortPolicy and an exception is thrown
  • The ThreadPoolExecutor class has several internal implementation classes to handle such cases:
    • AbortPolicy Discards the task and throws runtime exceptions
    • CallerRunsPolicy Executes the task
    • DiscardPolicy ignore, nothing happens
    • DiscardOldestPolicy Removes the first (and last) task to be queued from the queue
  • Implement the RejectedExecutionHandler interface to customize the processor

Enforcement mechanism

  • If the number of threads in the thread pool is less than corePoolSize at this point, new threads are created to handle the added tasks even if all threads in the thread pool are idle.
  • If the number of threads in the thread pool is equal to corePoolSize, but the buffer queue workQueue is not full, then the task is put into the buffer queue.
  • If the number of threads in the pool is greater than corePoolSize, the buffer workQueue is full, and the number of threads in the pool is less than maximumPoolSize, a new thread is created to handle the added task.
  • If the number of threads in the pool is greater than corePoolSize, the buffer queue workQueue is full, and the number of threads in the pool is equal to maximumPoolSize, the task is processed using the policy specified by the handler. The priority of the rejected task is corePoolSize, workQueue, and maximumPoolSize. If all three are full, the rejected task is processed by handler.
  • If the number of threads in the thread pool is greater than corePoolSize, the thread will be terminated if it is idle for longer than keepAliveTime. In this way, the thread pool can dynamically adjust the number of threads in the pool.

4. Set the thread pool after it is created

Let’s look at what the allowCoreThreadTimeOut and keepAliveTime properties mean. In a high-stress situation, the pool is busy when all threads in the pool are working on newly submitted or queued tasks. If the pressure is low, then many thread pools may be idle, and in order to save system resources and reclaim those unused idle threads, some timeout mechanism must be provided as part of the thread pool sizing strategy. CorePoolSize and maximumPoolSize control how new threads are created; Controls how threads are destroyed with allowCoreThreadTimeOut and keepAliveTime.

Set the core and maximum number of threads

threadPool.setCorePoolSize(10);
threadPool.setMaximumPoolSize(10);
Copy the code

Set the number of core threads can be idle timeout exit

threadPool.allowCoreThreadTimeOut(true);
Copy the code

Setting a Rejection Policy

threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
Copy the code

5. Monitor thread pool status

/ * * * active roughly the number of threads in thread pool * / long activeCount = threadPool. GetActiveCount (); /** * returns the approximate total number of tasks performed. */ long taskCount = threadPool.getTaskCount(); / * * * returns the thread pool in the running process completed roughly the number of jobs * / long completedTaskCount = threadPool. GetCompletedTaskCount (); /** * returns the number of threads in the threadPool */ long poolSize = threadpool.getpoolsize (); / * * * returns once created the maximum number of threads * / long largestPoolSize = threadPool. GetLargestPoolSize (); / * * * returns the current largest configuration threads * / long maximumPoolSize = threadPool. GetMaximumPoolSize ();Copy the code

LargestPooSize = “maximum number of thread pools”, maximumPoolSize = “maximum number of thread pools”, maximumPoolSize = “maximum number of thread pools”, coreSize = “maximum number of thread pools” The maximum number of threads in a thread pool is 10, so why can largestPooSize sometimes be 20? Let me go to the source code comment:

/** * Returns the largest number of threads that have ever * simultaneously been in the pool. * * @return the number of threads */ public int getLargestPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { return largestPoolSize; } finally { mainLock.unlock(); }}Copy the code

Two things are found in the addWorker method:

  • LargestPoolSize is the historical maximum number of workers. LargestPoolSize specifies the number of threads created in the thread pool, regardless of the pool capacity.
  • LargestPoolSize < = maximumPoolSize.

6. Rejection strategy

The four rejection strategies mentioned above are explained and verified in detail below

1. The default policy AbortPolicy discards the task and throws runtime exceptions

private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 5, SECONDS, new ArrayBlockingQueue<>(2)); private static void task() { try { sleep(1000); System.out.println(Thread.currentThread().getName() + ":queue size:" + threadPool.getQueue().size() + ", active:" + threadPool.getActiveCount()); } catch (InterruptedException e) { e.printStackTrace(); } } @Test public void test_aborot_policy() { for (int i = 0; i < 10; i++) { threadPool.submit(TestThread::task); }} running results: Java. Util. Concurrent. RejectedExecutionException: Task java.util.concurrent.FutureTask@6833ce2c rejected from java.util.concurrent.ThreadPoolExecutor@725bef66[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)Copy the code

CallerRunsPolicy uses the main thread to execute tasks

@Test public void test_CallerRunsPolicy() throws InterruptedException { threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 10; i++) { threadPool.submit(TestThread::task); } Thread.sleep(1000000); // Use to block the main thread, otherwise the main thread exits and cannot see the print} result:  pool-2-thread-1:queue size:2, active:2 pool-2-thread-2:queue size:2, active:2 main:queue size:2, active:2 pool-2-thread-1:queue size:2, active:2 pool-2-thread-2:queue size:2, active:2 main:queue size:0, active:2 pool-2-thread-1:queue size:2, active:2 pool-2-thread-2:queue size:2, active:2 pool-2-thread-1:queue size:0, active:2 pool-2-thread-2:queue size:0, active:2Copy the code

It can be seen that the main thread executes part of the task, and the main thread cannot schedule the thread until the execution of the main thread is completed. The reason why activeCount is not allowed is that the thread state changes rapidly, and activeCount is only a general estimate of the current active thread number.

3, DiscardPolicy ignore, directly discard, nothing will happen

@Test public void test_DiscardPolicy() throws InterruptedException { threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); for (int i = 0; i < 20; i++) { threadPool.submit(TestThread::task); } Thread.sleep(1000000); // use to block the main thread, otherwise the main thread exits and cannot see the print} result; pool-2-thread-1:queue size:2, active:2 pool-2-thread-2:queue size:2, active:2 pool-2-thread-1:queue size:0, active:2 pool-2-thread-2:queue size:0, active:2Copy the code

Only threads and queues execute, everything else is discarded, and no exceptions are thrown

DiscardOldestPolicy Removes the first task from the queue

private static void task2(int cnout) { try { sleep(1000); System.out.println(cnout); System.out.println(Thread.currentThread().getName() + ":queue size:" + threadPool.getQueue().size() + ", active:" + threadPool.getActiveCount()); } catch (InterruptedException e) { e.printStackTrace(); } } @Test public void test_DiscardOldestPolicy() throws InterruptedException { threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); for (int i = 0; i < 20; i++) { int finalI = i; threadPool.submit(()->task2(finalI)); } Thread.sleep(1000000); // Use to block the main thread, otherwise the main thread exits and cannot see the print} result:  0 3 pool-2-thread-1:queue size:2, active:2 pool-2-thread-2:queue size:2, active:2 18 19 pool-2-thread-1:queue size:0, active:2 pool-2-thread-2:queue size:0, active:2Copy the code

You can see that all tasks are discarded except for the first two threads that execute and the last two threads that queue

7, ThreadFactory

Created by thread pool constructor can see there is a parameter: Executors. DefaultThreadFactory ()

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
Copy the code

Each of these factory classes implements an interface with a single newThread method that is used to produce threads. Subclasses need to implement this method to produce threads according to their own rules.

DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s ! = null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() ! = Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }Copy the code

DefaultThreadFactory does two things: it creates a thread as a non-daemon thread and sets the name of the thread to pool-2-thread-1. But the thread of the format of differentiation is not enough, best can according to the function of the thread pool to use to define the thread, then can introduce other thread factory, such as use of guava ThreadFactoryBuilder and Spring CustomizableThreadFactory:

@Test public void test_thread_factory() throws InterruptedException { threadPool.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("Clear Task-%d").build()); for (int i = 0; i < 4; i++) { threadPool.submit(TestThread::task); } Thread.sleep(1000000); // Use to block the main thread, otherwise the main thread exits and cannot see the print} result:  Clear Task-0:queue size:2, active:2 Clear Task-1:queue size:2, active:2 Clear Task-0:queue size:0, active:2 Clear Task-1:queue size:0, active:2Copy the code

Java comes with a thread pool utility class

Java provides four thread pools through Executors:

  • NewCachedThreadPool Creates a cacheable thread pool. If the length of the thread pool exceeds the processing requirement, free threads can be recycled, or new threads can be created.
  • NewFixedThreadPool creates a thread pool of fixed length that controls the maximum number of concurrent threads. The excess threads will wait in the queue, indicating that the number of concurrent threads can only be so large at one time
  • NewScheduledThreadPool Creates a scheduled thread pool that supports scheduled and periodic task execution.
  • NewSingleThreadExecutor creates a single-threaded thread pool that uses only one worker thread to execute tasks, ensuring that all tasks are executed in the specified order (FIFO, LIFO, priority).
  • NewWorkStealingPool has been added since 1.8 to create a preemptive thread pool that uses ForkJoinPool to create threads based on the number of available CPUS

However, using Executors to create thread pools is not allowed in the Alibaba Java Development Manual.

When creating a thread pool in either of these methods, OOM may occur. Either a blocking queue is used but the queue is unbounded and the default value of the queue is integer. MAX_VALUE, or the maximum number of threads that can be created is integer. MAX_VALUE

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
Copy the code

Therefore, it is best to use the traditional ThreadPoolExecutor method to create thread pools.

Conclusion: How to properly configure the size of thread pool

The program is not running in the number of threads processing the faster!!

NCpu=Runtime.getRuntime().availableProcessors()

  • If CPU intensive tasks are performed, the CPU needs to be squeezed as much as possible. The reference value can be set to NCPU+1. Common scenarios: Complex algorithms.

  • For I/O intensive tasks, you can set the reference value to 2 x NCPU. Common scenarios include database interaction, file upload and download, and network transfer. When a thread is in I/O operation, the thread is blocked and changes from running to waiting state. The CPU does a context switch to process other programs. When the I/O operation is complete, the CPU receives an interrupt signal from the hard disk, and the thread that the CPU is executing is interrupted and returns to the ready queue. The thread that was waiting for I/O returns to the ready queue with the I/O completion, and the CPU may select it to execute. In this case, more threads can be processed in parallel because it will block I/O.

Of course, this is only a reference value. Specific Settings need to be adjusted based on actual conditions. For example, you can set the thread pool size as a reference value first, and then adjust it based on task running status, system load, and resource utilization.