Keywords:

Thread, Thread Pool, Single Thread, Multi-Thread, Benefits of Thread Pool, Thread Recycle, Creation Method, Core Parameters, underlying mechanism, Rejection Policy, Parameter Setting, Dynamic Monitoring, Thread Isolation

Threads and thread pool related knowledge, is the Java knowledge, learning or interview will meet this we will from the threads and processes, parallel and concurrent, single-threaded and multithreading, etc., have been explained to the thread pool, the benefits of the thread pool, creation method, the core of the important parameters, several important methods, the underlying implementation, refused to strategy, parameter setting, dynamic adjustment, Thread isolation and so on. The main outline is as follows:

The benefits of thread pools

Thread pool, using the idea of pooling to manage threads, pooling technology is to maximize benefits, minimize user risk, the idea of resources unified management. This idea has been used in many places, not only computers, such as finance, business management, equipment management, etc.

Why thread pools? If, in a concurrent scenario, the coder creates the thread pool according to the requirements, the following problems may occur:

  • It can be difficult to determine how many threads are running on your system, and creating and destroying threads can be costly if you use them or use them or destroy them
  • Let’s say there are a lot of requests, maybe crawlers, creating threads like crazy, maybe running out of system resources.

What are the benefits of implementing a thread pool?

  • Lower resource consumption: Pooling technology can reuse threads that have been created, reducing the cost of thread creation and destruction.
  • Improved response time: Use existing threads for processing, reducing thread creation time
  • Management thread control: Threads are scarce resources and cannot be created indefinitely. Thread pool can be uniformly allocated and monitored
  • Expand other features: such as timed thread pool, can be timed to execute tasks

In fact, pooling technology is used in many places, such as:

  • Database connection pooling: Database connections are scarce resources. Create them first to improve response time and reuse existing connections
  • Instance pool: first create a good object into the pool inside, recycling, reduce back and forth to create and destroy the consumption

Thread pool-related classes

The following is the inheritance relationship of the class related to thread pool:

Executor

Executor is a top-level interface. There is only one method, execute(Runnable Command), which defines the basic specification for scheduling a thread pool to execute tasks. It defines the basic specification for scheduling a thread pool.

ExecutorService

ExecutorService inherits Executor, but it’s still an interface with a few more methods:

  • void shutdown(): Close the thread pool and wait for the task to finish.
  • List<Runnable> shutdownNow(): immediately close the thread pool, try to stop all actively executing tasks, stop waiting for tasks to be processed, andReturns a list of tasks that are waiting to be executed (those that have not yet been executed).
  • boolean isShutdown(): Determine if the thread pool is closed, but the thread may still be executing.
  • boolean isTerminated(): After shutdown/ ShutdownNow, if all tasks have been completed, this status is true.
  • boolean awaitTermination(long timeout, TimeUnit unit): Blocking until terminated after shutdown unless timeout or interruption occurs.
  • <T> Future<T> submit(Callable<T> task): Submit a task with a return value and return the Future of the task without a result. Call the future.get() method to return the result when the task is completed.
  • <T> Future<T> submit(Runnable task, T result): Submit a task and pass in the result. The result does nothing but specify the type and a result to return.
  • Future<? > submit(Runnable task): Submit the task and return to Future
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks): Batch execution of tasks, get the list of Future, you can batch submit tasks.
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit): Bulk Submit tasks with a timeout specified
  • <T> T invokeAny(Collection<? extends Callable<T>> tasks): block to get the result value of the first completed task,
  • <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit): Blocks, retrieves the value of the first completed result, and specifies the timeout


Future

submit(Runnable task, T result)

submit(Runnable task, T result)

Future

submit(Runnable task, T result)

After the task is completed, call future.get () to return the result, and use result new to create a fTask. Inside, it uses Runnable’s wrapper class, RunNableLeadapter, without doing anything special with the result. When the call() method is called, it returns the result. Executors — Executors — Executors

public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); Return result; return result; }}

One more method worth mentioning is invokeAny(): The method invokeAny() in ExecutorService is used in ThreadPoolExecutor to get the result of the first completed task. When the first completed task is completed, the interrupt() method is called to interrupt the other tasks.

Note that ExecutorService is an interface and contains definitions, not implementations, and that the previous explanation is based on its name (the specified specification) and its general implementation.

As you can see, ExecutorService defines a number of operations for the thread pool, including closing, deciding whether to close, stopping or not, committing a task, batch committing a task, and so on.

AbstractExecutorService

AbstractExecutorService is an abstract class that implements the ExecutorService interface, which is the basic implementation of most thread pools. The main methods are as follows:

It not only implements Submit, InvokeAll, InvokeAny and other methods, but also provides a newTaskFor method for building RunnableFuture objects. Those objects that can get the result of the task are obtained through newTaskFor. Instead of expanding the entire source code, take the submit() method as an example:

public Future<? > submit(Runnable task) { if (task == null) throw new NullPointerException(); // Wrapped Task RunnableFuture<Void> fTask = newTaskFor(Task, null); // execute(ftask); // return the RunnableFuture object return fTask; }

But AbstractExecutorService does not implement the most important method, the execute() method. Thread pool specific how to perform, the different thread pool can have different implementations, are generally inherit AbstractExecutorService (timing task has other interface), we are the most commonly used is the ThreadPoolExecutor.

ThreadPoolExecutor

Here’s the big deal!! ThreadPoolExecutor is usually the class we use to create a thread pool, if not a timed thread pool, or to use it.

Let’s start with the internal structure (properties) of ThreadPoolExecutor:

Public class ThreadPoolExecutor extends AbstractExecutorService {public class AbstractExecutorService extends AbstractExecutorService {public class AbstractExecutorService extends AbstractExecutorService {public class AbstractExecutorService extends AbstractExecutorService. Private Final AtomicInteger CTL = new AtomicInteger(CTLOF (RUNNING, 0)); // The number of bits used to represent the number of threads (the number of threads and the state of the thread pool) Private static final int COUNT_BITS = Integer.size-3; private static final int COUNT_BITS = Integer.size-3; Private static final int CAPACITY = (1 << COUNT_BITS) -1; private static final int CAPACITY = (1 << COUNT_BITS) -1; private static final int CAPACITY = (1 << COUNT_BITS) -1; Private static final int RUNNING = -1 << COUNT_BITS; private static final int RUNNING = -1 << COUNT_BITS; // 000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 private static final int STOP = 1 << COUNT_BITS; // 010 private static final int TIDYING = 2 << COUNT_BITS; // 011 private static final int TERMINATED = 3 << COUNT_BITS; Private static int runStateOf(int c) {return c & ~CAPACITY; } private static int workerCountOf(int c) {return c & CAPACITY; } / / the running state and the number of threads for CTL private static int ctlOf (int the rs, int wc) {return rs | wc. } private Final BlockingQueue<Runnable> WorkQueue; Private final ReentrantLock mainLock = new ReentrantLock(); private final ReentrantLock = new ReentrantLock(); Private final hashSet <Worker> workers = new hashSet <Worker>(); Condition replaces wait() with await(), notify() with signal(), and notifyAll() with signalAll(). Condition can be implemented in the traditional thread communication mode. Condition is no different from traditional thread communication, Condition private final Condition termination = MainLock. NewCondition (); private final Condition termination = MainLock. // LargestPoolSize private int LargestPoolSize Private long completedTaskCount; private long completedTaskCount; Private volatile threadFactory; private volatile threadFactory; // task rejection handler private volatile rejectedExecutionHandler handler; Private volatile Long KeepAliveTime; Private volatile Boolean allowCoreThreadTimeout; Private volatile int corePoolSize; Private volatile int MaximumPoolSize; Private static final rejectedExecutionHandler DefaultHandler = new AbortPolicy(); Private static final RuntimePermission ShutdownPerm = new RuntimePermission("modifyThread"); // private Final AccessControlContext ACC; Private static final Boolean ONLY_ONE = true; }

Thread pool state

As can be seen from the above code, we use a 32-bit object to store the state of the thread pool and the capacity of the thread pool. The top 3 bits are the state of the thread pool, and the remaining 29 bits are the number of threads:

Private static final int RUNNING = -1 << COUNT_BITS; private static final int RUNNING = -1 << COUNT_BITS; // 000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 private static final int STOP = 1 << COUNT_BITS; // 010 private static final int TIDYING = 2 << COUNT_BITS; // 011 private static final int TERMINATED = 3 << COUNT_BITS;

The states are different from each other, and their states vary as follows:

  • Running: It can accept or process tasks
  • Shutdown: Can’t accept a task, but can be processed
  • Stop: Can not accept or process a task, interrupt the current task
  • Tidying: All threads stop
  • TERMINATED: The final state of the thread pool

The Worker to realize

Thread pool, must have a pool, and is where you put the thread, characterized by the Worker in the ThreadPoolExecutor, this is the inner class:

A thread pool is a collection of workers, using a HashSet:

private final HashSet<Worker> workers = new HashSet<Worker>();

How does Worker implement this?

Worker in addition to the inherited AbstractQueuedSynchronizer, namely AQS, AQS is essentially a queue lock, and a simple mutex, are generally used when interrupt or modify the state of the Worker.

AQS is introduced internally for thread safety. When a thread executes a task, it calls RunWorker (Worker W). This method is not a Worker method, but a ThreadPoolExecutor method. As you can see from the code below, every time the Worker’s state is changed, it is thread-safe. Worker holds a Thread, which can be understood as the encapsulation of the Thread.

How does RunWorker (Worker W) work? Keep that question open, and we’ll go into more detail later.

// Implement Runnable, Encapsulates the thread private final class Worker extends AbstractQueuedSynchronizer implements Runnable {/ / serialization id private static final long serialVersionUID = 6138294804551838833L; // final Thread Thread; // Initialize the task, may be empty, if the task is not empty, other incoming tasks, can be run directly, not added to the task queue Runnable FirstTask; // volatile long completedTasks; // Specify a task to keep the Worker busy. This task may be an empty Worker(Runnable FirstTask) {// Initialize the state setState(-1) of the AQS queue lock; // Disallow interrupts until RunWorker this.firstTask = firstTask; NewThread (this) = getThreadFactory().newThread(this); } // RunWorker public void run() {// Execute RunWorker (this); } protected Boolean isHeldExclusively() {return getState()! = 0; } // Exclusive. Attempts to acquire the lock. Returns true on success. False protected Boolean tryAcquire(int unused) {// CAS optimist lock if (compareAndSetState(0, 1)) {// Successfully, Current Thread exclusive lock SetExclusiveOwnerThread (Thread.currentThread()); return true; } return false; } protect Boolean tryRelease(int unused) {setExclusiveOwnerThread(null); setState(0); return true; } public void lock() {acquire(1);} public void lock() {acquire(1); } public Boolean tryLock() {return tryAcquire(1); } // Unlock public void unlock() {Release (1); } public Boolean isLocked() {return isheldadmin (); } // interrupt void interruptifStarted () {Thread t; if (getState() >= 0 && (t = thread) ! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }

Task queue

In addition to the thread pool, if there are many tasks, but not so many threads, there must be a place to put the tasks, acting as a buffer, namely the task queue, in the code as:

private final BlockingQueue<Runnable> workQueue;

Reject policies and handlers

The memory of the computer is always limited, and we can’t keep adding content to the queue, so the thread pool gives us the choice to choose a variety of queues. At the same time, when the task is too many, fill up the thread, and also fill up the task queue, we need to make a certain response, that is, reject or throw an error, drop the task? What tasks to drop, these are all things that might need to be customized.

How do I create a thread pool

ThreadPoolExecutor provides a constructor for the pool. The main arguments are as follows. If not, the default will be used:

  • Core Threads: The number of core threads, which are usually permanent and do not destroy when there is no task
  • Maximum number of threads: The maximum number of threads allowed to be created by the thread pool
  • Lifetime of a non-core thread: This is how long a non-core thread can live without a task
  • Unit of time: A unit of survival time
  • Queuing for storing tasks: Used to store tasks
  • Thread factory
  • Reject handler: If the add task fails, it will be handled by this handler
// Specify the number of core threads, the maximum number of threads, the time that a non-core thread can survive without a task, the unit of time, Public ThreadPoolExecutor(int CorePoolSize, int MaximumPoolSize, long KeepAliveTime, timeUnit) BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } // Specifies the number of core threads, the maximum number of threads, the time that a non-core thread has to live without a task, the unit of time, the task queue, Public ThreadPoolExecutor(int CorePoolSize, int MaximumPoolSize, Long KeepAliveTime, TimeUnit) BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } // Specifies the number of core threads, the maximum number of threads, the time that a non-core thread has to live without a task, the unit of time, the task queue, Public ThreadPoolExecutor(int CorePoolSize, int MaximumPoolSize, long KeepAliveTime, timeUnit) BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); Public ThreadPoolExecutor(int CorePoolSize, int MaximumPoolSize, long KeepAliveTime, timeUnit); public ThreadPoolExecutor(int CorePoolSize, int MaximumPoolSize, int MaximumPoolSize, int KeepAliveTime, timeUnit); BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... }

In fact, in addition to explicitly specifying the above parameters, the JDK also encapsulates some method for creating a thread pool directly for us, called Executors:

// Thread pool with fixed number of threads, Public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, int); nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } // Thread pool of a single thread, unbounded queue, in order of task submission, Public static ExecutorService newSingleThreadExecutor(ThreadFactory ThreadFactory) {return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } // dynamically adjust, no core thread, all are normal threads, each thread lives for 60s, Public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.max_value,); 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } / / timing task thread pool public static ScheduledExecutorService newSingleThreadScheduledExecutor () {return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }

But it is generally not recommended to use the above others encapsulated thread pool!!

The underlying parameters and core methods of the thread pool

You might be a little confused after reading the above creation parameters, but that’s OK, let’s explain them to you:

As you can see, when a task comes in, the core thread pool is first determined to be full, and if not, the thread will continue to be created. Notice that if a task comes in and the thread is created to execute, and the thread is done, and the thread is idle, does another task come in and use the previous thread, or does it create a new thread to execute?

The answer is to recreate the threads so that the pool can quickly grow to the size of the core number of threads in order to respond quickly to subsequent tasks.

If the number of threads has reached the core number of threads, the thread pool is not idle, then it will judge whether the queue is full, if the queue still has space, then it will put the task in the queue, waiting for the thread to receive execution.

If the task queue is full, it will determine whether the number of threads has reached the maximum number of threads. If not, it will continue to create threads and execute the task. At this time, it will create non-core threads.

If we have reached the maximum number of threads, we cannot continue to create threads and can only execute the rejection policy. The default rejection policy is to discard the task. We can customize the rejection policy.

It is worth noting that if there are many tasks before and some non-core threads are created, then after fewer tasks, no tasks can be received. After a certain time, the non-core threads will be destroyed, leaving only the number of threads in the core thread pool. This time is called KeepAliveTime.

Submit a task

If the number of threads fails to reach the number of core threads, it will be added to the task queue. If the number of threads fails to reach the number of core threads, it will be added to the task queue. If the number of threads fails to reach the task queue, it will be added to the task queue.

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); Int c = ctl.get(); If (workerCountOf(c) < CorePoolSize) {// add if (addWorker(command, true)) return; C = ctl.get(); If (isRunning(c) &&workqueue.offer (command)) {// Check again int recheck = ctl.get(); // Check if the thread pool is still running if (! IsRunning (recheck) &&remove (command)) // If not, reject and remove task reject(command); Else if (WorkerCountOf (recheck) == 0) // Add addWorker(null, false); }else if (! AddWorker (command, false)) // Reject (command); }

AddWorker (Runnable FirstTask, Boolean Core) is an important method called. This method is used to increase the number of working threads. Let’s see how it works:

Private Boolean addWorker(Runnable FirstTask, Boolean Core) {// Retry: for (;;); Int c = ctl.get(); int rs = runStateOf(c); // The thread pool has been stopped if the thread pool is greater than SHUTDOWN. (rs == SHUTDOWN && firstTask == null && ! Workqueue.isEmpty ()) means that at least one of the three conditions is not met // does not equal SHUTDOWN meaning greater than SHUTDOWN // firstTask! // WorkQueue isEmpty if (rs >= Shutdown &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) Int wc = WorkerCountOf (c); / / is in line with the CAPACITY of the if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; / / added successfully, jump out of the loop if (compareAndIncrementWorkerCount (c)) break retry. c = ctl.get(); // Re-read CTL // Cas failed, retry if (runStateOf(c)! = rs) continue retry; // else CAS failed due to workerCount change; Retry inner loop}} Boolean WorkerStarted = false; boolean workerAdded = false; Worker w = null; W = new Worker(firstTask); try {// create a worker, wrap the task w = new Worker(firstTask); final Thread t = w.thread; // Thread creation succeeds if (t! = null) {final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {int rs = runStateOf(ctl.get()); If (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {/ / if the thread has been started, If failure (t.i sAlive ()) / / precheck that t is startable throw new IllegalThreadStateException (); // Add a new thread to the set Workers. Add (w); Int s = Workers. Size (); If (largestPoolSize) largestPoolSize = largestPoolSize; // WorkerAdded = true; }} finally {// Unlock mainLock. Unlock (); } // If you have added if (WorkerAdded) {// Start the thread t.art (); workerStarted = true; }}} finally {// If (! // AddWorkerFailed (w); } return workerStarted; }

Processing tasks

When we introduced Worker, we explained that its run() method calls the external runWorker() method, so let’s look at the runWorkder() method:

First, it processes its firstTask directly, which is not in the task queue, but is held by itself:

Final void RunWorker (Worker W) {// Thread WT = Thread.currentThread(); // The first task Runnable task = w.irstTask; // Reset to NULL; // Allow to interrupt w.nlock (); boolean completedAbruptly = true; While (task!) {// while (task!); = null || (task = getTask()) ! = null) {// lock w.lock(); // If the thread pool stops, make sure the thread is interrupted; // If not, make sure the thread is not interrupted. This // needs to be reprocessed in the second case // Shutdown-now race also clears the interrupt if ((runstatateLeast (ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); BeforeExecute (WT, Task); beforeExecute(WT, Task); Throwable thrown = null; Try {// Task.run (); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {// AfterExecute (Task, Thrown); }} finally {// set null task = null; // Updates complete task w.completedTasks++; w.unlock(); }} // complete completedAbruptly = false; } finally {// ProcessWorkerExit (w, completedAbruptly); }}

Above you can see if the current task is null, will go to get a task, we see getTask (), which involves two parameters, one is whether to allow the core thread, another is the core thread count is greater than the number of threads, if meet the conditions, the task is removed from the queue, if take less than a timeout, then returns an empty, If the task is not fetched, the previous loop will not be executed and the thread will be triggered to destroy processWorkerExit() and other work.

Private Runnable getTask() {// timeout Boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); / / SHUTDOWN state to continue processing tasks in the queue, but do not receive the new task if (rs > = SHUTDOWN && (rs > = STOP | | workQueue. IsEmpty ())) {decrementWorkerCount (); return null; } // Thread count int wc = WorkerCountOf (c); / / whether to allow the core thread timeout or thread count is greater than the core number of threads Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; If ((wc > maximumPoolSize | | (timed && timedOut)) && (wc > 1 | | workQueue. IsEmpty ())) {/ / reduce thread is successful, it returns null, Behind the processWorkerExit () if (compareAndDecrementWorkerCount (c)) return null; continue; } try {// If the core thread is allowed to close, or the core thread is exceeded, the task can be retrieved within the timeout time or the task can be retrieved directly. Runnable r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // If we can fetch the task, then we can definitely execute if (r! = null) return r; TimedOut = true; // TimedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}

Destruction of the thread

As mentioned above, if the current task of the thread is empty and the core thread is allowed to be destroyed, or if the thread exceeds the number of core threads, waits for a certain amount of time, but does not get the task from the task queue, it will jump out of the loop and execute the subsequent thread destruction (termination) procedure. What do you do when you destroy a thread?

Private void processWorkerExit(Worker w, Boolean completedAbruptly) {// If the thread ended abruptly, the number of threads before it ended is not changed, If (completedAbruptly) decrementWorkerCount(); Final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// completedTaskCount += w.completedTasks; // Remove the thread Workers. Remove (w); } finally {// Unlock mainLock. Unlock (); } // Try to stop tryTerminate(); Int c = ctl.get(); // Shutdown the shutdown if (runStateLessThan(c, stop)) {// If (! completedAbruptly) {// The minimum value is either 0 or the number of core threads, 0 int min = allowCoreThreadTimeout? 0 : corePoolSize; If (min == 0 &&!) and the queue is not empty, keep a thread if (min == 0 &&!!). workQueue.isEmpty()) min = 1; If (WorkerCountOf (c) >= min) return; // Replacement not needed} // If not, addWorker(null, false); }}

How do I stop the thread pool

The thread pool can be stopped using shutdown() or shutdownNow(). Shutdown () can continue processing tasks in the queue, while shutdownNow() immediately cleans up tasks and returns unexecuted tasks.

Public void shutdown() {final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// CheckShutdownAccess (); // Shutdown: SHUTDOWN; // InterruptidleWorkers (); // InterruptidleWorkers (); // CALLBACK hook onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } public List<Runnable> shutdownNow() {List<Runnable> tasks; Final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// CheckShutdownAccess (); // update status to stop AdvancerUnState (stop); // interrupt all threads interruptWorkers(); // Tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); Return tasks; // return tasks; }

Execute () and submit() methods

  • execute() Method can submit tasks that do not require a return value, and there is no way to determine whether the task was successfully executed by the thread pool
  • submit()Method is used to submit a task that requires a return value. The thread pool returns an object of type Future, through which we callget()Method can be usedblocking, until we get the result of the completion of the thread execution, or we can use the wait method with a timeoutGet (long timeout, TimeUnit unit)“, so that if the thread has not completed execution, if the time is up, will not block, directly return null. Returns theRunnableFutureObject, inheritedRunnable, Future<V>Two interfaces:
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

Why do thread pools use blocking queues?

The blocking queue, which is first and foremost a queue, is definitely a first-in, first-out property.

And blocking, which is an evolution of the model, the general queue, can be used in the production consumer model, which is data sharing, where someone is putting tasks in and someone is constantly pulling tasks out, which is an ideal state.

But if it is not ideal, the speed of generating tasks and consuming tasks is not the same. If there are more tasks in the queue and consuming tasks is slow, it can also be consumed slowly, or the producer has to pause the generation of tasks (blocking the producer thread). You can use the offer(E o, long timeout, timeUnit) to set the wait time. If the BlockingQueue cannot be added to the queue within the specified time, it will return a failure, or you can use the put(Object) to put the Object into the BlockingQueue. If there is no space, then this method will block until there is space.

If the consumption rate is too fast for the producer to produce, the poll(time) can be used to obtain the task, and if there is data, the poll(time) can be directly fetched, and if there is no data, it can wait for the time time, and return null. You can also use take() to fetch the first task, and if there is no task, it will block until the queue has a task.

The property of a blocking queue is mentioned above, so why use it?

  • If a task is generated, put it in the queue as it comes in. Resources can easily be exhausted.
  • Creating a thread requires acquiring a lock, which is a global lock for a thread pool. If each thread constantly acquires the lock, unlocks, thread context switches and so on, the overhead is relatively large, rather than when the queue is empty, and a thread is blocked waiting.

Common blocking queues

  • ArrayBlockingQueue ArrayBlockingQueue: Array-based implementation with a fixed-length array inside it that holds both the head and tail positions of the queue.
  • LinkedBlockingQueue: List based blocking counterparty, producer and consumer use separate locks, strong parallelism, if not specified capacity, default is invalid capacity, easy to run out of system memory.
  • DelayQueue: DelayQueue. There is no limit to the size of the queue. Production data is not blocked, consumption data is.
  • PriorityBlockingQueue: based on the blocking queue priority, carried out in accordance with the priority consumption, internal control synchronization is fair lock.
  • SynchronousQueue: Without buffering, the producer delivers the task directly to the consumer, with no intermediate cache.

How does a thread pool reuse threads? What about threads that have completed execution

If a thread in the pool calls the run() method, it calls the runWorker() method, which is an infinite loop, unless it fails to get the task. If it fails to get the task and fails to get the task from the task queue, then it timeout. If it fails to get the task from the task queue, then it calls the runWorker() method. The current thread will be terminated if the core thread is destroyed or if the core thread number is exceeded.

Otherwise, it’s always in a cycle and never ends.

We know that the start() method can only be called once, so when we call the run() method, we call the outside runWorker() and let it loop around while the runWorker() gets the task. Get the task, and call the task’s run() method.

When the thread finishes executing, it calls processWorkerExit (), which is analyzed, which takes the lock, reduces the number of threads, removes the worker thread from the collection, and then determines if there are too few threads. If so, it adds them back, which is supposed to be a fix.

How do I configure thread pool parameters?

In general, there is a formula that sets the number of core threads to -1 for computationally (CPU) intensive tasks, and 2* processor cores for IO intensive tasks (many network requests). But this is not a silver bullet, everything should be based on reality, it is best to test in the test environment, practice will be true knowledge, and many times a machine more than one thread pool or there will be other threads, so the parameters can not be set too full.

General 8-core machine, set up 10-12 core threads is about the same, all this must be calculated according to the specific value of the business. Too many threads, too few context switches, too much competition, too few Settings, no way to make the most of your computer’s resources.

Compute (CPU) intensive consumption is mainly CPU resources, can set the number of threads (CPU cores) +1, one more than the number of CPU cores to prevent the thread accidental page fault interrupt, or other causes the impact of the task pause. Once the task is paused, the CPU is idle, and the extra thread in this case can take advantage of the idle time of the CPU.

IO-intensive systems spend most of their time dealing with I/O interactions, and threads are free of CPU during the time they are processing I/O, so they can give up the CPU to other threads. Therefore, in the application of I/O intensive tasks, we can configure more threads, the specific calculation method is 2N.

Why not recommend the default thread pool creation?

In the Ari programming specification, it is not recommended to use the default method to create a thread. This is because many times a thread is created with default parameters, which may not be known by the creator. It is best to create a new ThreadPoolExecutor() to control the parameters. The default way to create the problem is as follows:

  • Limitless Executors. NewFixedThreadPool () : queue, memory may be
  • Executors. NewSingleThreadExecutor () : a single thread, low efficiency, the serial port.
  • Executors. NewCachedThreadPool () : there is no core thread, the largest number of threads can be infinite, memory might have maxed out.

To create a thread pool with specific parameters, the developer must understand the role of each parameter, not to set parameters arbitrarily, and to reduce problems such as memory overflow.

It is generally reflected in the following questions:

  • How to set up the task queue?
  • How many core threads are there?
  • What is the maximum number of threads?
  • How to reject a task?
  • The thread was created without a name, making traceability problems difficult to find.

Rejection policies for thread pools

Thread pool generally has the following four rejection policies, in fact we can see from its internal class:

  • AbortPolicy: Without executing a new task, it simply throws an exception to indicate that the thread pool is full
  • DiscardPolicy: does not execute a new task, but does not throw an exception, silently
  • Discarded the oldest task in the message queue to become the new task
  • CallerRunsPolicy: Calls the current Execute directly to execute the task

In general, the above rejection policies are not ideal, if general task is full, the first thing you need to do is look at task is necessary, if not necessary, non-core, can consider to rejected, and an error warning, if it is a must, must keep it up, whether the use of mq message, or other means, not tasks. Logging is essential in these processes. You have to protect the thread pool and be responsible for the business.

Thread pool monitoring and dynamic tuning

Thread pools provide APIs to dynamically retrieve the state of a thread pool, and to set the parameters of the thread pool, as well as the state:

To view the thread pool status:

Modify thread pool state:

Meituan’s thread pool article makes this very clear and even provides a platform for real-time tuning of thread pool parameters, tracking and monitoring thread pool activity, Transaction (frequency, time) of tasks, Reject exceptions, thread pool internal statistics, and so on. Here I wouldn’t have launched, the original: https://tech.meituan.com/2020… This is the train of thought we can refer to.

Thread pool isolation

Thread isolation, as many students may know, means that different tasks are run in different threads, while thread pool isolation is generally based on business types. For example, the processing thread of orders is placed in a thread pool, and the processing related to membership is placed in a thread pool.

Core and non-core processes can also be separated. Core processing processes are put together, while non-core processes are put together. The two use different parameters and different rejection policies, so as to ensure that there is no influence between multiple thread pools, and the operation of the core thread is kept as much as possible, and the non-core thread can tolerate failure.

Hystrix uses this technology, Hystrix’s Thread Isolation technology, to prevent an avalanche of different network requests, even if the thread pool of a dependent service is full, without affecting the rest of the application.

About the author

Qin Huai, the author of the public number [Qin Huai grocery store], the road of technology is not at that time, the mountain is high and the water is long, even if it is slow, galloping and not rest. Personal Writing Direction: Java source analysis, JDBC, MyBatis, Spring, Redis, distributed, sword Offer, Leetcode, etc., carefully write each article, don’t like the title party, don’t like the garb, most of the series of articles, can not guarantee that I write are completely correct, But I guarantee that everything I write has been practiced or researched. If there is any omission or error, please correct it.

What do I write about in 2020?

Open Source Programming Notes