Question why

* If you can’t use Executors to create a thread pool, follow the following problems: * If you can’t use Executors to create a thread pool, follow the following problems:

  • FixedThreadPool and SingleThreadPool: The allowed request queue length is integer. MAX_VALUE, which may accumulate a large number of requests and result in OOM.
  • CachedThreadPool: The number of threads allowed to be created is integer. MAX_VALUE, which may create a large number of threads, resulting in OOM.

Then useless Executors. NewScheduledThreadPool (), and their new a ScheduledThreadPoolExecutor object, and rewrite the afterExecute method, and custom refused to strategy.

As a result, the log is not printed after only one execution. This problem bothered me for a long time, so I left a note to record it. The code is as follows:

@Slf4j
@Component
public class PlanStartAndEndTask implements ApplicationRunner {

  /** * Initializes the scheduled task thread pool */
  private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1.new RecordExceptionExecutionHandler()) {

    /** * Custom exception handling *@paramA runnable task *@paramThrowable exception *@date2021/3/31 * /
    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
      final Logger log = LoggerFactory.getLogger(this.getClass());
      if (runnable instanceof Thread) {
        if(throwable ! =null) {
          log.error("The scheduled task of automatic start/end share plan is abnormal. Time: {}, exception information: {}", LocalDateTime.now(), throwable.getMessage()); }}else if (runnable instanceofFutureTask) { FutureTask<? > futureTask = (FutureTask<? >) runnable;try {
          That's the problem!!
          futureTask.get();
        } catch (InterruptedException e) {
          log.error("Automatic start/end share schedule scheduled task interrupted at: {}", LocalDateTime.now());
          Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
          log.error("The scheduled task of automatic start/end share plan is abnormal. Time: {}, exception information: {}", LocalDateTime.now(), e.getMessage()); }}}};@Override
  public void run(ApplicationArguments args) throws Exception {
    // For simulation, the first delay time is 0 and the cycle is 5 seconds
    executor.scheduleAtFixedRate(() -> {
      long startTime = System.currentTimeMillis();
        log.info("Start performing automated tasks");
        /** omit the business code **/
        log.info("End automatic task execution, time: {} ms;", System.currentTimeMillis() - startTime);
    }, 0.5000, TimeUnit.MILLISECONDS);
  }

  /** * Custom implement reject policy, log, when the queue is full, the new task is directly discarded **@author Zhu Lin
   * @date2021/3/30 * /
  @Slf4j
  static class RecordExceptionExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
      log.error("Task: {}, rejected by {}", runnable.toString(), threadPoolExecutor.toString()); }}}Copy the code

Then at first I thought there was a problem with the exception that caused it to not execute once and the console didn’t print the exception because JavaDoc said so too

    /**
     * Creates and executes a periodic action that becomes enabled first
     * after the given initial delay, and subsequently with the given
     * period; that is executions will commence after
     * {@code initialDelay} then {@code initialDelay+period}, then
     * {@code initialDelay + 2 * period}, and so on.
     * If any execution of the task
     * encounters an exception, subsequent executions are suppressed.
     * Otherwise, the task will only terminate via cancellation or
     * termination of the executor.  If any execution of this task
     * takes longer than its period, then subsequent executions
     * may start late, but will not concurrently execute.
     *
     * @param command the task to execute
     * @param initialDelay the time to delay first execution
     * @param period the period between successive executions
     * @param unit the time unit of the initialDelay and period parameters
     * @return a ScheduledFuture representing pending completion of
     *         the task, and whose {@code get()} method will throw an
     *         exception upon cancellation
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if command is null
     * @throws IllegalArgumentException if period less than or equal to zero
     */
    publicScheduledFuture<? > scheduleAtFixedRate(Runnable command,long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
Copy the code

In simple terms, a task can only be stopped if it encounters an exception, otherwise it can only be stopped if it cancels and terminates the execution so I changed it to this

  @Override
  public void run(ApplicationArguments args) throws Exception {
    // The first delay time is 0 and the cycle is 5 seconds
    executor.scheduleAtFixedRate(() -> {
      long startTime = System.currentTimeMillis();
      try {
        log.info("Start performing automated tasks");
      } catch (Exception e) {
        log.error("The scheduled task of automatic start/end share plan is abnormal. Time: {}, exception information: {}", LocalDateTime.now(), e.getMessage());
      } finally {
        log.info("End automatic task execution, time: {} ms;", System.currentTimeMillis() - startTime); }},0.5000, TimeUnit.MILLISECONDS);
  }
Copy the code

AfterExecutor: afterExecutor: afterExecutor: afterExecutor: afterExecutor: afterExecutor: afterExecutor: afterExecutor: afterExecutor: afterExecutor Futuretask.get () is a line of code that causes the thread to stop running. Let’s take a closer look

Problem resolution

First of all, why is get() blocked

public V get(a) throws InterruptedException, ExecutionException {
    int s = state;
    // If the task is already in progress, wait
    if (s <= COMPLETING)
        s = awaitDone(false.0L);
    return report(s);
}

// Wait for the task to complete
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    // Calculate the end time of the wait. If you have been waiting, the end time is 0
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    / / don't line up
    boolean queued = false;
    // Infinite loop
    for (;;) {
        // If the thread has been interrupted, delete it and throw an exception
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        // Current task status
        int s = state;
        // The current task has been completed
        if (s > COMPLETING) {
            // The thread of the current task is empty
            if(q ! =null)
                q.thread = null;
            return s;
        }
        // If executing, the current thread cedes the CPU to recompete, preventing the CPU from soaring
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
            // If it is run for the first time and a new waitNode is created, the current thread is the property of waitNode
        else if (q == null)
            q = new WaitNode();
            Queued is true and will not be executed again
            // Treat the current waitNode as the first waiters list
        else if(! queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
            // Remove the current Wait from the waiters table if a timeout is set and runs out
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            // The thread enters TIMED_WAITING before the timeout period expires
            LockSupport.parkNanos(this, nanos);
        }
        // No timeout is set. The system is in WAITING state
        else
            LockSupport.park(this); }}Copy the code

If this condition is not met, you will continue to block. The state of the scheduled task that I submitted is never changed. We continue to get Next we see ScheduledThreadPoolExecutor# scheduleAtFixedRate methods

    publicScheduledFuture<? > scheduleAtFixedRate(Runnable command,long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
Copy the code

The core logic here is that theRunnablePackaged into aScheduledFutureTaskObject, this wrapper is inFutureTaskOn this basis, some data needed for timing scheduling are added. (FutureTaskIs one of the core classes of the thread pool.decorateTaskIs a hook method used for extensions where the default implementation is returnScheduledFutureTaskItself. And then the master logic just passesdelayedExecutePut it in the queue.So why hasn’t our mission state changedScheduledFutureTasktherunMethods.

/** * Overrides FutureTask version so as to reset/requeue if periodic. */

public void run(a) {
    // Whether it is a periodic task
    boolean periodic = isPeriodic();
    // If the task cannot be CANCELLED at its current state, cancel it (set the status of the task to CANCELLED).
    if(! canRunInCurrentRunState(periodic)) cancel(false);
    else if(! periodic)// If it is not a periodic task, call the FutureTask # run method
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
        // If it is periodic, set the next execution time
        setNextRunTime();
        // Add the task to the queue againreExecutePeriodic(outerTask); }}Copy the code

We focus on ScheduledFutureTask. Super. RunAndReset () method, which is actually called his father class FutureTask runAndReset () method, this method can be carried on after successful reset thread state, the reset is the semantics. We can also see that when the method execution returns false, the task will not be added to the queue again, which is consistent with the exception we originally assumed. The final answer lies in the difference between runAndReset and run:

    public void run(a) {
        /** omit other code **/
        try {
            // Execute the task
            result = c.call();
            ran = true;
        } catch (Throwable ex) {
            result = null;
            ran = false;
            setException(ex);
        }
        if (ran)
            set(result);
        /** omit other code **/
    }

    protected boolean runAndReset(a) {
        /** omit other code **/
        try {
            // Execute the task
            c.call(); // don't set result
            ran = true;
        } catch (Throwable ex) {
            setException(ex);
        }
        /** omit other code **/
    }
Copy the code

C. Call () is where the task was executed. There is an RAN variable that defaults to false. When the task was successfully executed, RAN is set to true, meaning the task was executed. But that’s not the point, the point is that we find that the run method goes back and calls a set method after it succeeds

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion(); }}Copy the code

The state of state is changed in the set method, which proves our logic. The periodic task call runAndReset does not change the state at all, so the get method blocks and has no other choice.

Small wonder

If according to the specification of alibaba development manual, ScheduledThreadPoolExecutor also allows you to create a thread data exist for Integer. MAX_VALUE problem, so how to solve it, that I’m a little confused.

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
Copy the code