preface

Java’s thread pool is best known as the ExecutorService interface, an API added to the java.util.concurrent package in jdk1.5 that greatly simplifies multithreaded code development. Whether you use FixedThreadPool or CachedThreadPool the implementation behind it is ThreadPoolExecutor. ThreadPoolExecutor is the product of a typical cache pooling design, because pools have sizes, and rejection policies are involved when the pool is not large enough to hold. There are already four preset thread pool rejection policies in the JDK. Here’s a look at how these policies are used and what other rejection policies can be extended.

Pooling design idea

Pool design should not be a new term. We commonly such as Java thread pool, JDBC connection pool, Redis connection pool is the representative implementation of this kind of design. The problem is to offset the cost of each resource acquisition, such as the cost of creating a thread, the cost of acquiring a remote connection, etc. Just like you go to the canteen to make a meal, the old woman who makes a meal will first fill several portions of the meal and put them there, and you will directly take the lunch box to add food, without temporarily filling rice and making dishes, and the efficiency is high. In addition to initializing resources, pooling design includes characteristics such as the initial value of the pool, the active value of the pool, the maximum value of the pool, and so on, which can be mapped directly to the Java thread pool and database connection pool member properties.

When the thread pool triggers the reject policy

Unlike data source connection pools, thread pools have an additional blocking queue to buffer in addition to their initial size and pool maximum. When the number of connection requests from the data source connection pool exceeds the maximum value of the connection pool, the denial policy is triggered. The denial policy usually blocks waiting for the set time or throws exceptions directly. The trigger time of the thread pool is shown below:

As shown in figure, want to know what the thread pool when trigger refused to rough, it is necessary to define the specific meaning of the above three parameters, are the result of the three parameters of the overall coordination, rather than simply exceed the maximum number of threads will trigger the thread refused to rough, when submit task number greater than corePoolSize will be priority in the queue buffer, only fill the buffer, If the current running task is larger than maxPoolSize, a new thread will be created to process it. If the current number of submitted tasks is greater than (maxPoolSize + queueCapacity), the thread pool rejection policy is triggered.

There are four thread pool rejection policies built into the JDK

Reject the policy interface definition

The JDK defines a thread pool denial policy interface, as follows:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}Copy the code

When the reject policy is triggered, the thread pool will invoke the specified policy you set, passing the current submitted task and the thread pool instance itself to you for processing.

CallerRunsPolicy (CallerRun Policy)

    public static class CallerRunsPolicy implements RejectedExecutionHandler {

        public CallerRunsPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if(! e.isShutdown()) { r.run(); }}}Copy the code

Function: When the reject policy is triggered, it is handled by the current thread submitting the task as long as the thread pool is not closed.

Usage scenarios: generally not allowed in failure, the performance requirement is not high, concurrent scenarios using small amount, usually because the thread pool will not shut down, which is submitted by the task will be run, but because it is the caller thread its execution, when repeatedly submitting, can block the subsequent task execution, performance and efficiency naturally slow.

AbortPolicy (AbortPolicy)

    public static class AbortPolicy implements RejectedExecutionHandler {

        public AbortPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from "+ e.toString()); }}Copy the code

Function: When the reject policy is triggered, the exception that refuses to execute is thrown directly. The abort policy means that the current execution process is interrupted

Usage scenarios: There are no special scenarios for this, but it is important to handle the exceptions thrown correctly. The default policy for ThreadPoolExecutor is AbortPolicy. The ExecutorService interface family of ThreadPoolExecutors does not display a rejection policy, so it does by default. Note, however, that the thread pool instance queues in ExecutorService are unbounded, meaning that overflowing memory will not trigger a rejection policy. When customizing your own thread pool instance, use this policy to handle exceptions that are thrown when the policy is triggered, because it interrupts the current execution process.

DiscardPolicy

    public static class DiscardPolicy implements RejectedExecutionHandler {

        public DiscardPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }Copy the code

Function: Silently discards the quest without triggering any action

Usage scenario: You can use it if the task you are submitting is inconsequential. Because it’s an empty realization, a task that will silently devour you. So that strategy is basically gone

DiscardOldestPolicy discarding old policy

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if(! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}Copy the code

Function: If the thread pool is not closed, pop up the element in the queue header and try to execute

Usage scenario: This strategy still drops the task, again silently, but it typically drops the old unfinished task, and the high priority task to be executed. Based on this feature, the scenario I can think of is publishing a message, modifying a message, and then when the message is published and not yet executed, the updated message comes back, and the version of the message that is not executed is lower than the version of the message that is now committed and can be discarded. Because there may be messages of lower versions queued, it is important to compare the versions of messages before actually processing them

Rejection policies implemented by third parties

Thread rejection policy in Dubbo

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);

    private final String threadName;

    private final URL url;

    private static volatile long lastPrintTime = 0;

    private static Semaphore guard = new Semaphore(1);

    public AbortPolicyWithReport(String threadName, URL url) {
        this.threadName = threadName;
        this.url = url;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" +
                        " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                        " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
                threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
                url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        dumpJStack();
        throw new RejectedExecutionException(msg);
    }

    private void dumpJStack() {// omit implementation}}Copy the code

As you can see, there are three main things that dubbo’s worker does when it triggers a thread-rejection policy. The principle is to make it as clear to the user as possible why the thread-rejection policy is triggered

  • A warning level log is output with details of the thread pool setting parameters, the current state of the thread pool, and some details of the current rejected task. It is safe to say that this log, which is seen more or less by anyone with production operations experience using Dubbo, is a good example of log printing. Other examples of log printing are spring. Thanks to such detailed logs, it is easy to locate problems
  • Dump thread context is a lifesaver when you can’t locate a problem with the current thread stack
  • The task fails by continuing to throw a reject exception, which inherits the JDK’s default reject policy feature

Thread pool rejection policy in Netty

    private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
        NewThreadRunsPolicy() {
            super();
        }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                final Thread t = new Thread(r, "Temporary task executor");
                t.start();
            } catch (Throwable e) {
                throw new RejectedExecutionException(
                        "Failed to start a new thread", e); }}}Copy the code

The implementation in Netty is much like the CallerRunsPolicy in the JDK, reluctant to drop tasks. The difference is that the CallerRunsPolicy is a task that is executed directly in the caller thread. Netty creates a new thread to handle it. As a result, the implementation of Netty can be extended to support high performance scenarios with respect to the use of caller execution policies. Note, however, that the implementation of Netty does not make any judgment constraints on the creation of threads, that is, as long as the system has resources to create a new thread to handle, until the new thread is not new, will throw thread creation failure exception

Thread pool rejection policy in activeMq

 new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
                    try {
                        executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
                    }

                    throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); }});Copy the code

The policy in activeMq belongs to the maximum effort task execution type. When the rejection policy is triggered, the task will be re-inserted into the task queue within one minute. When the one-minute timeout fails, an exception will be thrown

Pinpoint thread pool rejection policy

public class RejectedExecutionHandlerChain implements RejectedExecutionHandler {
    private final RejectedExecutionHandler[] handlerChain;

    public static RejectedExecutionHandler build(List<RejectedExecutionHandler> chain) {
        Objects.requireNonNull(chain, "handlerChain must not be null");
        RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]);
        return new RejectedExecutionHandlerChain(handlerChain);
    }

    private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) {
        this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null");
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        for(RejectedExecutionHandler rejectedExecutionHandler : handlerChain) { rejectedExecutionHandler.rejectedExecution(r, executor); }}}Copy the code

The e rejection strategy has a lot of characteristics, and other implementation are different. He defines a rejection policy chain and wraps a list of rejection policies. When a rejection policy is triggered, rejectedExecution in the policy chain will be executed one by one

conclusion

The definition of the Java thread pool rejection policy interface is introduced from the thread pool design idea and the time when the thread pool triggers the rejection policy. In addition, four kinds of denial policy definitions in JDK and four third-party open source software are introduced to describe various ideas and application scenarios of thread pool rejection policy implementation. Hopefully this article will give you a better understanding of the Java thread pool rejection policy and enable you to apply it flexibly according to different usage scenarios.

BLOG address: www.liangsonghua.com

Pay attention to wechat public number: songhua preserved egg bulletin board, get more exciting!

Introduction to our official account: We share our technical insights from working in JD, as well as JAVA technology and best practices in the industry, most of which are pragmatic, understandable and reproducible