Thread pool call Execute Submission Task — > creates Worker(set properties thead, firstTask) — > Worker.Thread.start () — > actually calls Worker.run () — > thread pool RunWorker (Worker) — > Work Er. FirstTask. The run ();

RunWork method core (thread reuse)

Final void runWorker(Worker w) {// Get the currentThread Thread wt = Thread.currentThread(); // Get W FirstTask Runnable Task = W. FerstTask; // Set W's firstTask to NULL; // Release the lock and set the state of AQS to 0, allowing the interrupt of w.unlock(); // The processWorkerExit() method in finally has a different logic. Boolean completedAbruptly = true; Try {// call getTask() through a loop to fetch the task, continually fetching the task from the task cache queue and executing while (task! = null || (task = getTask()) ! = null) {// The Worker object will be locked to ensure that the Worker object will not be interrupted during the execution of the task; If ((runStateAtLeast (CTL) get (), STOP) | | / / if the state of the thread pool is greater than or equal to STOP, (Thread.get (), STOP)) &&&/ / Because of the state change in the Thread pool (Thread.get (), STOP)) Wt.isInterrupted ()) // Make sure the thread is not interrupted // Make an interrupt request wt.interrupt(); BeforeExecute (WT, Task); beforeExecute(WT, Task); Throwable thrown = null; Try {// Task.run (); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {// AfterExecute (Task, Thrown); }} finally {// empty task, ready to get the next task through getTask() task = null; / / increasing completedTasks w.com pletedTasks++; // Frees the worker's exclusive lock w.unlock(); } } completedAbruptly = false; } finally {// At this point, finally {// Finally {// At this point, finally {// At this point, finally {// At this point, finally {// At this point, finally {// At this point, finally {// At this point, finally {// At this point, finally {// At this point, finally {// At this point, finally {// At this point, finally {// At this point, finally { // In the first case, if the workCount () returns null, then the workCount () will be decrement. The decrement operation processes processWorkerExit(w, completedAbruptly) in processWorkerExit(); }}

Loop for task getAsk ()

Private Runnable getTask() {Boolean timedOut = false; for (;;) Int c = ctl.get(); Int rs = runStateOf(c); // If the thread pool is SHUTDOWN and the blocking queue is empty, the number of threads in the pool decreases, and the method returns NULL. Recycling thread if (rs > = SHUTDOWN && (rs > = STOP | | workQueue. IsEmpty ())) {decrementWorkerCount (); return null; } // Get the number of workers int wc = WorkerCountOf (c); If allowCoreThreadTimeout is true, or if the current number of threads is greater than the core pool size, Requires a timeout recovery Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; / / if the worker number greater than maximumPoolSize (may call the setMaximumPoolSize (), lead to the worker number greater than maximumPoolSize) if ((wc > maximumPoolSize | | (timed && timedOut)) / / or obtaining mission timeouts && (wc > 1 | | workQueue. IsEmpty ())) {/ / workerCount is greater than 1 or blocking the queue is empty (in blocking queue is not empty, Need to make sure that at least one working thread) if (compareAndDecrementWorkerCount (c)) / / diminishing number of threads, thread pool work method returns null, recycling thread return null; // Fail to decrement the number of worker threads in the pool. Skip the remainder and continue with the loop. } try {if timeout is allowed, then call poll() of the blocking queue and wait for the fetch task only for keepAliveTime, and return null if this is passed. Otherwise, call take(). If the queue is empty and the thread is blocked, wait indefinitely for the fetch task. Runnable R = Timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // If the task is not null, return the successfully retrieved task object if (r! = null) return r; Timeout = true; Timeout = true; Timeout = true; } catch (interruptedException retry) {// If the worker is interrupted, the alternative is to retry without timeout // Under what circumstances does the interrupt occur? Call setMaximumPoolSize(), shutDown(), shutdownNow () timedOut = false; }}}

PS: How to deal with the occurrence of an exception in the thread pool worker?

ProcessWorkerExit method

private void processWorkerExit(Worker w, Boolean completedAbruptly) {// If runWorker's getTask method WorkerCount has been decrementCount () if (if) DecrementWorkerCount ();  final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// completedTasks; completedTaskCount += w.completedTasks; Workers. Remove (w); Workers. Remove (w); Workers. } finally { mainLock.unlock(); } // Try to stop thread pool tryTerminate(); int c = ctl.get(); // SHUTDOWN if (runStateLessThan(c, STOP)) {// The thread is not terminated if (! completedAbruptly) {allowCorePoolSize int min = allowCoreThreadTimeout? 0 : corePoolSize; If (min == 0 &&!) and the queue is not empty, make sure that there is a thread to execute the task in the queue if (min == 0 &&!! workQueue.isEmpty()) min = 1; If (WorkerCountOf (c) >= min) return; // replacement not needed } // 1. AddWorker (null, false); addWorker(null, false); addWorker(false); addWorker(null, false); }}