This article has authorized the public account “code egg”, and will be improved later. Reproduced please indicate the source, thank you ~

General layout

Thread

Threads are a resource, not just in the world of programs. A program is just an abstract representation of life.

For example, the ticket window, refund window, check window of the station, each window is doing a different thing, is running different threads at the same time.

Many threads, need to manage, different threads also to ensure that they will not interfere with each other, each to do their own.

Thread is like human energy, and thread pool is like the total field of human energy. This analogy may not be appropriate, because when people do several things at the same time, even if they seem to be “simultaneous”, they are distracted, and their OWN CPU is constantly switching time slices, which is only “concurrent”, not “parallel”.

Threads of Life

ThreadPoolExecutor

Thread pool state

public class ThreadPoolExecutor extends AbstractExecutorService {
    /** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indicating the effective number of threads * runState, indicating whether running, shutting down etc * * The runState provides the main lifecycle control, taking on values: * * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completedCopy the code
  • RunState: running status of the thread pool
  • WorkerCount: indicates the number of worker threads
    // Android-added: @ReachabilitySensitive
    @ReachabilitySensitive
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code

The thread pool uses a 32-bit int to hold both runState and workerCount, where the top three bits (bits 31 through 29) are runState and the remaining 29 bits are workerCount (about 500 million).

Let’s draw a picture of the storage structure

Construtor

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Copy the code

Its parameters

  • CorePoolSize Number of core threads, such as class cadres.

  • MaximumPoolSize Specifies the maximum number of threads, such as the number of seats in a classroom. When the number of submitted tasks exceeds this maximum, the thread also has a RejectExecutionHandler policy.

  • KeepAliveTime indicates the keepAliveTime of idle threads except core threads. When the number of threads in the thread pool exceeds the number of corePoolSize, keepAliveTime is up, then the idle thread is disabled.

  • WorkQueue Through workQueue, thread pools implement the blocking function. When the number of threads in the thread pool exceeds its corePoolSize, the thread enters the blocking queue and waits.

  • ThreadFactory Creates a factory for threads. All threads are created through the Factory (through the addWorker method).

  • Handler thread pool saturation strategy.

Four strategies:

  • AbortPolicy: Directly throws an exception, the default policy.
  • CallerRunsPolicy: Executes the task with the caller’s thread;
  • DiscardOldestPolicy: Discards the most advanced task in the blocking queue and executes the current task.
  • DiscardPolicy: Discards tasks directly.

More constructors called:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
Copy the code

Queuing strategy

  • SynchronousQueue commits directly. Submit tasks directly to the thread instead of saving them.

  • LinkedBlockingQueue Unbounded queue. An unbounded queue is used to store submitted tasks when the core threads are busy. The number of threads cannot exceed the number of core threads, and the maximumPoolSize setting is invalid.

  • ArrayBlockingQueue A bounded queue. Queues are also capped in case resources run out.

Executors

Executors. DefaultThreadFactory () is the default threadFactory Executors static factory. More on that later.

Source code analysis

Worker — Worker thread

When a thread is created in a thread pool, it is encapsulated as a Worker thread Worker, which is a person working in the thread pool.

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
Copy the code

Implements Runnable and inherited AbstractQueuedSynchronizer (AQS), so it is not only the task of an executable, and can achieve the effect of the lock.

The difference between Worker and Task: Worker is a thread in the thread pool, while Task is a runnable, but it is not really executed. It just calls the run method by Worker.

/**
  * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
       this.thread = getThreadFactory().newThread(this);
   }
Copy the code

Let’s take a look at the overall flow chart ~~

Note:

  • There are many workers in the thread pool, and the core member is corePoolSize. The maximum number of workers in the pool is maximumPoolSize.
  • WorkQueue is a queue of waiting tasks.
  • Threads within corePoolSize are not collected by default.

Look at the source code

execute(Runnable command)

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState  and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, * * If the number of currently running threads < corePoolSize, attempt to start a new thread with the given command as the first task. * Call the addWorker method, check runState and workerCount, and prevent error alerts if additional threads are added, or return false if not. * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back * * The enqueuing if * stopped, or start a new thread if there are none. You still need a double check to see if you need to create a new thread * (because it's possible that existing threads have died since the last check) or if the thread pool has been closed after entering this method. * So we need to check state again, roll back to queue if the thread pool stops, and create a new thread if there are no more threads in the pool. * 3. If we cannot queue task, then we try to add a new * thread. If it fails, We know we are shut down or saturated * and so reject the task. * If the task cannot be queued (perhaps the thread pool is closed or full), a new thread needs to be opened (heading to maxPoolSize).  * if it fails, the thread pool is shutdown or full, and the task will be rejected. * /
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        /** * 2, if the thread pool is in the RUNNING state, and the thread pool is queued successfully */
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            
            // If the thread pool is not in the RUNNING state,
            // and remove(command)-- workqueue.remove () succeeds, rejecting the current command
            if (! isRunning(recheck) && remove(command))
                reject(command);

            // Why only check if the number of workers running is 0? Why not compare corePoolSize??
            // Only one worker thread can execute tasks from the queue.
            // Because as long as there are active worker threads, you can consume tasks in the workerQueue
            else if (workerCountOf(recheck) == 0)
                addWorker(null.false);
        }
        /** * 3, if the thread pool is not in the running state or cannot be queued ** start a new thread, expand to maxPoolSize, * if addWork(command, false) fails, reject the current command */
        else if(! addWorker(command,false))
            reject(command);
    }
Copy the code

It’s a bit of a hassle to look at the annotations, so let’s list the comments in the execute method separately, step by step:

  1. If the number of threads currently running is < corePoolSize, try to start a new thread with the given command as the first task. The addWorker method is called to check runState and workerCount, and to prevent error alerts if additional threads are added, or to return false if not.

  2. If a task is successfully added to the queue, you still need to double check to see if a new thread is needed. The thread pool may have died since the last check or has been closed after entering this method. So we need to check state again, roll back to the queue if the thread pool stops, and create a new thread if there are no more threads in the pool.

  3. If the task cannot be queued (perhaps the thread pool is closed or full), you need to start a new thread (heading to maxPoolSize). If it fails, the thread pool is shutdown or full, and the task must be rejected.

Draw one yourself:

It’s still complicated. Let’s make it simple:

In general, it is:

  1. Let’s see if we can add it to the core thread,
  2. And see if I can join the workQueue,
  3. If the maximum number of threads in the pool is exceeded, then the task is rejected.

Executors

It is a Java utility class. Provides factory methods to create different types of thread pools. It makes it easy to create the following thread pools.

Common thread pool The characteristics of To adapt to the scene
newSingleThreadExecutor A single-threaded thread pool Used in scenarios where sequential execution is required and only one thread is executing
newFixedThreadPool Fixed size thread pool Limit the number of threads when concurrency pressure is known.
newCachedThreadPool A thread pool that can be expanded indefinitely It is suitable for tasks with small execution time.
newScheduledThreadPool A thread pool that can be delayed and timed to start Applicable to scenarios where multiple background threads are required to perform periodic tasks.
newWorkStealingPool A thread pool with multiple task queues You can reduce the number of connections and create threads with the current number of available cpus to execute in parallel.

For example:

ExecutorService singleService = Executors.newSingleThreadExecutor();

ExecutorService fixedService = Executors.newFixedThreadPool(9);

ExecutorService cacheService = Executors.newCacheThreadPool();
Copy the code

Or you can customize the required thread pool through the constructor of ThreadPoolExecutor.

Let me write it here

ref

  • Take an in-depth look at the implementation of Java thread pools
  • Deep interpretation of Java thread pool design ideas and source code implementation
  • Java thread pool ThreadPoolExecutor (ThreadPoolExecutor
  • Speed-read Java thread pools
  • Do you really understand thread pools?