JAVA thread pool source code parsing

preface

This article will only parse ThreadPoolExecutor in 1.8, focusing on the execute method, the thread declaration cycle in the pool, and the entire process from the commit to the end of the Worker.

Thread pool building

Direct look at the source code:









Let’s take a look at all the thread pool inputs in turn:

  1. corePoolSizeCore threads

  2. maximumPoolSizeMaximum number of threads

  3. keepAliveTimeIdle threads maintain active wait times

  4. unitThe unit of waiting time

  5. workQueueWaiting queue

  6. threadFactoryThread factory

  7. handlerRejection policies

We’ll see what these parameters do in turn in the source code later.

execute

Take a look at itexecuteThe code:









See firstctl.private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));This atomic int is very powerful, using the low 29 bits to indicate the number of threads in the thread pool and the high 3 bits to indicate the running state of the thread pool:

  1. RUNNING: -1 << COUNT_BITS, i.e. 111, the thread pool in this state receives new tasks and processes tasks in the blocking queue
  2. SHUTDOWN: 0 << COUNT_BITS, that is, the high three bits are 000. The thread pool in this state will not receive new tasks, but will process tasks in the blocking queue
  3. STOP: 1 << COUNT_BITS, i.e. 001, a thread in this state does not receive new tasks, does not process tasks in a blocking queue, and interrupts running tasks
  4. TIDYING: 2 << COUNT_BITS, i.e. 010,terminatedThe thread pool is set to TIDYING after the method is executed

  5. TERMINATED: 3 << COUNT_BITS, that is, the high three bits are 011

The workerCount method takes the number of workers from the CTL. If it is smaller than the number of core threads, the addWorker operation will be attempted. The command is the task we want to run, and the addWorker method will be slowly resolved below. Return if added successfully; If not, update the value of c.

If the number of workers is greater than or equal to the number of core threads, check whether the thread pool is still running according to c (c is the value of CTL) and try to add tasks to the queue. A rollback operation is required (the rollback operation is to remove the task from the queue and process it with a reject policy). If the double Check passes but the number of workers is 0, the worker will be added.

Workqueue. offer(command) returns false. In this case, a direct attempt is made to add a worker. If this fails, a rejection policy will be executed to process the task.

As can be seen from the execute method, the process of execute:

  1. ifworkerIf the number of threads does not reach the number of core threadsworkerPerform this task
  2. ifworkerExceeds the number of core threads and is placed in a blocking queue for execution
  3. If the blocking queue is also full, it simply tries to add a new oneworkerExecute the task. If the attempt fails, the reject policy is executed.

Worker

All the way up hereWorkerIs a class in the thread pool:









As you can see, this class is inherited from AQS, and is also the parent class of various locks, which is convenient for concurrent control, and is implementedrunMethod,runIs to throw yourself as a taskrunWorkerMethods.

Glance at theWorkerAnd then look ataddWorkerMethods:









This is just the first half, so when you add it, let’s get itcIf the state is greater than or equal to SHUTDOWN, do not commit and return false.

If core is true, then the worker count is greater than or equal to corePoolSize. Otherwise, check whether the value is greater than or equal to maximumPoolSize. Return false if greater than or equal to. Otherwise, increase the number of workers through CAS and the increase succeeds, then jump out of the loop and start to create a new thread. Failure indicates that in the competition state, the new C is acquired again, and then the above process is recycled.

The thread pool maintains a set of threads called core threads. If enough core threads have been created, we can create more threads, but not more than maximumPoolSize. (If the values are the same, there will only be corePoolSize threads.)

Look at the second half:









First createWorker, in the case of a reentrant lock, theWorkerIn aworkers(theworkersIs ahashset).workerEnter the build method offirsttaskIs ourexecuteSubmitted tasks. We’ll talk about whyfirsttask. If you join theworkers, will performworkerIn thethreadthethreadIs the representativeworkerThe thread, then, thisworker runWhat is to be carried out, as mentioned above, carried outrunWorker(this). Now to be specificrunWorkerMethods:









This is the heart of the entire thread pool:

  1. workerReleases the lock.

  2. Access to taskfirsttask, why is it called the first task? Because when a worker is created, there will be a task at the beginning, and the task will be executed after it is finishedgetTaskMethod to retrieve tasks from a queue.

  3. You can customize tasks based on service scenarios before and after task executionbeforeExecuteandafterExecuteMethods.

  4. If there are no tasks in the queue,getTaskWill wait.

  5. getTaskOut of the task, will be executedtasktherunMethods. Note:This is not the start method!Always remember that this is an executionworker! whileworkerIt is already bound to a thread when it is created, so we are taking the task out inworkerHow to create the Worker threadWorkerClass.

To summarize, the entire process from addWorker to runWorker:

  1. Check the thread pool state and the number of core threads, and then createWorker,WorkerIn theworkersIn the.

  2. performworker.workerExecute the task assigned at creation time, and then block the queue for the task to execute.

The worker’s life cycle

Finally, let’s talk about worker’s declaration cycle, which is very important but neglected in most online blogs. Let’s take a closer look.

The created process has already been described, without further elaboration, just looking at the end of the worker’s life cycle. Recall above that the worker executes the firsttask assigned to him at the beginning and then takes the task from the queue to execute it. Let’s look at the code for this loop:

while (task ! = null || (task = getTask()) ! = null) { //.... }Copy the code

taskThe beginning isfirsttask, the implementation of thefirsttask,task = getTask(). We look atgetTask:









The thread pool state is checked when a task is fetched from a queue. Let’s go straight to where the quest is obtained:





We see it in the red boxkeepAliveTimeWe know by combining the meaning of this field, oneWorkerFetching a task from a blocking queue only waitskeepAliveTimeSo long time, if you can’t get it, it will returntaskisnullAnd combined with thewhile (task ! = null || (task = getTask()) ! = null)This sentence, iftaskfornull, then jump out of the loop, so jump out of the loop why, look at the end of the loop code:









So here’s the code for the whole while loop, pasted again for clarity, so we can see the wholewhileLoop intryIn the block, let’s see there’s a block calledcompletedAbruptlyIs set only at the end of the while loopfalse! ifwhileIn the loop, if an exception occurs during the execution of the task and the operation will be thrown, this parameter will not be set tofalseIt’s at initializationtrue.

No matterwhileNormal end, or abnormal interruption, will enterfinally,processWorkerExitMethods:









thiscompletedAbruptlyThe meaning of:If it is true, it means that the worker ends because the user’s task ends abnormally.

Look at the code to see what the exit worker does:

  1. If it’s a user problemworkerEnd, then decreaseworkerThe number.

  2. Global lock, the thread pool execution completedtaskQuantity plus thisworkerThe executed task is equivalent to a performance review.

  3. fromworkersTo remove thisworker.

  4. Attempted to terminate the thread pool.
  5. The thread pool state is less than STOP, and if the thread pool is interrupted due to a user problem, replenish one directlyworkerIt’s the following sentenceaddWorker(null, false)). If the interruption is due to a user problem, determine the number of core threads andworker countIf necessary, add another oneworker. Note: added hereworkerStudent: No allocationfirsttaskOf means this supplementaryWorkerThe task is fetched directly from the blocking queue.

At this point, we summarize the end of worker’s life cycle:

  1. workerIn awhileIn a loop, tasks are continuously fetched from the blocking queue.

  2. keepAliveTimeIf the task cannot be obtained within the specified time, or the task is successfully obtained but fails to be executed, exit the while loop and enterprocessWorkerExit.

  3. If you end theWorkerAfter that, it needs to be replenishedworker, then add one, otherwise, oneworkerEnd of life, waiting to be GC (fromworkersAfter it was removed, thisworkerNo more quotes).

conclusion

At this point, the source code analysis of the entire thread pool is complete. We can see that the most important thing in the thread pool is the whole life cycle of workers, which is closely related to the state flow of the thread pool and has a very rigorous structure. One last homage to Doug Lea.

Blog.shiwuliang.com/2017/11/03/… (Other code can follow me on Github)