java spring boot webmvc webflux reactive aop reflect

[Java] An asynchronous callback to a synchronous return

introduce

Download function should be more common function, although a project may not appear much, but basically every project will have, and some download function is actually more complicated, not difficult, but trouble.

So combined with the previous download requirements, I wrote a library to simplify the implementation of the download function

Wouldn’t it be convenient if I said that now you can download any object with a single annotation

@Download(source = "classpath:/download/README.txt")
@GetMapping("/classpath")
public void classpath(a) {}@Download
@GetMapping("/file")
public File file(a) {
    return new File("/Users/Shared/README.txt");
}

@Download
@GetMapping("/http")
public String http(a) {
    return "http://127.0.0.1:8080/concept-download/image.jpg";
}
Copy the code

Doesn’t feel very different? Listen to one download request I’ve had

We have a platform that manages devices, and then each device will have a QR code image with an HTTP address stored in a field

Now we need to export the compressed package of two-dimensional code pictures of all devices, and the image name needs to add.png suffix to the device name. It is not difficult in terms of requirements, but it is really a bit troublesome

  • First I need to look up the list of devices
  • Then use the QR code address to download the image and write it to a local cache file
  • You need to determine whether the cache already exists before downloading
  • Concurrent downloads are required to improve performance
  • After all the images are downloaded
  • Regenerated into a compressed file
  • The input and output streams are then written to the response

Seeing that I had implemented nearly 200 lines of code, it was so long and stinky that a single download feature was such a hassle, I wondered if there was an easier way

My requirements were simple, I thought I would just provide the data to download, such as a file path, a file object, a string of text, an HTTP address, or a mashup of all the previous types, or even an instance of one of our custom classes, and I would leave the rest alone

Is the file path a file or a directory? String text needs to be written to a text file first, okay? How can HTTP resources be downloaded locally? How to compress multiple files? How do I end up in the response? I don’t want to spend my time dealing with this

For example, in my current requirement, I just need to return the list of devices, and I don’t have to worry about anything else

@download (filename = "qr code.zip")
@GetMapping("/download")
public List<Device> download(a) {
    return deviceService.all();
}

public class Device {

    // Device name
    private String name;

    // Device QR code
    // The annotation indicates that the HTTP address is data to be downloaded
    @SourceObject
    private String qrCodeUrl;

    // Annotations indicate the file name
    @SourceName
    public String getQrCodeName(a) {
        return name + ".png";
    }
    // omit the other attribute methods
}
Copy the code

Specify file names and file addresses by making some annotations (or implementing some interface) in the Device field, saving time and 199 lines of code

If you’re interested, Github has a more detailed introduction, including advanced usage and the overall architecture

Train of thought

Here is the main design idea of this library, as well as the pits encountered in the middle, you can continue to see if you are interested

In fact, based on the idea at the beginning, I think the function is not much complex, so I decided to open the liver

I just never thought it would be more complicated than I thought (that’s a story later)

basis

First of all, the whole library is based on responsive programming, but it is not fully responsive, just Mono

… Strange combination?

Why is it like this? A big reason is that I just reconstructed the InputStream mode into responsive mode due to the need to be compatible with WebMVC and WebFlux, so this combination appears

This was also the biggest pitfall I encountered, as I had already tuned through the entire servlet-based download process and thought about supporting WebFlux

As you all know, in WebMVC, we can get request and response objects through the RequestContextHolder, but in WebFlux, we can inject them in method parameters

@Download(source = "classpath:/download/README.txt")
@GetMapping("/classpath")
public void classpath(ServerHttpResponse response) {}Copy the code

With Spring’s built-in injection capabilities, we could have gotten the input to the response using AOP, but we always felt it was a bit redundant

Is there a way to get rid of unwanted inputs and get the response object at the same time


/** * sets the current request and response. * *@see ReactiveDownloadHolder
 */
public class ReactiveDownloadFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        ServerHttpResponse response = exchange.getResponse();
        return chain.filter(exchange)
                // Lower versions use subscriberContext.contextWrite(ctx -> ctx.put(ServerHttpRequest.class, request)) .contextWrite(ctx -> ctx.put(ServerHttpResponse.class, response)); }}/** * to get the current request and response. * *@see ReactiveDownloadFilter
 */

public class ReactiveDownloadHolder {

    public static Mono<ServerHttpRequest> getRequest(a) {
        // Lower versions use subscriberContext
        return Mono.deferContextual(contextView -> Mono.just(contextView.get(ServerHttpRequest.class)));
    }

    public static Mono<ServerHttpResponse> getResponse(a) {
        // Lower versions use subscriberContext
        returnMono.deferContextual(contextView -> Mono.just(contextView.get(ServerHttpResponse.class))); }}Copy the code

You can get the response object by adding a WebFilter, but the return value is Mono

Can you use mono.block () to block the object? No, because WebFlux is based on Netty’s non-blocking thread, calling this method will throw an exception

So there’s nothing left to do but refactor the previous code in a responsive manner

architecture

Let’s talk about the overall architecture

For a download request, we can break it down into several steps, taking downloading a compressed package of multiple files as an example

  • First we usually get multiple File paths or corresponding File objects

  • These files are then compressed to produce a compressed file

  • Finally, the compressed file is written to the response

But for the requirements I described above, it’s not a file path or object to start with, it’s an HTTP address, and then one more step is required to download the image before compression

We may need to add additional steps anywhere in the current step for a variety of requirements, so I refer to the implementation of the Spring Cloud Gateway interceptor chain

/** * Download the processor. * /
public interface DownloadHandler extends OrderProvider {

    /** * Execute processing. * *@param context {@link DownloadContext}
     * @param chain   {@link DownloadHandlerChain}
     */
    Mono<Void> handle(DownloadContext context, DownloadHandlerChain chain);
}

/** * Download the processing chain. * /
public interface DownloadHandlerChain {

    /** * schedule the next download handler. * *@param context {@link DownloadContext}
     */
    Mono<Void> next(DownloadContext context);
}
Copy the code

Each step can then implement a separate DownloadHandler, which can be added in any combination between steps

Download context

Based on this, a DownloadContext is used throughout the process to facilitate sharing and passing intermediate results between steps

DownloadContext is also provided and a DownloadContextFactory can be used to customize the context

At the same time provides DownloadContextInitializer and DownloadContextDestroyer used in context initialization and destruction of extended their own logic

Download Type support

The types of data we need to download are variable, such as files, HTTP addresses, and custom class instances as I wanted earlier

So I abstracted all the download objects as Source, representing a download Source, so that the file could be implemented as FileSource, HTTP address could be implemented as HttpSource, and then created by matching the corresponding SourceFactory

For example FileSourceFactory can match File and create FileSource, HttpSourceFactory can match http:// prefix and create HttpSource

/ * * * {@linkThe Source} factory. * /
public interface SourceFactory extends OrderProvider {

    /** * Whether the raw data objects that need to be downloaded are supported. * *@paramSource The original data object to download *@param context {@link DownloadContext}
     * @returnReturns true */ if supported
    boolean support(Object source, DownloadContext context);

    /** * create. * *@paramSource The original data object to download *@param context {@link DownloadContext}
     * @returnCreate a {@link Source}
     */
    Source create(Object source, DownloadContext context);
}
Copy the code

So what about the support for our custom class? I mentioned that we can annotate the class or implement a specific interface, so I will use the way I implement annotations to describe it

In fact, the logic is very simple, as long as you can skillfully use reflection is completely no problem, let’s look at the usage

@download (filename = "qr code.zip")
@GetMapping("/download")
public List<Device> download(a) {
    return deviceService.all();
}

public class Device {

    // Device name
    private String name;

    // Device QR code
    // The annotation indicates that the HTTP address is data to be downloaded
    @SourceObject
    private String qrCodeUrl;

    // Annotations indicate the file name
    @SourceName
    public String getQrCodeName(a) {
        return name + ".png";
    }
    // omit the other attribute methods
}
Copy the code

First I define an annotation @sourcemodel on the class to indicate that it needs to be resolved. Then I define an annotation @SourceObject on the field (or method) that needs to be downloaded so that we can get the value of that field (or method) via reflection

The corresponding Source can be created based on the currently supported SourceFactory, then the @sourcename is used to specify the name, and the method (or field) value can also be reflected and set to the created Source by reflection

This gives you the flexibility to support any object type

Concurrent load

For network resources such as HTTP, we need to load them concurrently into local memory or cache files to improve our processing efficiency

Of course I could have executed a thread pool directly, but the concurrency requirements and allocation of resources vary from machine to machine, project to project, and even requirement

So I provided SourceLoader to support custom load logic, you can even use part of the thread pool, part of the coroutine, and the rest of the load

/ * * * {@linkSource} loader. * *@see DefaultSourceLoader
 * @see SchedulerSourceLoader
 */
public interface SourceLoader {

    /** * Perform load. * *@param source  {@link Source}
     * @param context {@link DownloadContext}
     * @returnLoaded {@link Source}
     */
    Mono<Source> load(Source source, DownloadContext context);
}
Copy the code
The compression

Once we have finished loading, we can perform Compression. Again, I define a class Compression as an abstraction for the Compression object

Typically, we create a cache file locally and then write the compressed data to the cache file

However, I hate to configure various paths in the configuration file every time, so I support memory compression when compression, of course, if the file is large or simply generate a cache file

There is also a fully customizable SourceCompressor interface for compression formats, so you don’t have a problem implementing a compression protocol yourself

/ * * * {@linkSource} compressor. * *@see ZipSourceCompressor
 */
public interface SourceCompressor extends OrderProvider {

    /** * get the compressed format. * *@returnCompressed format */
    String getFormat(a);

    /** * Checks whether the compression format is supported. * *@paramFormat Compressed format *@param context {@link DownloadContext}
     * @returnReturns true */ if supported
    default boolean support(String format, DownloadContext context) {
        return format.equalsIgnoreCase(getFormat());
    }

    /** * this method is called to perform compression if the corresponding format is supported. * *@param source  {@link Source}
     * @param writer  {@link DownloadWriter}
     * @param context {@link DownloadContext}
     * @return {@link Compression}
     */
    Compression compress(Source source, DownloadWriter writer, DownloadContext context);
}

Copy the code
In response to write

I’ve abstracted the response into a DownloadResponse, which is primarily compatible with HttpServletResponse and ServerHttpResponse

Here’s how WebMVC and WebFlux write responses

//HttpServletResponse
response.getOutputStream().write(byte b[], int off, int len);

//ServerHttpResponse
response.writeWith(Publisher<? extends DataBuffer> body);
Copy the code

It was compatible with my head pain, but it worked out

/** * hold {@linkServerHttpResponse} {@linkDownloadResponse}, for WebFlux. * /
@Getter
public class ReactiveDownloadResponse implements DownloadResponse {

    private final ServerHttpResponse response;

    private OutputStream os;

    private Mono<Void> mono;

    public ReactiveDownloadResponse(ServerHttpResponse response) {
        this.response = response;
    }

    @Override
    public Mono<Void> write(Consumer<OutputStream> consumer) {
        if (os == null) {
            mono = response.writeWith(Flux.create(fluxSink -> {
                try {
                    os = new FluxSinkOutputStream(fluxSink, response);
                    consumer.accept(os);
                } catch(Throwable e) { fluxSink.error(e); }})); }else {
            consumer.accept(os);
        }
        return mono;
    }

    @SneakyThrows
    @Override
    public void flush(a) {
        if(os ! =null) { os.flush(); }}@AllArgsConstructor
    public static class FluxSinkOutputStream extends OutputStream {

        private FluxSink<DataBuffer> fluxSink;

        private ServerHttpResponse response;

        @Override
        public void write(byte[] b) throws IOException {
            writeSink(b);
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            byte[] bytes = new byte[len];
            System.arraycopy(b, off, bytes, 0, len);
            writeSink(bytes);
        }

        @Override
        public void write(int b) throws IOException {
            writeSink((byte) b);
        }

        @Override
        public void flush(a) {
            fluxSink.complete();
        }

        public void writeSink(byte. bytes) {
            DataBuffer buffer = response.bufferFactory().wrap(bytes);
            fluxSink.next(buffer);
            // There may be a problem here, but there is currently no data that needs to be releasedDataBufferUtils.release(buffer); }}}Copy the code

As long as the last byte[] is written to each other, it can be converted to each other, but it may be a bit more troublesome, requiring interface callbacks

Write FluxSink as an OutputStream, convert byte[] to DataBuffer, call next, and flush to complete. Perfect

Response writing is actually processing input/output streams. Normally, we define a byte[] to cache the read data, so I don’t fix the size of this cache. Instead, DownloadWriter is available to customize input/output streams, including those with specified encoding or Range headers

/** *@linkInputStream} and {@linkOutputStream} writer. * /
public interface DownloadWriter extends OrderProvider {

    /** * Whether the writer supports writing. * *@param resource {@link Resource}
     * @param range    {@link Range}
     * @param context  {@link DownloadContext}
     * @returnReturns true */ if supported
    boolean support(Resource resource, Range range, DownloadContext context);

    /** * Perform write. * *@param is      {@link InputStream}
     * @param os      {@link OutputStream}
     * @param range   {@link Range}
     * @param charset {@link Charset}
     * @paramLength Indicates the total size. The value may be NULL */
    default void write(InputStream is, OutputStream os, Range range, Charset charset, Long length) {
        write(is, os, range, charset, length, null);
    }

    /** * Perform write. * *@param is       {@link InputStream}
     * @param os       {@link OutputStream}
     * @param range    {@link Range}
     * @param charset  {@link Charset}
     * @paramLength Indicates the total size, which may be null *@paramCallback calls back to the current progress and the size of the growth */
    void write(InputStream is, OutputStream os, Range range, Charset charset, Long length, Callback callback);

    /** * schedule callback. * /
    interface Callback {

        /** * callback progress. * *@paramCurrent Current value *@paramIncrease */
        void onWrite(long current, long increase); }}Copy the code
The event

When I implemented the whole download process, I found that the whole logic was a bit complicated, so I had to find a way to monitor the whole download process

At the beginning, I defined several listeners to call back and forth, but it was not easy to use. Firstly, our entire architecture was designed to be very flexible and extensible, and the defined listener types were few and not easy to expand

When we later added other processes and steps, we had to add new classes of listeners or add methods on top of the original listener class, which was cumbersome

So I came up with the idea of using events to scale more flexibly and defined DownloadEventPublisher for publishing events and DownloadEventListener for listening on events, as well as supporting Spring’s event listening mode

The log

Based on the above event mode, I implemented several download logs on this basis

  • Logs for each process
  • Load progress updates, compress progress updates, and respond to logs that write progress updates
  • Log time spent

These logs print the information of the whole download process in detail and also help me find a lot of bugs

Other pit

Initialization and destruction of the initial context correspond to one step at the beginning and one step at the end respectively, but after I write the response in WebFlux, I find that the destruction of the context does not take place

So I followed the Spring source code and found that the write method returns mono.empty (), which means that the next method is never called after the response has been written, so steps after the response has been written are never called

Finally, context initialization and destruction are separated out, and the destruction method is called at doAfterTerminate

The end of the

This is basically the content, but for the responsive part of the content is not very thorough, and some of the operators are not very good at using, but there are still a lot of advanced use

If you are interested, you can hold a show, and we will update other libraries gradually


Other articles

[Java] An asynchronous callback to a synchronous return