ThreadPoolExecutor use

Tip: if you have any questions, please contact the private message, below the source code address, please take


preface


Tip: The following is the text of this article, the following case for reference

1. Technical introduction

1. What is a thread pool?

Two, the use of steps

1. Description of the ThreadPoolExecutor parameter

The parameter name The parameter types Parameter meaning
corePoolSize int Core thread pool size
maximumPoolSize int Maximum thread pool size
keepAliveTime long Maximum idle time of a thread
unit TimeUnit Unit of time
workQueue BlockingQueue Thread waiting queue
threadFactory ThreadFactory Thread creation factory
handler RejectedExecutionHandler Rejection policies
Let’s take a look at the underlying source code for the Execute method of the ThreadPoolExecutor class
OK, according to the judgment:

1. If fewer threads are running than the corePoolSize thread, try starting a new thread with the given command as its first task.

2. If the task can be queued successfully, then we still need to check again whether threads should be added (because the existing thread has died since the last check) or whether the pool has been closed since entering this method. So, we recheck the status and roll back the queue if necessary, or start a new thread if there is no thread.

3. If we cannot queue the task, try adding a new thread. If it fails, we know that we are closed or saturated and therefore reject the task.

2. NewSingleThreadExecutor use

The code is as follows (example) :

	@Test
    public void testNewSingleThreadExecutor(a) {
        ExecutorService threaPool = Executors.newSingleThreadExecutor();
        long start = System.currentTimeMillis();
        System.out.println("Thread pool execution begins");
        int idx = 10;
        while (--idx > 0) {
            threaPool.execute(() -> {
                try {
                    LOGGER.info("Thread executing");
                    TimeUnit.SECONDS.sleep(1);
                } catch(InterruptedException interruptedException) { interruptedException.printStackTrace(); }}); } threaPool.shutdown();for(; ;) {if (threaPool.isTerminated())
                break;
        }
        long end = System.currentTimeMillis();
        System.out.println("Thread pool execution finished, total time:" + (end - start) + " ms ");


    }

Copy the code

The results of running this test method are as follows:Notice that where I have marked with the red box, only 1 thread is used to execute, what is the principle? Let’s look at the source code for newSingleThreadExecutor

	public static ExecutorService newSingleThreadExecutor(a) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

Copy the code

We’ve built a ThreadPoolExecutor pool with one core thread and one maximum execution thread, and the queue is LinkedBlockingQueue. Let’s click on it again and see what the default constructor of LinkedBlockingQueue isYou can see that this is a queue with a capacity of interger.max_value by default

Conclusion: newSingleThreadExecutor is a pool with a core of 1 threads, a pool with a maximum of 1 threads, and an infinite queue, so you should know why it only opened one thread to execute.

3. Use newFixedThreadPool

The code is as follows (example) :

 	@Test
    public void testNewFixedThreadPool(a) {
      	ExecutorService threaPool = Executors.newFixedThreadPool(5);
        long start = System.currentTimeMillis();
        System.out.println("Thread pool execution begins");
        int idx = 20;
        while (--idx >= 0) {
            threaPool.execute(() -> {
                try {
                    LOGGER.info("Thread executing");
                    TimeUnit.SECONDS.sleep(1);
                } catch(InterruptedException interruptedException) { interruptedException.printStackTrace(); }}); } threaPool.shutdown();for(; ;) {if (threaPool.isTerminated())
                break;
        }
        long end = System.currentTimeMillis();
        System.out.println("Thread pool execution finished, total time:" + (end - start) + " ms ");
    }

Copy the code

Let’s take a look at the results

OK, look at the execution result, we can see that only 5 threads are opened, each batch of 5 execution, next let’s look at the source codeWe also construct a ThreadPoolExecutor pool with parameters as follows: The number of core threads and the maximum number of threads in the pool are passed in, and the unit test is passed in with 5, so we run on 5 threads and reuse these 5 threads to execute in the queue.

Conclusion: newFixedThreadPool is a pool of fixed size threads that execute based on the parameters passed in

4. NewCachedThreadPool use

The code is as follows (example) :

	@Test
    public void testNewCachedThreadPool(a) {
        ExecutorService threaPool = Executors.newCachedThreadPool();
        long start = System.currentTimeMillis();
        System.out.println("Thread pool execution begins");
        int idx = 200;
        while (--idx >= 0) {
            threaPool.execute(() -> {
                    LOGGER.info("Thread executing");

            });
        }
        threaPool.shutdown();
        for(; ;) {if (threaPool.isTerminated())
                break;
        }
        long end = System.currentTimeMillis();
        System.out.println("Thread pool execution finished, total time:" + (end - start) + " ms ");
    }

Copy the code

OK, this is different, let’s execute 200 threads, let’s look at the result first,It is obvious that the thread is repeatedly used to execute short time tasks. What is the reason? Let’s look at the source firstCreate a core queue with zero threads and a maximum execution thread of interger. MAX_VALUE. Note that SynchronousQueue has no capacity and is a blocking queue that does not store elements. The task is handed directly to the consumer, and you must wait for the added elements in the queue to be consumed before you can continue adding new elements.

SynchronousQueue, the underlying principles of which will be covered in a later article on queues, will not be covered here

Conclusion: newCachedThreadPool is a pool of threads that can be expanded indefinitely. It creates a new thread if there are no idle threads, and uses idle threads if there are

5. Recommended thread pool usage

Through the above test case and source code analysis, I believe you have a certain understanding of the thread pool, summarized as follows:

NewSingleThreadExecutor: open only one thread running, processing efficiency is slow, there is no limit to the size of the blocking queue size, if the queue accumulation data too much can cause resource consumption

2. NewFixedThreadPool: a fixed size pool of threads that can control the number of concurrent threads, but there is no limit to the blocking queue size, if the queue is too much data will cause resource consumption

3. NewCachedThreadPool: It is suitable for processing services with short execution time, but if the thread is created without limit, it may cause too much memory and generate OOM, and will cause excessive CPU switching and consume too many resources.

So the recommendation is to customize ThreadPoolExecutor for business scenarios, especially high-concurrency, high-traffic systems, which is why ali internally does not recommend using the above thread pools.

The author remarks

Does it feel simple? For more usage, please click below to see the source code, and follow me as I reveal more advanced usage

Source address: click here to view the source code.