GitHub address of the dynamic-Threadpool project, click to visit

Thread Pool is a tool to manage threads based on pooling idea. Using a thread pool reduces the overhead of creating and destroying threads and prevents excessive threads from depleting system resources

Thread pools are widely used in business systems, but there is no good standard for thread pool initialization parameters in the industry. Thread pools in the online environment encounter some pain points due to business specificity, which leads to some thoughts on the use of thread pools in the editor

The online configuration cannot be properly evaluated

The biggest pain point is the inability to properly evaluate the configuration of key thread pool parameters. Parameters such as number of core threads, maximum number of threads, blocking queue size, etc. cannot be changed once online

Imagine if you have considered any of these scenarios after using thread pools with gusto for your business

  1. The core line, blocking queue, and maximum line are too small. As a result, the rejection policy frequently thrown by the interface is abnormal
  2. The core line is too small, the blocking queue is too small, and the maximum thread is too large, which leads to the increase of thread scheduling overhead and the decrease of processing speed. This is especially true if periodic bursts of traffic occur
  3. The core line is too small, and the blocking queue is too large, resulting in tasks piling up, interface responses, or program execution taking longer
  4. If the core thread is too large, too many idle threads in the thread pool occupy system resources, resulting in resource waste

Some of the above scenarios, affected by other parameters, are not absolutely true

I used to think that it would be OK to calculate the parameters of the thread pool in advance. What dynamics do you want?

By the way, in most business scenarios, thread pool parameters are at best pretty much the same. A thread pool that wastes a small amount of resources or triggers a small number of reject tasks while the business is running

However, some business fluctuations are not predictable. For example, there is a restaurant owner, from Monday to Thursday, there are not many guests, so usually there are not so many dishes, happened to come to a tourist group to eat, the restaurant stock is also stretched, and this kind of emergency situation and unpredictable

If the business system encounters this situation, it may need to re-estimate the thread pool parameters based on the incoming traffic, re-publish the system and check whether the current thread pool parameters are reasonable, and if not, it may need to go through the process again

What dynamic thread pools do is isolate parameter changes from system releases, as shown in the following flow chart

There was no proper monitoring

How do you find the parameter error scenario mentioned above, that is, thread pool runtime monitoring

Knowing some of the thread pool runtime metrics can greatly prevent the above problems. Here are some examples

  1. Monitors the current and peak load of the business thread pool
  2. Monitor the number of core threads, maximum threads, and active threads in the thread pool at different time periods
  3. Monitor indicators related to thread pool blocking queues to determine whether there is a risk of backlog of tasks
  4. Monitor the number of exceptions thrown by threaded tasks at run time and diagnose whether posted tasks are “healthy”
  5. Monitor the number of times the thread pool rejects policy execution to determine whether the thread pool parameters are reasonable

If the monitoring is matched with reasonable alarm information, it can largely avoid the development of online business hindsight, effectively prevent some problems and improve the repair speed of business bugs

The above thoughts on dynamic parameters, monitoring and early warning of thread pool are derived from The implementation Principle of Java Thread Pool and its Practice in The Business of Meituan technology blog.

How do I update parameters dynamically

Setting thread pool parameters dynamically involves two questions. What parameters can be updated dynamically? How to use dynamic update?

Let’s take a look at the set of parameters that the original thread pool API supports

CorePoolSize (number of core threads)

There is a minimum number of threads that are idle in the thread pool. You can change the number of core threads in the thread pool using #setCorePoolSize, as shown in the following flow diagram

Compared with several other dynamic parameters, the dynamic setting process of the number of core threads is complicated

  1. Judge setnew corePoolSizeMust be greater than 0 or an exception is thrown
  2. Directly replace the thread poolcorePoolSizenew corePoolSize
  3. Check whether the worker thread in the thread pool is larger thannew corePoolSizeIf the condition is true, the execution interrupts the redundant idle threads
  4. If the above conditions are not true, judgecorePoolSizeIs less thannew corePoolSizeIf the value is smaller than that, a new core thread needs to be created

As for step 4, there is a small tidbit about optimizations made by thread pool authors to ensure that thread resources are not wasted

When you perform step 4, you know from the comment that you don’t know how many threads need to be created, but to ensure that thread resources are not wasted, you calculate the number of threads k that need to be created based on workQueue#size and delata

Math#min will return the smaller of the two values. There are three scenarios that I can imagine

  1. Assuming thatworkQueue#size == 0, then k is also equal to zero, proving that no blocked task needs to be executed,k-- > 0If the expression is not true, it will not be executed#addWorker
  2. Assuming thatworkQueue#size > 0 && < delta, the task queue is waiting to be executed.k-- > 0If the expression is true, it will normally be createdworkQueue#sizeNew core threads, or other threads in the thread poolworkQueueWhen the task is cleared, the create process will jump out
  3. Assuming thatworkQueue#size > 0 && > deltaIn this case, at most delata new core threads are created

The lesson here is not to write code just for yourself, but to think globally about whether the code can be improved

If the number of core threads is set too large, it is a waste of resources after the peak period. Why should we adjust the number of core threads back

We can control this by setting a parameter when creating a thread pool. AllowsCoreThreadTimeOut Defaults to False, meaning that the core thread remains active even when idle. If True, the core thread uses keepAliveTime to timeout to wait for work

Core thread dynamic pit

It is important to note that the core thread count setting may fail. For example, if the maximum number of threads is 5 and the number of active threads in the current thread pool is 5, setting the number of core threads to 10 will not work.

Assume that the runtime state of the thread pool is 3 core threads, 5 maximum threads, and 5 active threads. At this point, call #setCorePoolSize to dynamically set the number of core threads to 10

After doing the above, call #execute to initiate task execution to the thread pool with the following internal processing logic

  1. If the core number of the current thread pool is 10 and the current worker thread is 5, #addWorker is invoked to add a thread
  2. #addWorkertoNumber of worker threads + 1In this case, the Worker is not really added to the thread pool
  3. Next, the wrapper class Worker of the thread will be created and Start will be executed, because the Worker itself holds the thread object and Start is also the operation thread to execute the task
  4. Access to task#getTaskOne of the reasons dynamic changes to the number of core threads do not work is that tasks in the queue are executed firstCheck whether the current number of worker threads is greater than the maximum number of threads
  5. Since there is an operation of +1 for Worker threads above, so the number of Worker threads in the pool is 6. The condition judgment expression is valid, and then -1 operation will be performed on the number of Worker threads and the Worker will be destroyed

Here is a snippet of thread pool get queue task #getTask for a quick look

Now that you know what the problem is, how do you fix dynamic setting failures

In fact, the method is very simple, that is, when setting the core thread, at the same time set the maximum number of threads can be. Dynamic Settings are valid as long as the number of worker threads is not greater than the maximum number of threads

Refer to how do I set thread pool parameters? Meituan gave an answer that blew the interviewer’s mind

MaximumPoolSize (maximum number of threads)

Represents the maximum number of threads that can be created in a thread pool. To reset the maximum number of threads, use #setMaximumPoolSize

The source code for setting the maximum number of threads in the thread pool is relatively simple and does not contain complex logic. The flow is as follows

  1. judgenew maximumPoolSizeIf the parameters are correct, an exception is thrown to terminate the process if the condition is not met
  2. Set up thenew maximumPoolSizeReplaces the maximum number of threads in the thread pool
  3. If the thread pool worker thread is greater thannew maximumPoolSize, the interrupt process is initiated for redundant workers

ThreadFactory (ThreadFactory)

The function of a thread factory is to create a thread for the thread pool. When a thread is created, you can set a custom thread name prefix (important), whether a thread is daemon, thread priority, and how a thread does not catch an exception

Although the thread factory can be reset after running, this is not recommended. Because already running threads are not destroyed, if previously running threads are not destroyed, it is very possible to have two semantically different threads in a thread pool

The sample code creates a thread pool and specifies the thread factory prefix name before. Run the task on the thread pool so that it has the before factory create thread inside

Then create a new after thread factory, replace the internal factory of the thread pool, and run the task to create the maximum number of threads. We can look at the log

Not surprisingly, the threads created by the two thread factories are separate from each other, and this will continue until there is no special operation. Therefore, it is not recommended to modify the thread factory in the business, otherwise the pit is our own ~

Other parameters

The remaining two dynamic adjustment parameters are relatively simple, not an example, we can look at the source code

  • KeepAliveTime
  • RejectedExecutionHandler

Another important parameter that needs to be updated dynamically is the size of the blocking queue. Some of you might be wondering, why don’t you just replace the blocking queue?

In fact, it is possible to directly replace the blocking queue, but if the direct replacement will cause a lot of problems, for example, the original queue stacking task is difficult to deal with, modify the capacity can solve the problem, there is no need to complicate. So when you do dynamic, you only care about the size of the blocking queue, not the replacement

In the case of LinkedBlockingQueue, there is no way to change the queue size in the source code because capacity, the variable representing the queue size, is decorated with the final keyword

Consider how to extend the blocking queue capacity modification based on this final modification

Dynamic blocking queues

In the thread pool, the producer consumer mode is used to cache tasks through a blocking queue from which worker threads fetch tasks. The interface to a work queue is the BlockingQueue. When the queue is empty, the thread fetching the element waits for the queue to become non-empty, and when the queue is full, the thread storing the element waits for the queue to become available

Blocking queues Dynamically set queue sizes in a number of ways. You can add some extensions to the original logic, or you can rewrite it in a specific way. The implementation is not fixed. There are several scenarios that can implement dynamic blocking queue functionality

  1. Copy the blocking queue source code implementation and add the #set method to make capacity variable
  2. Inherit the blocking queue and override the core method
  3. Inherit blocking queue, reflect dynamic change capacity

If you don’t need to rewrite the original blocking queue for additional functionality, the small editor prefers the first option, which makes the code cleaner and more stable. The following uses LBQ as an example

Replication blocking queue

This way a kind of simple and crude, the copy out a LinkedBlockingQueue code, directly to change a new name ResizableCapacityLinkedBlockIngQueue, then remove the capacity of modified final keyword, Plus a #setCapacity method

Override core methods

GitHub LinkedBQ Semaphore Implementation (GitHub LinkedBQ Semaphore Implementation)

The queue contains the size of the blocking queue and the self-fulfilling semaphore. Each time the blocking queue size is adjusted, the semaphore is increased or decreased

Reflection modification Capacity

The blocking queue can also be dynamically modified by reflection. Before the modification, we considered whether there would be thread unsafe problems in this way. We used Jmeter thread group and modified Capacity alternately, conducted several rounds of tests, and the test conclusion was that there was no thread safety problem

Using reflection to modify the blocking queue size is not recommended. First, it’s not elegant, and second, it’s not 100% compatible with subsequent JDK versions

All things considered, it is not recommended that reflection modify Capacity, although it can achieve desired results

To summarize

In this article, the small editor summarizes the widespread use of thread pools in the industry

  1. The parameters of the thread pool cannot be adjusted dynamically quickly
  2. Lack of proper monitoring leads to loss of initiative and effective prevention of potential problems

I wrote the first article in the series on dynamic thread pools based on the first kind of dynamic parameter tuning problem. Yes, there will be more articles on dynamic thread pools later, including but not limited to the following ideas

  1. How to realize real-time monitoring of thread pool and how to summarize historical indicator data
  2. Docking different platforms of alarm messages, to achieve configurable, elegant one-round multiplexing effect
  3. Can the dynamic thread pool synchronize the configuration center and connect to the Server for unified management to trigger parameter modification

These functions, in fact, in the United States group dynamic thread pool are implemented, but no open source. There were pain points in my own project, so I had to reinvent the wheel

Initially, it took about three days to write a version that relied on middleware and could dock with a platformized dynamic thread pool. Maybe it’s too easy. I think there’s something missing. Later on in a sleepless night imagination, if not rely on middleware?

Then there is the Dynamic ThreadPool DTP (dynamic-threadpool) project, and the groupId project is named IO. At present, DTP is only used as a project to exercise the development ability and product awareness of xiaobian, and the daily code development time is concentrated in the off-duty and Saturday days

DTP project is divided into two main bodies, Server and SpringBoot Starter. The Server serves as the registry of all client thread pools and stores historical indicator data for Console to retrieve and display. The Starter acts as a Jar that the client relies on to interact with the Server

At present, DTP has realized the dynamic parameter update interaction between the Server and the Client, and the source code implementation refers to the long polling and event monitoring mechanism before Nacos 2.0

Now that you have chosen not to rely on middleware, the problem becomes obvious: a single point of problem in an online environment. Because once a cluster is deployed, data cannot be broadcast between nodes, this area may be referred to Eureka as a distributed AP model later

Finally, I feel good after seeing the project. Please click ⚡️ Star, wish you all the best.

The code continues to be updated, source address: github.com/longtai94/d…