preface
Thread pool is a common concurrency framework in Java. Reasonable use of thread pool can: reduce resource consumption, improve response speed, and improve thread manageability. This article is the study note of Chapter 9 of “The Art of Concurrent Programming in Java”. According to the author of the original article, we have learned and summarized the principle of thread pool, the creation of thread pool, the execution of task of thread pool and the closing of thread pool. Finally, I will give a thread pool implementation case, this case for a WeChat public number before the implementation of the actual combat of the thread pool, has been unable to investigate its source, the original case for thread pool closure defects, I have made some corrections and explanations, to share with you to learn.
The body of the
The principle of thread pool
When a task is submitted to the thread pool ThreadPoolExecutor, the execution of the task is shown in the figure below.
- If the number of threads currently running is less than CorePoolszie (number of core threads), a new thread is created to execute the task (need to acquire the global lock);
- If the number of threads currently running is equal to or greater than CorePoolszie, the task is added
BlockingQueue
(task blocking queue); - if
BlockingQueue
If it is full, a new thread is created to execute the task (requiring global lock acquisition). - If creating a new thread would make the current number of threads greater than MaximumPoolSize (the maximum number of threads), reject the task and call
RejectedExecutionHandler
therejectedExecution()
Methods.
Since the collection used by the ThreadPoolExecutor to store the worker thread is a HashSet, it is necessary to obtain a global lock to ensure thread safety when performing steps 1 and 3 above. In general, obtaining a global lock can cause thread pool performance bottlenecks. After the thread pool has warmed up (the current number of threads is greater than or equal to CorePoolSize), the execute() method of the thread pool executes Step 2.
Thread pool creation
You can create a thread pool with ThreadPoolExecutor, and the constructor signature of ThreadPoolExecutor is as follows.
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
When creating a thread pool with ThreadPoolExecutor, you need to specify the number of core threads in the pool, the maximum number of threads, the retention time, the retention time unit, and the task blocking queue, and specify the thread factory and the saturation rejection policy as needed. If you do not specify the thread factory and the saturation rejection policy, you need to specify the core number of threads, the maximum number of threads, the retention time unit and the task blocking queue. ThreadPoolExecutor uses the default thread factory and saturation rejection policy. Here’s what these parameters mean.
parameter | meaning |
---|---|
corePoolSize | The number of core threads, or the base size of the thread pool. When a task is submitted to a thread pool, if the number of threads in the pool is less than CorePoolSize, a new thread will need to be created to execute the task regardless of whether the other threads are idle. |
maximumPoolSize | Maximum number of threads. When the number of threads in the thread pool is greater than or equal to CorePoolSize, newly submitted tasks are added to the task-blocking queue, but if the task-blocking queue is full and the number of threads is less than MaximumPoolSize, new threads are created to execute the task. This parameter specifies the maximum number of threads that the thread pool is allowed to create |
keepAliveTime | Thread retention time. When the number of threads in the thread pool is greater than the number of core threads, the excess idle thread will survive for a maximum KeepAliveTime. If this time is exceeded and the idle thread has not yet acquired a task to execute, the idle thread will be reclaimed. |
unit | The unit of time a thread is alive. throughTimeUnit You can select DAYS, HOURS, MINUTES, SECONDS, MILLISECONDS, MICROSECONDS and NANOSECONDS for how long a thread is alive, but no matter what time you specify,ThreadPoolExecutor Unity will convert it to NanoConds. |
workQueue | The task blocks the queue. When the number of threads in the thread pool is greater than or equal to CorePoolSize, new tasks will be added to the Workqueue. After all threads finish executing the previous task, they will loop through the Workqueue for tasks to execute. |
threadFactory | Create a factory for the thread. Each thread created can be given a more meaningful name through a thread factory. |
handler | Saturated rejection strategy. If the task-blocking queue is full and the number of threads in the thread pool is equal to MaximumPoolSize, the thread pool is now saturated and a denial policy should be implemented to process the newly submitted tasks. |
The thread pool executes the task
The thread pool executes the task using two methods, execute() and submit(). The execute() method is used to execute tasks that do not require a return value, and the submit() method is used to execute tasks that do. Execute () is the method defined by the interface Executor, submit() is the method defined by the interface ExecutorService, and the related class diagram is shown below.
ThreadPoolExecutor implements execute() as follows.
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (! addWorker(command, false)) reject(command); }
AbstractExecutorService implements submit() as follows.
public Future<? > submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
The execute() method determines whether to create a new thread to process the task or add the task to the task-blocking queue based on the current number of threads, while the submit() method wraps the task as a RunnableFuture and calls the execute() method.
Close the thread pool
You can close the thread pool by calling the shutdown() or shutdownNow() method on the thread pool.
The shutdown() method sets the thread pool state to shutdown. At this point, the pool will not receive any new submitted tasks. Idle threads will be interrupted, and the pool will not be safely closed until the task being executed and the tasks in the task-blocking queue have finished executing.
The shutdownNow() method sets the thread pool state to STOP, at which point the thread pool will not receive new submitted tasks, all threads will be interrupted, tasks in the task-blocking queue will no longer be executed (they will be returned as a list), and executing tasks will be attempted to STOP.
Note: The above idle thread can be understood as a thread that is fetching a task from a task-blocking queue, that is, a thread that is not executing a task.
Five. Thread pool implementation practice
In a ThreadPoolExecutor, the collection of workers (Worker threads) is called a HashSet, so a global lock needs to be acquired every time a Worker collection is operated on. In the practice of this thread pool implementation, a ConcurrentHashSet will be implemented based on the ConcurrentHashMap as a collection of storage workers. The implementation is as follows.
public class ConcurrentHashSet<T> extends AbstractSet<T> { private final ConcurrentHashMap<T, Object> MAP = new ConcurrentHashMap<>(); ConcurrentHashMap private final Object PRESENT = new Object(); ConcurrentHashMap private final Object PRESENT = new Object(); Private final AtomicInteger COUNT = new AtomicInteger(); @Override public boolean add(T t) { COUNT.incrementAndGet(); return MAP.put(t, PRESENT) == null; } @Override public boolean remove(Object o) { COUNT.decrementAndGet(); return MAP.remove(o) == PRESENT; } @Override public Iterator<T> iterator() { return MAP.keySet().iterator(); } @Override public int size() { return COUNT.get(); }}
Take a look at the fields in ThreadPool again.
Public class threadPool {private final ReentrantLock lock = new ReentrantLock(); Private final int coreSize; private final int coreSize; Private final int maxSize; private final int maxSize; Private Final Long KeepAliveTime; private Final Long KeepAliveTime; Private final TimeUnit; private final TimeUnit; Private final BlockingQueue<Runnable> WorkQueue; Private volatile threadFactory; private volatile threadFactory; Private Volatile RejectedExecutionHandler handler; Private final Set<Worker Worker> workers = new ConcurrentHashSet<>(); Private final AtomicBoolean shutDown = new AtomicBoolean(false); private final AtomicBoolean shutDown = new AtomicBoolean(false); Private Final AtomicBoolean shutdownNow = new AtomicBoolean(false); Private Final AtomicInteger Tasknum = new AtomicInteger(); . Public threadFactory getThreadFactory() {return threadFactory; } / update the thread pool rejection policies * * * * @ param handler reject strategy * / public void setRejectedExecutionHandler (RejectedExecutionHandler handler) { if (handler == null) { throw new NullPointerException(); } this.handler = handler; Public void setThreadFactory(threadFactory, threadFactory, threadFactory, threadFactory, threadFactory, threadFactory, threadFactory, threadFactory, threadFactory) {if (threadFactory == null) { throw new NullPointerException(); } this.threadFactory = threadFactory; }... }
Because this actual combat only needs to achieve the basic simple function of the thread pool, so the number of core threads, the maximum number of threads, thread retention time and thread retention time units once specified, can not be modified. Take a look at the constructor.
public class ThreadPool { ...... /** * create thread pool, Use the default reject policy and the thread factory * @Param CoreSize Core Threads * @Param MaxSize Maximum Threads * @Param KeepAliveTime Thread KeepTime * @Param Unit Thread KeepTime Unit * Public ThreadPool(int coreSize, int maxSize, int keepAliveTime, timeUnit keepAliveTime) BlockingQueue<Runnable> workQueue) { this.coreSize = coreSize; this.maxSize = maxSize; this.keepAliveTime = keepAliveTime; this.unit = unit; this.workQueue = workQueue; threadFactory = Executors.defaultThreadFactory(); handler = new AbortPolicy(); } /** * create thread pool, Use the default rejection policy * @Param CoreSize Core Thread Number * @Param MaxSize Maximum Thread Number * @Param KeepAliveTime Thread Hold Time * @Param Unit Thread Hold Time Unit * @Param Public ThreadPool(int coreSize, int maxSize, int keepAliveTime, int keepAliveTime) TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this.coreSize = coreSize; this.maxSize = maxSize; this.keepAliveTime = keepAliveTime; this.unit = unit; this.workQueue = workQueue; this.threadFactory = threadFactory; handler = new AbortPolicy(); } /** * Create a thread pool * @Param CoreSize * @Param MaxSize * @Param KeepAliveTime * @Param KeepAliveTime * @Param Unit * @Param Workqueue * @Param ThreadFactory * @Param Handler Reject Policy */ public ThreadPool(int coreSize, int CoreSize, int CoreSize) int maxSize, int keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { this.coreSize = coreSize; this.maxSize = maxSize; this.keepAliveTime = keepAliveTime; this.unit = unit; this.workQueue = workQueue; this.threadFactory = threadFactory; this.handler = handler; }... }
Three constructors, similar to ThreadPoolExecutor, are provided: the number of core threads, the maximum number of threads, the time to hold a thread alive, the unit of time to hold a thread alive, and the task blocking queue specified by the user. Let’s look at the submit() method and execute() method.
public class ThreadPool { ...... * @Param Callable * @Param Callable * @Param Callable * @Param Callable */ public <T> Future<T> submit(Callable<T> Callable) {if (Callable == null) {throw new NullPointerException(); } FutureTask<T> futureTask = new FutureTask<>(callable); execute(futureTask); return futureTask; } /** * Execute tasks with no return value * @Param Runnable needs to execute tasks, */ public void execute(Runnable Runnable) {if (Runnable == null) {throw new NullPointerException(); } / / call the shutDown () or shutDownNow () method, the thread pool will not accept a new task if (shutDown. The get () | | shutDownNow. The get () {return; } // Add the total number of tasks to tasknum. incrementAndGet(); If (Workers. Size () < CoreSize) {addWorker(Runnable); return; } // If the current number of threads is greater than or equal to the core number of threads, the task is added to the task-blocking queue // If the task-blocking queue is full, the offer() method returns false to indicate failure to add Boolean offer = workqueue.offer (Runnable); if (! If (Workers. Size () < MaxSize) {addWorker(Runnable); } else {// If the current number of threads is greater than or equal to the maximum number of threads, the reject policy is executed, and the total number of tasks is minus one tasknum.decrementAndGet (); handler.rejectedExecution(runnable, this); } } } private void addWorker(Runnable runnable) { Worker worker = new Worker(runnable); workers.add(worker); worker.getThread().start(); }... }
The execute() method of ThreadPool follows the judgment strategy summarized in the first section when a ThreadPool receives a new task. The addWorker() method creates a Worker object and then adds it to the Worker collection. Since the implemented thread-safe ConcurrentHashSet is used as the Worker collection, this step does not require a global lock. The Worker class is an internal class to ThreadPool and is implemented as follows.
public class ThreadPool { ...... Private final class Worker implements Runnable {// private final class Worker implements Runnable; private final class Worker implements Runnable; Private final Thread Thread; private final Thread Thread; public Worker(Runnable task) { firstTask = task; thread = getThreadFactory().newThread(this); } public Thread getThread() { return thread; } public void close() { thread.interrupt(); } @Override public void run() {Runnable task = firstTask; If the task is empty, get the task from the task blocking queue while (task! = null || (task = getTask()) ! = null) {try {// Task.run (); } finally {// At the end of each task, the task is set to null, and the task is fetched from the task blocking queue ina loop. taskNum.decrementAndGet(); }}} finally {// Get a NULL task from the task blocking queue and break out of the while loop and execute the thread release logic here if (Workers. Remove (this)) {// Used to safely close the thread pool after calling shutDown() tryClose(); } } } } ...... }
The Worker has two fields: firstTask and Thread. FirstTask will be assigned to the task passed in when the Worker is created, representing the initial task of the Worker. Therefore, a Worker will always execute the initial task first, and then get the task from the task-blocking queue. The thread is created by the thread factory when the Worker is created, and the task performed by the thread is the Worker itself (because the Worker implements the Runnable interface, the Worker itself is also a task). After the Worker is created, the thread thread in the Worker will start, which will execute the Worker’s run() method. The implementation idea of the run() method is to execute the initial task first, and then call the threadPool’s getTask() method in a loop to get the task from the task blocking queue. If the fetch task fails (the thread retention time is exceeded/the thread is interrupted), exit the loop and execute the logic to release the Worker.
Let’s look at the implementation of the getTask() method.
public class ThreadPool { ...... private Runnable getTask() { Runnable task; // If the mandatory close flag is true, then the task is no longer fetched from the task-blocking queue. Is no longer for task from the task blocking queue if (shutDownNow. The get () | | (shutDown. The get () && taskNum) get () = = 0)) {return null; } try {// Only one thread can obtain a task from the task-blocking queue at a time, while the remaining threads enter the blocking state. LockInterruptibly (); If (Workers. Size () > CoreSize) {// If the current number of threads is greater than the core number of threads, the maximum number of time a thread can wait for a task to survive while fetchinga task from the task-blocking queue. Null task = workQueue. Poll (KeepAliveTime, Unit); if (task == null) { System.out.println(Thread.currentThread().getName() + " time out"); }} else {// If the current number of threads is less than or equal to the core number of threads, the thread that fetches a task from the task blocking queue will block until it fetches the task task task = workqueue.take (); } } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " interrupted"); task = null; } finally { lock.unlock(); } return task; }... }
To understand the getTask() method, you need to analyze it in conjunction with the ThreadPool shutDown() method and the shutdownNow () method. The implementation is as follows.
public class ThreadPool { ...... Public void shutDown() {Shutdown.set (true); /** * Shutdown.set (true); /** * Shutdown.set (true); tryClose(); } public void shutdownNow () {shutdownNow.set (true);} public void shutdownNow.set (true); doClose(); } private void tryClose() {if (shutDown. Get () &&tasknum. Get () == 0) { For (Worker Worker: workers) {Worker. Close (); }} private void doClose() {if (shutdownNow.get ()) {if (shutdownNow.get ())) {if (shutdownNow.get ()) { workers) { worker.close(); }}}}
Worker retrieves tasks from the task-blocking queue via the getTask() method in three scenarios.
In the first case, the shutDown() and shutdownNow () methods were not called. In this case, both shutDown and shutdownNow are false. In this case, the Worker can normally obtain the task from the task-blocking queue. Due to the use of global lock, only one Worker can obtain the task from the task-blocking queue at the same time, while the other workers enter the blocking state. If the current number of threads in the thread pool is greater than the number of core threads, then the Worker that obtained the global lock will block on the poll() method of the task blocking queue. If no task is obtained after the guaranteed time, NULL will be returned directly, and the Worker will be released. If the current number of threads in the thread pool is less than or equal to the core number of threads, the Worker that acquired the global lock will block on the take() method of the task-blocking queue until the task is acquired.
The second case is when the shutDown() method is called. The shutDown is true. If there are still outstanding tasks, the Worker can get the tasks from the task blocking queue normally, same as case 1. If the task-blocking queue has no tasks and no tasks are executing, then the shutDown call will interrupt all threads and the workers blocked in the getTask() method will be woken up and released. If the task-blocking queue does not have a task when the shutDown() method is called, then waiting for the last task to finish will interrupt all threads, and all workers blocked in the getTask() method will be woken up and released.
Third case: The shutdownNow () method is called, and shutdownNow is true. When the shutdownNow () method is called, all threads are interrupted regardless of whether there are tasks in the task-blocking queue and whether there are executing tasks. If the thread is blocked in the task-blocking queue, the thread is awakened and released. If the thread is executing a task, the thread is also tried to interrupt. Threads that have finished executing the task or exited from the task in response to an interrupt will get NULL (because shutdownNow is true) when they get the task again from the getTask() method and are freed. In particular, if a thread is performing an endless loop and cannot respond to an interrupt, the thread will never be released.
Finally, ThreadPool provides a size() method to get the current number of workers, which is only used for testing purposes.
public int size() {
return workers.size();
}
Let’s write a test program to test ThreadPool.
Test case 1: Test the shutDown() method for ThreadPool, create a ThreadPool with a core number of 2, a maximum number of 4, a task-blocking queue size of 20, create a fixed task with a print number of 0-2, and print a digital sleep for 1 second each time. First commit the three fixed tasks to ThreadPool and wait for 4 seconds before the shutDown() method is executed. The expected result should be that all three tasks submitted will complete and that the tasks that were blocked in the task blocking queue will be woken up and released after the task completes. The test code is as follows.
public class ThreadPoolTest { @Test void testThreadPool_1() { CountDownLatch countDownLatch = new CountDownLatch(3); BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(20); ThreadPool threadPool = new ThreadPool(2, 4, 1000, TimeUnit.MILLISECONDS, workQueue); Runnable task = () -> { try { for (int i = 0; i < 3; i++) { System.out.println(Thread.currentThread().getName() + ": " + i); Thread.sleep(1000); } } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " interrupted from sleep"); } countDownLatch.countDown(); }; for (int i = 0; i < 3; i++) { threadPool.execute(task); } try { Thread.sleep(4000); threadPool.shutDown(); Thread.sleep(3000); countDownLatch.await(); } catch (InterruptedException e) { System.out.println(e.getMessage()); } System.out.println("Worker size: " + threadPool.size()); System.out.println("Task num: " + workQueue.size()); }}
The test results are shown below.
ThreadPool creates thread 1 and thread 2 to process task1 and task2, respectively, and then task3 is added to the taskblocking queue. When thread 1 and thread 2 finish executing the task, thread 2 acquires the global lock and task3 from the taskblocking queue, while thread 1 enters the blocking state. The shutDown() method is called and all threads are interrupted until thread 2 finishes executing the task. At this point, thread 1 is woken up and released, leaving the number of threads and tasks in the ThreadPool zero.
Test Case 2: Test the shutdownNow () method of ThreadPool, create a ThreadPool with a core number of 2, a maximum number of 4, a task blocking queue size of 20, create a fixed task with a print number of 0-2, and print a digital sleep for 1 second each time. First commit three fixed tasks to ThreadPool and wait 2 seconds to execute the shutdownNow () method. The expected result should be that none of the three tasks submitted will finish and all threads will be interrupted and freed after the shutdownNow () method is called. The test code is as follows.
public class ThreadPoolTest { @Test void testThreadPool_2() { CountDownLatch countDownLatch = new CountDownLatch(2); BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(20); ThreadPool threadPool = new ThreadPool(2, 4, 1000, TimeUnit.MILLISECONDS, workQueue); Runnable task = () -> { try { for (int i = 0; i < 3; i++) { System.out.println(Thread.currentThread().getName() + ": " + i); Thread.sleep(1000); } } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " interrupted from sleep"); } countDownLatch.countDown(); }; for (int i = 0; i < 3; i++) { threadPool.execute(task); } try { Thread.sleep(2000); threadPool.shutDownNow(); Thread.sleep(1000); countDownLatch.await(); } catch (InterruptedException e) { System.out.println(e.getMessage()); } System.out.println("Worker size: " + threadPool.size()); System.out.println("Task num: " + workQueue.size()); }}
The test results are shown below.
The results show that ThreadPool first creates threads 1 and 2 to process task1 and task2, respectively, then task3 is added to the taskblocking queue, and the shutdownNow () method is called while thread 1 and thread 2 are executing the task. Since task1 and task2 can respond to interrupts and exit the task, threads 1 and 2 exit the task and are released, task3 in the task-blocking queue is not executed, and the number of threads in the ThreadPool is 0 and the number of tasks is 1.
Test Case 3: Create a ThreadPool with a core number of 2, a maximum number of 4, a task blocking queue size of 1, a thread holding time of 1 second, create a fixed task with a print number of 0-2, and print a digital sleep for 1 second each time. After submitting five fixed tasks to the ThreadPool and waiting for the ThreadPool to execute them, the expected result should be that the number of threads in the ThreadPool should be 2 (the core number of threads) and the number of tasks should be 0 when all the tasks have finished executing. The test code is as follows.
public class ThreadPoolTest { @Test void testThreadPool_3() { CountDownLatch countDownLatch = new CountDownLatch(5); BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(1); ThreadPool threadPool = new ThreadPool(2, 4, 1000, TimeUnit.MILLISECONDS, workQueue); Runnable task = () -> { try { for (int i = 0; i < 3; i++) { System.out.println(Thread.currentThread().getName() + ": " + i); Thread.sleep(1000); } } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " interrupted from sleep"); } countDownLatch.countDown(); }; for (int i = 0; i < 5; i++) { threadPool.execute(task); } try { countDownLatch.await(); } catch (InterruptedException e) { System.out.println(e.getMessage()); } System.out.println("Worker size: " + threadPool.size()); System.out.println("Task num: " + workQueue.size()); }}
The test results are shown below.
ThreadPool creates threads 1 and 2 to process task1 and task2, then task3 is added to the taskblocking queue, then threads 3 and 4 are created to process task4 and task5, and after the task finishes, thread 4 acquires the global lock. Task3 is retrieved from the task blocking queue and continues to execute. At this point, threads 1 and 2 are released because they did not get a task to execute during the thread retention time. Finally, the number of threads in the ThreadPool is 2 and the number of tasks is 0.
conclusion
The use of thread pool mainly focuses on the configuration of the parameters of the thread pool, and to reasonably configure the parameters of the thread pool, you need to have a certain understanding of the principle of the thread pool, if you can write a thread pool and achieve basic functions, to understand the principle of the thread pool is good.