1, the introduction

Java asynchronous programming greatly saves the execution time of the main program and improves the utilization efficiency of computing resources. It is one of the necessary skills for senior Java engineers. This article focuses on what asynchrony is, what problems asynchrony solves, and how to program asynchrony.

1.1 What is Asynchronous programming

Before explaining asynchronous programming, let’s look at the definition of synchronous programming. Synchronous programming is a typical request-response model in which a request calls a function or method, waits for its response to return, and then executes subsequent code. The most important characteristic of synchronization is “orderliness”, when each process is completed and results are returned at the end. As shown in figure

In asynchronous programming, only the call is sent, and the caller does not wait for the called method to complete execution, but continues to execute the following process. In a multi-processor or multi-core environment, asynchronous calls are truly executed in parallel. As shown in figure

1.1 What problems does asynchronous programming solve

The goal of Java asynchronous programming is to optimize the execution time of the main program by taking full advantage of the computer’s CPU resources and preventing the main program from blocking on some long-running task. Such time-consuming tasks can be IO operations, remote calls, and high-density computing tasks. Without multithreaded asynchronous programming, our system would block on time-consuming subtasks, resulting in significantly longer times to complete main function tasks.

In real business development, this is usually done synchronously. However, there are many scenarios that are very suitable for asynchronous processing, such as: registering new users and giving 100 credits; Or successful order, send push message and so on. Take the use case of registering a new user. Why do you do it asynchronously?

  • The first reason is fault tolerance and robustness. If there is an anomaly in sending points, users cannot fail to register because of sending points. Because user registration is the main function, and sending points is the secondary function, even if the points are abnormal, the user should be reminded of the success of registration, and then compensation will be made for the abnormal points.
  • The second reason is to improve performance. For example, it takes 20 milliseconds to register a user and 50 milliseconds to send credits. If you use synchronization, the total time is 70 milliseconds.

Therefore, asynchrony can solve two problems, performance and fault tolerance. So what are the ideas for implementing asynchrony?

2. Asynchronous realization based on message-oriented middleware

The most common solution is to introduce message-oriented middleware. Synchronous interface calls lead to long response times, and with MQ, changing synchronous calls to asynchronous can significantly reduce system response times.

System A, as the producer of the message, can return the results directly after it has done its job. Instead of waiting for message consumers to return, they eventually do all the business functions independently.

In this way, it can avoid the problem of taking a long time and affecting the user experience. The downside of using message-oriented middleware is that the introduction of third-party messaging components for relatively simple businesses can be a bit heavy and cause some additional maintenance.

Instead of introducing third party middleware, we can also consider asynchronous non-blocking observer patterns, such as EventBus from Google’s open source package Guava, which elegantly implements asynchronous queues.

3. Realize asynchrony based on Servlet

Prior to Servlet 3.0, servlets handled requests in a thread-per-request manner, meaning that each Http Request was processed from beginning to end by a single Thread. Performance issues are obvious when it comes to time-consuming operations. Servlet 3.0 provides asynchronous processing of requests. You can increase the throughput of the service by releasing the threads and related resources that the container allocates to requests, reducing the burden on the system. Servlet 3.0 asynchrony is accomplished through the AsyncContext object, which can pass from the current thread to another thread and return the original thread. The new thread can directly return the result to the client after processing the business.

The AsyncContext object can be obtained from HttpServletRequest:

    @RequestMapping("/async")
    public void async(HttpServletRequest request) {
        AsyncContext asyncContext = request.getAsyncContext();
    }
Copy the code

AsyncContext provides functions such as retrieving a ServletRequest, ServletResponse, and addListener:

public interface AsyncContext {
    ServletRequest getRequest(a);
    ServletResponse getResponse(a);
    void addListener(AsyncListener var1);
    void setTimeout(long var1);
    // omit other methods
}
Copy the code

You can not only obtain Request and Response information through AsyncContext, but also set the timeout time for asynchronous processing. In general, the timeout in milliseconds needs to be set, otherwise waiting indefinitely would be the same as synchronous processing. AddListener of AsyncContext also allows you to addListener events that handle callbacks to asynchronous threads such as start, finish, exception, timeout, etc.

AddListener AsyncListener AsyncListener

public interface AsyncListener extends EventListener {
    // called when asynchronous execution is complete
    void onComplete(AsyncEvent var1) throws IOException;
    // The asynchronous thread makes the timeout call
    void onTimeout(AsyncEvent var1) throws IOException;
    // called when the asynchronous thread fails
    void onError(AsyncEvent var1) throws IOException;
    // called when the asynchronous thread starts
    void onStartAsync(AsyncEvent var1) throws IOException;
}
Copy the code

Typically, an exception or timeout returns an error message to the caller, while an exception handles some cleanup and shutdown operations or logs the exception.

Let’s look directly at an example of an asynchronous servlet-based request:

@GetMapping(value = "/email/send")
public void servletReq(HttpServletRequest request) {
    AsyncContext asyncContext = request.startAsync();
    // Set listener: can set its start, finish, exception, timeout events callback processing
    asyncContext.addListener(new AsyncListener() {
        @Override
        public void onTimeout(AsyncEvent event) {
            System.out.println("Processing timed out...");
        }

        @Override
        public void onStartAsync(AsyncEvent event) {
            System.out.println("Thread starts executing");
        }

        @Override
        public void onError(AsyncEvent event) {
            System.out.println("Error occurred during execution:" + event.getThrowable().getMessage());
        }
        
        @Override
        public void onComplete(AsyncEvent event) {
            System.out.println("Execute complete, release resource"); }});// Set the timeout period
    asyncContext.setTimeout(6000);
    asyncContext.start(new Runnable() {
        @Override
        public void run(a) {
            try {
                Thread.sleep(5000);
                System.out.println("Internal thread:" + Thread.currentThread().getName());
                asyncContext.getResponse().getWriter().println("async processing");
            } catch (Exception e) {
                System.out.println("Asynchronous processing exception:" + e.getMessage());
            }
            // Asynchronous request complete notification, complete requestasyncContext.complete(); }});// The request thread connection has been released
    System.out.println("Main thread:" + Thread.currentThread().getName());
}
Copy the code

Start the project, access the corresponding URL, and print the following log:

Main thread: HTTP-nio-8080-exec-4 Internal thread: http-nio-8080-exec-5 When the execution is complete, the resource copy code is releasedCopy the code

As you can see, the above code completes the main thread first, which is the log printing of the last line of the program, and then the execution of the internal thread. The inner thread completes execution and AsyncContext’s onComplete method is called. If you access the URL through a browser, you can also see the return value of async Processing. The result from the internal thread is also returned to the client normally.

4. Asynchronous based on Spring

Spring-based asynchronous requests can be implemented through Callable, DeferredResult, or WebAsyncTask.

4.1 Callable based implementation

For a request (/email), the Callable processing flow is as follows:

1. Spring MVC enables sub-threads to handle business (submitting Callable to TaskExecutor);

2. DispatcherServlet and all filters exit the Web container thread, but Response remains open;

Callable returns the result, SpringMVC sends the original request back to the container (request again /email), resuming the previous processing;

4. DispatcherServlet is called again and the result is returned to the user;

Examples of code implementation are as follows:

@GetMapping("/email")
public Callable<String> order(a) {
    System.out.println("Main thread start:" + Thread.currentThread().getName());
    Callable<String> result = () -> {
        System.out.println("Secondary thread start:" + Thread.currentThread().getName());
        Thread.sleep(1000);
        System.out.println("Secondary thread returns:" + Thread.currentThread().getName());
        return "success";
    };

    System.out.println("Main thread returns:" + Thread.currentThread().getName());
    return result;
}
Copy the code

To access the corresponding URL, the console enters the log as follows:

Main thread start: HTTP-nio-8080-exec-1 Main thread Return: HTTP-nio-8080-exec-1 secondary thread Start: task-1 Secondary thread return: task-1Copy the code

As you can see from the log, the main thread completes before the secondary thread executes. At the same time, the URL returns the result “Success”. This also illustrates the problem that asynchronous processing on the server side is invisible to the client.

Callable is executed by default using the SimpleAsyncTaskExecutor class. This thread pool is not really a thread pool. It is worth noting that thread reuse cannot be achieved with this thread pool, as each call creates a new thread. If threads are constantly created in the system, the system memory usage is too high and OutOfMemoryError is raised. The key codes are as follows:

public void execute(Runnable task, long startTimeout) {
  Assert.notNull(task, "Runnable must not be null");
  Runnable taskToUse = this.taskDecorator ! =null ? this.taskDecorator.decorate(task) : task;
  // Determine whether to enable traffic limiting. The default value is no
  if (this.isThrottleActive() && startTimeout > 0L) {
    // Perform the preceding operation to limit the current
    this.concurrencyThrottle.beforeAccess();
    this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse));
  } else {
    // Execute thread tasks in the case of unrestricted flow
    this.doExecute(taskToUse); }}protected void doExecute(Runnable task) {
  // keep creating threads
  Thread thread = this.threadFactory ! =null ? this.threadFactory.newThread(task) : this.createThread(task);
  thread.start();
}

// Create a thread
public Thread createThread(Runnable runnable) {
  // Specify thread name, task-1, task-2...
  Thread thread = new Thread(this.getThreadGroup(), runnable, this.nextThreadName());
  thread.setPriority(this.getThreadPriority());
  thread.setDaemon(this.isDaemon());
  return thread;
}
Copy the code

The thread name is [task-1], [task-2], [task-3], [task-4]….. The increment.

This is why it is important to customize the thread pool when using the asynchronous framework instead of the default SimpleAsyncTaskExecutor. The thread pool is configured by implementing the WebMvcConfigurer interface.

@Configuration
public class WebConfig implements WebMvcConfigurer {

    @Resource
    private ThreadPoolTaskExecutor myThreadPoolTaskExecutor;

    /** * configure thread pool */
    @Bean(name = "asyncPoolTaskExecutor")
    public ThreadPoolTaskExecutor getAsyncThreadPoolTaskExecutor(a) {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(2);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(25);
        taskExecutor.setKeepAliveSeconds(200);
        taskExecutor.setThreadNamePrefix("thread-pool-");
        AbortPolicy and CallerRunsPolicy are supported in the thread pool. Default to the latter
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Override
    public void configureAsyncSupport(final AsyncSupportConfigurer configurer) {
        // Processing callable timed out
        configurer.setDefaultTimeout(60 * 1000);
        configurer.setTaskExecutor(myThreadPoolTaskExecutor);
        configurer.registerCallableInterceptors(timeoutCallableProcessingInterceptor());
    }

    @Bean
    public TimeoutCallableProcessingInterceptor timeoutCallableProcessingInterceptor(a) {
        return newTimeoutCallableProcessingInterceptor(); }}Copy the code

To verify the printed thread, we replace the system.out.println in the example code with log output, and see that the log is printed as follows before using the thread pool:

The 2021-02-21 09:45:37. 8312-144 the INFO [nio - 8080 - exec - 1] C.S.L earn. Controller. AsynController: Main thread start: HTTP - nio - 8080-2021-02-21 09:45:37 exec - 1, 8312-144 the INFO [nio - 8080 - exec - 1] C.S.L earn. Controller. AsynController: Main thread returns: HTTP - nio - 8080-2021-02-21 09:45:37 exec - 1, 8312-148 the INFO] [task - 1 C.S.L earn. Controller. AsynController: Vice thread begins: the 2021-02-21 09:45:38 task - 1. 8312-153 the INFO] [task - 1 C.S.L earn. Controller. AsynController: vice thread returns: task 1Copy the code

The thread name is task-1. After the thread pool is enabled, the following log is printed:

The 2021-02-21 09:50:28. 8339-950 the INFO [nio - 8080 - exec - 1] C.S.L earn. Controller. AsynController: Main thread start: HTTP - nio - 8080-2021-02-21 09:50:28 exec - 1, 8339-951 the INFO [nio - 8080 - exec - 1] C.S.L earn. Controller. AsynController: The main thread to return: HTTP - nio - 8080-2021-02-21 09:50:28 exec - 1. 8339-955 the INFO [thread pool - 1] C.S.L earn. Controller. AsynController: Vice thread begins: thread pool - 1 09:50:29 2021-02-21. 8339-956 the INFO [thread pool - 1] C.S.L earn. Controller. AsynController: The secondary thread returns thread-pool-1Copy the code

The thread name is thread-pool-1, where the thread pool prefix is specified.

In addition to the thread pool configuration, you can also configure unified exception handling, which is not demonstrated here.

4.2 Asynchronous Implementation based on WebAsyncTask

Spring provides WebAsyncTask as a wrapper around Callable, providing more powerful functions such as handling timeout callbacks, error callbacks, completion callbacks, and so on.

@GetMapping("/webAsyncTask")
public WebAsyncTask<String> webAsyncTask(a) {
    log.info("External thread:" + Thread.currentThread().getName());
    WebAsyncTask<String> result = new WebAsyncTask<>(60 * 1000L.new Callable<String>() {
        @Override
        public String call(a) {
            log.info("Internal thread:" + Thread.currentThread().getName());
            return "success"; }}); result.onTimeout(new Callable<String>() {
        @Override
        public String call(a) {
            log.info("timeout callback");
            return "timeout callback"; }}); result.onCompletion(new Runnable() {
        @Override
        public void run(a) {
            log.info("finish callback"); }});return result;
}
Copy the code

Access the corresponding request and print the log:

The 2021-02-21 10:22:33. 8547-028 the INFO [nio - 8080 - exec - 1] C.S.L earn. Controller. AsynController: External thread: HTTP - nio - 8080-2021-02-21 10:22:33 exec - 1, 8547-033 the INFO [thread pool - 1] C.S.L earn. Controller. AsynController: Internal threads, thread pool - 1 10:22:33 2021-02-21. 8547-055 the INFO [nio - 8080 - exec - 2] C.S.L earn. Controller. AsynController: finish callbackCopy the code

4.3 Asynchronous based on DeferredResult

DeferredResult is used in a similar way to Callable, but not in the same way as A DeferredResult. The actual result of DeferredResult may not have been generated when it is returned. The actual result of DeferredResult may have been set in another thread. This feature of DeferredResult is important for advanced applications such as server-side pushing, order expiration handling, long polling, and the ability to emulate MQ. Common usage scenarios:

If you are familiar with how Apollo works, you may know that Real-time hot updates to Apollo configurations are based on DeferredResult to implement configuration change notifications

2. If the front-end polls the order payment status, it can be changed to the form of long links and implement asynchronous result callback based on DeferredResult to reduce the server pressure

In DeferredResult, let’s take a look at the official examples and instructions:

@RequestMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes(a) {
  DeferredResult<String> deferredResult = new DeferredResult<String>();
  // Save the deferredResult in in-memory queue ...
  return deferredResult;
}

// In some other thread...
deferredResult.setResult(data);
Copy the code

As you can see from the example above, the call to DeferredResult doesn’t have to be in Spring MVC, it could be another thread. So does the official explanation:

In this case the return value will also be produced from a separate thread. However, that thread is not known to Spring MVC. For example the result may be produced in response to some external event such as a JMS message, a scheduled task, etc.

That is, DeferredResult can also return results triggered by MQ, scheduled tasks, or other threads. Here’s an example:

@Controller
@RequestMapping("/async/controller")
public class AsyncHelloController {

    private List<DeferredResult<String>> deferredResultList = new ArrayList<>();

    @ResponseBody
    @GetMapping("/hello")
    public DeferredResult<String> helloGet(a) throws Exception {
        DeferredResult<String> deferredResult = new DeferredResult<>();

        // Save it and wait for the trigger
        deferredResultList.add(deferredResult);
        return deferredResult;
    }

    @ResponseBody
    @GetMapping("/setHelloToAll")
    public void helloSet(a) throws Exception {
        // Let all held requests be responded to
        deferredResultList.forEach(d -> d.setResult("say hello to all")); }}Copy the code

The first request, / Hello, will store deferredResult, and the front end of the page will wait. All relevant pages will not respond until the second request, setHelloToAll, is sent.

The entire execution process is as follows:

  • The controller returns a DeferredResult and stores it in memory or in a List (for subsequent access);
  • Spring MVC calls Request.startAsync () to enable asynchronous processing; At the same time, the interceptor, Filter and so on in DispatcherServlet will immediately exit the main thread, but the state of Response remains open;
  • The application gives the DeferredResult#setResult value through another thread (possibly MQ messages, scheduled tasks, and so on). SpringMVC then sends the request to the servlet container again;
  • The DispatcherServlet is invoked again and then processes the subsequent standard flow;

It can be found through the above process: DeferredResult can be used to implement long connections. For example, when an operation is asynchronous, you can save the corresponding DeferredResult object, and when the asynchronous notification comes back, you can find the DeferredResult object and process the result in setResult. Thus improving performance.

5. Asynchronous implementation based on SpringBoot

Declaring a method as an asynchronous method in SpringBoot is as simple as two annotations @enableAsync and @async. In the command, @enableAsync is used to enable SpringBoot asynchrony and the SpringBoot startup class. @async is used on a method to mark it as an asynchronous processing method.

Note that @Async is not supported on methods of classes annotated by @Configuration. If a method in the same class calls another method with @async, the annotation will not take effect.

An example of how to use @enableAsync:

@SpringBootApplication
@EnableAsync
public class App {

    public static void main(String[] args) { SpringApplication.run(App.class, args); }}Copy the code

Examples of using @async:

@Service
public class SyncService {
    
    @Async
    public void asyncEvent(a) {
        // Business processing}}Copy the code

The @async annotation is similar to Callable in that by default the SimpleAsyncTaskExecutor thread pool is used. You can customize the thread pool as described in Callable.

To verify this, use @enableAsync on the startup class and define the Controller class:

@RestController
public class IndexController {
    
    @Resource
    private UserService userService;
    
    @RequestMapping("/async")
    public String async(a){
        System.out.println("--IndexController--1");
        userService.sendSms();
        System.out.println("--IndexController--4");
        return "success"; }}Copy the code

Define Service and asynchronous methods:

@Service
public class UserService {

    @Async
    public void sendSms(a){
        System.out.println("--sendSms--2");
        IntStream.range(0.5).forEach(d -> {
            try {
                Thread.sleep(1000);
            } catch(InterruptedException e) { e.printStackTrace(); }}); System.out.println("--sendSms--3"); }}Copy the code

If @enableAsync and @async annotations are first commented out, that is, service requests under normal circumstances, the following logs are generated:

--IndexController--1
--sendSms--2
--sendSms--3
--IndexController--4
Copy the code

When @enableAsync and @async annotations are used, the following logs are printed:

--IndexController--1
--IndexController--4
--sendSms--2
--sendSms--3
Copy the code

We can see from the log comparison that @async is treated as a child thread. Therefore, the entire sendSms method is executed after the main thread has finished executing.

Today we’ll look at how Spring does this: When scanning a bean, spring scans for @async annotations on the method. If so, Spring dynamically generates a subclass of the bean called a proxy class (?). The proxy class inherits the bean we wrote and then injects the proxy class into it. At this point, when executing this method, the proxy class decides that this method needs to be executed asynchronously and does not call the corresponding method of the parent class (the bean we originally wrote). Spring maintains a queue of its own, and it puts the method that needs to be executed into a queue and waits for the thread pool to read the queue and complete the execution of the method, thus completing the asynchronous function. Note that when reconfiguring tasks, there are parameters that let us configure the number of thread pools. Because of this implementation, the @async annotation is invalid for method calls in the same class! The reason is that when you are in the same class, the method call is made inside the class body and Spring cannot intercept the method call.

That takes it a step further, and Spring provides us with AOP, section-oriented functionality. The principle is similar to asynchronous annotations in that Spring starts the container by scanning for classes defined by the aspect. When these classes are injected, they’re also injected as proxy classes, and when you call these methods, you’re essentially calling the proxy class. Using a proxy class to execute a method corresponding to the parent class, Spring only needs to execute some code before and after the invocation to complete AOP implementation!

@Async, WebAsyncTask, Callable, DeferredResult

Different packages:

  • @ Async: org. Springframework. Scheduling. The annotation;
  • WebAsyncTask: org. Springframework. Web. Context. Request. Async;
  • Callable: Java. Util. Concurrent;
  • DeferredResult: org. Springframework. Web. Context. Request. Async;

We should feel a little bit different from the package we are in, for example @Async is in the Scheduling package, while WebAsyncTask and DeferredResult are for the Web (Spring MVC), Callable is used for concurrent processing.

For Callable, this is usually used for asynchronous requests with the Controller method, but it can also be used as a Runable replacement. Differs from normal methods in the way they return:

// Common method
public String aMethod(a){}// Compare the Callable method
public Callable<String>  aMethod(a){}Copy the code

WebAsyncTask is a wrapper around Callable that provides some handling of event callbacks, not much different in nature.

DeferredResult is used in a similar way to Callable, with an emphasis on communication across threads.

@async is also a way to replace Runable instead of creating our own thread. And it applies more broadly, not just to the Controller layer, but to any layer’s methods.

Of course, you can also analyze it in terms of return results, exception handling and so on, but I won’t go into that here.

6. Build your own wheels

We can also build our own wheels and implement asynchronous business execution through custom thread pool utility classes.

public abstract class ExecutorKit {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorKit.class);
    private static ThreadPoolExecutor threadPoolExecutor;

    public static void sleep(int timeout, TimeUnit timeUnit) {
        try {
            timeUnit.sleep((long)timeout);
        } catch (Exception var3) {
        }

    }

    public static void execute(Runnable cmd) {
        execute("default".true, cmd);
    }

    public static void execute(String name, Runnable cmd) {
        execute(name, true, cmd);
    }

    public static void execute(String name, boolean ignoreException, Runnable cmd) {
        threadPoolExecutor.execute(() -> {
            Transaction transaction = Cat.newTransaction("Executor-Tasks", name);

            try {
                cmd.run();
                transaction.setStatus("0");
            } catch (Exception var8) {
                logger.error("Asynchronously running [{}] task exception", name, var8);
                if (ignoreException) {
                    transaction.setStatus("0");
                } else{ transaction.setStatus(var8); }}finally{ transaction.complete(); }}); }private ExecutorKit(a) {}static {
        AtomicInteger threadIndex = new AtomicInteger(0);
        LinkedBlockingDeque<Runnable> blockingDeque = new LinkedBlockingDeque(51200);
        ThreadFactory threadFactory = (r) -> {
            Thread thread = new Thread(r);
            thread.setName(ExecutorKit.class.getSimpleName() + "-" + threadIndex.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        };
        threadPoolExecutor = new ThreadPoolExecutor(9.9.0L, TimeUnit.SECONDS, blockingDeque, threadFactory, (r, executor) -> {
            logger.error("Too many thread pool tasks, discard process"); }); }}Copy the code

Example:

ExecutorKit.execute("Log-Save-Decide", () -> logAndSaveDecide(userId, result, userVars));
Copy the code

7, summary

Through the above analysis, it should be for common asynchronous processing, when the familiar with the basic theory and practical examples, use method and matters needing attention, in the subsequent practice, still need to explore the implementation details, particular case is particular analysis, research and development process to avoid “takes doctrine”, to avoid stepping on pit, fear online every line of code.

Attached: Reference link:

1, zhuanlan.zhihu.com/p/146336940

2, zhuanlan.zhihu.com/p/99268715

3, juejin. Cn/post / 694312…

4, blog.csdn.net/wangdong567…