preface

In multithreaded programming, it is not practical to allocate one thread for each task, and the overhead and resource consumption of thread creation are high. Thread pool emerges as our tool to manage threads. Through the Executor interface, Java provides a standard way to decouple the submission process from the execution process of a task and represent the task with a Runnable.

Let’s take a look at ThreadPoolExecutor, the implementation of the Java thread pool framework.

The following analysis is based on JDK1.7

The life cycle

ThreadPoolExecutor uses the higher three bits of CAPACITY to represent the running status:

RUNNING: Receives new tasks and processes tasks in the task queue

SHUTDOWN: does not receive new tasks, but processes tasks in the task queue

STOP: does not receive new tasks, does not exit the task queue, and interrupts all ongoing tasks

TIDYING: All tasks are terminated, the number of worker threads is 0, and terminated()

TERMINATED: TERMINATED () is TERMINATED

  


State transition diagram

ThreadPoolExecutor uses atomic classes to represent state bits

privatefinalAtomicIntegerctl=newAtomicInteger(ctlOf(RUNNING,0));

Thread pool model

Core parameter

CorePoolSize: Minimum number of viable worker threads (0 if allowCoreThreadTimeOut is set)

MaximumPoolSize: specifies the maximum number of threads, which is limited by CAPACITY

KeepAliveTime: indicates the keepAliveTime of a thread. The unit is specified by TimeUnit

WorkQueue: indicates a workQueue that stores tasks to be executed

RejectExecutionHandler: Reject the policy. It will be triggered when the thread pool is full

Maximum CAPACITY of the thread pool: The first three bits of CAPACITY are used as flag bits, meaning that the maximum CAPACITY of worker threads is (2^29)-1

Four kinds of model

CachedThreadPool: A cacheable thread pool in which idle threads are reclaimed if the current size of the thread pool exceeds processing requirements, and new threads can be added as demand increases. There is no limit to the size of the thread pool.

FixedThreadPool: A fixed size thread pool that creates a thread when a task is submitted until the maximum number of threads in the pool is reached, at which point the pool size does not change.

SingleThreadPool: a single-threaded thread pool that has only one worker thread to execute tasks, ensuring that tasks are executed sequentially in the order they are queued, and creating a new thread to execute tasks if this thread terminates abnormally.

ScheduledThreadPool: A thread pool of fixed size that executes tasks in a deferred or timed manner, similar to a Timer.

Execute task execute

Core logic:

AddWorker (command, true)

Number of current threads =corePoolSize, and the task is successfully added to the work queue

Check whether the thread pool is currently RUNNING

If no, the task is rejected

If so, determine whether the current number of threads is 0. If so, add a worker thread.

Enable addWorker(command, false) for common threads. If the command fails to be enabled, this task will be rejected

From the above analysis, four stages of thread pool operation can be summarized:

PoolSize corePoolSize and the queue is empty, a new thread is created to process the submitted task

PoolSize == corePoolSize, at this point, the submitted task enters the work queue, the worker thread obtains the task from the queue for execution, and the queue is not empty and not full.

PoolSize == corePoolSize, and the queue is full, a new thread will also be created to process the submitted task, but poolSize maxPoolSize

PoolSize == maxPoolSize, and the queue is full, the reject policy is triggered

Rejection policies

RejectedExecutionHandler RejectedExecutionHandler is the interface for processing rejected tasks. Here are four rejection strategies.

AbortPolicy: The default policy to terminate the task and throw RejectedException

CallerRunsPolicy: Executes the current task in the caller thread without throwing an exception

DiscardPolicy: Discards the policy and directly discards the task without throwing exceptions

DiscardOldersPolicy: Discards the oldest task and executes the current task without throwing exceptions

Worker in thread pool

Worker inherited AbstractQueuedSynchronizer and Runnable, the former provide the Worker with lock function, which is the main method of execution work thread runWorker Worker (w) (get task from the task queue). Worker reference is stored in workers set and guarded by mainLock.

privatefinalReentrantLockmainLock=newReentrantLock(); privatefinalHashSetWorkerworkers=newHashSetWorker();

The core function runWorker

To simplify the logic, note that each worker thread’s RUN executes the following functions

finalvoidrunWorker(Workerw){

Threadwt=Thread.currentThread();

Runnabletask=w.firstTask;

w.firstTask=null; while(task! =null||(task=getTask())! =null){

w.lock();

beforeExecute(wt,task);

task.run();

afterExecute(task,thrown);

w.unlock();

}

processWorkerExit(w,completedAbruptly);

}

Get the task from getTask()

Lock the worker

Execute beforeExecute(wt, Task), which is an extension provided by ThreadPoolExecutor to subclasses

Run the task. If the worker is configured with the first task, the first task will be executed first and only once.

Perform afterExecute (task, thrown);

Unlock the worker

If the obtained task is NULL, close the worker

Get the task getTask

The task queue inside the thread pool is a blocking queue, and the implementation is passed in at construction time.

privatefinalBlockingQueueRunnableworkQueue;

GetTask () fetches tasks from the task queue and supports blocking and timeout waiting tasks, four of which result in null being returned and the worker shutting down.

The number of existing threads exceeds the maximum number of threads. Procedure

The thread pool is stopped

The thread pool is SHUTDOWN and the task force is empty

The number of threads waiting for a task timed out and exceeded the number of reserved threads

Core logic: according to the timed timed wait on the blocking queue or the waiting task is blocked, the waiting task timeout will cause the worker thread to be shut down.

timed=allowCoreThreadTimeOut||wccorePoolSize; Runnabler=timed?

workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):

workQueue.take();

Waiting for a task can time out in two cases:

Allow core threads to wait for timeout, allowCoreThreadTimeOut(true)

The current thread is a normal thread, then wc corePoolSize

The work queue uses BlockingQueue, which I won’t expand here, and I’ll write a detailed analysis later.

conclusion

ThreadPoolExecutor is based on a producer-consumer model, with the actions of submitting tasks acting as producers and the threads of executing tasks acting as consumers.

Following the thread pool model provided by Executors, you can also inherit from ThreadPoolExecutor. Override the beforeExecute and afterExecute methods to customize thread pool task execution.

Whether to use a bounded queue or an unbounded queue depends on the situation, as do the size of the work queue and the number of threads.

The rejection policy is recommended to use CallerRunsPolicy, which does not discard the task or throw an exception, but instead rolls the task back to the caller’s thread for execution.