preface

Have you used thread pools in Java? How do you use thread pools?

This phrase has come up several times during the interview process. I even want the title to be [Java Thread pool] But it’s a bit too cliche. Although thread pools are a cliche, I want to write this article to impress myself, but I want to write it in a special way.

The thread pool

The purpose of using thread pools

Why do we use thread pools first?

  • Threads are scarce resources and cannot be created frequently. Creating and destroying threads is also expensive.
  • To achieve decoupling, threads are created and executed separately for easy maintenance.
  • To reuse, creating and destroying threads is expensive, so you can put them in a pool and reuse them for other tasks.
How is a thread pool created step by step

The first edition

Normally we create a thread to execute a task like this:

new Thread(r).start();
Copy the code

But this is the most basic approach, and there may be many places in our project where we need to create a new thread. In order to reduce duplicate code, we will put the code to create the thread in a tool class, and then provide the tool method, when using directly call this method.

The second edition

/** * Define the interface (task executor) */
public interface Executor {
    /** * Execute the task *@paramRunnable Thread task */
    void execute(Runnable runnable);
}
Copy the code
/** * Implementation: create thread directly. * /
class ExecutorImpl implements Executor {
    public void execute(Runnable r) {
        newThread(r).start(); }}Copy the code

This approach allows for reuse of the code that creates the thread, but not of the thread resources, which are created when 1000 threads are needed in 1000 places.

The third edition

In order to realize resource reuse, add a blocking queue. When the task of creating a thread comes, it will be put into the queue first, and then use a thread (Worker) to process the task. This completes the reuse of thread resources, with only one thread going back and forth all the time, processing tasks in the queue.

In this way, thread resources can be reused and decoupled from submitting tasks and processing tasks. However, there is a bottleneck because there is only one thread to process the task, so the number of threads needed to process the task is best determined by the business scenario, so let’s set this value as a parameter to be passed when creating the thread pool, called corePoolSize.

In addition, the task queue should have capacity, but the capacity should be configured according to the service scenario. In addition, some rules can be customized for the task queue, for example, the task queue is queued according to certain rules. So we also configure the task queue as a parameter and pass it in when we create the thread pool. Let’s call it workQueue.

Task when the queue is full, the task will be abandoned, but if it is important to the business mission, also cannot abandon, so the task when the queue is full, no resource processing tasks in the thread pool, refused to strategy, we also according to the business scenario to determine, it also introduced a refusal strategies, when created the parameter name is called: RejectedExecutionHandler.

Continue to optimize

Although the effect is greatly optimized after adding the above three parameters, it can still be further optimized:

  • You don’t have to create itcorePoolSizeThe number of threads we can increase by a variableworkCountTo record that a worker thread has been created, so that onlyworkCount<corePoolSizeWhen we create threads to perform tasks, whenworkCount>CorePoolSizeWhen the task comes, go to the queue.
  • When adding the rejection policy, I define an interface:RejectedExecutionHandlerUsers can then implement the interface themselves to complete their own rejection policy.
  • Add a thread factory entry:ThreadFactoryThis ensures that each time a thread is created, it does not have to be created manuallyThreadFactoryTo get the thread, and you can also add some thread identifiers.

The fourth edition

While version 3 of the thread pool is good enough for everyday work, it is still not flexible enough, meaning that it should be more powerful when tasks are submitted frequently and less powerful when tasks are submitted infrequently.

This version of the thread pool is not elastic.

If the number of submitted tasks increases dramatically in a certain period of time, and corePoolSize and queue are full, then the rejection policy is used.

You might think, well, I could increase the value of corePoolSize, and that would create more threads to handle the task, but the number of submitted tasks increases dramatically, but after a certain period of time, there are so many threads created that most of them would be idle. It’s a waste of resources.

This leads to a dilemma: Setting corePoolSize too large is bad, setting corePoolSize too small is bad.

In order for the thread pool to scale flexibly, we can add another parameter to it: maximumPoolSize, which represents the maximum number of threads.

whencorePoolSizeandworkQueueWhen both are full, newly submitted tasks can still be created for new threads to process, and these exceedcorePoolSizeThe created thread is calledNon-core thread. whencorePoolSizeAnd the number of non-core threadsmaximumPoolSizeThen execute the reject policy.In this way,corePoolSize, responsible for thread usage in normal times,maximumPoolSizeResponsible for temporary capacity expansion during peak delivery time.

However, the current approach only takes into account the expansion during the peak period of submitted tasks, but the peak period is only temporary. After this peak period, non-core threads remain idle, which is a waste of resources. Therefore, we set a parameter for the idle active time of non-core threads:keepAliveTime, so that when the number of non-core threads, idle time exceeds this value, the thread is destroyed, freeing resources.In this version of the thread pool, it can be temporarily expanded during the peak time of submitting tasks, and can be timely reclaimed during the low time of non-core threads, thus saving resources. Really did itAnd freely.

With the improvements in the previous versions of the thread pool, the result is a thread pool that is basically similar to Java. This will also give you a better understanding of the meaning of the key parameters passed in when creating the thread pool.

Here are a few common points of examination for thread pools

Thread pool blocking queues in Java

  • ArrayBlockingQueue: bounded queue, access queues in the order in which they are blocked. By default, access queues that do not guarantee fairness ~ Reduce throughput if fairness is to be guaranteed. The bottom is to rely onReentrantLockIn each case, it depends onReentrantLockLock to complete blocking.
 public ArrayBlockingQueue(int capacity) {
     this(capacity, false);
 }
 public ArrayBlockingQueue(int capacity, boolean fair) {
     if (capacity <= 0)
         throw new IllegalArgumentException();
     this.items = new Object[capacity];
     lock = new ReentrantLock(fair);
     notEmpty = lock.newCondition();
     notFull =  lock.newCondition();
 }
Copy the code
  • LinkedBlockingQueue: lists based blocking queues in first-in, first-out order, default integer.max_value if queue length is not set. Therefore, the LinkedBlockingQueue is considered unbounded when the queue length is not set. When a queue length is specified and becomes a bounded queue, the throughput of LinkedBlockingQueue is usually higher than that of ArrayBlockingQueue.
  • SynchronousQueue: a blocking queue that does not store elements. Each insert operation must wait until another thread calls the remove operation, otherwise the insert operation remains blocked. This queue can be used when tasks are not allowed to wait in the queue.
  • DelayQueue: The element queue is obtained after a specified time. It is an unbounded blocking queue.
  • PriorityBlockingQueue: Priority Sort queue, which sorts tasks according to a certain priority. The default is small top heap.
  • LinkedBlockingDeque: double – ended blocking queue based on linked list.

What default thread pools do Java provide, and why are they not recommended for actual development?

  • Executors.newCachedThreadPool(); SynchronousQueue, which does not store waiting tasks, and the maximum number of threads is integer.max_value. So when task submissions peak, it is equivalent to an unlimited number of creation threads. And the idle time is 60 seconds, QPS peak will eventually exhaust the server resources, so it is not recommended in real applications.
public static ExecutorService newCachedThreadPool(a) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
Copy the code
  • Executors.newFixedThreadPool(int nThreads); A thread pool with a fixed number of threads can be reused.
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

public static ExecutorService newSingleThreadExecutor(a) {
     return new FinalizableDelegatedExecutorService
         (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                 new LinkedBlockingQueue<Runnable>()));
 }
Copy the code

Core number of threads and the maximum number of threads is equal to the thread pool, and block the limitless task queue is a queue, so that when the number of threads processing tasks to the core number of threads, then the mission will be submitted to the blocking queue, but blocking queue is unbounded, thus submitted task peak is likely to cause the task has been piling up in the queue, Exceeding memory capacity eventually results in memory overflow.

  • Executors.newScheduledThreadPool(int corePoolSize); : a fixed-length thread pool that supports timed and periodic task execution. The maximum number of threads in this thread pool is integer. MAX_VALUE. Resources may be exhausted. Therefore, do not use this command in common scenarios.
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
Copy the code
  • Executors.newSingleThreadExecutor();

This thread pool creates a thread pool with a fixed number of threads, and the task queue is an unbounded LinkedBlockingQueue, which risks OOM if the task queue is added indefinitely.

public static ExecutorService newSingleThreadExecutor(a) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
Copy the code
  • Executors.newWorkStealingPool(); : a thread pool with preemptive operations.

The number of concurrent threads per thread is passed in as a parameter. This is different from the previous four thread pools, which all have core threads, maximum threads, and so on. This uses the number of concurrent threads to solve the problem. This thread pool does not guarantee that tasks will be executed sequentially, as WorkStealing means.

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null.true);
}

Copy the code

Which denial policies are provided by thread pools in Java

  • AbortPolicy: The default policy is saturation. When the tasks cannot be submitted in dealing with direct selling RejectedExecutionException, users will be able to capture this exception.
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
 	throw new RejectedExecutionException("Task " + r.toString() +
                                      " rejected from " +
                                      e.toString());
}
Copy the code
  • CallerRunsPolicyIf the thread pool is unable to handle the task, the main thread can not submit the task, so that the thread pool can have time to deal with the backlog of tasks.
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}
Copy the code
  • DiscardOldestPolicyIf the blocking queue uses PriorityBlockingQueue, the task with the highest priority will be discarded. Therefore, it is not recommended to use this policy when the blocking queue is PriorityBlockingQueue.
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     if (!e.isShutdown()) {
         e.getQueue().poll();
         e.execute(r);
     }
 }
Copy the code
  • DiscardPolicy: This is a rather capricious strategy. When the thread pool can’t handle the task, it will simply discard it, and when new tasks come in, it will discard it.
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}Copy the code
  • RejectHandler:

Throw reject exception directly.

public void rejectedExecution(Runnable r, java.util.concurrent.ThreadPoolExecutor executor) {
    throw new RejectedExecutionException();
}
Copy the code

How to configure the number of core threads and the maximum number of threads in Java thread pool

Thread pools can be separated depending on the task being submitted.

  • To handle CPU-intensive tasks, the number of threads should be small and can beN (number of CPU cores) + 1orN(Number of CPU cores) x 2If the task itself is cpu-bound, too many threads will only increase the overhead of thread switching, but not the throughput. However, a longer queue may be needed to buffer.
  • I/ O-intensive tasks, which are slow and have a small number of I/O tasks, need to consider the number of threads instead of a large queue. More threads are required than computational tasks, depending on the specific I/O blocking duration.

In practice, however, some tasks consume both CPU and I/O resources. At this time, a solution similar to Meituan technology can be adopted to monitor the thread pool status information in real time, and then adjust the data of the thread pool.

The following thread pool properties can be used when monitoring thread pools:

  • getTaskCount(): Number of tasks that need to be executed by the thread pool.
  • completedTaskCount: The number of tasks completed by the thread pool during execution, less than or equal to taskCount.
  • largestPoolSize: The maximum number of threads that have ever been created in the thread pool. If the value is equal to the maximum number of threads in the thread pool, the thread pool has been full.
  • getPoolSize(): The number of threads in the thread pool. Threads in the thread pool will not be automatically destroyed if the pool is not destroyed, so this size will only increase.
  • getActiveCount(): Gets the number of active threads.

Reference:

  • You call this shit a thread pool?
  • [The Art of Java Concurrent Programming]

Welcome to follow the wechat official account: Jimoer