The introduction

In the last article, we introduced a basic thread pool execution process called “[Java Concurrent Programming] Interview Essential Thread Pool” and its seven core parameters, as well as the role of each parameter and how to use the thread pool, leaving a few minor questions. It is recommended to read this article before you can read the previous article. In this article, we will analyze several small problems in the previous article

  • Does the thread pool distinguish between core and non-core threads?
  • How to ensure that core threads are not destroyed?
  • How can threads in a thread pool be reused?

Thread.start() can only be called once. Once that call is complete, the Thread is stopped and cannot call start again. If you are a Thread object has been launched to call again start method, produces: IllegalThreadStateException is unusual, but the Thread run method can be repeated calls. So here’s a common interview question: What’s the difference between the run() and start() methods in Thread? The ThreadPoolExecutor#execute method is an entry point for executing tasks in a thread pool

 public void execute(Runnable command) {
     if (command == null)
         throw new NullPointerException();
 		
     int c = ctl.get();
     // If the number of threads in the thread pool is smaller than corePoolSize, addWorker is called in the if condition to create a core thread to execute the task
     if (workerCountOf(c) < corePoolSize) {
         if (addWorker(command, true))
             return;
         c = ctl.get();
     }
     // The task is added to the workQueue if the current number of threads in the thread pool is greater than or equal to corePoolSize
     if (isRunning(c) && workQueue.offer(command)) {
     	// Get the state of the current thread and assign it to recheck
         int recheck = ctl.get();
         // If isRunning returns false, remove the task and implement the reject policy, i.e. rollback and requeue
         if (! isRunning(recheck) && remove(command))
             reject(command);
         // The thread pool is running, but there are no threads, so create a thread to execute the task
         else if (workerCountOf(recheck) == 0)
             addWorker(null.false);
     }
     // If the task fails to be placed in the workQueue, try creating a non-core thread to execute the task
     // Failure to create a non-core thread indicates that the thread pool is closed or saturated, and a denial policy will be implemented
     else if(! addWorker(command,false))
         reject(command);
 }
Copy the code

The excute method is the main business logic

  • If the current thread pool running thread is smaller than coreSize, a new thread is created to execute the task.
  • If the currently running thread is equal to or superfluous to coreSize(this can only happen if the coreSize is changed dynamically), put the task on the blocking queue.
  • If the queue is too full to put new tasks into it, a new thread needs to be created to execute the task.
  • If the number of newly created threads has reached the maximum, the task will be rejected.

AddWorker method

The core of the above method is the addWorker method,

private boolean addWorker(Runnable firstTask, boolean core) {
       // The first part is omitted...

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if(t ! =null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true; }}finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true; }}}finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
Copy the code

For this method let’s look at the Work class first

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
     
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        
        public void run(a) {
            runWorker(this);
        }
Copy the code

The Work class implements the Runnable interface, and the Run method calls the runWorker method

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // Create a new one
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
             // Check whether the task is empty. If not, execute the task directly
        	// If the task is empty, call the getTask() method to fetch the new task execution from the workQueue
            while(task ! =null|| (task = getTask()) ! =null) {
                w.lock();
                if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally{ afterExecute(task, thrown); }}finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally{ processWorkerExit(w, completedAbruptly); }}Copy the code

The runwork method will preferentially take the tasks bound to the worker. If no task is bound to the worker when the worker is created, the worker will get the task from the queue and execute it. After execution, the worker will not destroy it. Instead, the getTask method gets the task from the blocking queue through a while loop and calls task.run() to execute the task, thus achieving thread reuse. while (task ! = null || (task = getTask()) ! = null) the loop condition does not terminate as long as getTask returns a value that is not null, so the thread will always be running. So how to ensure that the core thread is not destroyed after the task is executed? Non-core thread destruction? The answer is in the getTask() method

private Runnable getTask(a) {
  // Timeout flag, false by default, or true if the workqueue.poll () method is called timeout
  // This tag is very important, as discussed below
  boolean timedOut = false;
  for (;;) {
    // Get the CTL variable value
    int c = ctl.get();
    int rs = runStateOf(c);

    // If the current state is greater than or equal to SHUTDOWN and the tasks in the workQueue are empty or the state is greater than or equal to STOP
    AQS reduces the number of worker threads and returns null
    If the workQueue is not empty, the thread pool can continue to execute the remaining tasks
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      // Call AQS to reduce the number of threads in the thread pool by one
      decrementWorkerCount();
      return null;
    }

    // Get the number of valid threads in the thread pool
    int wc = workerCountOf(c);

    // If allowCoreThreadTimeOut is actively enabled, or if the current worker thread is larger than corePoolSize, the thread can be reclaimed by timeout
    // allowCoreThreadTimeOut Defaults to false, that is, the core thread timeout is not allowed by default
    Threads outside the core thread are "temporary" threads that can be reclaimed by the thread pool at any time
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
    // There are two conditions for thread destruction:
    SetMaximumPoolSize = setMaximumPoolSize = setMaximumPoolSize = setMaximumPoolSize = setMaximumPoolSize (); To keep the maximum thread pool size smaller than maximumPoolSize,
    Timed && timedOut if true, timed out indicates that the current operation needs to be timed out. TimedOut is true, indicating that the thread has timed out from workqueue.poll ()
    Thread timeout collection can be triggered if either of the above two conditions are met
    if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
      // Try to reduce the number of threads in the thread pool by one with AQS
      if (compareAndDecrementWorkerCount(c))
        // Return null after successful subtraction, and the thread is reclaimed
        return null;
      // Otherwise retry
      continue;
    }

    try {
      // If timed is true, the timed obtain task is blocked; otherwise, the obtain task is blocked
      Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
      workQueue.take();
      if(r ! =null)
        return r;
      // If the poll timeOut fetch task times out, set timeOut to true
      // Continue the loop, and if the developer happens to have allowCoreThreadTimeOut enabled, then the thread satisfies timeout collection
      timedOut = true;
    } catch (InterruptedException retry) {
      timedOut = false; }}}Copy the code

So the key code that keeps threads from being destroyed is this one line of code

   Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
Copy the code

As long as timed is false the workqueue.take () will block, ensuring that the thread will not be destroyed. The value of timed is again controlled by allowCoreThreadTimeOut and whether the number of running threads is greater than coreSize.

  • As long asgetTaskMethod returnsnullOur thread will be recycled (runWorkerMethod will callprocessWorkerExit)
  • The source code for this method explains why we set it when we create the thread poolallowCoreThreadTimeOut =trueThe core thread will also be destroyed.
  • This way we can also answer the above question that thread pools do not distinguish between core threads and non-core threads.

The end of the

  • As a result of their talent and learning, it is inevitable that there will be mistakes, if you found the wrong place, but also hope that the message to me pointed out, I will correct it.
  • If you think the article is good, your forwarding, sharing, appreciation, like, message is the biggest encouragement to me.
  • Thank you for reading. Welcome and thank you for your attention.

The shoulders of giants pick applesObjcoding.com/2019/04/25/…