Why use thread pools?

Is more threads better?

  1. In Java, a thread is an object and an operating system resource that takes time to create and destroy. If creation time + short time > execution time, it is not worthwhile.
  2. Java objects occupy heap memory, and operating system threads occupy system memory. According to the JVM specification, the default maximum stack size for a thread is 1 MB, which is the stack space that needs to be allocated from system memory. Too many threads consume too much memory.
  3. The operating system needs to switch thread contexts frequently (every line wants to be run), affecting performance.

Thread pools are introduced to facilitate the control of the number of threads.

The thread pool

Basic concept of thread pool

Thread pools consist of the following four basic components:

  1. Thread pool manager: used to create and manage thread pools, including creating thread pools, destroying thread pools, adding new tasks;
  2. Worker thread: a thread in a thread pool that is in a waiting state when no work is available and can execute tasks in a loop;
  3. Task interface: every task must be implemented for the worker thread to schedule the execution of the task, it mainly specifies the entry of the task, the end work after the task execution, task execution state, etc.
  4. Task queue: Used to store unprocessed tasks. Provide a buffer mechanism.

Thread pool interface definition and implementation classes

Can think ScheduledThreadPoolExector is one of the most abundant implementation class.

ExecutorService

public interface ExecutorService extends Executor {
    /** * Gracefully close the thread pool. Previously submitted tasks will be executed, but no new tasks will be accepted. * /
    void shutdown(a);

    /** * attempts to stop all pending tasks, stop processing waiting tasks, and return a list of waiting tasks. * /
    List<Runnable> shutdownNow(a);

    Return true if the thread pool is closed. */
    boolean isShutdown(a);

    /** * Return true */ if all tasks after the shutdown have been completed
    boolean isTerminated(a);

    /** * Monitors whether the ExecutorService is closed until all tasks complete, or a timeout occurs, or the current thread is interrupted. * /
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    /** * Submit a Callable return task for execution and return a Future to retrieve the Callable execution result. * /
    <T> Future<T> submit(Callable<T> task);

    /** * submit a runnable task for execution and return Future with the result */ passed in
    <T> Future<T> submit(Runnable task, T result);

    /** * submit a runnable task for execution and return a Future object with null */Future<? > submit(Runnable task);/** * execute the given set of tasks, and return the result. * /
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

    /** * Execute a given set of tasks. After completion or timeout, the result is returned and other tasks are terminated. * /
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

    /** * Execute the given task. If any task succeeds, the result is returned. The other tasks are terminated. * /
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

    /** * Execute the given task. If any task succeeds or times out, the result is returned and the other tasks are terminated */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

Copy the code

ScheduledExecutorService

public interface ScheduledExecutorService extends ExecutorService {

    /** * create and execute a one-time task that will be executed after a delay */
    publicScheduledFuture<? > schedule(Runnable command,long delay, TimeUnit unit);

    /** * create and execute a one-time task that will be executed after a delay */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    /** * creates and executes a periodic task that will be executed for the first time after the specified initialization delay. If an exception occurs during the execution, the task stops * if the execution time of a task exceeds its cycle time, the next task will be executed immediately after the completion of the task. This is an important difference between scheduleWithTixedDelay and scheduleWithTixedDelay */
    publicScheduledFuture<? > scheduleAtFixedRate(Runnable command,long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    /** * creates and executes a periodic task that will be executed for the first time after the specified initialization delay. If an exception occurs during the execution, the task stops * The execution duration of a task exceeds the interval. The execution delay of the next task is calculated based on the time when the task ends. * This is an important difference between scheduleAtFixedRate and scheduleAtFixedRate */
    publicScheduledFuture<? > scheduleWithFixedDelay(Runnable command,long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}
Copy the code

Thread pool tool class

If so, you can use the following methods: * * * * to create a thread pool:

newFixedThreadPool(int nThreads)

Create a fixed size thread pool with misdefined task queue capacity. Core threads = Maximum number of threads.

newCachedThreadPool()

Creates a buffer pool of unbounded size. Its task queue is a synchronization queue. Tasks are added to the pool. If there are idle threads in the pool, they are executed by idle threads. If there are none, new threads are created for execution. Threads in the pool that are idle for more than 60 seconds will be destroyed and released. The number of threads varies with the number of tasks. This method is applicable to asynchronous tasks that take less time to execute. Number of core threads of the pool =0, maximum thread = integer.max_value

newSingleThreadExecutor()

A single thread pool with only one thread to execute an unbounded task queue. This thread pool ensures that the order in which tasks are added is executed one at a time. When a unique thread is aborted due to a task, a new thread is created to continue the subsequent tasks. The difference with newFixedThreadPool(1) is that the pool size of a single thread pool is hardcoded in the newSingleThreadExecutor method and cannot be changed.

newScheduledThreadPool(int corePoolSize)

A pool of threads that can execute tasks on a scheduled basis. The number of core threads for the pool is specified by the parameter, maximum number of threads = integer.max_value

Task thread pool execution process

How do I determine the appropriate number of threads?

  • For CPU-intensive applications, set the thread pool size to N+1 (N is the total number of CPU cores).
  • For IO intensive applications, set the thread pool size to 2N+1 (N is the total number of CPU cores).
  • The higher the proportion of thread wait time (IO), the more threads are required.
  • The higher the percentage of thread CPU time, the fewer threads are required.

The fastest part of a system is the CPU, so it is the CPU that determines the maximum throughput of a system. The CPU processing capability is enhanced to increase the upper limit of system throughput. However, due to the short board effect, the real system throughput cannot be calculated solely from the CPU. To improve system throughput, we need to start with the “system weaknesses” (such as network latency, IO) :

  1. Maximize the parallelization rate of short board operations, such as multi-threaded download technology;
  2. To enhance the ability of weaknesses, such as NIO instead of IO;

Thread pool usage analysis

public class ExecutorsUse {
    /** * test: commit 15 tasks with execution time of 3 seconds to see the status of the thread pool **@paramThreadPoolExecutor is passed to different thread pools to see different results *@throws Exception
     */
    public void testCommon(ThreadPoolExecutor threadPoolExecutor) throws Exception {
        // Test: commit 15 tasks with execution time of 3 seconds, and see how the two tasks that exceed the size are handled
        for (int i = 0; i < 15; i++) {
            int n = i;
            threadPoolExecutor.submit(() -> {
                        try {
                            System.out.println("Start execution:" + n);
                            Thread.sleep(3000L);
                            System.err.println("End of execution :" + n);
                        } catch(InterruptedException e) { e.printStackTrace(); }}); System.out.println("Task submitted successfully :" + i);
        }
        // Check the number of threads
        Thread.sleep(500L);
        System.out.println(The current number of threads in the thread pool is: + threadPoolExecutor.getPoolSize());
        System.out.println(The number of threads currently waiting in the thread pool is: + threadPoolExecutor.getQueue().size());
        // Wait 15 seconds to check the number of threads and queues (theoretically, the number of threads will be destroyed automatically if the number of core threads exceeds the number)
        Thread.sleep(15000L);
        System.out.println(The current number of threads in the thread pool is: + threadPoolExecutor.getPoolSize());
        System.out.println(The number of threads currently waiting in the thread pool is: + threadPoolExecutor.getQueue().size());
    }

    Thread pool information: number of core threads 5, maximum number of core threads 10, unbounded queue, thread survival time beyond the number of core threads: 5 seconds, specified reject policy **@throws Exception
     */
    private void threadPoolExecutorTest1(a) throws Exception {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5.10.5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>());
        testCommon(threadPoolExecutor);
        // Expected result: the number of threads in the thread pool is: 5, the number of tasks exceeds the number, and the others are queued for execution
    }

    /** * 2, thread pool information: number of core threads 5, maximum number of core threads 10, queue size 3, thread lifetime beyond the number of core threads: 5 seconds, specify the ** of the reject policy@throws Exception
     */
    private void threadPoolExecutorTest2(a) throws Exception {
        // Create a pool with 5 core threads, a maximum of 10, and a maximum of 3 waiting queues, that is, a maximum of 13 tasks.
        / / the default strategy is to throw RejectedExecutionException abnormalities, Java. Util. Concurrent. ThreadPoolExecutor. AbortPolicy
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5.10.5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.err.println("We've had a mission denied."); }}); testCommon(threadPoolExecutor);// Expected results:
        // 1, 5 tasks are directly assigned to the thread to start execution
        // Two or three tasks enter the waiting queue
        // 3, the queue is insufficient, temporarily add 5 threads to execute the task (5 seconds no work to destroy)
        // 4, the queue and thread pool are full, and there are 2 tasks left.
        // 5. If no task can be executed after 5 seconds, destroy the temporarily created 5 threads
    }

    /** * 3, thread pool information: number of core threads 5, maximum number of core threads 5, unbounded queue, thread lifetime beyond the number of core threads: 5 seconds **@throws Exception
     */
    private void threadPoolExecutorTest3(a) throws Exception {
        / / and Executors. NewFixedThreadPool (int nThreads)
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5.5.0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        testCommon(threadPoolExecutor);
        // Expected junction: the number of threads in the thread pool is: 5, the number of tasks exceeds the number, the rest of the queue is waiting to be executed
    }

    /** * 4, thread pool information: * Number of core threads 0, maximum number of cores Integer.MAX_VALUE, SynchronousQueue, lifetime of threads beyond the number of core threads: 60 seconds **@throws Exception
     */
    private void threadPoolExecutorTest4(a) throws Exception {

        // SynchronousQueue, which is not really a queue because it does not maintain storage space for the elements in the queue. Unlike other queues, it maintains a set of threads that are waiting to add or remove elements from the queue.
        // With SynchronousQueue as the work queue, when client code submits a task to the thread pool,
        // If there are no idle threads in the pool to fetch a task from a SynchronousQueue instance,
        // Then the corresponding offer method call will fail (i.e., the task has not been queued).
        // In this case, the ThreadPoolExecutor will create a new worker thread to process the failed queued task (assuming that the thread pool size has not reached maximumPoolSize).

        / / and Executors. NewCachedThreadPool () the same
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
        testCommon(threadPoolExecutor);
        // Expected results:
        // 1, the number of threads in the thread pool is: 15, the number of tasks exceeds the number, the rest of the queue is waiting to be executed
        If no task can be executed 60 seconds later, all threads are destroyed and the pool size is restored to 0
        Thread.sleep(60000L);
        System.out.println("60 seconds later, look at the number of threads in the pool:" + threadPoolExecutor.getPoolSize());
    }

    <br/> * Number of core threads 5, maximum number of cores Integer.MAX_VALUE, DelayedWorkQueue Specifies the duration of time for threads exceeding the number of core threads to survive: 0 seconds **@throws Exception
     */
    private void threadPoolExecutorTest5(a) throws Exception {
        / / and Executors. NewScheduledThreadPool () the same
        ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(5);
        threadPoolExecutor.schedule(new Runnable() {
            @Override
            public void run(a) {
                System.out.println("Mission carried out, now time:"+ System.currentTimeMillis()); }},3000, TimeUnit.MILLISECONDS);
        System.out.println(
                "Scheduled task, successfully submitted, time is:" + System.currentTimeMillis() + ", number of threads in the current thread pool:" + threadPoolExecutor.getPoolSize());
        // Expected result: the task will be executed once after 3 seconds
    }

    <br/> * Number of core threads 5, maximum number of core threads Integer.MAX_VALUE, DelayedWorkQueue Specifies the duration of the queue that exceeds the number of core threads@throws Exception
     */
    private void threadPoolExecutorTest6(a) throws Exception {
        ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(5);
        // Execute a task periodically. Thread pools provide two types of scheduling, which are shown here separately. The test scenario is the same.
        // Test scenario: the submitted task takes 3 seconds to complete. Look at the differences between the two different scheduling methods
        // Effect 1: After the commit, the first execution starts 2 seconds later. After that, the execution is fixed once every 1 second (if the last execution has not been completed, the execution will wait until it is finished, and the execution will be executed immediately after it is finished).
        // In this code, execute once every 3 seconds (calculate method: each time execute three seconds, interval 1 second, immediately start the next execution, no need to wait)
        threadPoolExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run(a) {
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Mission 1 has been executed, now time:"+ System.currentTimeMillis()); }},2000.1000, TimeUnit.MILLISECONDS);

        // Effect 2: After the commit, the first execution starts 2 seconds later. After that, the execution is fixed once every 1 second. (If the last execution is not complete, wait until the last execution is complete.
        // The effect of this code clock is: execute every 4 seconds. (Calculation method: each execution is 3 seconds, the interval is 1 second, and after the execution, wait for 1 second, so it is 3+1)
        threadPoolExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run(a) {
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Mission 2 has been executed, now time:"+ System.currentTimeMillis()); }},2000.1000, TimeUnit.MILLISECONDS);
    }

    Thread pool information: number of core threads 5, maximum number of core threads 10, queue size 3, thread survival time beyond the number of core threads: 5 seconds, specify the rejection policy **@throws Exception
     */
    private void threadPoolExecutorTest7(a) throws Exception {
        // Create a pool with 5 core threads, a maximum of 10, and a maximum of 3 waiting queues, that is, a maximum of 13 tasks.
        / / the default strategy is to throw RejectedExecutionException abnormalities, Java. Util. Concurrent. ThreadPoolExecutor. AbortPolicy
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5.10.5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.err.println("We've had a mission denied."); }});// Test: commit 15 tasks with execution time of 3 seconds, and see how the two tasks that exceed the size are handled
        for (int i = 0; i < 15; i++) {
            int n = i;
            threadPoolExecutor.submit(new Runnable() {
                @Override
                public void run(a) {
                    try {
                        System.out.println("Start execution:" + n);
                        Thread.sleep(3000L);
                        System.err.println("End of execution :" + n);
                    } catch (InterruptedException e) {
                        System.out.println("Abnormal:"+ e.getMessage()); }}}); System.out.println("Task submitted successfully :" + i);
        }
        // Terminate the thread pool after 1 second
        Thread.sleep(1000L);
        threadPoolExecutor.shutdown();
        // Failed to submit again
        threadPoolExecutor.submit(new Runnable() {
            @Override
            public void run(a) {
                System.out.println("Add a mission"); }});// Result analysis
        // 1. 10 tasks are executed, 3 tasks are queued, and 2 tasks are rejected
        // 2. After shutdown is invoked, no new tasks are received until 13 tasks are completed
        // 3. The appended task cannot be submitted after the thread pool is closed and will be rejected
    }

    Thread pool information: number of core threads 5, maximum number of core threads 10, queue size 3, thread survival time beyond the number of core threads: 5 seconds, specified to reject the policy **@throws Exception
     */
    private void threadPoolExecutorTest8(a) throws Exception {
        // Create a pool with 5 core threads, a maximum of 10, and a maximum of 3 waiting queues, that is, a maximum of 13 tasks.
        / / the default strategy is to throw RejectedExecutionException abnormalities, Java. Util. Concurrent. ThreadPoolExecutor. AbortPolicy
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5.10.5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.err.println("We've had a mission denied."); }});// Test: commit 15 tasks with execution time of 3 seconds, and see how the two tasks that exceed the size are handled
        for (int i = 0; i < 15; i++) {
            int n = i;
            threadPoolExecutor.submit(new Runnable() {
                @Override
                public void run(a) {
                    try {
                        System.out.println("Start execution:" + n);
                        Thread.sleep(3000L);
                        System.err.println("End of execution :" + n);
                    } catch (InterruptedException e) {
                        System.out.println("Abnormal:"+ e.getMessage()); }}}); System.out.println("Task submitted successfully :" + i);
        }
        // Terminate the thread pool after 1 second
        Thread.sleep(1000L);
        List<Runnable> shutdownNow = threadPoolExecutor.shutdownNow();
        // Failed to submit again
        threadPoolExecutor.submit(new Runnable() {
            @Override
            public void run(a) {
                System.out.println("Add a mission"); }}); System.out.println("Unfinished tasks are:" + shutdownNow.size());

        // Result analysis
        // 1. 10 tasks are executed, 3 tasks are queued, and 2 tasks are rejected
        // 2. After shutdownNow is called, 3 threads in the queue are no longer executed and 10 threads are terminated
        // 3. The appended task cannot be submitted after the thread pool is closed and will be rejected
    }

    public static void main(String[] args) throws Exception {
// new ExecutorsUse().threadPoolExecutorTest1();
// new ExecutorsUse().threadPoolExecutorTest2();
// new ExecutorsUse().threadPoolExecutorTest3();
        new ExecutorsUse().threadPoolExecutorTest4();
// new ExecutorsUse().threadPoolExecutorTest5();
// new ExecutorsUse().threadPoolExecutorTest6();
// new ExecutorsUse().threadPoolExecutorTest7();
// new ExecutorsUse().threadPoolExecutorTest8();}}Copy the code