Moment For Technology

ThreadPoolExecutor learning Notes (2)

Posted on Nov. 27, 2023, 10:41 p.m. by Robert Maddox
Category: The back-end Tag: The back-end java

In addWorker method, the thread in Worker will be started after the book is added successfully

w = new Worker(firstTask); final Thread t = w.thread; . if (workerAdded) { t.start(); // Start the thread in worker workerStarted = true; }Copy the code

Now let's see what the run method does

public void run() {
Copy the code


final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; Try {// catch getTask's exception // If the first task is not empty, run the first task // If the first task is empty, loop to get the task from the task queue while (task! = null || (task = getTask()) ! = null) { w.lock(); // If the thread pool stops, make sure the thread is interrupted. Thread is not interrupted by the if ((runStateAtLeast (CTL) get (), STOP) | | (Thread. Interrupted ()  runStateAtLeast (CTL) get (), STOP))) ! wt.isInterrupted()) wt.interrupt(); Try {// Catch business exceptions beforeExecute(wt, task); Throwable thrown = null; try {; } catch (RuntimeException x) {thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; ++ w.nlock (); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); //completedAbruptly If the exception is caused by user services, the value is true}}Copy the code

The way to start a thread is fairly simple.


private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); Final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); // Remove from workers} finally {mainlock. unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (! completedAbruptly) {// If there are still tasks in the queue, ensure that at least one thread is executing the task int min = allowCoreThreadTimeOut? 0 : corePoolSize; if (min == 0  ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) = min) return; // replacement not needed } addWorker(null, false); // If user thread causes exception, add worker}}Copy the code


public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //CAS changes the thread pool state to SHUTDOWN advanceRunState(SHUTDOWN); // interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }Copy the code

InterruptIdleWorkers Interrupts idle threads

private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // w.lock() can get the lock if it is an idle thread. And w.u nlock (); Wrap the code to run the task, the idle thread blocks in runWorker's task = getTask() this if (! t.isInterrupted()  w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); }}Copy the code


final void tryTerminate() { for (;;) { int c = ctl.get(); / / the current state of the thread pool to judge the if (set (c) | | runStateAtLeast (c, TIDYING) | | (runStateOf (c) = = SHUTDOWN ! workQueue.isEmpty())) return; If (workerCountOf(c)! = 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } // Perform the thread pool state transition final ReentrantLock mainLock = this.mainlock; mainLock.lock(); If (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {terminated hook function terminated(); } finally {// switch the thread pool state to TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }Copy the code


public ListRunnable shutdownNow() { ListRunnable tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //CAS changes the thread pool state to STOP advanceRunState(STOP); // interruptWorkers(); Tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); // Return tasks; }Copy the code

ShutdownNow Process for closing a thread pool:

  1. Changing the state of the thread pool to STOP blocks all entries from the runWorker, as shown in the following statement:
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted()  runStateAtLeast(ctl.get(), STOP)))  ! wt.isInterrupted()) wt.interrupt();Copy the code

If a thread is still in while closing the thread pool; When the thread is interrupted, it will throw an exception when it gets the task again. For example, the interrupted thread will run to queue.take() again. Throws an exception directly

public static void main(String[] args) { BlockingQueue queue = new ArrayBlockingQueue(1); Thread thread = new Thread(() - { long count = 0; for (;;) { if (count  160) { count ++; System.out.println(count); } break; } try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); }}); thread.start(); thread.interrupt(); System.out.println(thread.isInterrupted()); } return result:  true 1 java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly( at java.util.concurrent.locks.ReentrantLock.lockInterruptibly( at java.util.concurrent.ArrayBlockingQueue.take( at com.networkbench.tingyun.demo.thread.Main.lambda$main$0( at com.networkbench.tingyun.demo.thread.Main$$Lambda$1/ Source) at the code
  1. Interrupt all sites that have been started
Copy the code
  1. Gets all outstanding tasks
tasks = drainQueue();
Copy the code
  1. Change the thread pool state
Copy the code
About (Moment For Technology) is a global community with thousands techies from across the global hang out!Passionate technologists, be it gadget freaks, tech enthusiasts, coders, technopreneurs, or CIOs, you would find them all here.