Jane book Jiang Yi Jonny, reprint please indicate the original source, thank you!

This article is based on JDK1.7 source code analysis and interpretation.

ThreadExecutorPool is a kind of thread pool tool provided by JUC. It is also the most common concurrency framework in the Java language. It can be said that almost anything that requires asynchronous or concurrent execution can use Java thread pool. So first, let’s compare the “pure thread solution” and “ThreadExecutorPool solution” to solve the problem.

Case study: the scribe

In the Middle Ages, there were jobs called Scribe s, or Scribe s. Their duties were like copying a copy machine from book to book. Let’s say there’s a scribes’ studio at this time, and there’s only 2 scribes, and they have to copy 10 books.

In this example, we have “write thread management yourself” and “ThreadExecutorPool does thread management.”

public static class Book { private static AtomicInteger id = new AtomicInteger(0); // Private String bookName; Public void copy() {system.out.println ("start copy "+ bookName); try { Thread.sleep(100L); // sleep 100ms } catch (Exception e) { // ignore } System.out.println("end copy " + bookName); } public Book() { bookName = "book-" + String.valueOf(id.incrementAndGet()); }}Copy the code

Implement thread management yourself

Final BlockingQueue<Book> books = new BlocdBlockingdeque <Book>(10); for (int i = 0; i < 10; i++) { try { books.put(new Book()); } catch (Exception e) { // ignore } } System.out.println("start work..." ); // Create two book scribes Thread Thread[] scribes = new Thread[2]; for (int scribeIndex = 0; scribeIndex < 2; scribeIndex++) { scribes[scribeIndex] = new Thread(new Runnable() { public void run() { for (; ;) { if (Thread.currentThread().isInterrupted()) { System.out.println("time arrives, stop writing..." ); } try { Book currentBook = books.poll(5, TimeUnit.SECONDS); currentBook.copy(); } catch (Exception e) { System.out.println("time arrives, stop writing..." ); return; }}}}); scribes[scribeIndex].setDaemon(false); // set to non-daemon thread scribes[scribeIndex].start(); } // try {thread.sleep (10000L); } catch (Exception e) {// ignore} // Stop copying for (int scribeIndex = 0; scribeIndex < 2; scribeIndex++) { scribes[scribeIndex].interrupt(); } System.out.println("end work..." );Copy the code

We’ve written a bunch of code to do this, so let’s take a look at what ThreadExecutorPool does.

System.out.println("start work..." ); ExecutorService executorService = Executors.newFixedThreadPool(2); for (int i = 0; i < 10; i ++) { executorService.submit(new Runnable() { public void run() { new Book().copy(); }}); } // try {thread.sleep (10000L); } catch (Exception e) { // ignore } executorService.shutdownNow(); System.out.println("end work..." );Copy the code

The whole process is very clear: task writing, thread creation, thread start, thread termination.

But many times, the problem is not limited to the above.

Developer dilemma

The earliest developers of concurrent programming had to do a lot of things themselves, and by using Java thread pools, they could do the following:

1) Thread management, including thread creation, start and destruction;

2) Thread reuse, thread creation will bring a certain cost to the server, how to reduce the cost of frequent repeated thread creation;

3) Elastic scaling. The server usually has peak and low peak periods. Whether the thread pool can be flexibly scaled, for example, whether the thread pool can be recycled after it is not used for a long time to reduce the waste of system resources, or whether the capacity of the thread pool can be increased at any time;

4) Rejection strategy: the number of threads is limited and many tasks need to be processed. Whether to reject or block the tasks beyond the scope of the system;

5) Exception handling. The thread may encounter exceptions or errors during execution, and how the developer should properly deal with these exceptions or errors;

6) Assignment of tasks, whether the assignment of tasks is based on first-in, first-out or some kind of priority strategy.

Let’s take a look at Doug Lea’s ThreadExecutorPool framework to solve these problems.

ThreadExecutorPool source brief analysis

First, before reading the source code, let’s introduce some important concepts about ThreadExecutorPool

The life cycle

In the ThreadExecutorPool thread pool design, the entire task execution framework thread pool is divided into five life cycles:

RUNNING: Allows to receive new tasks and process tasks in the queue

SHUTDOWN: no new tasks are received. Only tasks in the queue are digested

STOP: not only does it STOP receiving new tasks, but it stops processing the tasks in the queue and tries to interrupt the thread that is executing the task

TIDYING: All tasks terminated, workCount is set to 0, the thread state is set to TIDYING and terminated hook function terminated().

TERMINATED: The hook function TERMINATED () is TERMINATED

The transformation diagram of each life cycle is as follows:


image.png

As you can see from the diagram, the changes throughout the life cycle are irreversible.

Status word

ThreadExecutorPool packs the thread pool state and thread pool capacity into an int variable, as shown below


image.png

Thread pool state bits

state High value enumeration Plus or minus sex
RUNNING 111 Negative number (-536870912)
SHUTDOWN 000 0
STOP 001 Positive number (536870912)
TIDYING 010 Positive (1073741824)
TERMINATED 011 Positive number (1610612736)

The state data are TERMINATED > TIDYING > STOP >SHUTDOWN > RUNNING

The code in ThreadExecutorPool looks like this:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; Private static final int RUNNING = -1 << COUNT_BITS; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; Private static int runStateOf(int c) {return c & ~CAPACITY; private static int runStateOf(int c) {return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static Boolean isRunning(int c) {return c < SHUTDOWN; }Copy the code

Thread pools mainly execute processes


image.png

The code is introduced

First, we create a thread pool.

1. Create a thread pool
ExecutorService executorService = Executors.newFixedThreadPool(2);
Copy the code

Use the factory method provided by Executors to create the following four types of thread pools:

NewFixedThreadPool. This method will be used to create a fixed size thread pool (corePoolSize = maxPoolSize at this point), one thread pool will be created for each task submitted until the maximum number of thread pools is reached, and the size of the thread pool will not change after that;

NewCachedThreadPool. This method creates a cacheable thread pool (corePoolSize = 0, maxPoolSize = integer.max_value) where idle threads are automatically reclaimed after 60 seconds. The risk with this thread pool is that if the server application reaches peak request times, New threads are created until memory runs out.

NewSingleThreadExecutor. This method creates a single-threaded thread pool that executes sequentially in order of tasks in the queue (FIFO, LIFO, priority);

NewScheduledThreadPool. This method creates a pool of fixed-length threads that can execute tasks in a deferred or timed manner.

2. Task submission

The general logic of task submission is as follows:

1) When the thread pool is smaller than corePoolSize, a new submitted task will create a new thread to execute the task, even if there are idle threads in the thread pool;

2) When the thread pool reaches the corePoolSize, the new submitted task will be put into the workQueue and wait for the task scheduling in the thread pool to execute;

3) If the workQueue is full and maximumPoolSize > corePoolSize, a new thread will be created to execute the newly submitted task.

4) If the number of submitted tasks exceeds maximumPoolSize, RejectedExecutionHandler will handle the new submitted task.

5) When there are more corePoolSize threads in the thread pool and the idle time reaches keepAliveTime, the idle thread is closed;

So how does the source code implement the above description

After the thread pool is created successfully, we submit the task to the thread pool:

executorService.submit(new Runnable() { public void run() { new Book().copy(); }});Copy the code

Submit to thread pool:

public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); // Wrap a new task execute(ftask); // thread pool entry return ftask; }Copy the code

As you can see, the ThreadExecutorPool entry method is execute(Runnable commad). The execution logic of this method is as follows:

int c = ctl.get(); // 1. If the number of threads in the current thread pool is less than the number of core threads, add new threads to the thread pool. If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // If the thread pool is already closed, then roll back the added task if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3. If corePoolSize is already full and the task queue is also full, // Try to increase the number of threads to maximumPoolSize, if still failed, execute the reject policy else if (! addWorker(command, false)) reject(command);Copy the code

Ctl.get (), workerCountOf(), and isRunning() are all used to read and write status words.

Next, let’s look at what addWorker does:

Private Boolean addWorker(Runnable firstTask, Boolean core) {private Boolean addWorker(Runnable firstTask, Boolean core) { // Need to increment the work count (thread-safe operation)... boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { ... w = new Worker(firstTask); // This encapsulates a new Work class, which we will cover later with final Thread t = w.htread; if (t ! = null) { ... Int c = ctl.get(); int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new  IllegalThreadStateException(); workers.add(w); int s = workers.size(); . workerAdded = true; . if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! WorkerStarted) // If the task fails to be started or submitted to the thread pool, // the rollback operation is performed (remove the failed worker from the worker pool and reduce the task count in the status word) addWorkerFailed(w); } return workerStarted; }Copy the code
3. Task execution

Tasks are executed in the Worker class, which is a class that inherits the Runnable interface.

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { ... public void run() { runWorker(this); }... }Copy the code

You can see that the external runWorker() method is called in the Worker class. As a result, you can see that the main logic for task execution is performed in the external runWorker() method

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; . boolean completedAbruptly = true; try { while (task ! = null || (task = getTask()) ! = null) {// Loop read task... try { beforeExecute(wt, task); Thrown = null; thrown = null; try { task.run(); } catch (RuntimeException x) {thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); }} finally {task = null; w.completedTasks++; . } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}Copy the code

BeforeExecute and afterExecute are hook methods that specify actions to be performed when a thread starts execution and after it completes execution, which the developer needs to implement.

Also note the getTask() method called within the runWorker method, which returns null to terminate the worker thread’s execution loop if: 3) The current thread count is greater than corePoolSize and smaller than maxPoolSize, and the timeout time for fetching data from BlockingQueue is exceeded (default: 60 seconds).

The code implementation is as follows:

private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); / / check status if the current thread pool (rs > = SHUTDOWN && (rs > = STOP | | workQueue. IsEmpty ())) {decrementWorkerCount (); return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) ! = rs) continue retry; // else CAS failed due to workerCount change; Retry inner loop} try {// If a thread does not obtain a retry task within the specified time (60 seconds by default), a thread is about to expire Runnable r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}Copy the code
4. Task rejection

If a thread is committed to a thread pool, one of the following conditions occurs in the current thread pool: ThreadPoolExecutor AbortPolicy: AbortPolicy: AbortPolicy: AbortPolicy: AbortPolicy This strategy will directly thrown RejectedExecutionException abnormalities, the caller will get the exception; 2) DiscardPolicy: With this policy, the thread pool will quietly discard the task without the caller knowing; 3) CallerRunsPolicy (caller run) : this policy neither abandons the task nor throws an exception, but returns the task to the caller, thus reducing the traffic of the new task; DiscardOldestPolicy: This policy will discarding the next task whose turn it is to execute. DiscardOldestPolicy results in discarding the highest priority task. Therefore, it is best not to use the DiscardOldestPolicy together with the priority queue. Here, the code implementation we will only show the CallerRunsPolicy (caller run) policy:

public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ Public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (! e.isShutdown()) { r.run(); }}}Copy the code

Developers can also choose to define their own saturation strategy based on business needs.

5. Thread pool destruction

ThreadExecutorPool provides two methods to destroy the thread pool: shutdown() and shutdownNow()

The shutdown() method simply sets the state of the thread pool to shutdown and rejects all subsequent attempts to submit requests, but tasks already in the queue will still be consumed as normal.

The shutdownNow() method is much simpler. It forcibly shuts down ExecutorService, tries to cancel tasks that are being executed, and returns all tasks that have been committed but not yet started, which the developer can log for later processing. In addition, the attempt to cancel the executing task is only an attempt to interrupt the executing thread, and the specific thread response to interrupt policy needs to be written by the user. The code implementation is as follows:

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
Copy the code

Tread carefully: thread pool experience talk

Do not use ThreadLocal

Do not use ThreadLocal in a ThreadPoolExecutor thread pool, where threads are reused and therefore shared by multiple tasks, thus potentially polluting dirty data. It needs to be used with care

Set the value of corePoolSize appropriately

Take a piece of code as an example:

Private static final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); private static final ExecutorService service = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, queue );Copy the code

If the number of tasks exceeds 10, maximumPoolSize threads will be created first and the rest will be queued for execution.

In the implementation of ThreadPoolExecutor, when the workQueue is full and maximumPoolSize>corePoolSize, the new submitted task creates a new thread to execute the task.

Therefore, the queue will never be full, so no maximumPoolSize threads will ever be created, which means that our task will always be running on one thread, not the desired number of threads.

Thread the interrupt

Although ThreadPoolExecutor provides the shutdownNow() method, which attempts to interrupt all threads after calling it, this interrupt is not a guarantee that the thread will terminate, so it is up to the developer to implement a thread interrupt strategy. We’ve covered all of this In Section 7.1.2 of Doug Lea’s Java Concurrency In Practice, but I won’t repeat it here.

Finalize function

In particular, ThreadPoolExecutor has a Finalize function, which is implemented as follows:

protected void finalize() {
    shutdown();
}
Copy the code

The shutdown() function is called in this method, so don’t let the thread pool go out of your code’s scope if you don’t really want to stop it.

Other personal blog address: blog.csdn.net/jonnyhsu_09…

I’m Jonny Jiang yi. Magic city Java worker. Read my article have harvest, remember to pay attention and like ~ if must reward, I will not refuse ~ 🙂