I. Introduction to WebFlux

WebFlux is a new reactive Web framework introduced in Spring Framework5.0. Reactive Streams specification implemented through the Reactor project, a fully asynchronous and non-blocking framework. It will not speed up the execution of the program, but in the case of high concurrency, asynchronous IO can handle higher throughput with a small number of stable threads, avoiding the thread accumulation caused by file IO/ network IO blocking.

1.1 WebFlux Features

WebFlux has the following features:

  • Asynchronous non-blocking – Take an upload example. Compared to Spring MVC, which is a synchronous blocking IO model, Spring WebFlux handles it like this: Thread found file data no good, you do other things first, when the file ready to notify the thread to handle (here is the input non-blocking mode), when the receiver and written to disk (this step can also use asynchronous non-blocking mode) to notify the thread to process the response after (here is the output non-blocking mode).

  • Reactive functional Programming – Spring Flux uses the Reactor Stream asynchronous, non-blocking Push pattern as opposed to the Java8 Stream synchronous, blocking Pull pattern. Written in Java Lambda style, it is close to natural language and easy to understand.

  • Servlet-free – can run in traditional Servlet containers (version 3.1+), as well as Netty, Undertow, and NIO containers.

1.2 Design objectives of WebFlux

  • Suitable for high concurrency

  • High throughput

  • scalability

2. Introduction to Spring WebFlux components

2.1 an HTTPHandler

A simple abstraction for handling requests and responses, adapted to the APIS of different HTTP service containers.

2.2 WebHandler

An abstract interface for processing business requests that defines a set of processing behaviors. The related core implementation classes are as follows;

2.3 DispatcherHandler

The master controller for request processing, where the actual work is handled by multiple configurable components.

WebFlux is compatible with Spring MVC programming and development mode based on annotations such as @Controller and @RequestMapping, which can achieve smooth switching.

2.4 the Functional Endpoints

This is a lightweight functional programming model. Is an alternative to the @Controller, @RequestMapping, etc annotated programming model, providing a functional API for creating Routers, handlers, and filters. The call processing components are as follows:

Simple RouterFuntion route registration and business process:

@Bean
public RouterFunction<ServerResponse> initRouterFunction(a) {
    return RouterFunctions.route()
        .GET("/hello/{name}", serverRequest -> {
            String name = serverRequest.pathVariable("name");
            return ServerResponse.ok().bodyValue(name);
        }).build();
}
Copy the code

Request forwarding process:

2.5 Reactive Stream

This is an important component, WebFlux uses Reactor to rewrite traditional Spring MVC logic. Flux and Mono are two key concepts in Reactor. These two concepts are needed to understand how WebFlux works.

Both Flux and Mono implement the Publisher interface of Reactor, which is a time Publisher and provides a subscription interface for consumers. When an event occurs, Flux or Mono will notify consumers of the event by calling back the corresponding method. This is known as the responsive programming model.

Mono workflow flowchart

Only after a single result is sent.

Flux Flow Chart

Send zero or more, possibly infinite, results before completing.

For streaming media types: application/stream+ JSON or Text /event-stream, you can let the caller get the server scrolling results. For non-streaming types: Application/JSON WebFlux By default the JSON encoder will flush serialized JSON to the network at once, which does not mean blocking, because the resulting Flux<? > is written to the network in a reactive manner, without any obstacles.Copy the code

3. Working principle of WebFlux

3.1 Assembly process of components

Process related source code parsing -WebFluxAutoConfiguration

@Configuration
// Conditional assembly is only loaded when the activated type is REACTIVE
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
// Load only if WebFluxConfigurer instance exists
@ConditionalOnClass(WebFluxConfigurer.class)
/ / load when no WebFluxConfigurationSupport instance
@ConditionalOnMissingBean({ WebFluxConfigurationSupport.class })
// After the assembly
@AutoConfigureAfter({ ReactiveWebServerFactoryAutoConfiguration.class, CodecsAutoConfiguration.class, ValidationAutoConfiguration.class })
// Automatic assembly sequence
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
public class WebFluxAutoConfiguration {
   @Configuration
   @EnableConfigurationProperties({ ResourceProperties.class, WebFluxProperties.class })
   / / interface programming Prior to assembly WebFluxConfig EnableWebFluxConfiguration assembly
   @Import({ EnableWebFluxConfiguration.class })
   public static class WebFluxConfig implements WebFluxConfigurer {
      // Hide part of the source code
     /**
     * Configuration equivalent to {@code @EnableWebFlux}. * /
   } 
    @Configuration
    public static class EnableWebFluxConfiguration
            extends DelegatingWebFluxConfiguration {
        // Hide some code
    }
    @Configuration
    @ConditionalOnEnabledResourceChain
    static class ResourceChainCustomizerConfiguration {
        // Hide some code
    }
    private static class ResourceChainResourceHandlerRegistrationCustomizer
            implements ResourceHandlerRegistrationCustomizer {
        // Hide some code
    }
Copy the code

When WebFluxAutoConfiguration automatic assembly automatic assembly EnableWebFluxConfiguration first And EnableWebFluxConfiguration – > DelegatingWebFluxConfiguration – > WebFluxConfigurationSupport.

Eventually WebFluxConfigurationSupport configuration not only DispatcherHandler is also equipped with many other WebFlux core components including Exception handler WebExceptionHandler, mapping handler HandlerMapping, request adapter HandlerAdapter, response handler HandlerResultHandler, etc.

The initialization process of DispatcherHandler creation is as follows:

public class WebFluxConfigurationSupport implements ApplicationContextAware {
   // Hide some code
   @Nullable
   public final ApplicationContext getApplicationContext(a) {
      return this.applicationContext;
   }
	// Hide some code
   @Bean
   public DispatcherHandler webHandler(a) {
      return new DispatcherHandler();
   }
Copy the code
public class DispatcherHandler implements WebHandler.ApplicationContextAware {
   @Nullable
   private List<HandlerMapping> handlerMappings;
   @Nullable
   private List<HandlerAdapter> handlerAdapters;
   @Nullable
   private List<HandlerResultHandler> resultHandlers;
   	@Override
   public void setApplicationContext(ApplicationContext applicationContext) {
		initStrategies(applicationContext);
   }
   protected void initStrategies(ApplicationContext context) {
   	  / / injection handlerMappings
      Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerMapping.class, true.false);

      ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
      AnnotationAwareOrderComparator.sort(mappings);
      this.handlerMappings = Collections.unmodifiableList(mappings);
 	  / / injection handlerAdapters
      Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerAdapter.class, true.false);

      this.handlerAdapters = new ArrayList<>(adapterBeans.values());
      AnnotationAwareOrderComparator.sort(this.handlerAdapters);
      / / injection resultHandlers
      Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerResultHandler.class, true.false);

      this.resultHandlers = new ArrayList<>(beans.values());
      AnnotationAwareOrderComparator.sort(this.resultHandlers);
   }
Copy the code

Process related source code parsing – * * * * HTTPHandlerAutoConfiguration

Since the WebFlux core component loading process was described above, when are these components injected into the corresponding container context? It is actually injected when the container context is refreshed.

org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#onRefresh

public class ReactiveWebServerApplicationContext extends GenericReactiveWebApplicationContext
      implements ConfigurableWebServerApplicationContext {
   @Override
   protected void onRefresh(a) {
      super.onRefresh();
      try {
         createWebServer();
      }
      catch (Throwable ex) {
         throw new ApplicationContextException("Unable to start reactive web server", ex); }}private void createWebServer(a) {
      WebServerManager serverManager = this.serverManager;
      if (serverManager == null) {
         String webServerFactoryBeanName = getWebServerFactoryBeanName();
         ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName);
         boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit();
         // Inject httpHandler when creating container management here
         this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit);
         getBeanFactory().registerSingleton("webServerGracefulShutdown".new WebServerGracefulShutdownLifecycle(this.serverManager));
         // Register a Web container startup service class that inherits SmartLifecycle
         getBeanFactory().registerSingleton("webServerStartStop".new WebServerStartStopLifecycle(this.serverManager));
      }
      initPropertySources();
   }
   protected HttpHandler getHttpHandler(a) {
		String[] beanNames = getBeanFactory().getBeanNamesForType(HttpHandler.class);
		if (beanNames.length == 0) {
			throw new ApplicationContextException(
					"Unable to start ReactiveWebApplicationContext due to missing HttpHandler bean.");
		}
		if (beanNames.length > 1) {
			throw new ApplicationContextException(
					"Unable to start ReactiveWebApplicationContext due to multiple HttpHandler beans : "
							+ StringUtils.arrayToCommaDelimitedString(beanNames));
		}
        // The container context gets httpHandler
		return getBeanFactory().getBean(beanNames[0], HttpHandler.class);
	}
Copy the code

And is this an HTTPHandler by HTTPHandlerAutoConfiguration assembly.

@Configuration
@ConditionalOnClass({ DispatcherHandler.class, HttpHandler.class })
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
@ConditionalOnMissingBean(HttpHandler.class)
@AutoConfigureAfter({ WebFluxAutoConfiguration.class })
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
public class HttpHandlerAutoConfiguration {
   @Configuration
   public static class AnnotationConfig {
      private ApplicationContext applicationContext;
      public AnnotationConfig(ApplicationContext applicationContext) {
         this.applicationContext = applicationContext;
      }
      / / build WebHandler
      @Bean
      public HttpHandler httpHandler(a) {
         return WebHttpHandlerBuilder.applicationContext(this.applicationContext) .build(); }}Copy the code

Process-related source code parsing – Web container

Org. Springframework. Boot. Web. Reactive. Context. ReactiveWebServerApplicationContext# createWebServer. When the WebServerManager container Manager is created, the corresponding Web container instance is taken and the HTTPHandler for the response is injected.

class WebServerManager {
   private final ReactiveWebServerApplicationContext applicationContext;
   private final DelayedInitializationHttpHandler handler;
   private final WebServer webServer;
   WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory,
         Supplier<HttpHandler> handlerSupplier, boolean lazyInit) {
      this.applicationContext = applicationContext;
      Assert.notNull(factory, "Factory must not be null");
      this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit);
      this.webServer = factory.getWebServer(this.handler); }}Copy the code

Using a Tomcat container as an example, the TomcatHTTPHandlerAdapter is used to connect Servlet requests to HTTPHandler components.

public class TomcatReactiveWebServerFactory extends AbstractReactiveWebServerFactory implements ConfigurableTomcatWebServerFactory {
    // Hide some code
    @Override
    public WebServer getWebServer(HttpHandler httpHandler) {
        if (this.disableMBeanRegistry) {
            Registry.disableRegistry();
        }
        Tomcat tomcat = new Tomcat();
        File baseDir = (this.baseDirectory ! =null)?this.baseDirectory : createTempDir("tomcat");
        tomcat.setBaseDir(baseDir.getAbsolutePath());
        Connector connector = new Connector(this.protocol);
        connector.setThrowOnFailure(true);
        tomcat.getService().addConnector(connector);
        customizeConnector(connector);
        tomcat.setConnector(connector);
        tomcat.getHost().setAutoDeploy(false);
        configureEngine(tomcat.getEngine());
        for (Connector additionalConnector : this.additionalTomcatConnectors) {
            tomcat.getService().addConnector(additionalConnector);
        }
        TomcatHttpHandlerAdapter servlet = new TomcatHttpHandlerAdapter(httpHandler);
        prepareContext(tomcat.getHost(), servlet);
        returngetTomcatWebServer(tomcat); }}Copy the code

Finally after the Spring container loading by SmartLifecycle class WebServerStartStopLifecycle to start a Web container.

See WebServerStartStopLifecycle registration process: org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer

3.2 Complete request processing process

(From: blog.csdn.net)

The figure shows an invocation link for HTTP request processing. It is written as a Reactor Stream and only the final call to Subscirbe actually executes the business logic. Avoid blocking logic in controllers when developing based on WebFlux. The following examples show the difference in request handling between Spring MVC and Spring Webflux.

@RestControllerpublic
class TestController {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @GetMapping("sync")
    public String sync(a) {
        logger.info("sync method start");
        String result = this.execute();
        logger.info("sync method end");
        return result;
    }
    @GetMapping("async/mono")
    public Mono<String> asyncMono(a) {
        logger.info("async method start");
        Mono<String> result = Mono.fromSupplier(this::execute);
        logger.info("async method end");
        return result;
    }
    private String execute(a) {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello"; }}Copy the code

Log output

2021-05-31 20:14:52.384  INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController  : sync method start
2021-05-31 20:14:57.385  INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController  : sync method end
2021-05-31 20:15:09.659  INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController  : async method start
2021-05-31 20:15:09.660  INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController  : async method end
Copy the code

As you can see from the example above, sync() blocks the request, while asyncMono() does not block the request and returns immediately. The asyncMono() method’s specific business logic is wrapped in the Supplier in the Mono. After execute processes the service logic, it responds to the browser in callback mode.

Storage support

Once the control layer uses Spring Webflux, both the security authentication layer and the data access layer must use Reactive apis to achieve asynchronous non-blocking.

NOSQL Database

  • Directing (org. Springframework. The boot: spring – the boot – starter – data – directing – reactive).

  • Redis (org. Springframework. The boot: spring – the boot – starter – data – Redis – reactive).

Relational Database

  • H2 (io.r2dbc:r2dbc-h2)

  • MariaDB (org.mariadb:r2dbc-mariadb)

  • Microsoft SQL Server (io.r2dbc:r2dbc-mssql)

  • MySQL (dev.miku:r2dbc-mysql)

  • jasync-sql MySQL (com.github.jasync-sql:jasync-r2dbc-mysql)

  • Postgres (io.r2dbc:r2dbc-postgresql)

  • Oracle (com.oracle.database.r2dbc:oracle-r2dbc)

Five, the summary

There are a lot of Spring MVC and Spring WebFlux assessments, which are briefly explained in this paper. Refer to Spring: Blocking vs non-blocking: R2DBC vs JDBC and WebFlux vs Web MVC.

Basic dependence

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-r2dbc</artifactId> </dependency> <! R2dbc </groupId> <artifactId> r2DBC -pool</artifactId> </dependency> <! -- r2DBC mysql --> <dependency> <groupId>dev.miku</groupId> <artifactId> r2DBC - mysql</artifactId> </dependency> <! > <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jdbc</artifactId> </dependency> <! <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>Copy the code

With the same data, the effect is as follows **; **

Spring MVC + JDBC performed best at low concurrency, but WebFlux + R2DBC used the least memory per processing request at high concurrency.

Spring WebFlux + R2DBC delivers excellent throughput performance with high concurrency.

Author: Zhou Changqing, Vivo Internet Server Team