Abstract: For ThreadPoolExecutor, the core class of the thread pool, what are the important attributes and inner classes that provide important guarantees for the correct operation of the thread pool?

The ThreadPoolExecutor class is designed to ensure that the thread pool runs correctly.

For ThreadPoolExecutor, the core class of the thread pool, what are the important attributes and inner classes that provide an important guarantee that the thread pool will work correctly? Today, let’s take a closer look at these questions!

An important attribute in the ThreadPoolExecutor class

There are several very important properties and methods in the ThreadPoolExecutor class, and we’ll look at them next.

CTL related attributes

The AtomicInteger constant CTL is an important property throughout the lifetime of the thread pool. It is an atomic class object that stores the number of threads and the state of the thread pool. The code associated with this property is shown below.

Private final AtomicInteger CTL = new AtomicInteger(ctlOf(RUNNING, 0)); Private static final int COUNT_BITS = integer.size -3; private static final int COUNT_BITS = integer.size -3; Private static final int CAPACITY = (1 << COUNT_BITS) -1; private static final int CAPACITY = (1 << COUNT_BITS) -1; Private static final int RUNNING = -1 << COUNT_BITS; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; Private static int runStateOf(int c) {return c & ~CAPACITY; } private static int workerCountOf(int c) {return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } private static boolean isRunning(int c) { return c < SHUTDOWN; } private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get())); }Copy the code

The states of the thread pool are described below.

  • RUNNING: A RUNNING state that can receive newly submitted tasks and process tasks in a blocking queue

  • SHUTDOWN: A closed state that can no longer accept newly submitted tasks, but can process tasks already saved in the blocking queue. When the thread pool is in the RUNNING state, the SHUTDOWN () method is called to put the thread pool into this state

  • STOP: Cannot receive new tasks or process tasks already saved in the blocking queue. The thread that is processing the task is interrupted. If the thread pool is in the RUNNING or SHUTDOWN state, the shutdownNow() method is called to put the thread pool into that state

  • TIDYING: The thread pool enters this state if all tasks have terminated and the number of valid threads is 0 (the blocking queue is empty and the number of worker threads in the pool is 0).

  • TERMINATED: The thread pool in TIDYING state calls TERMINATED () and uses the thread pool to get into the state

The transitions between the states of the thread pool can also be summarized as shown in the following figure, following the ThreadPoolExecutor class.

  • RUNNING -> SHUTDOWN: Call SHUTDOWN () explicitly, or call Finalize () implicitly

  • (RUNNING or SHUTDOWN) -> STOP: Explicitly call the shutdownNow() method

  • SHUTDOWN -> TIDYING: When both the thread pool and task queue are empty

  • STOP -> TIDYING: When the thread pool is empty

  • TIDYING -> TERMINATED: when the TERMINATED () hook method is executed

Other important attributes

In addition to CTL related attributes, some other important attributes in the ThreadPoolExecutor class are shown below.

Private final BlockingQueue<Runnable> workQueue; Private final ReentrantLock mainLock = new ReentrantLock(); private final ReentrantLock mainLock = new ReentrantLock(); Private final HashSet<Worker> workers = new HashSet<Worker>(); Private final Condition termination = mainlock. newCondition(); private final Condition termination = mainlock. newCondition(); Private volatile ThreadFactory ThreadFactory; Private volatile RejectedExecutionHandler Handler; Private static Final RejectedExecutionHandler defaultHandler = new AbortPolicy();Copy the code

An important inner class in the ThreadPoolExecutor class

Within the ThreadPoolExecutor class there are inner classes that are critical to thread pool execution, the Worker inner class and the rejection policy inner class. Next, let’s look at each of these inner classes.

The Worker inner class

From the source code, Worker class implements the Runnable interface, indicating that it is essentially a thread used to execute tasks. Next, let’s take a look at the source code of Worker class, as shown below.

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ private static final long serialVersionUID = 6138294804551838833L; // The Thread that actually executes the task is final Thread Thread; // The first Runnable task is stored in this variable if the first task to be executed is specified when the thread is created. This variable can also be null. Get Runnable firstTask from BlockingQueue via getTask; Volatile long completedTasks; volatile long completedTasks; volatile long completedTasks; FirstTask can be passed as null Worker(Runnable firstTask) {// Prevent setState(-1) from being interrupted before calling runWorker; this.firstTask = firstTask; // use ThreadFactory to create a newThread to execute the task this.thread = getThreadFactory().newthread (this); } public void run() {runWorker(this);} public void run() {runWorker(this); } state=0: the lock is not acquired //state=1: the lock is acquired. = 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) ! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }Copy the code

In the constructor of Worker class, it can be seen that the synchronization state is first set to -1 to prevent the runWorker method from being interrupted before it runs. This is because if another thread calls the shutdownNow() method of the thread pool, the thread will be interrupted if the state value in the Worker class is greater than 0, and the thread will not be interrupted if the state value is -1.

The Worker class implements the Runnable interface and needs to override the run method, which essentially calls the runWorker method of ThreadPoolExecutor. In the runWorker method, unlock is first called. This method sets state to 0, so calling shutDownNow interrupts the current thread while entering the runWork method and not interrupting the thread before executing the runWorker method.

Note: It is important to understand the implementation of the Worker class.

Reject the policy inner class

In the thread pool, if the workQueue blocking queue is full and there are no free thread pools, the task continues to be submitted and a strategy needs to be adopted to handle the task. The thread pool provides a total of four policies, as shown below.

  • Throw an exception directly, which is also the default strategy. The implementation class is AbortPolicy.

  • Execute the task with the caller’s thread. The implementation class is CallerRunsPolicy.

  • Discards the most advanced task in the queue and executes the current task. Implementation class DiscardOldestPolicy.

  • The current task is discarded. The implementation class is DiscardPolicy.

Four inner classes are provided in the ThreadPoolExecutor class to implement the corresponding policy by default, as shown below.

public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (! e.isShutdown()) { r.run(); } } } public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());  } } public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}Copy the code

We can also customize the rejectedExecution policy by implementing the RejectedExecutionHandler interface and rewriting the rejectedExecution method of the RejectedExecutionHandler interface. Call the constructor of ThreadPoolExecutor, passing in our own rejection policy.

For example, a custom rejection policy is shown below.

public class CustomPolicy implements RejectedExecutionHandler { public CustomPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (! E.isshutdown ()) {system.out.println (" use the caller's thread to execute the task ") r.run(); }}}Copy the code

Create a thread pool using a custom reject policy.

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                       60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>(),
                       Executors.defaultThreadFactory(),
		       new CustomPolicy());
Copy the code

Click to follow, the first time to learn about Huawei cloud fresh technology ~