Thread 1.

1.1 define

  • Process: process is a program execution, process is a program and its data in the processing machine sequential execution of the activities, process is a program with independent functions in a data set to run on the process, it is the system for resource allocation and scheduling an independent unit
  • Thread: The smallest unit in which an operating system can schedule operations. It is contained within the process and is the actual operating unit within the process

1.2 Why Multithreading

  • More processor cores
  • Faster response times
  • A better programming model

1.3 priority

Modern operating systems basically schedule running threads in the form of time. The operating system will divide time slices and the threads will be allocated several time slices. When the time slices are used up, thread scheduling will occur and wait for the next allocation. The number of time slices allocated by a thread determines how much processor resources it uses, and thread priority is a thread attribute that determines whether a thread should allocate more or less processor resources.

In Java threads, the priority is controlled by an integer member variable priority, which ranges from 1 to 10. The setPriority(int) method can be used to change the priority during thread construction. The default priority is 5.

1.4 state

There are six states of a Java thread

Java thread state transitions

1.5 the Daemon thread

Daemon threads are a support thread because they are used primarily for background scheduling and support work within a program. This means that a Java virtual machine will exit when there are no non-Daemon threads in the machine. Threads can be set to Daemon threads by calling thread.setdaemon (true).

Note: The finally block of Daemon threads does not necessarily execute

The sample code

package Daemon; import util.SleepUtils; public class Daemon { public static void main(String[] args) { Thread thread = new Thread(new DaemonRunner(), "Daemon"); thread.setDaemon(true); thread.start(); } static class DaemonRunner implements Runnable { @Override public void run() { try { System.out.println("start"); SleepUtils.second(10); }finally { System.out.println("Daemon"); }}}}Copy the code
package util;

import java.util.concurrent.TimeUnit;

public class SleepUtils {

    public static void second(long seconds){
        try {
            TimeUnit.SECONDS.sleep(seconds);
        }catch (InterruptedException ignored){

        }
    }

}

Copy the code

2. The thread pool

“Thread pool”, as the name implies, is a thread cache. Threads are scarce resources. If they are created without limit, they will not only consume system resources, but also reduce system stability.

2.1 Introduction to Thread Pools

In Web development, the server needs to accept and process requests, so each request is allocated a thread to process. This is easy to implement if you create a new thread for each request, but there is a problem:

If the number of concurrent requests is very large, but the execution time of each thread is very short, threads will be created and destroyed frequently, which will greatly reduce the efficiency of the system. It may occur that the server spends more time and consumes more system resources creating and destroying new threads for each request than it does processing the actual user request.

Is there a way to finish a task without being destroyed, and then move on to other tasks?

That’s what thread pools are for. Thread pools provide a solution to the overhead and under-resourcing problems of the thread life cycle. By reusing threads for multiple tasks, the overhead of thread creation is spread over multiple tasks.

When to use thread pools?

  • The processing time of a single task is short
  • The number of tasks to handle is large

2.2 Advantages of thread pools

  • Reusing existing threads reduces thread creation and death overhead and improves performance
  • Improve response speed. When a task arrives, it can be executed immediately without waiting for the thread to be created.
  • Improve thread manageability. Threads are scarce resources. If they are created without limit, they will not only consume system resources, but also reduce system stability. Thread pools can be used for unified allocation, tuning, and monitoring.

3. The Executor framework

The Executor interface is the most basic part of the thread pool framework and defines a execute method for executing Runnable.

The following figure shows its inheritance and implementation:

As you can see from the figure, there is an important subinterface ExecutorService under Executor, which defines the specific behavior of the thread pool:

  • Execute (Runnable command) : executes tasks of the Ruannable type
  • Submit (Task) : You can submit a Callable or Runnable task and return a Future object representing the task
  • Shutdown () : Shuts down after completing a submitted task, no longer taking over new tasks
  • ShutdownNow () : Stop all ongoing tasks and shutdown
  • IsTerminated () : tests whether all tasks are completed
  • IsShutdown () : tests whether the ExecutorService isShutdown

3.1 Thread pool key attributes

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
Copy the code

CTL is a field that controls the running state of the thread pool and the number of valid threads in the thread pool. It contains two parts of information: The running state of the thread pool (runState) and the number of valid threads in the thread pool (workerCount), as you can see here, are stored using the Integer type, with the runState stored in the upper 3 bits and the workerCount stored in the lower 29 bits. COUNT_BITS is 29, and CAPACITY is 1 shifted 29 bits to the left minus 1 (29 1s). This constant represents the upper limit of workerCount, which is about 500 million.

3.2 CTL related methods

    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code
  • RunStateOf: Obtains the running status.
  • WorkerCountOf: Gets the number of active threads;
  • CtlOf: Gets the value of the health status and number of active threads.

3.3 Thread pool status

Thread pools exist in five states

private static final int RUNNING = -1 << COUNT_BITS; Private static final int SHUTDOWN = 0 << COUNT_BITS; Private static final int STOP = 1 << COUNT_BITS; Private static final int TIDYING = 2 << COUNT_BITS; Private static final int TERMINATED = 3 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // The top three bits are 011Copy the code
  • RUNNING
    • Status description: When the thread pool is in the RUNNING state, it can receive new tasks and process added tasks.
    • State switching: The initial state of the thread pool is RUNNING. In other words, once the thread pool is created, it is RUNNING and the number of tasks in the pool is 0!
  • SHUTDOWN
    • Status description: When the thread pool is in the SHUTDOWN state, it does not receive new tasks but can process added tasks
    • State switching: When the shutdown() interface of the thread pool is called, the thread pool is run -> shutdown.
  • STOP
    • Status description: When the thread pool is in the STOP state, it does not receive new tasks, does not process added tasks, and interrupts ongoing tasks.
    • State switch: When the shutdownNow() interface of the thread pool is called, the thread pool is run or SHUTDOWN -> STOP.
  • TIDYING
    • State description: When all tasks have terminated, the “task quantity” recorded by CTL is 0, and the thread pool changes to TIDYING state. The hook function terminated() is executed when the thread pool is in TIDYING state. Terminated () is empty in the ThreadPoolExecutor class and is processed if the user wants the thread pool to become TIDYING; This can be done by overloading the terminated() function.
    • State switching: SHUTDOWN -> TIDYING occurs when the thread pool is in SHUTDOWN state, the blocking queue is empty, and the tasks executed in the thread pool are empty. When the thread pool is in the STOP state and the task executed in the thread pool is empty, STOP -> TIDYING is invoked.
  • TERMINATED
    • The thread pool is TERMINATED completely and becomes TERMINATED.
    • State switching: the thread pool terminated in TIDYING state is terminated by TIDYING – > terminated(). Entry into TERMINATED conditions is as follows:
      • The thread pool is not RUNNING;
      • The thread pool state is not TIDYING or TERMINATED;
      • If the thread pool state is SHUTDOWN and the workerQueue is empty;
      • WorkerCount 0;
      • Setting the TIDYING state succeeded.

State switch:

4. Parse the source code of ThreadPoolExecutor

4.1 Creating a thread pool

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
Copy the code

4.2 Task Submission

Public void execute() // Submit the task. No return value. Public Future<? > submit() // Returns a value when the task completesCopy the code

4.3 Parameter Description

4.3.1 corePoolSize

The number of core threads in the thread pool that, when a task is submitted, creates a new thread to execute the task until the current number of threads equals corePoolSize; If the current number of threads is corePoolSize, further submitted tasks are stored in a blocking queue, waiting to be executed. If the thread pool’s prestartAllCoreThreads() method is executed, the thread pool creates and starts all core threads ahead of time.

4.3.2 maximumPoolSize

The maximum number of threads allowed in the thread pool. If the current blocking queue is full and the task continues to be submitted, a new line is created to execute the task if the current number of threads is less than maximumPoolSize;

4.3.3 keepAliveTime

Thread pools maintain the idle time allowed by threads. When the number of threads in the thread pool is greater than corePoolSize, threads outside the core thread will not be destroyed immediately if no new tasks are submitted. Instead, they will wait until the keepAliveTime is exceeded.

4.3.4 unit

The unit of keepAliveTime;

4.3.5 workQueue

A blocking queue used to hold tasks that are waiting to be executed and that must implement the Runable interface. The JDK provides the following blocking queue:

  • ArrayBlockingQueue: a bounded blocking queue based on an array structure that sorts tasks by FIFO;
  • LinkedBlockingQuene: a blocking queue based on a linked list structure that sorts tasks by FIFO and typically has a higher throughput than ArrayBlockingQuene;
  • SynchronousQuene: a blocking queue that does not store elements. Each insert operation must wait until another thread calls a remove operation. Otherwise, the insert operation is blocked and throughput is usually higher than LinkedBlockingQuene.
  • PriorityBlockingQuene: an unbounded blocking queue with priority;

4.3.6 threadFactory

It is a variable of type ThreadFactory that is used to create a new thread. Default Executors. DefaultThreadFactory () to create a thread. When a thread is created using the default ThreadFactory, the new thread is created with the same NORM_PRIORITY priority and is non-daemon, and the thread name is set.

4.3.7 handler

The saturation strategy of the thread pool. When the blocking queue is full and there are no idle worker threads, if the task continues to be submitted, a policy must be adopted to process the task. The thread pool provides four strategies:

  • AbortPolicy: Directly throws an exception, the default policy.
  • CallerRunsPolicy: Executes the task with the caller’s thread;
  • DiscardOldestPolicy: Discards the most advanced task in the blocking queue and executes the current task.
  • DiscardPolicy: Directly discards the task.

The above four policies are all inner classes of ThreadPoolExecutor.

You can also implement the RejectedExecutionHandler interface according to application scenarios to customize saturation policies, such as logging or persistent storage of tasks that cannot be processed.

4.4 Thread Pool Monitoring

Public long getCompletedTaskCount() public int getPoolSize() Public int getActiveCount() // The number of threads executing tasks in the thread poolCopy the code

4.5 Thread Pool Principles

5. Source code analysis

5.1 the execute method

5.1.1 Overall flow chart

5.1.2 code

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); Int c = ctl.get(); /* * Retrieve the last 29 bits of CTL to get the number of current working threads; * If the current number of threads is smaller than the core thread (corePoolSize), create a thread to put into the thread pool; */ if (workerCountOf(c) < corePoolSize) {/* The second parameter in addWorker indicates whether the number of added threads is limited by corePoolSize or maximumPoolSiz; * If true, judge by corePoolSize; */ if (addWorker(command, true)) return; */ c = ctl.get(); } /* * If (isRunning(c) &&workqueue.offer (command)) {// get the CTL value int recheck = ctl.get(); // If the thread pool is not running, remove the command if (! IsRunning (recheck) && remove(command)) // After the command is executed, the handler processes the task with a reject policy (command). Else if (workerCountOf(recheck) == 0) // The first argument is null, indicating that a thread is created in the pool, but not started; The second parameter is false to set the maximum number of threads in the thread pool to maximumPoolSize. addWorker(null, false); } // The thread pool is not in the RUNNING state; WorkerCount >= corePoolSize and workQueue is full. // At this point, the addWorker method is called again, but the second argument is passed false, setting the upper limit of the thread pool's // finite number of threads to maximumPoolSize; // Reject the task if it fails else if (! addWorker(command, false)) reject(command); }Copy the code

5.2 addWorker method

The main job of addWorker is to create a new thread in the thread pool and execute it. The firstTask parameter specifies the firstTask to be executed by the new thread. The core parameter is true to determine whether the number of active threads is less than corePoolSize. False indicates whether the number of active threads is less than maximumPoolSize before adding a new thread.

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); Int rs = runStateOf(c); /* * if rs >= SHUTDOWN, no new task will be received; * 1. Rs == SHUTDOWN, if any of the following three conditions are not met, return false: * 1. Rs == SHUTDOWN, if any of the following three conditions are not met, return false: * 1. The blocking queue is not empty * * First consider rs == SHUTDOWN * in which case no new submitted task will be accepted, so f alse will be returned if firstTask is not empty; * Then, if firstTask is empty and workQueue is also empty, return false, * because there are no more tasks in the queue, Check if queue empty only if necessary. If (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) Int wc = workerCountOf(c); // If wc exceeds CAPACITY, which is the maximum of the lower 29 bits of the CTL (binary is 29 1s), return false; // Core is the second argument to the addWorker method, which compares corePo olSize if true, and maximumPoolSize if false. if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; / / try to increase workerCount, if successful, the jump out of the first for loop if (compareAndIncrementWorkerCount (c)) break retry. C = ctl.get(); c = ctl.get(); // re-read CTL // If the current running state is not rs, the state has been changed. Return the first for loop and continue if (runStateOf(c)! = rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; W = new Worker(firstTask); w = new Worker(firstTask); // Each Worker object creates a Thread. Final Thread t = w.htread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // rs < SHUTDOWN; // If rs is RUNNING or rs is SHUTDOWN and firstTask is null, add threads to the thread pool. // Since no new tasks are being added during SHUTDOWN, But will carry out the task of workQueue if (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {if (t.i sAlive ()) / / precheck that t is  startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); If (s > largestPoolSize) largestPoolSize = s; if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) {// start the thread. workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code

5.3 the Worker class

Each thread in the ThreadPool is encapsulated into a Worker object, and ThreadPool maintains a set of Worker objects. See JDK source code.

The Worker class inherits AQS and implements the Runnable interface. Note firstTask and Thread attributes: firstTask saves incoming tasks; A thread is a thread created from a ThreadFactory when the constructor is called, and is used to process tasks.

When the constructor is called, the task is passed in, in this case via getThreadFactory().newThread(this); To create a newThread, the newThread method passes in the parameter this. Since the Worker itself inherits the Runnable interface, that is, a thread, a Worker object will call the run method in the Worker class when it is started.

Worker inherits AQS and uses AQS to realize the function of exclusive lock. Why not use ReentrantLock? You can see the tryAcquire method, which does not allow reentrant, whereas ReentrantLock does:

  • Once the lock method obtains an exclusive lock, it indicates that the current thread is executing a task.
  • If a task is being executed, the thread should not be interrupted;
  • If the thread is not in the exclusive lock state, that is, the idle state, it is not processing the task, then you can interrupt the thread.
  • The thread pool calls the interruptIdleWorkers method to interrupt idle threads when the shutdown or tryTerminate methods are executed. The interruptIdleWorkers method uses the tryLock method to determine whether threads in the pool are idle.
  • It is set to non-reentrant because we do not want the task to regain the lock when it calls a thread pool control method like setCorePoolSize. If you use ReentrantLock, it is reentrant, so that if a thread pool-controlled method such as setCorePoolSize is called in the task, the running thread is interrupted.

Therefore, Worker inherits from AQS and is used to determine whether threads are free and can be interrupted.

5.4 runWorker method

5.4.1 Overall process

  • The while loop keeps getting tasks through the getTask() method;
  • The getTask() method takes the task from the blocking queue;
  • If the thread pool is stopping, make sure the current thread is interrupted, otherwise make sure the current thread is not interrupted.
  • Call task.run() to execute the task;
  • If task is null, it breaks out of the loop and executes processWorkerExit();
  • When the runWorker method completes execution, it also means that the run method in the Worker completes execution and destroys the thread.

5.4.2 code

The run method in the Worker class calls the runWorker method to perform the task. The runWorker method has the following code:

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // Get the first task Runnable task = w.firstTask; w.firstTask = null; // allow interrupt w.nlock (); // Exit loop because of exception Boolean completedAbruptly = true; Try {// If the task is empty, get the task by getTask while (task! = null || (task = getTask()) ! = null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}Copy the code

Here is an illustration of the first if judgment, which aims to:

  • If the thread pool is stopping, make sure the current thread is interrupted.
  • If not, ensure that the current thread is not interrupted;

Consider that the shutdownNow method may also be executed during the execution of the if statement, which sets the state to STOP. Recall the STOP state:

New tasks cannot be accepted and tasks in the queue are not processed, which interrupts the thread that is processing the task. When a thread pool is in the RUNNING or SHUTDOWN state, calling the shutdownNow() method brings the pool to that state.

To interrupt all threads in the thread pool, Using thread.interrupted () ensures that the Thread is in the RUNNING or SHUTDOWN state because thread.interrupted () will restore the interrupted state.

5.5 getTask method

The getTask method is used to get the task from the blocking queue as follows:

Private Runnable getTask() {// the value of timeOut indicates whether the task was timedOut the last time it was fetched from the blocking queue. Boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /* * Check if queue empty only if necessary. * 1. Rs >= STOP, whether the thread pool is stopping; * 2. Whether the blocking queue is empty. * If the above conditions are met, the workerCount is reduced by 1 and null is returned. * Because no more tasks are allowed to be added to the blocking queue if the current thread pool state is SHUTDOWN or above. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // The timed variable is used to determine whether timeout control is required. // allowCoreThreadTimedOut defaults to false, meaning that core threads are not allowed to timeout; // wc > corePoolSize, indicating that the number of threads in the current thread is greater than the number of core threads; / / for these threads over the core number of threads, the need for overtime control Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; /* * wc > maximumPoolSize because setMaximumPoolSize may be executed at the same time; * Timed && timedOut if true, the current operation needs to be timed out and the last task fetched from the blocking queue timed out * If the number of valid threads is greater than 1 or the blocking queue is empty, then try to decrement workerCount by 1; * If the reduction fails, retry is returned. * If wc == 1, the current thread is the only thread in the pool. * / if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try {/* * Timed: if timed is true, the poll method of the blocking queue is used for timeout control. If no task is obtained within keepAliveTime, null is returned. If the queue is empty at this point, the take method blocks until the queue is not empty. * */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; // If r == null, timeout has occurred. TimedOut = true; } catch (InterruptedException retry) {// If the current thread is interrupted while retrieving the task, set timedOut to false and loop back timedOut = false; }}}Copy the code

The important thing here is the second if judgment, which controls the effective number of threads in the thread pool. If the number of threads in the current thread pool exceeds corePoolSize and is smaller than maximumPoolSize, and the workQueue is full, you can add worker threads. If the number of threads in the current thread pool exceeds corePoolSize and is smaller than maximumPoolSize, you can add worker threads. If timedOut is true, the workQueue is empty, which means that there are no more threads in the current thread pool to perform the task. You can destroy the number of threads that are larger than corePoolSize, leaving the number of threads at corePoolSize.

When will it be destroyed? When the runWorker method is finished executing, the JVM automatically retrieves it.

When the getTask method returns NULL, the while loop is broken out of the runWorker method, followed by the processWorkerExit method.

5.6 processWorkerExit method

Private void processWorkerExit(Worker w, Boolean completedAbruptly) {// If completedAbruptly is true, an exception occurred during thread execution, You need to subtract workerCount by 1; // If the thread executes without exception, the workerCount is already decrement by 1 in the getTask() method and no further decrement is required. if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// Count the number of completed tasks completedTaskCount += w.completedTasks; Worker. remove(w); worker. remove(w); worker. remove(w); } finally { mainLock.unlock(); } // Decide whether to terminate the thread pool based on the thread pool state. TryTerminate (); int c = ctl.get(); /** when the thread pool is RUNNING or SHUTDOWN, if the worker is terminated abnormally, addWorker will be added directly; * Keep at least one worker if allowCoreThreadTimeOut=true and wait queue has tasks; * If allowCoreThreadTimeOut=false, workerCount is not less than corePoolSize. */ if (runStateLessThan(c, STOP)) { if (! completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); }}Copy the code

At this point, the Worker thread is destroyed after processWorkerExit is executed. This is the lifetime of the entire Worker thread. Starting with the execute method, the Worker uses ThreadFactory to create a new Worker thread. The task is then executed, and if getTask returns NULL, the processWorkerExit method is entered, and the entire thread terminates, as shown: