1. Use the development protocol for the number of threads

There are three mandatory rules on the use of threads and thread pools in the Alibaba development manual

[Mandatory] Specify a meaningful thread name when creating a thread or thread pool to facilitate backtracking in case of errors.

Example: customize thread factories and group them based on external characteristics, such as calls from the same machine room and assigning the machine room number to whatFeatureOfGroup


public class UserThreadFactory implements ThreadFactory {

private final String namePrefix;

private final AtomicInteger nextId = new AtomicInteger(1);

/** * define the thread group name, which is very helpful when using jStack to troubleshoot problems */

UserThreadFactory(String whatFeatureOfGroup) {

namePrefix = "From UserThreadFactory's " + whatFeatureOfGroup + "-Worker-";

}

@Override

public Thread newThread(Runnable task) {

String name = namePrefix + nextId.getAndIncrement();

Thread thread = new Thread(null, task, name, 0);

System.out.println(thread.getName());

returnthread; }}Copy the code

[Mandatory] Thread resources must be provided through a thread pool. Explicit creation of threads in an application is not allowed.

Note: The benefit of thread pooling is to reduce the time spent creating and destroying threads and the overhead of system resources, solving the problem of insufficient resources.

If you don’t use thread pools, you can run out of memory or “overswitch” by creating a large number of similar threads.

Thread pools don’t allow to be created by Executors, but by ThreadPoolExecutor

In this way, students can be more clear about the running rules of the thread pool and avoid the risk of resource exhaustion.

* If the thread pool object returns by Executors, it has the following disadvantages:

1) FixedThreadPool and SingleThreadPool

The allowed queue length is integer. MAX_VALUE, which may accumulate a large number of requests and result in OOM.

2) CachedThreadPool:

The number of threads allowed to be created is integer.max_value, which may create a large number of threads, resulting in OOM.

ThreadPoolExecutor source code

1. Constructor

UML diagrams:ThreadPoolExecutor has four constructors, all of which end up calling the same one:

2. Core parameters

  1. CorePoolSize => Number of core threads in the thread pool

  2. MaximumPoolSize => Maximum number of thread pools

  3. KeepAliveTime => Duration of keepalive in the thread pool after the worker thread is idle. If there are many tasks and the task execution time is short, you can increase the keepAliveTime to improve thread utilization.

  4. Unit => Time unit

  5. WorkQueue => Buffer queue used by the thread pool. The queue types are:

    • ArrayBlockingQueue, a bounded blocking queue based on an array structure that sorts tasks according to FIFO (first-in, first-out). Using this queue, the maximum number of threads that can be created in the thread pool is maximumPoolSize

    • LinkedBlockingQueue, an unbounded queue based on a linked list structure that sorts tasks according to FIFO (first-in, first-out) and has a higher throughput than ArrayBlockingQueue. Using this queue, the maximum number of threads that can be created in the thread pool is corePoolSize. Static factory methods Executor. NewFixedThreadPool () using the queue.

    • SynchronousQueue, a blocking queue that does not store elements. The add task must wait until another thread removes it, otherwise the add remains blocked. Static factory methods Executor. NewCachedThreadPool () using the queue.

    • PriorityBlokingQueue: An unbounded blocking queue with priority support. Using this queue, the maximum number of threads that can be created in the thread pool is corePoolSize.

  6. ThreadFactory => The factory used by the thread pool to create threads

  7. Handler => Processing policies of the thread pool for rejected tasks. There are four types of rejection policies:

    • AbortPolicy: Throws an exception when a new task cannot be processed. This is the default policy.

    • CallerRunsPolicy: Executes the task with the caller’s thread.

    • DiscardOldestPolicy: Discards the most advanced task in the blocking queue and executes the current task.

    • DiscardPolicy: Discards tasks directly.

3. The execute () method

  1. If fewer threads are currently running than corePoolSize, a new worker thread is created to perform the task (this step requires a global lock).

  2. If the currently running thread is greater than or equal to corePoolSize and BlockingQueue is not full, add the task to BlockingQueue.

  3. If BlockingQueue is full and the currently running thread is smaller than maximumPoolSize, a new worker thread is created to perform the task (this step requires a global lock).

  4. If the currently running thread is greater than or equal to maximumPoolSize, task will be rejected, and call the RejectExecutionHandler. RejectExecution () method. That is, a saturation policy is invoked to process the task.

3. Thread pool workflow

Execution logic description:

  1. Check whether the number of core threads is full. The number of core threads is related to the corePoolSize parameter. If the number of core threads is not full, create a thread to execute the task

  2. If the core thread pool is full, check whether the queue is full, which is related to the workQueue parameter, if not, join the queue

  3. If the queue is full, determine whether the thread pool is full, depending on the maximumPoolSize parameter, if not, create a thread to execute the task

  4. If the thread pool is full, a reject policy is used to process tasks that cannot be executed. The reject policy is related to the handler parameter

4. Executors Create/Return to ThreadPoolExecutor object (not recommended)

There are three methods for creating a return ThreadPoolExecutor object:

Executors#newCachedThreadPool => create cacheable thread pools

  • CorePoolSize => 0, the number of core thread pools is 0

  • MaximumPoolSize => integer. MAX_VALUE, the maximum number of threads can be considered unlimited

  • keepAliveTime => 60L

  • The unit = > seconds

  • workQueue => SynchronousQueue

Disadvantages: maximumPoolSize => integer. MAX_VALUE may cause OOM

2. Create a single-threaded thread pool

SingleThreadExecutor is a single-threaded thread pool with only one core thread:

  • CorePoolSize => 1, the number of core thread pools is 1

  • MaximumPoolSize => 1, only one non-core thread can be created

  • keepAliveTime => 0L

  • The unit = > milliseconds

  • workQueue => LinkedBlockingQueue

Disadvantages: LinkedBlockingQueue is a queue with length integer. MAX_VALUE, which can be considered unbounded. As a result, an unlimited number of tasks can be inserted into the queue, which can cause OOM exceptions if resources are limited

3. Executors#newFixedThreadPool => Create a fixed-length thread pool

  • CorePoolSize => 1, the number of core thread pools is 1

  • MaximumPoolSize => 1, only one non-core thread can be created

  • keepAliveTime => 0L

  • The unit = > milliseconds

  • workQueue => LinkedBlockingQueue

It is similar to SingleThreadExecutor, except that the number of core threads is different, and due to the use of LinkedBlockingQueue, it can cause OOM exceptions when resources are limited

5. Proper configuration of thread pools

Analyze the characteristics of tasks from the following perspectives:

  1. The nature of the task: CPU intensive task, IO intensive task, and hybrid task.

  2. Task priority: high, medium, low.

  3. Task execution time: long, medium, short.

  4. Task dependencies: Whether they depend on other system resources, such as database connections.

Tasks of different nature can be handled separately by thread pools of different sizes. You can use runtime.getruntime ().availableProcessors() to get the number of cpus on the current device.

  • Cpu-intensive tasks: Configure the smallest possible threads, such as the number of CPU cores +1 thread pool.

  • IO intensive tasks: Since threads are not performing tasks all the time, configure as many threads as possible, such as 2 ∗ Ncpu.

  • Hybrid tasks: Split them into one CPU intensive task and one IO intensive task, if possible. As long as the time difference between the two tasks is not too great, the throughput of decomposed execution is higher than that of serial execution. If the execution time of the two tasks is too different, there is no need to break them down.

Tasks with different priorities can be processed using the PriorityBlockingQueue, which allows tasks with higher priorities to be executed first. However, if high-priority tasks are always added to the blocking queue, low-priority tasks may never execute.

Tasks with different execution times can be assigned to thread pools of different sizes, or priority queues can be used to allow shorter tasks to be executed first.

Tasks that rely on the database connection pool, because threads submit SQL and wait for the database to return results, should be set to a larger number to make better use of the CPU.

It is recommended to use bounded queue, which can increase the stability and warning ability of the system. You can make it as big as you want, say thousands. With unbounded queues, the thread pool becomes larger and larger, potentially filling up memory and rendering the entire system unusable.

The rejection policies are recommended as follows:

Capture RejectedExecutionException anomalies in the program, to deal with the task in the catch exceptions. Use the CallerRunsPolicy reject policy for the default reject policy. This policy assigns tasks to the thread calling execute (usually the master thread), and the main thread cannot submit any tasks for a period of time so that the worker thread can handle the tasks in progress. At this point, the submitted thread will be stored in the TCP queue. When the TCP queue is full, the client will be affected. This is a smooth performance degradation. Discard tasks by using DiscardPolicy and DiscardOldestPolicy * If you create a ThreadPoolExecutor using the static method of Executors, Semaphore can be used to limit traffic during task execution and avoid OOM exceptions.

6. Rejection strategy

The following comparisons are recommended:

  • Capture RejectedExecutionException anomalies in the program, to deal with the task in the catch exceptions. For the default reject policy

  • Use the CallerRunsPolicy reject policy, which assigns tasks to the thread calling execute (usually the master thread), and the main thread cannot submit any tasks for a period of time, allowing the worker thread to handle the tasks in progress. The submitted thread will be stored in the TCP queue. When the TCP queue is full, the client will be affected, which is a gentle performance degradation

  • To customize the RejectedExecutionHandler interface, implement the RejectedExecutionHandler interface

  • * If the task is not particularly important, discard the task by using DiscardPolicy and DiscardOldestPolicy. * If you create a ThreadPoolExecutor object by using the static method of Executors, Semaphore can be used to limit traffic during task execution and avoid OOM exceptions.

  • See article: 8 Rejection Strategies

The five running states of the thread pool

Thread status:

Unlike thread states, thread pools also have the following states:

• RUNNING: The thread pool in this state can accept newly submitted tasks and process tasks in the blocking queue.

• SHUTDOWN: The thread pool in this state cannot receive newly submitted tasks, but can process tasks in the blocking queue. (The government service hall is no longer allowed for the masses to take the number, after dealing with the current and queuing government affairs, work off.)

When in the RUNNING state, calling the shutdown() method brings the thread pool to that state. Note: Shutdown () is implicitly called when finalize() is executed.Copy the code

• STOP: The thread pool in this state does not accept newly submitted tasks, does not process tasks in the blocking queue, and interrupts ongoing tasks. (There was no service at the government service hall, no numbers, no lines, no work at hand.)

When a thread pool is in the RUNNING or SHUTDOWN state, calling the shutdownNow() method brings the pool to that state.Copy the code

• TIDYING: If all tasks have terminated, workerCount =0.

The thread pool in terminated state calls the hook method terminated().Copy the code

• TERMINATED: this state is entered after the TERMINATED () hook method is executed. By default, nothing is done in TERMINATED () hook method.

Reference articles 【 】 【 1 】 the concurrent JAVA programming art [2] tech.meituan.com/2020/04/02/…