Original text: chenmingyu. Top/concurrent -…

The thread pool

Thread pools are used to process asynchronous or concurrent tasks

Advantages:

  1. Reuse created threads to reduce resource consumption caused by thread creation and destruction
  2. Use threads directly from the thread pool to improve response times
  3. Improved manageability of threads, managed by thread pools

ThreadPoolExecutor

Thread pools in Java are implemented using ThreadPoolExecutor

The constructor

ThreadPoolExecutor provides four constructors, and the other three constructors end up calling the following constructor

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Copy the code

The arguments:

  1. CorePoolSize: The number of core threads in the thread pool

    The number of core threads maintained by the thread pool. When the thread pool is initialized, the number of core threads is zero, and threads are created to execute tasks when they arrive. When the number of worker threads in the thread pool equals the number of core threads, newly arrived tasks are placed in the cache queue

  2. MaximumPoolSize: The maximum number of threads allowed to be created in the thread pool

    When the blocking queue is full and the number of threads created in the thread pool is less than maximumPoolSize, a new thread is created to execute the task

  3. KeepAliveTime: hold time for thread activity

    KeepAliveTime is valid only when the number of thread pools is greater than the number of core threads. If the number of current threads is greater than the number of core threads, and the idle time of the threads reaches keepAliveTime, the current thread terminates until the number of thread pools equals the number of core threads

  4. Unit: Unit of the hold time for thread activity

    Units of keepAliveTime, including: Timeunit. DAYS DAYS, timeunit. HOURS, timeunit. MINUTES, timeunit. SECONDS, timeunit. MILLISECONDS, timeunit. MICROSECONDS, TimeUnit. NANOSECONDS NANOSECONDS

  5. WorkQueue: A task queue. It is used to save the blocking queue of tasks waiting to be executed

    ArrayBlockingQueue: is a bounded queue based on an array structure

    LinkedBlockingQueue: is a blocking queue based on a linked list structure

    SynchronousQueue: a blocking queue that does not store elements. Each insert must wait until the next thread calls the remove operation, otherwise the insert will block

    PriorityBlockingQueue: A wireless blocking queue with a priority

  6. ThreadFactory: A factory used to create threads

  7. Handler: Saturation policy. When the thread pool and queue are full, a policy must be adopted to process new tasks. The default policy is AbortPolicy

    AbortPolicy: Directly throws an exception

    CallerRunsPolicy: Runs the current task with the caller’s thread

    DiscardOldestPolicy: Discards the latest task in the queue and executes the current task

    DiscardPolicy: Do not process, discard

    It is also possible to customize the implementation strategy by implementing RejectedExecutionHandler

The thread pool operates differently depending on the input parameter. Understanding the meaning of each input parameter is due to a better understanding of the implementation principle of the thread pool

Submit a task

The thread pool processes the submission task as follows

Processing process:

  1. If the number of core threads is not enough, create a thread to execute the task, otherwise add to the blocking queue
  2. If the blocking queue is not full, the task is stored in the queue
  3. If the blocking queue is full, see if the maximum number of thread pools has been reached. If not, create a thread to execute the task
  4. If the maximum number of thread pools has been reached, it is processed according to the saturation policy

ThreadPoolExecutor uses Execute (Runnable Command) and submit(Runnable Task) to submit tasks to a thread pool, Execute (Runnable command) is called in the submit(Runnable task) method, so we need to understand execute(Runnable command).

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // Get the state of the thread pool, and you can use CTL to get the current number of thread pools and the state of the thread pool
    int c = ctl.get();
    // If the number of worker threads is smaller than the number of core threads, a new thread is created to execute the task
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // If the above conditions are not met, the current thread is running and writing to the blocking queue succeeded
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // Double check to get the thread status again. If the current thread state becomes non-running, remove the task from the queue and execute the reject policy
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // Check whether the number of worker threads is 0
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false);
    }
    // Create a thread to execute the task, and execute a reject policy if the task fails to be added
    else if(! addWorker(command,false))
        reject(command);
}
Copy the code

Execute (Runnable command) is a new thread to execute the task. AddWorker (command, true)

The workqueue.offer (command) method is used to add tasks to a blocking queue

Will reject (command) method according to create a thread pool incoming saturated strategy to deal with the task, such as the default AbortPolicy, know after check the source is directly behind a RejectedExecutionException abnormalities, other saturated strategy source code is very simple

How is the thread pool state represented against the number of worker threads

Use an AtomicInteger variable representation in ThreadPoolExecutor

/ * * * CTL said two information, one is the state of the thread pool (high three representation), one is the number of the current thread pool low (29), the front with us * said the read-write lock state variable is the same, with a variable record two information, are using 32 bytes of int, high 16 statement read, The lower ten * six bits indicate write lock */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Lower than 29 bits to save the number of thread pools
private static final int COUNT_BITS = Integer.SIZE - 3;
// Maximum capacity of the thread pool
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// The health state is stored in the high three bits
// Running status
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code

AddWorker (command, Boolean) Creates a worker thread to execute the task

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // Thread pool status
        int rs = runStateOf(c);
        // Determine the thread pool state and whether the blocking queue is empty
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null&&! workQueue.isEmpty()))return false;

        for (;;) {
            // Get the number of thread workers
            int wc = workerCountOf(c);
            // Determine whether it is greater than the maximum capacity, and whether it is greater than the number of core threads or the maximum number of threads based on the incoming core
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // Increase the number of worker threads
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            // If the thread pool state changes, retry
            if(runStateOf(c) ! = rs)continue retry;
            // else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // Create Worker, create a new thread internally
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if(t ! =null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
				// Thread pool status
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // Add the created thread to the thread pool
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true; }}finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // to execute a task, the firstTask of the Worker object is executed
                t.start();
                workerStarted = true; }}}finally {
        // If the task fails to execute
        if (! workerStarted)
            / / remove the worker
            addWorkerFailed(w);
    }
    return workerStarted;
}
Copy the code
Closing the thread pool

Shutting down a thread pool in ThreadPoolExecutor uses the shutdown() and shutdownNow() methods, both of which interrupt threads by iterating through the pool

for (Worker w : workers) {
	Thread t = w.thread;
	if(! t.isInterrupted() && w.tryLock()) {try {
			t.interrupt();
		} catch (SecurityException ignore) {
		} finally{ w.unlock(); }}if (onlyOne)
		break;
	}
Copy the code
Executor framework

The Executor framework separates the submission of a task from its execution

Execorators provides a series of factory methods for initiating thread pools and returning thread pools that implement the ExecutorService interface

Factory Method:

  1. newFixedThreadPool: a thread pool for creating a fixed number of threads
  2. newCachedThreadPool: Used to create a cacheable thread pool. The execute call will reuse previously constructed threads or, if no existing threads are available, create a new thread and add it to the pool. Terminates and removes threads from the cache that have not been used for 60 seconds
  3. newSingleThreadExecutor: Used to create a thread pool with only one thread
  4. newScheduledThreadPool: Used to create a thread pool that supports scheduled and periodic task execution

The Alibaba manual mandates a ban on using the factory method provided by Executors to create thread pools

This is indeed a very serious problem. Our department once used the FixedThreadPool thread pool, which resulted in OOM. This is because the thread was blocked or took a long time to execute the task, so the blocking queue kept adding tasks until the memory was full and OOM was reported

So when using a thread pool, we use the constructor of ThreadPoolExecutor to create a thread pool, determine the number of core threads and the maximum number of threads based on the task type, and choose the appropriate length of the blocking queue and the blocking queue

Configure thread pools properly

Properly configuring a thread pool requires analyzing the nature of the task (using ThreadPoolExecutor to create a thread pool) :

  1. Cpu-intensive tasks should be configured with potentially small threads, such as CPU count +1

  2. IO intensive tasks are not always performing tasks and should be configured with as many threads as possible, such as CPU number x2

    The number of cpus is available via Runtime.getruntime ().availableProcessors()

  3. If the task is time-consuming to call the external interface, the idle time of the CUP will be longer. You can set the number of thread pools to be larger, so that the idle time of the CUP can be used to perform other tasks

  4. You are advised to use a bounded queue. You can set the length of the queue to be larger as required to prevent OOM

Reference: The Art of Concurrent Programming in Java

Recommended reading:

Concurrent Java programming | thread explanation

A: concurrent Java programming | Lock AQS, Lock, already, ReentrantReadWriteLock