“This is the second day of my participation in the Gwen Challenge in November. See details: The Last Gwen Challenge in 2021”

The phenomenon of

First, let’s talk about the new bug that came out last week. According to business feedback, the function of exporting purchase orders online timed out. Originally, I thought that there were too many export purchase orders, so I asked the business to shorten the export date, but the export still failed.

The online code this time mainly calls the downstream system in batches, which is changed to call through the thread pool. Since it is not strongly dependent on the downstream system, the rejection policy set by the thread pool call is DiscardPolicy.

Problem of repetition

Test code:

public class DiscardTest {
    public static void main(String[] args) throws Exception {

        // A thread with a maximum queue size of 1
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(1.1.1L, TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(1), new ThreadPoolExecutor.DiscardPolicy());

        Future future1 = executorService.submit(new Runnable() {
            @Override
            public void run(a) {
                System.out.println("start Runnable one");
                try {
                    Thread.sleep(2000);
                } catch(InterruptedException e) { e.printStackTrace(); }}}); Future future2 = executorService.submit(new Runnable() {
            @Override
            public void run(a) {
                System.out.println("start Runnable two"); }}); Future future3 = executorService.submit(new Runnable() {
            @Override
            public void run(a) {
                System.out.println("start Runnable three"); }}); System.out.println("task one finish " + future1.get());// Wait until task 1 is complete
        System.out.println("task two finish " + future2.get());// Wait until task 2 is complete
        System.out.println("task three finish " + future3.get());// Wait until task 3 is complete

        executorService.shutdown();// Close the thread pool and block until all tasks are completed}}Copy the code

Execution result: stuck all the time

Cause analysis,

When the number of threads in the thread pool reaches 3, subsequent submitted tasks execute a DiscardPolicy. Let’s look at the DiscardPolicy implementation as follows:

/** * A handler for rejected tasks that silently discards the * rejected task. */
public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy(a) {}// The reject policy does nothing, the thread's state is still NEW
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}Copy the code

When a discard policy is executed, the discard policy is executed by doing nothing. The thread’s state is still NEW.

To analyze this, we also need to look at what happens in the thread pool’s Submit method. When a task is submitted to the thread pool, it is wrapped as FutureTask, with an initial state of NEW. The task is performed by the wrapped FutureTask object.

/ * * *@throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc} * /
publicFuture<? > submit(Runnable task) {if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

// The initial state is NEW
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

// He will be waiting well
public V get(a) throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false.0L);
    return report(s);
}

Copy the code

FutureTask state >COMPLETING will return. Because the reject policy does not modify the state of FutureTask, the state of FutureTask is always NEW, so it does not return and waits.

Will other rejection policies cause blocking

DiscardOldestPolicy: DiscardOldestPolicy: AbortPolicy throws an exception and the caller can get the result immediately. DiscardOldestPolicy: AbortPolicy throws an exception and the caller can get the result immediately. CallerRunsPolicy: Allows the main thread to execute an exception. So the task generated by poll is the NEW state

public static class AbortPolicy implements RejectedExecutionHandler {
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy(a) {}/**
     * Always throws RejectedExecutionException.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws RejectedExecutionException always
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from "+ e.toString()); }}public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy(a) {}/**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if(! e.isShutdown()) { r.run(); }}}public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if(! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}Copy the code

conclusion

  1. If future.get () is used, the timeout period needs to be set based on the actual service situation
  2. Don’t use a DiscardPolicy if you can, and implement it yourself if you really need to use it.

What are the other considerations for using thread pools?

  • Using CallerRunsPolicy will not cause a freeze, but be careful. If the main thread is blocked, the service will be affected as well.
  • Do not use Executors to create a thread pool. Use ThreadPoolExecutor to clear the running rules of the thread pool and avoid resource depletion. 1) newFixedThreadPool and newSingleThreadExecutor: The main problem is that the stack of request processing queues can consume very large memory, or even OOM. 2) newCachedThreadPool and newScheduledThreadPool: The main problem is that the maximum number of threads is integer. MAX_VALUE, which may create a very large number of threads, or even OOM.
  • If the company has full-link trace (such as The Trace analysis of Ali Cloud), remember to pass trace information in the thread, otherwise the trace information will be lost.

This article has participated in the activity of “New person creation Ceremony”, and started the road of digging gold creation together.