This is the fifth day of my participation in the November Gwen Challenge. Check out the details: The last Gwen Challenge 2021

Implement asynchrony in Spring applications

Spring provides annotation support for task scheduling and asynchronous method execution. Methods can be invoked asynchronously by setting the @async annotation on a method or class. The caller returns immediately upon invocation, and the actual execution of the called method is left to Spring’s TaskExecutor. So when the annotated method is called, it will be executed in the new thread, and the method calling it will be executed in the original thread, which avoids blocking and ensures the real-time performance of the task.

Review configurations

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
Copy the code

@enableAsync Adds a configuration class

The @enableAsync annotation was added to the entry class, mainly to scan all @Async annotations under the scope package.

Asynchronous calls, methods called by opening a new thread, do not affect the main thread. The actual execution of asynchronous methods is left to Spring’s TaskExecutor.

Future gets the result of asynchronous execution

  • You can see that the calling method is not finished waiting for the calling method to finish. If you want to know what to do when all three methods are finished, you can use asynchronous callbacks.

  • AsyncResult AsyncResult AsyncResult AsyncResult AsyncResult AsyncResult AsyncResult AsyncResult AsyncResult AsyncResult

public class AsyncResult<V> implements ListenableFuture<V> {
    private final V value;
    private final ExecutionException executionException;
    / /...
}
Copy the code

AsyncResult implements the ListenableFuture interface, which has two internal properties: return value and exception information.

public interface ListenableFuture<T> extends Future<T> {
    void addCallback(ListenableFutureCallback<? super T> var1);
    void addCallback(SuccessCallback<? super T> var1, FailureCallback var2);
}
Copy the code

The ListenableFuture interface inherits from the Future and adds the definition of callback methods. The Future interface is defined as follows:

public interface Future<V> {
    // Whether you can interrupt the current task
    boolean cancel(boolean mayInterruptIfRunning);
    // Result of task cancellation
    boolean isCancelled(a);
    // The value of the object returned last in the asynchronous method
    V get(a) throws InterruptedException, ExecutionException;
    // To determine whether the asynchronous task is complete, return true if it is complete, false if it is not
    boolean isDone(a);
    // Same as get() except that the timeout is set
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
Copy the code
  • The get() method, which executes while waiting for the result of the callback, blocks the wait. If the timeout is not set, it blocks until a task is completed. By setting a timeout, we can interrupt the current task if it has been running for too long, freeing up threads so that resources are not tied up all the time.

  • Cancel (Boolean) method, which takes a Boolean value and passes in whether it is ok to interrupt the currently performing task. If the argument is true and the current task is not completed, the current task can be interrupted, and true is returned;

    • If the current task has not been executed, the return value is true regardless of whether the argument is true or false;
    • If the current task is completed, the return value is false regardless of whether the argument is true or false;
    • If the current task is not completed and the argument is false, the return value is also false.

This means:

  • If the task has not been executed, it must return true if you want to cancel the task, regardless of the argument.
  • If the task is completed, it cannot be cancelled, so the return value is false regardless of the argument.
  • If the task is in progress, canceling the task at this point depends on whether the parameter allows interruption (true/false).

Gets an implementation of the return value of an asynchronous method

public Future<String> test(a) throws Exception {
        log.info("Get to work.");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("Completed task, time:" + (end - start) + "毫秒");
        return new AsyncResult<>("Mission completed, time consuming." + (end - start) + "毫秒");
}
Copy the code

We change the return value of the Task method to Future, concatenating the execution time as a string return.

    @GetMapping("/task")
    public String taskExecute(a) {
        try {
            Future<String> r1 = taskService.test();
            Future<String> r2 = taskService.test();
            Future<String> r3 = taskService.test();
            while (true) {
                if (r1.isDone() && r2.isDone() && r3.isDone()) {
                    log.info("execute all tasks");
                    break;
                }
                Thread.sleep(200);
            }
            log.info("\n" + r1.get() + "\n" + r2.get() + "\n" + r3.get());
        } catch (Exception e) {
           log.error("error executing task for {}",e.getMessage());
        }
        return "ok";
    }
Copy the code

Another asynchronous callback result fetching implementation

Count the total time it takes for all three tasks to execute concurrently. This requires recording the time and calculating the result after all three functions have been deployed.

You can also use CompletableFuture to return the result of an asynchronous call

@Async
public CompletableFuture<String> doTaskOne(a) throws Exception {
    log.info("Start on task one.");
    long start = System.currentTimeMillis();
    Thread.sleep(random.nextInt(10000));
    long end = System.currentTimeMillis();
    log.info("Complete Task 1, Time:" + (end - start) + "毫秒");
    return CompletableFuture.completedFuture("Mission one complete.");
}
Copy the code

After modifying the other two asynchronous functions as described above, let’s modify the test case so that the test does something else after waiting for three asynchronous calls.

@Test
public void test(a) throws Exception {
    long start = System.currentTimeMillis();
    CompletableFuture<String> task1 = asyncTasks.test();
    CompletableFuture<String> task2 = asyncTasks.test();
    CompletableFuture<String> task3 = asyncTasks.test();
    CompletableFuture.allOf(task1, task2, task3).join();
    long end = System.currentTimeMillis();
    log.info("All tasks completed, total time:" + (end - start) + "毫秒");
}
Copy the code
  • When three asynchronous functions are called, a result object of type CompletableFuture is returned
  • Completablefuture.allof (task1, task2, task3).join(
  • After all three tasks are complete, calculate the total time required for the concurrent execution of the three tasks based on the end time and start time.

Configuring a thread pool

The easiest way to use this is to use the default TaskExecutor. If you want to use a custom Executor, you can configure it with the @Configuration annotation. Spring basically has five thread pools.

  • SimpleAsyncTaskExecutor: Not a real thread pool. This class does not reuse threads and creates a new thread each time it is called.

  • SyncTaskExecutor: This class does not implement an asynchronous call, just a synchronous operation. Only applicable where multithreading is not required

  • ConcurrentTaskExecutor: Executor adaptation class. This class is not recommended. Consider using this class only if ThreadPoolTaskExecutor does not meet the requirements

  • SimpleThreadPoolTaskExecutor: is Quartz SimpleThreadPool class. This class is required only if thread pools are used by both Quartz and non-Quartz

  • ThreadPoolTaskExecutor: Most commonly used, recommended. The essence of which is the Java. Util. Concurrent. ThreadPoolExecutor packaging,

 public ThreadPoolTaskExecutor FebsShiroThreadPoolTaskExecutor(a) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         // Set the number of core threads
        executor.setCorePoolSize(5);
        // Set the maximum number of threads
        executor.setMaxPoolSize(20);
        // Configure the queue size
        executor.setQueueCapacity(200);
        // The thread pool maintains the idle time allowed by threads
        executor.setKeepAliveSeconds(30);
        // Configure the thread name prefix in the thread pool
        executor.setThreadNamePrefix(ConstantFiledUtil.KMALL_THREAD_NAME_PREFIX);
        // Set the thread pool down to wait for all tasks to complete before continuing to destroy other beans
        executor.setWaitForTasksToCompleteOnShutdown(true);
        // Set the wait time of tasks in the thread pool, and force the task to be destroyed if it has not been destroyed by this time to ensure that the application is eventually shut down rather than blocked
        executor.setAwaitTerminationSeconds(60);
          Rejection -policy: how to deal with a new task when the pool reaches Max size
        // CALLER_RUNS: The task is not executed in a new thread, but by the caller's thread
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // Perform initialization
        executor.initialize();
        return executor;
    }
Copy the code

The configuration of the thread pool is flexible, with attributes such as the number of core threads and the maximum number of threads. Rejection -policy, how to process new tasks when the thread pool has reached the maximum number of threads. The options are CallerBlocksPolicy and CallerRunsPolicy. CALLER_RUNS: The task is not executed in a new thread, but in the caller’s thread. In TaskService, print the current thread name:

    public Future<String> doExecute(a) throws Exception {
        log.info("Start on task one.");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("Complete Task 1, Time:" + (end - start) + "毫秒");
        log.info("Current thread is {}", Thread.currentThread().getName());
        return new AsyncResult<>("Once the task is completed, time required." + (end - start) + "毫秒");
    }
Copy the code

When using Spring @async asynchronous threads, it is important to note that the following uses will invalidate @async:

  • Asynchronous methods use the static modifier;
  • Spring can’t scan asynchronous classes because they don’t use the @Component annotation (or any other annotation);
  • Asynchronous methods cannot be in the same class as the asynchronous method being called;
  • Classes need to be automatically injected with @autowired or @Resource annotations, not manually new objects;
  • If you use the Spring Boot framework you must add the @enableAsync annotation to the Boot class.

Thread context information passing

A single request in a microservice architecture involves multiple microservices. Or there may be multiple processing methods in a service, which may be asynchronous. Some thread context information, such as the path of the request and the user’s unique userId, is passed through the request. If we don’t do anything, let’s see if we can get this information.

If a null pointer exception is reported while requesting information from the RequestContextHolder. This indicates that the context information of the request is not passed to the thread of the asynchronous method. An implementation of RequestContextHolder that has two ThreadLocal’s that hold the request from the current thread

    // Get the stored request
    private static final ThreadLocal<RequestAttributes> requestAttributesHolder =
            new NamedThreadLocal<RequestAttributes>("Request attributes");
    // Request can be inherited by quilt thread
    private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder =
            new NamedInheritableThreadLocal<RequestAttributes>("Request context");
Copy the code

How do you pass context information to an asynchronous thread?

ThreadPoolTaskExecutor in Spring has a configuration property TaskDecorator, which is a callback interface that follows the decorator pattern.

Decorator is a dynamic way to add extra functionality to an object, which is more flexible than subclassing it. So TaskDecorator is mainly used to set up some execution context when a task is invoked, or to provide some monitoring/statistics for task execution.

public interface TaskDecorator {
    Runnable decorate(Runnable runnable);
}
Copy the code

The decorate method, which decorates a given Runnable, returns the wrapped Runnable for actual execution.

Let’s define a TaskDecorator with a thread context copy.

public class ThreadLocalDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        RequestAttributes context = RequestContextHolder.currentRequestAttributes();
        return() - > {try {
                RequestContextHolder.setRequestAttributes(context);
                runnable.run();
            } finally{ RequestContextHolder.resetRequestAttributes(); }}; }}Copy the code

In the thread pool configuration, add the TaskDecorator attribute to the TaskDecorator callback:

    @Bean("taskExecutor")
    public Executor taskExecutor(a) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(200);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("taskExecutor-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        // Add the configuration of the TaskDecorator property
        executor.setTaskDecorator(new ThreadLocalDecorator());
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
Copy the code

Asynchronous methods can only have two return types:

When the return type is void, the exception will not be thrown to the calling method invocation process level, can be captured by note AsyncUncaughtExceptionHandler such anomalies

When the return type is Future, exceptions generated during the method call are thrown at the caller level

Note: Thread pools that do not customize asynchronous methods use SimpleAsyncTaskExecutor by default.

SimpleAsyncTaskExecutor: Not a real thread pool. This class does not reuse threads and creates a new thread each time it is called. Serious performance problems can occur when concurrency is large.