Thread pools can be created by following the following steps: * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *

The necessity and role of thread pools

Threads can fully and reasonably coordinate the use of CPU, memory, I/O and other system resources, but the creation of threads requires the creation of virtual machine stacks, local method stacks, program counters and other thread private space, which needs to be reclaimed when the thread is destroyed. The frequent creation and destruction of threads can be a waste of system resources, so thread pools are needed to manage threads and improve thread reuse (of course, threads do more than that).

What threads do:

  • Reuse thread, control the maximum number of concurrent.
  • Implement timing, cycle and other time-related functions.
  • Implement task queue cache policy and reject mechanism.
  • Isolate the threading environment. For example, the file upload service and the data query service are on the same server. The file upload service takes a lot of time. If the file upload service and the data query service use the same thread pool, the file upload service will affect the data query service. File upload and data query services can be isolated by configuring a separate thread pool to prevent them from impacting each other.

How do I create a thread pool

Sure, you can use the Executors class to create a thread pool, but it’s not recommended (for reasons discussed below). The Executors class is just a static factory that provides static methods for creating a thread pool (while internally masking the thread pool configuration details). The real thread pool class is ThreadPoolExecutor. The ThreadPoolExecutor constructor looks like this:

public ThreadPoolExecutor(int corePoolSize,Copy the code
                          int maximumPoolSize,Copy the code
                          long keepAliveTime,Copy the code
                          TimeUnit unit,Copy the code
                          BlockingQueue<Runnable> workQueue,Copy the code
                          ThreadFactory threadFactory,Copy the code
                          RejectedExecutionHandler handler) {Copy the code
    if (corePoolSize < 0 ||Copy the code
//maximumPoolSize must be greater than or equal to 1 as well as corePoolSizeCopy the code
        maximumPoolSize <= 0 ||Copy the code
        maximumPoolSize < corePoolSize ||Copy the code
        keepAliveTime < 0)Copy the code
        throw new IllegalArgumentException();Copy the code
    if (workQueue == null || threadFactory == null || handler == null)Copy the code
        throw new NullPointerException();Copy the code
    this.acc = System.getSecurityManager() == null ?Copy the code
        null :Copy the code
    AccessController.getContext();Copy the code
    this.corePoolSize = corePoolSize;Copy the code
    this.maximumPoolSize = maximumPoolSize;Copy the code
    this.workQueue = workQueue;Copy the code
    this.keepAliveTime = unit.toNanos(keepAliveTime);Copy the code
    this.threadFactory = threadFactory;Copy the code
    this.handler = handler;Copy the code
}Copy the code

Parameter Description:

  1. CorePoolSize: number of core threads. If equal to 0, the thread in the thread pool is destroyed when no task request comes in after the task is finished. If it is greater than 0, the core thread will not be destroyed even after the local task completes. If the value is too high, system resources are wasted. If the value is too small, threads are frequently created.
  2. MaximumPoolSize: indicates the maximum number of threads. Must be greater than or equal to 1 and greater than or equal to corePoolSize. If equal to corePoolSize, the thread pool size is fixed. If the value is greater than corePoolSize, a maximum of maximumPoolSize threads are created to execute tasks, and other tasks are added to the workQueue cache queue. If the workQueue is empty and the number of tasks executed is smaller than maximumPoolSize, Thread idle time exceeding keepAliveTime is reclaimed.
  3. KeepAliveTime: indicates the idle time of a thread. When the idle time in the thread pool reaches the keepAliveTime value, the thread is destroyed until there are only corePoolSize threads left. By default, keepAliveTime takes effect only when the maximum number of threads in the thread pool is greater than corePoolSize. If allowCoreThreadTimeOut is set to true, keepAliveTime takes effect even if the maximum number of threads in the thread pool is equal to corePoolSize.
  4. Unit: TimeUnit indicates the TimeUnit.
  5. WorkQueue: indicates the cache queue. When the number of threads requested is greater than maximumPoolSize, the thread enters the BlockingQueue BlockingQueue.
  6. ThreadFactory: threadFactory. Threads used to produce a set of identical tasks. It is used to set the prefix of the generated thread, whether it is a daemon thread, and the priority. Setting meaningful name prefixes will help you know which thread factory created the thread when performing virtual machine analysis.
  7. Handler: Executes the reject policy object. When the task cache limit is reached (that is, the number of tasks that can be stored by the workQueue parameter), a reject policy is implemented, which can be regarded as simple traffic limiting protection.

Thread pool-related class structure



The ExecutorService interface inherits from the Executor interface and defines ways to manage threading tasks. ExecutorService abstract class AbstractExecutorService provides partial implementations of submit and invokeAll(), but the core executor.execute () method is not implemented. Because all tasks are executed in this method, different thread pools have different implementation strategies, leaving the implementation to a specific thread pool.

Executors core method

  • Executors. NewFixedThreadPool: create a fixed number of threads thread pool. The number of core threads is equal to the maximum number of threads, there are no idle threads, and keepAliveTime is 0.
public static ExecutorService newFixedThreadPool(int nThreads) {Copy the code
    return new ThreadPoolExecutor(nThreads, nThreads,Copy the code
                                  0L, TimeUnit.MILLISECONDS,Copy the code
                                  new LinkedBlockingQueue<Runnable>());Copy the code
}Copy the code

  • Executors. NewSingleThreadExecutor: create a single thread thread pool, the core number of threads and the maximum number of threads to 1, the equivalent of serial execution.
public static ExecutorService newSingleThreadExecutor() {Copy the code
    return new FinalizableDelegatedExecutorServiceCopy the code
        (new ThreadPoolExecutor(1, 1,Copy the code
                                0L, TimeUnit.MILLISECONDS,Copy the code
                                new LinkedBlockingQueue<Runnable>()));Copy the code
}Copy the code

  • Executors. NewScheduledThreadPool: create support timing and periodic task execution thread pool. The maximum number of threads is integer.max_value. OOM risk exists. KeepAliveTime is 0, so the worker thread is not reclaimed.
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {Copy the code
    return new ScheduledThreadPoolExecutor(corePoolSize);Copy the code
}Copy the code
Copy the code
public ScheduledThreadPoolExecutor(int corePoolSize) {Copy the code
/ / ScheduledThreadPoolExecutor is the parent of the ThreadPoolExecutor, maximum number of threads for Integer. MAX_VALUECopy the code
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,Copy the code
          new DelayedWorkQueue());Copy the code
}Copy the code

  • Executors. NewCachedThreadPool: core number of threads is 0, the largest number of threads for Integer. MAX_VALUE, is a highly scalable thread pool. OOM risk exists. KeepAliveTime is 60. If the worker thread is idle and exceeds keepAliveTime, the thread is reclaimed.
public static ExecutorService newCachedThreadPool() {Copy the code
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,Copy the code
                                  60L, TimeUnit.SECONDS,Copy the code
                                  new SynchronousQueue<Runnable>());Copy the code
}Copy the code

  • Executors. NewWorkStealingPool: JDK8 introduction, create hold enough thread thread pool to support a given parallelism, and by using multiple queue reduce competition.
 public static ExecutorService newWorkStealingPool() {Copy the code
// The number of cpus is set to parallelism by defaultCopy the code
     return new ForkJoinPoolCopy the code
         (Runtime.getRuntime().availableProcessors(),Copy the code
          ForkJoinPool.defaultForkJoinWorkerThreadFactory,Copy the code
          null, true);Copy the code
 }Copy the code

Do not create a thread pool by using Executors. Cause:

Executors. NewCachedThreadPool and Executors. NewScheduledThreadPool two methods maximum number of threads for Integer. MAX_VALUE, if reached upper limit, no task server can continue to work, An OOM exception will definitely be raised.

Executors. NewSingleThreadExecutor and Executors. NewFixedThreadPool workQueue parameters of the two methods for new LinkedBlockingQueue < Runnable > (), The capacity is integer. MAX_VALUE. If the instantaneous request is very large, OOM risk may occur.

Conclusion: the above five core method besides the Executors. NewWorkStealingPool method, other methods are OOM risk.

public LinkedBlockingQueue() {Copy the code
    this(Integer.MAX_VALUE);Copy the code
}Copy the code
Copy the code
public LinkedBlockingQueue(int capacity) {Copy the code
    if (capacity <= 0) throw new IllegalArgumentException();Copy the code
    this.capacity = capacity;Copy the code
    last = head = new Node<E>(null);Copy the code
}Copy the code

How do I customize a ThreadFactory

public class UserThreadFactory implements ThreadFactory {Copy the code
    private final AtomicInteger threadNumber = new AtomicInteger(1);Copy the code
    private final String namePrefix;Copy the code
Copy the code
    public UserThreadFactory() {Copy the code
        namePrefix = "UserThreadFactory's " + "-Worker-";Copy the code
    }Copy the code
Copy the code
    public Thread newThread(Runnable r) {Copy the code
        String name = namePrefix + threadNumber.getAndIncrement();Copy the code
        Thread t = new Thread(null, r, name);Copy the code
        if (t.isDaemon())Copy the code
            t.setDaemon(false);Copy the code
        if(t.getPriority() ! = Thread.NORM_PRIORITY)Copy the code
            t.setPriority(Thread.NORM_PRIORITY);Copy the code
        return t;Copy the code
    }Copy the code
}Copy the code

As shown in the code above, the ThreadFactory interface is implemented and properties such as the name, daemon, and thread priority are set in the newThread method. Doing so can help quickly locate issues like deadlocks, StackoverflowErrors, and so on. As shown in the figure below, the green box custom thread factory has significantly more additional information than the thread name created by the blue default thread factory.



Thread rejection policy

ThreadPoolExecutor provides four public internal static classes:

  • AbortPolicy: by default, discarding the task and throw RejectedExecutionException anomalies.
  • DiscardPolicy: Discards the task without throwing an exception (not recommended).
  • DiscardOldestPolicy: Discards the longest waiting task in the queue and adds the current task to the queue.
  • CallerRunsPolicy: Calls the run() method of the task to bypass the thread pool and execute it directly.

A friendly rejection strategy:

  • Save to the database for peak load filling. Bring it up for execution at leisure.
  • Go to a prompt page
  • Print log

Custom rejection policy:

// Implement the RejectedExecutionHandler interfaceCopy the code
public class UserRejectedHandler implements RejectedExecutionHandler {Copy the code
Copy the code
    private static final Logger logger = Logger.getLogger(UserRejectedHandler.class);Copy the code
Copy the code
    @OverrideCopy the code
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {Copy the code
// You can also get the Runnable R task, record it in the database, and wait for the peak traffic to be executedCopy the code
        logger.warning("task rejected" + executor.toString());Copy the code
    }Copy the code
}Copy the code

END

The author is a young man who loves the Internet and Internet technology and is keen on sharing. If you are like me, I would like to be your friend and share every valuable knowledge with you. If you like the author, please like + forward + follow!

Recently, I have sorted out Java architecture documents and study notes files, architecture videos and HD architecture advanced learning maps to share with everyone for free (including Dubbo, Redis, Netty, ZooKeeper, Spring Cloud, distributed, high concurrency and other architecture technical information). I hope it can help you review before the interview and find a good job, but also save you time to study in the online search materials, you can also pay attention to me and there will be more dry goods to share.

Like, forward, and follow our products. After adding our assistant VX: XuanWO008, we can get high-quality technical materials such as interview materials for BAT Factories and VIP video courses for senior architects.