Asyncframework implements class Spring framework @async annotation functionality

Asyncframework is an asynchronous framework implemented by the author in 2019, which is like @async annotation function of Spring framework. The method can be executed asynchronously only by adding an @AsyncFunction annotation on the interface, and it has been published on the author’s Github.

The @AsyncFunction annotation of the AsyncFramework supports not only methods that have no return value, but also methods that have a return value, as the Spring framework does.

But unlike the Spring Framework implementation, the AsyncFramework framework is implemented entirely based on dynamic bytecode technology and supports use in non-Spring projects, which is why I wrote it in the first place.

If you are also interested in bytecodes, I highly recommend reading the source code for the framework. It is all in essence, with more than a dozen classes covering the use of design patterns, bytecodes, and framework design ideas. It is also helpful to understand the implementation of Spring’s @async annotations.

Let me first introduce you to how to use asyncFramework, and then introduce how asyncFramework is implemented.

How do I use asyncFramework

Step 1: Add dependencies to your Java project

<dependency> <groupId>com.github.wujiuye</groupId> <artifactId>asyncframework</artifactId> < version > 1.2.0 - RELEASE < / version > < / dependency >Copy the code

Step 2: Define the interface and write the implementation class of the interface

/ * * *@author wujiuye
 * @version1.0 on 2019/11/24 * /
public interface AsyncMessageSubscribe {
    /** * Asynchronous has no return value **@param queue
     */
    @AsyncFunction
    void pullMessage(String queue);
    /** * Asynchronous with return value **@param s1
     * @param s2
     * @return* /
    @AsyncFunction
    AsyncResult<String> doAction(String s1, String s2);
}
Copy the code

Write the implementation class:

public class AsyncMessageSubscribe implements AsyncMessageSubscribe {
    @Override
    public void pullMessage(String queue) {
        System.out.println(queue + ", current thread name:" + Thread.currentThread().getName());
    }
    @Override
    public AsyncResult<String> doAction(String s1, String s2) {
        return new AsyncResult<>("hello wujiuye! current thread name:"+ Thread.currentThread().getName()); }}Copy the code

Step 3: Configure the global thread pool and create proxy objects using AsyncProxyFactory

In calling AsyncProxyFactory getInterfaceImplSupporAsync method to create the proxy class instance, you need to specify the asynchronous execution which thread pool, as well as the interface implementation class.

public class AsmProxyTest {
   // Configure a global thread pool
   static ExecutorService executorService = Executors.newFixedThreadPool(2);
   @Test
   public void testAutoProxyAsync(a) throws Exception {
         AsyncMessageSubscribe proxy = AsmProxyFactory.getInterfaceImplSupporAsync(
                        AsyncMessageSubscribe.class, impl, executorService);
         Async with no return value
         proxy.pullMessage("wujiuye");
         // Async with return value
         AsyncResult<String> asyncResult = proxy.doAction("sssss"."ddd"); System.out.println(asyncResult.get()); }}Copy the code

You might ask, well, if I have to create a proxy class to call it, wouldn’t it be easier if I just created a new Runnable and put it into the thread pool?

This is true, but not if proxy objects are created automatically through package scanning, which Spring does through BeanPostProcess. Also, when we want to change asynchrony to synchronous, we only need to remove the annotations, and when we want to change asynchrony to synchronous, we only need to add annotations, not change the code.

Implementation principle of asynchronous no return value

We take the realization of asynchronous message subscription as an example to introduce how to switch the subscription message method from synchronous to asynchronous through static proxy implementation without using any framework, and this is the implementation principle of AsyncFramework, asyncFramework just changes the static proxy to dynamic proxy.

Define the message subscription interface:

public interface MessageSubscribeTemplate {
      <T> void subscribeMessage(MessageQueueEnum messageQueue, OnReceiveMessageCallback
       
         onReceiveMessageCallback, Class
        
          tagClass)
        
       ;
}
Copy the code

Message subscription interface implementation class:

public class AwsSqsMessageConsumer implements MessageSubscribeTemplate {    
    @Override
    public <T> void subscribeMessage(MessageQueueEnum messageQueue, OnReceiveMessageCallback
       
         onReceiveMessageCallback, Class
        
          tagClass)
        
       {
         // Write the implementation logic}}Copy the code

Tip: Why is message subscription abstracted as an interface? At the time, we often switched MQ frameworks, starting with RocketMQ and then switching to AWS SQS due to cost.

The following code can be used to implement synchronous and asynchronous message subscription through a static proxy.

public class MessageSubscribeTemplateProxy implements MessageSubscribeTemplate {
    private ExecutorService executorService; 
    private MessageSubscribeTemplate target;
  
    public MessageSubscribeTemplateProxy(ExecutorService executorService, MessageSubscribeTemplate target) {
        this.target = target;
      	this.executorService = executorService;
    }
  
    @Override
    public void subscribeMessage(MessageQueueEnum var1, OnReceiveMessageCallback var2, Class var3) {
        // Implement asynchronous call logic, which is put into the thread pool to execute
         executorService.execute(()->this.target.subscribeMessage(var1, var2, var3)); }}Copy the code

Asyncframework framework is to implement dynamic writing MessageSubscribeTemplateProxy proxy class, in order to eliminate the synchronous cutting asynchronous cuts synchronous or asynchronous modify MessageSubscribeTemplateProxy proxy class.

With the AsyncFramework, we just write the implementation class of the message subscribe template. We don’t have to be synchronous or asynchronous. When we want the subscribe method to execute asynchronously, we add the @AsyncSubscribe annotation to the method. It also supports multiple methods of the interface, and adding annotations to some methods will only enable asynchronous execution of those methods.

Implementation principle of asynchronous band return value

The author encountered two major challenges in implementing this feature to support asynchronous execution of methods with return values:

  • Difficulty 1: How can a method with a return value implement asynchrony?

  • Difficulty 2: How to write bytecode to implement the proxy class of the generic interface?

In a Spring project, if you want to add @async annotation to a method that returns a value, you need the method return type AsyncResult

. I also looked at the spring source code and found that AsyncResult is a Future.

The idea is there, but it can’t be realized just by relying on Future.

As we know, the Submit method of the ExecutorService supports submitting a task with a Callable return value, and the Submit method returns a Future whose GET method will block until the task completes.

So if we call the Future’s get method in the proxy class method to wait for the result, and then wrap the result as AsyncResult to return, we are not executing asynchronously, but synchronously.

The problem is that the proxy class must return an AsyncResult as soon as the AsyncResult method is submitted to the thread pool, and ensure that when the AsyncResult get method is called externally, the result is the result returned after the final method is executed.

The author came up with the following methods: Once the AsyncResult class commits an asynchronous method to the thread pool, it returns an AsyncResult proxy object that represents the Future’s GET method. When the AsyncResult proxy object’s GET method is called, the Future’s GET method is called.

Implement AsyncResult first, which is a non-blocking Future because there is no blocking required.

public class AsyncResult<T> implements Future<T> {
    private T result;
    public AsyncResult(T result) {
        this.result = result;
    }    
  
    @Override
    public T get(a) throws InterruptedException, ExecutionException {
        return result;
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return get();
    }

    /** ** called by bytecode **@paramFuture is submitted to the thread pool to execute the future * returned@param <T>
     * @return* /
    public static <T> AsyncResult<T> newAsyncResultProxy(final Future<AsyncResult<T>> future) {
        return new AsyncResult<T>(null) {
            @Override
            public T get(a) throws InterruptedException, ExecutionException {
                AsyncResult<T> asyncResult = future.get();
                returnasyncResult.get(); }}; }}Copy the code

The newAsyncResultProxy method is the most critical step in the whole asynchronous implementation. This method is called to the proxy object generated by bytecode, and the proxy method actually returns the AsyncResult returned by newAsyncResultProxy method. When the AsyncResult get method is called externally, it actually calls the Future GET method returned by the ExecutorService submit method. The implementation that blocks the result is shielded from the consumer.

Again, take the message subscription example:

/ / interface
public interface AsyncMessageSubscribe {
    @AsyncFunction
    AsyncResult<String> doAction(String s1, String s2);
}

// Interface implementation class
private AsyncMessageSubscribe impl = new AsyncMessageSubscribe() {
    @Override
    public AsyncResult<String> doAction(String s1, String s2) {
        return new AsyncResult<>("current thread name:"+ Thread.currentThread().getName()); }};Copy the code

The asyncframework framework uses dynamic bytecode technology to generate the Callable code that submits the AsyncMessageSubscribe#doAction method to the thread pool for execution.

public static class AsyncMessageSubscribe_doActionCallable implements Callable<AsyncResult<String>> {
    private AsyncMessageSubscribe target;
    private String param1;
    private String param2;

    public AsyncMessageSubscribe_doActionCallable(AsyncMessageSubscribe var1, String var2, String var3) {
        this.target = var1;
        this.param1 = var2;
        this.param2 = var3;
    }

    public AsyncResult<String> call(a) throws Exception {
        return this.target.doAction(this.param1, this.param2); }}Copy the code

The asyncFramework framework generates the following dynamic proxy classes for AsyncMessageSubscribe using dynamic bytecode technology.

public class AsyncMessageSubscribeProxy implements AsyncMessageSubscribe {
    private ExecutorService executorService; 
    private AsyncMessageSubscribe target;
  
    public MessageSubscribeTemplateProxy(ExecutorService executorService, MessageSubscribeTemplate target) {
        this.executorService = executorService;
        this.target = target;
    }
    
    public AsyncResult<String> doAction(String s1, String s2) {
         AsyncMessageSubscribe_doActionCallable callable = new AsyncMessageSubscribe_doActionCallable(target, "wujiuye"."hello");
         Future result = executorService.submit(callable);
         AsyncResult<String> asyncResult = AsyncResult.newAsyncResultProxy(result);
         returnasyncResult; }}Copy the code

Dynamic bytecode in implementing asyncFramework implements the generic interface pit

The source code for the asyncFramework to dynamically implement the asynchronous methods of the proxy class is in the Class FutureFunctionHandler.

public class FutureFunctionHandler implements AsyncFunctionHandler{
        /** * asyncMethod returns a value of type Future for processing **@paramClassWriter class overwriter *@paramInterfaceClass interface *@paramAsyncMethod Asynchronous method *@paramProxyObjClass interface implementation class *@paramType of the executorServiceClass thread pool */
        @Override
        public void doOverrideAsyncFunc(ClassWriter classWriter, Class
        interfaceClass, Method asyncMethod, Class
        proxyObjClass, Class
        executorServiceClass) {...// invoke submit callable
            methodVisitor.visitVarInsn(ALOAD, 0);
            methodVisitor.visitFieldInsn(GETFIELD, ByteCodeUtils.getProxyClassName(proxyObjClass), "executorService", Type.getDescriptor(executorServiceClass));
            methodVisitor.visitVarInsn(ALOAD, index);
            if(! executorServiceClass.isInterface()) { methodVisitor.visitMethodInsn(INVOKEVIRTUAL, executorServiceClass.getName().replace("."."/"),
                        "submit", ByteCodeUtils.getFuncDesc(Future.class, Callable.class), false);
            } else {
                methodVisitor.visitMethodInsn(INVOKEINTERFACE, executorServiceClass.getName().replace("."."/"),
                        "submit", ByteCodeUtils.getFuncDesc(Future.class, Callable.class), true);
            }
            // Store the return value on the operand stack
            methodVisitor.visitVarInsn(ASTORE, ++index);

            // Add another layer of proxy to block and wait for the external masking thread
            methodVisitor.visitVarInsn(ALOAD, index);
            methodVisitor.visitMethodInsn(INVOKESTATIC, AsyncResult.class.getName().replace("."."/"),
                    "newAsyncResultProxy", ByteCodeUtils.getFuncDesc(AsyncResult.class, Future.class),
                    false); methodVisitor.visitInsn(ARETURN); . }}Copy the code

When the thread pool calls AsyncMessageSubscribe_doActionCallable, it looks for a call method whose method descriptor is () ljava.lang.object; . Because Callable is a generic interface.

Instead, change the signature of the implementation class and the signature of the implementation call method to the following.

Class signature: Ljava/lang/Object; Ljava/util/concurrent/Callable<Lcom/wujiuye/asyncframework/handler/async/AsyncResult<Ljava/lang/String; >; >;" Call method signature: () Lcom/wujiuye asyncframework/handler/async/AsyncResult < Ljava/lang/String; >;Copy the code

Because the compiled descriptor for the generic

is ljava.lang.object; .

Such as AsyncResult generic class. (Select part)

public class AsyncResult<T> implements Future<T> {

    private T result;
    
    @Override
    public T get(a) throws InterruptedException, ExecutionException {
        returnresult; }}Copy the code

AsyncResult Bytecode information compiled by a generic class. (Select part)

public class com.wujiuye.asyncframework.handler.async.AsyncResult<T> implements java.util.concurrent.Future<T> { private  T result; descriptor: Ljava/lang/Object; public T get() throws java.lang.InterruptedException, java.util.concurrent.ExecutionException; descriptor: ()Ljava/lang/Object; Code: 0: aload_0 1: getfield #2 // Field result:Ljava/lang/Object; 4: areturnCopy the code

The descriptor of type T is Ljava/lang/Object; And in the get method, the type descriptor specified by the getField directive is also Ljava/lang/Object; .

The Callable interface is also generic. The method descriptor of the compiled call method is () ljava.lang.object; .

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}
Copy the code

Therefore, if the Callable interface is implemented through bytecode, the call method should not be set to a method signature. Setting a method signature means changing the method descriptor as well, which will cause the call method in the thread pool to throw an abstract method call error. The reason is that the corresponding Call method cannot be found in the Class of the Callable object according to the call method descriptor of the Callable interface.


A: Why would you want to implement such A framework when Spring already provides such functionality?

Q: Because I needed it when I was writing components, but I didn’t want to rely on Spring to use it in the project, which would be bloated. Secondly, it is also because I like to toss about and want to realize my ideas.

The AsyncFramework can replace Spring’s @async usage by packaging a starter package that relies on spring’s BeanPostProcess for seamless integration. I don’t want to make wheels. Asyncframework is recommended for non-Spring projects.