Inheritance relationships

The Executor interface

public interface Executor {
    void execute(Runnable command);
}
Copy the code

The ExecutorService interface

public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<? > submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }Copy the code

The ExecutorService interface inherits the Executor interface and adds methods like Submit, shutdown, invokeAll, and so on.

AbstractExecutorService abstract class

public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } public Future<? > submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {... } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {... } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {... } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {... } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {... }}Copy the code

The AbstractExecutorService abstract class implements the ExecutorService interface and provides default implementations of methods such as submit, invokeAny, and invokeAll.

There are no default implementations for methods like execute, thread pool shutdown methods (shutdown, shutdownNow, and so on).

Constructor and thread pool state

Public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, int keepAliveTime, BlockingQueue<Runnable> workQueue, // Block task queue ThreadFactory ThreadFactory, / / / / create a thread factory RejectedExecutionHandler handler) refused to task interface processor {the if (corePoolSize < 0 | | maximumPoolSize < = 0 | | maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }Copy the code

Thread pool state

Private final AtomicInteger CTL = new AtomicInteger(ctlOf(RUNNING, 0)); // Record the state of the thread pool and the number of threads. // Number of threads 29 Integer.SIZE=32 Private static final int COUNT_BITS = integer. SIZE - 3; / / CAPACITY of 000, 11111111111111111111111111111 private static final ints CAPACITY = (1 < < COUNT_BITS) - 1; / / in the operation of the 111, 00000000000000000000000000000 private static final ints RUNNING = 1 < < COUNT_BITS; / / off 000, 00000000000000000000000000000 private static final ints SHUTDOWN = 0 < < COUNT_BITS; / / STOP 001 00000000000000000000000000000 private static final ints STOP = 1 < < COUNT_BITS; / / finishing, 010, 00000000000000000000000000000 private static final ints TIDYING = 2 < < COUNT_BITS; / / termination, 011, 00000000000000000000000000000 private static final ints TERMINATED = 3 < < COUNT_BITS; Private static int runStateOf(int c) {return c & ~CAPACITY; } private static int workerCountOf(int c) {return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }Copy the code

Int is 4 bytes,32 bits

RUNNING: accepts new tasks and processes tasks in the blocking queue SHUTDOWN: rejects new tasks but processes tasks in the blocking queue STOP: rejects new tasks and abandons tasks in the blocking queue and interrupts ongoing tasks TIDYING: Terminated: All tasks are terminated (including tasks in the blocking queue), and the current active thread in the thread terminated is 0. State transition from thread pool state after terminated method call: RUNNING -> SHUTDOWN: explicitly call SHUTDOWN () or implicitly call Finalize () -> STOP: STOP -> TIDYING: STOP -> TIDYING: STOP -> TIDYING: STOP -> TIDYING -> TERMINATED: When the terminated() hook method is executedCopy the code

The difference between submit and execute methods

The submit method

  • Call the Submit method and pass in a Runnable or Callable object
  • Determines whether the object passed in is NULL. If it is null, an exception is thrown. If not, the process continues
  • Convert the incoming object into a RunnableFuture object
  • Execute the execute method, passing in the RunnableFuture object
  • Returns the RunnableFuture object
public Future<? > submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }Copy the code

The execute method

Public void execute(Runnable command) {if (command == null) throw new NullPointerException(); Int c = ctl.get(); /** * 3 steps */ /1. Check whether the number of threads in the current thread pool is smaller than corePoolSize. If the number is smaller than corePoolSize, call addWorker to create a new thread to run, and execute the Runnable as the first task. If (workerCountOf(c) < corePoolSize) {// Add a core thread (core thread). If (addWorker(command, true)) return; C = ctl.get(); } private static Boolean isRunning(int c) {return c < SHUTDOWN; } 2. The SHUTDOWN value is 0, that is, if c is less than 0, it is running; */ if (isRunning(c) &&workqueue.offer (command)) {// int recheck = ctl.get(); // If the current thread pool state is not RUNNING, the task is removed from the queue and the rejection policy is executed if (! IsRunning (recheck) && remove(command)) // Execute reject(command); Else if (workerCountOf(recheck) == 0) // Add an empty thread to the pool, use non-core capacity thread // There is only one case where this step is done, the number of core threads is 0, // Create a thread with no core capacity. The thread pool will execute addWorker(null, false) on the queue command. } // If the thread pool is stopped or the queue is full, add the maximumPoolSize worker thread, if this fails, execute the reject policy else if (! addWorker(command, false)) reject(command); }Copy the code

ThreadPoolExecutor.addWorker()

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); Int rs = runStateOf(c); // Check if queue empty only if necessary. // Check if queue empty only if necessary. If (rs >= SHUTDOWN &&!) if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // If the number of active threads is greater than the maximum value, return false if the number of active threads is greater than the maximum value, return false if the number of active threads is greater than the maximum value. , judge whether the number of threads of the current activity is greater than the corePoolSize / / 2. The core is false, is whether the current active thread number is greater than the maximumPoolSize if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; / / compare whether the current value and c is the same, if the same, is changed to c + 1, and systemic circulation, direct implementation of the Worker thread to create the if (compareAndIncrementWorkerCount (c)) break retry. c = ctl.get(); // re-read CTL // Check whether the current thread pool state has changed // If so, retry the outer loop, otherwise, retry the inner loop. If (runStateOf(c)! = rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; W = new Worker(firstTask); w = new Worker(firstTask); Final Thread t = w.htread; final Thread t = w.htread; final Thread t = w.htread; if (t ! = null) {final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new  IllegalThreadStateException(); workers.add(w); // Add the created thread to the workers container int s = worker.size (); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code

The Worker method

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable{
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
        
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
}
Copy the code

The Worker implements the Runnable interface for an inner class in ThreadPoolExecutor. There is only one constructor, and in addWorker() above final Thread t = w.htread; You know you’re actually getting the thread’s object, because in the constructor, the thread’s reference is itself.

So the call to t.start() executes (the method in the Worker class) :

/** Delegates main run loop to outer runWorker */ public void run() {// The runWorker in ThreadPoolExecutor is executed runWorker(this); }Copy the code

ThreadPoolExecutor.runWorker()

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; // Get the task in Worker w.firstTask = null; // Empty the task in Woeker. // allow interrupts boolean completedAbruptly = true; Get the task from getTask */ while (task! = null || (task = getTask()) ! = null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); Try {// Method called before task execution beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}Copy the code

From the above can be simply understood, is the execution of the task, but the execution of the task needs to be processed, including the acquisition of the task, task before the start of processing, task execution, task execution after processing. The key code, however, is a method called getTask(). BeforeExecute (Thread t, Runnable R) and afterExecute(Runnable R, Throwable T) do not have logic to process business in classes. They can be overridden by inheritingthread pools. This allows you to monitor the execution of tasks.

processWorkerExit

  • As you can see from the body of the While loop, when an exception occurs While the thread is running, it exits the loop and enters processWorkerExit()
  • If the result from getTask() is null, it will also enter processWorkerExit()

getTask()

private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? // loop for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? / / if set allowCoreThreadTimeOut (true) / / or the currently running task number is greater than the set up of the core number of threads / / timed = true Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } / * * -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- more than before operation with similar -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - * / / * * -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the key lies in the following code -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - * / / * * -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- from blocking the queue task -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - * / try {Runnable r = timed? // For blocking queues, Poll (long timeout, TimeUnit unit) returns null workQueue if not returned. Poll (keepAliveTime, timeUnit.nanoseconds) : Workqueue.take (); workqueue.take (); if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}Copy the code

The thread pool is guaranteed to wait for tasks without being destroyed, essentially blocking

ThreadPoolExecutor.processWorkerExit()

/** * @param completedAbruptly */ private void processWorkerExit(Worker w, DecrementWorkerCount (); Boolean completedAbruptly) {if (completedAbruptly) // The worker thread count is not decrementWorkerCount() if suddenly interrupted; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); If (runStateLessThan(c, STOP)) {if (! completedAbruptly) {task == null int min = allowCoreThreadTimeOut? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; AddWorker (null, false); // replacement not needed} }}Copy the code

Thread pool shutdown

A thread pool can be shutdown by calling its shutdown or shutdownNow methods. They work by iterating through worker threads in a thread pool and then interrupting them by calling the thread_by-thread_interrupt method, so tasks that cannot respond to interrupts may never be terminated. ShutdownNow first sets the state of the thread pool to STOP, then tries to STOP all threads executing or suspending tasks and returns the list of tasks waiting to be executed, while shutdown only sets the state of the thread pool to shutdown. It then interrupts all threads that are not executing tasks.

The isShutdown method returns true whenever either of the two shutdown methods is called. The thread pool is closed successfully when all tasks are closed, and calling isTerminaed returns true. Which method should be called to shutdown the thread pool depends on the nature of the task submitted to the pool. The shutdown method is usually called to shutdown the thread pool, or the shutdownNow method can be called if the task is not necessarily finished.

shutdown

When the shutdown method is called, the thread pool receives no new tasks and completes the tasks previously placed on the queue.

public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// Check permission checkShutdownAccess(); //CAS updates the thread pool state advanceRunState(SHUTDOWN); // interruptIdleWorkers(); Do nothing onShutdown(); } finally { mainLock.unlock(); } // tryTerminate(); }Copy the code

shutdownNow

Stop all executing tasks immediately and return the tasks in the queue

public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }Copy the code

conclusion

  • The thread pool preferentially uses the amount of corePoolSize to perform work tasks
  • If corePoolSize is exceeded, the queue is enqueued
  • If the number of threads exceeds the queue, use the maximumPoolSize -corePoolsize thread to process them.
  • At the end of each thread’s execution, it will determine the current worker thread and the number of tasks. If the number of tasks is high, it will create an empty thread to take the task from the queue.
  • The thread pool will not be automatically destroyed after execution. You need to manually shutdown the thread pool, change the thread pool status, and interrupt all threads.

The basis for assigning thread pool sizes

Consider it from the following perspectives

  • The nature of the task: CPU intensive task, IO intensive task, and hybrid task.
  • Task priority: high, medium and low.
  • Task execution time: long, medium and short.
  • Task dependencies: Whether they depend on other system resources, such as database connections.

Tasks of different natures can be handled separately by thread pools of different sizes. CPU intensive tasks should be configured with the smallest possible threads, such as the number of cpus +1 threads thread pool. Since IO intensive task threads are not always executing tasks, configure as many threads as possible, such as 2 x cpus. If a mixed task can be split into one CPU intensive task and one IO intensive task, the throughput of the split execution will be higher than that of the serial execution as long as the time difference between the two tasks is not too large. If the execution time of the two tasks is too different, there is no need to break them down. You can use runtime.getruntime ().availableProcessors() to get the number of cpus on the current device. Tasks with different priorities can be processed using the PriorityBlockingQueue. It allows high-priority tasks to be executed first.

Tasks with different execution times can be assigned to thread pools of different sizes, or priority queues can be used to allow shorter tasks to be executed first.

Tasks that depend on the database connection pool, because the longer threads wait for the database to return results after submitting SQL, the longer the CPU is idle, the larger the number of threads should be set to better utilize the CPU.

Use bounded queues

Bounded queues increase the stability and warning capabilities of the system and can be set as large as necessary, such as several thousand. Once we in the system background tasks queue of the thread pool and thread pool is full, and abandon the task abnormalities, by trying to find the problems is the database, lead to execute SQL becomes very slowly, because background tasks all the tasks in the thread pool is need to the database query and insert data, so the cause of the work in the thread pool threads blocked, Tasks are backlogged in the thread pool. If we had set the queue to unbounded, the thread pool would have become so large that it would have overwhelmed memory and rendered the entire system unusable, not just the background tasks. Of course, all tasks on our system are deployed on separate servers, and we use thread pools of different sizes for different types of tasks, but problems like this can affect other tasks as well.

Thread pool monitoring

If a large number of thread pools are used in the system, it is necessary to monitor the thread pool so that problems can be quickly located based on the usage of the thread pool. This can be monitored using the parameters provided by the thread pool, and the following properties can be used when monitoring the thread pool.

  • TaskCount: Indicates the number of tasks that need to be executed by the thread pool.

  • CompletedTaskCount: The number of tasks completed by the thread pool during the run, less than or equal to taskCount.

  • LargestPoolSize: maximum number of threads that have ever been created in the thread pool. This data tells you if the thread pool has ever been full. If the value is equal to the maximum size of the thread pool, it indicates that the thread pool has been full.

  • GetPoolSize: specifies the number of threads in the thread pool. Threads in the thread pool are not automatically destroyed if the thread pool is not destroyed, so the size only increases.

  • GetActiveCount: Gets the number of active threads.

  • Monitor by extending the thread pool. You can customize the thread pool by inheriting the thread pool, overriding the beforeExecute, afterExecute, and terminated methods of the thread pool, or you can perform some code to monitor the task before, after, and before the pool is closed. For example, monitor the average, maximum, and minimum execution time of a task. These methods are empty methods in the thread pool.

Follow wechat official number: [entry] to unlock more knowledge points