preface

Originally, I was going to write the CompletableFuture thread pool advanced article directly, but I always feel that it is not good to directly start without talking about the thread pool, so this article will first explain the two kinds of thread pool in Java.

Why do we need thread pools

When we need to process tasks asynchronously, the most common and easiest way is to start a new thread to do it. The creation and destruction of threads need TO consume CPU resources. When more and more asynchronous tasks are performed, we may lose control of CPU resources if we blindly open new threads to handle them. So first we need to control how many threads are opened.

Analogous to a database Connection pool, think about whether a Connection was created when accessing a database using JDBC, i.e., a Connection object. In web projects, there are usually many requests to access the database. When using the framework Spring+Mybatis, each database operation method needs to create a Connection if there is no transaction in the current context. This frequent creation of Connection objects is undoubtedly a waste of resources and will reduce interface throughput.

So we need a pool to maintain database connections, the database connection pool. We also need thread pools.

Introduction to Thread Pools

Thread pooling is a tool for managing threads based on pooling ideas, similar to database connection pooling.

The idea behind pooling is to create a certain number of Connection objects in advance and store them so that they can be retrieved directly from the pool each time they are needed, avoiding frequent creation and destruction. Thread pools cache Thread objects so that when a task is submitted, it can be directly submitted to the Thread in the pool without creating a new Thread. To summarize the advantages of thread pools:

  • The traditional thread creation method lacks the means to restrain the unlimited application of resources, which easily leads to the risk of resource exhaustion
  • By multiplexing threads in the thread pool, you can avoid frequent thread creation, save thread creation time, and improve response speed
  • The thread pool can specify the maximum number of threads, and the exceeded tasks will wait in the wait queue, so that there will not be a large number of threads running out of server resources

Traditional thread pool ThreadPoolExecutor

ThreadPoolExecutor is the most widely used thread pool implementation class, check out the class inheritance structure

The top-level interface Executor provides the idea of decoupling task submission from task execution. You do not need to worry about how to create a thread or schedule a thread to execute a task. You only need to provide a Runnable object and submit the execution logic of a task to an Executor. The Executor framework takes care of thread allocation and task execution. In other words, you just give it to the thread pool.

When using ThreadPoolExecutor, you can instantiate it directly. There are many overloaded constructors, so we’ll use the one with the most arguments.

CorePoolSize: number of core threads

The ideal number of threads to execute a task. When the number of running threads in the thread pool is smaller than corePoolSize, a new task is created to execute the task as it comes in

KeepAliveTime indicates the keepAliveTime of idle threads

When the number of threads is greater than corePoolSize, the maximum amount of time the excess idle threads can wait for a new task before terminating. That is, if no new task needs to be executed after this time, the idle thread will be destroyed. Obviously, keepAliveTime does not make sense when corePoolsize == maximumPoolSize.

WorkQueue indicates the workQueue

When the number of running threads in the thread pool is greater than or equal to corePoolSize, new tasks are queued when they arrive. Java provides queues such as LinkedBlockingQueue, ArrayBlockingQueue, LinkedBlockingDeque, etc., which I won’t go into detail here.

MaximumPoolSize Maximum number of threads

The maximum number of threads allowed in the thread pool, which many people assume is used for this purpose: when tasks in the thread pool exceed corePoolSize, threads continue to be created until the number of threads is less than maximumPoolSize. This understanding is completely wrong

What it really does: When the number of threads in the thread pool is greater than corePoolSize and the workQueue is full, the current number of threads is smaller than maximumPoolSize. If it is smaller than maximumPoolSize, the thread is created to execute the new task instead of the workQueue (why?). . If it is maximumPoolSize, the corresponding rejection policy is executed.

A lot of people might wonder, why is the thread pool creating new threads when the queue is full? Maybe the official design idea is that if the core thread count is full and the workQueue is full, our thread is not performing tasks efficiently, and it tries to make us more efficient by consuming more resources, which is equivalent to giving us a few spare threads.

Why does a newly created thread outside the core thread count execute a new task instead of taking it from the workQueue? Isn’t that cutting in line? That’s a good question, I don’t know…… That’s the official design, and we can write a code to verify that it’s true.

ThreadFactory specifies the threadFactory

Threads in the thread pool are created using thread factories, which allow you to customize the thread name, priority, and other Settings when creating a thread. In Java, the main thread cannot catch the exception thrown by the child thread directly. When using a thread pool, the ThreadFactory can set the child thread exception handling logic. Pass the following anonymous implementation as a parameter

r -> { Thread t = new Thread(r); T.setname (" custom thread name "); // Set thread name t.setPriority(10); // Set the priority. Setdaemon (false); / / whether the daemon thread t.s etUncaughtExceptionHandler ((t1, e) - > {System. Out. Println (um participant etMessage ()); // Handle child thread exception}); return t; }Copy the code

Normally we don’t use this parameter, just use the default.

Handler rejection policy

When the number of worker threads in the thread pool reaches maximumPoolSize, resubmitting a task is rejected, and the JDK provides four rejection strategies

  • AbortPolicy

Discarding the task and throw RejectedExecutionException anomalies. This is the default thread pool policy and is recommended for critical services so that exceptions can be detected in a timely manner when the subsystem cannot handle more concurrency

  • DiscardPolicy

Discard the task without throwing an exception. Using this policy may prevent us from discovering exceptions to the system, and is generally used for non-critical business

  • DiscardOldestPolicy

Discard the task at the top of the queue, which is the earliest task, and resubmit the rejected task

  • CallerRunsPolicy

The calling thread (the thread that submitted the task) handles the task. This is a good rejection strategy in some cases because the main thread will block while the task is being executed.

We can also implement our own rejection strategy by implementing RejectedExecutionHandler to override the rejection logic.

How ThreadPoolExecutor works

Let’s look at a diagram to understand how thread pools work

There are three cases of task allocation in thread pool. The first case is that the number of threads in the thread pool is smaller than the number of core threads and the task is executed directly. Second, the number of threads in the thread pool is greater than the number of core threads, and it is queued to wait. The third option is to reject the maximum number of threads in the pool.

The state of the ThreadPoolExecutor

  • RUNNING: Indicates the RUNNING status. Can accept newly submitted tasks and also process tasks in a blocking queue
  • SHUTDOWN: indicates the SHUTDOWN state. Can no longer accept newly submitted tasks, can process already saved tasks in the blocking queue
  • STOP: indicates the STOP state. Cannot accept a newly submitted task or process a task in a blocking queue, interrupting the thread that is processing the task
  • TIDYING: All tasks are terminated, workerCount is 0
  • TERMINATED: indicates the TERMINATED state. The state is entered after the terminated() method is executed.

ThreadPoolExecutor’s source code uses three bits of binary data to store its state, which looks like a bit of a trap……

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code

ThreadPoolExecutor Process for executing a task

How to choose the optimal number of core threads

Having said that, how do we actually determine the number of core threads in a thread pool? This problem can be divided into two types

  • computational

For computational tasks, it is good to set the number of core threads to CPU cores -1, because computational tasks that exceed the number of CPU cores do not help because computational tasks are CPU intensive and increase thread scheduling time. Why not set it to the same number of CPU cores? Because we have to consider the case where the CPU is occupied by the thread pool, the rest of the business can’t execute normally.

  • IO type

For IO type tasks. We need to understand that the IO operation is not actually done by the CPU (if the IO time-consuming operation is done by the CPU, it is really a waste of food……) Instead, the CPU delegates tasks to DMA (Direct Memory Access), and the CPU is free to do other things. Therefore, we can set the number of threads to exceed the core number and use resources wisely. But in fact, how much is the optimal setting is not necessarily, the normal approach is to constantly pressure test to try to find the most in line with the current server of an optimal parameter configuration.

Special thread pool ForkJoinPool

Java 7 provides a new thread pool implementation, ForkJoinPool. This type of thread pool is used in special situations, and there are many articles on the Internet about how awesome ForkJoinPool is, but it is not. In fact, ForkJoinPool is the best way to handle concurrent tasks in most cases. ForkJoinPool is used only for special situations where traditional thread pools are inadequate.

Why is ForkJoinPool needed

Let’s look at two scenarios first

The scene of a

Suppose you now have a concurrent task with multiple layers of dependencies. The following figure

There is a Task1 that needs to be completed, but to complete Task1 you must complete Task2, Task3, Task4, Task5, and to complete Task2 you must complete Task6…… And so there’s a chain of dependent tasks. There are some limitations to using ThreadPoolExecutor at this point.

ThreadPoolExecutor pool = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));
Copy the code

Making complete code assumes that we now have the thread pool configuration above, refer to the sample code ThreadPoolExecutorTest. Java consider task execution results. Since corePoolSize = maxPoolSize = 4, when the number of core threads reaches 4, no additional threads are created regardless of whether the queue is full. So let’s think about the process. When the application starts submitting tasks, the four tasks that the core thread works on are Task1, task2, task3, and task4, and the rest are thrown into a blocking queue. Task1,2,3, and 4 cannot be completed because their dependent tasks have not been completed. However, their dependent tasks are in the blocking queue and need to wait for tasks task1,2,3,4 to complete to free the thread to execute itself. You’ll find that they’ve been waiting for each other.

This is equivalent to a “deadlock”. No deadLock can be found using JPS or JSTACK, but a large number of waiting can be found. It’s actually a silly block, not a deadlock, but the block is irresolvable. Futuretask.get () calls awaitDone if the submitted task is not completed. Within the awaitDone method, a locksupport.park (this) call is called to block the current thread. FinishCompletion () will be called until the task the current thread is responsible for is complete, and there will be a locksupport.unpark (t) inside to unblock the current thread.

So the core of the loop is that the core thread’s call to locksupport.park (this) blocks, but has no chance to call locksupport.unpark (t).

It is easy to see that in previous thread pools, after submitting tasks, only one thread can execute them. Even if the task is complex and takes a very long time. For example, if there are five tasks A, B, C, D and E with five active threads, the execution of B, C, D and E takes only 1 second, and the execution of A takes 20 seconds, then when the execution of B, C, D and E ends, A still has to wait 20 seconds. This obviously failed to make good use of thread resource scheduling, so we began to think whether we could split the complex task A into multiple sub-tasks that could be completed in one second just like B, C, D and E, and then use the newly idle four threads to schedule and execute them, thus improving efficiency.

Scenario 2

If we now want to calculate the sum of 1 to N, which is very large, we can use ForkJoinPool to set a threshold for the sum of several elements, and then split the range indefinitely according to this threshold. For convenience, take 1 to 1000 as an example, set the threshold to 20, and the task will be split into

  • 1 \ ~ 500.501 \ ~ 1000
  • 1 \ ~ 250.251 \ ~ 500.501 \ ~ 750.751 \ ~ 1000
  • 1 \ ~ 125.126 \ ~ 250.251 \ ~ 375.376 \ ~ 500.501 \ ~ 625.626 \ ~ 750.751 ~ 875.876 ~ 1000
  • Continue to split…… Until the granularity of the split reaches a reasonable threshold that we set, what is a reasonable threshold? For the time being, this threshold is reasonable if the subtask can be executed immediately and the results can be obtained.

Such a complex task is broken down into simple sub-tasks for multiple threads to schedule computation. Efficient utilization of CPU resources.

ForkJoinPool application

The core idea of this class is the divide-and-conquer algorithm, which divides a problem of size N into K smaller sub-problems. It has two core methods fork(), join(), and fork() splits a large task indefinitely, down to the fineness of our threshold. For our example above, using ForkJoinPool.fork() will eventually be split into

The job of fork is to put the task into the thread-bound workQueue.

Join () blocks the current thread and waits for the result of the subtask.

ForkJoinPool Execution flowchart

Task to steal

Mission theft is the idea of doing more than one thing and helping one another. ForkJoinPool differs from ThreadPoolExecutor in that each thread has a workQueue. ForkJoinPool is a double-ended queue. When a thread (worker) operates on its own workQueue, the default LIFO operation is the stack. When a thread (worker) tries to steal a task from another WorkQueue, a FIFO operation (queue) is performed, i.e. stealing from the base.

Suppose threads A, B, C, and D each have 10 tasks. Assuming that thread A finishes its own task first, it will “steal” the task from the work queue of THREAD B, C, and D to help execute it. Thread A is doing something for other threads.

Why do idle threads steal tasks from the end of the work queue of other non-idle threads? If the stack is forked, the larger the task is, the more likely it is to be forked () at the base of the WorkQueue, and join() as soon as possible.

conclusion

Often ThreadPoolExecutor can already meet business requirements, and ForkJionPool can be used when specific didivide and conquer requirements are involved. ForkJoinPool this article introduces two types of thread pool scenarios. ForkJoinPool provides a basic overview of how to use the thread pool and how to execute the thread pool. The internal principles of ForkJoinPool are very complex and require extensive knowledge of concurrent programming and data structures to read the source code.

If this article has helped you, please like it and follow it! Your support is my motivation to continue to create!