The introduction

With the development of the Internet, the scale of users continues to expand, and the application architecture evolves from single application architecture to micro-service architecture. Under the microservices architecture, we extract some core businesses as separate services. Services communicate with each other via RPC or HTTP, which means that microservers are usually I/O intensive. So this also determines the performance bottleneck of modern Internet architecture: I/O.

Block architecture

Blocking systems are built on the Servlet framework (prior to Servert 3.0). Such systems are blocking and multithreaded, meaning that each connection uses one thread to process requests. I/O operations are performed by selecting a worker thread from the thread pool to perform I/O and blocking the requesting thread until the worker thread completes. As shown in the figure below

Specific processing steps:

  1. The client initiates a request through Http
  2. Tomcat creates a thread to process the request
  3. When application A calls application B, the thread will block, freeing CPU usage.
  4. When application B returns, the thread is awakened and given the opportunity to retrieve the CPU time slice.

In step 3 of the above scenario, the CPU will be idle for a period of time because of network I/O, and in a high concurrency scenario, it is easy to think of increasing CPU utilization through concurrency (using multiple threads).

Multithreading now seems to be the standard mode for solving concurrency problems in blocking systems. Usually we write programs using blocking code, and when performance bottlenecks occur, we can introduce additional threads to run the blocking code. However, parallelization is not a panacea. It mainly has the following problems

  1. Threads themselves are expensive resources, and creating too many threads takes up more memory.
  2. Thread context switching is time-consuming and increases CPU load.
  3. Synchronization between threads needs to be addressed.

Blocking systems lack elasticity. In high concurrency scenarios, if application B delays increase or application A retries due to errors, the number of application A connections and threads increases. When this happens, A surge in the number of threads in application A can cause A surge in server load and overwhelm the cluster. To solve this problem, we needed to limit the number of threads that could be created by limiting the number of threads, and introducing circuit breakers and flow limiting mechanisms (Hystrix, Sentinel) helped us maintain the stability of the blocking system during these events.

Asynchronous architecture

Another way to improve CPU utilization in high concurrency scenarios is asynchronization. We can write asynchronous, non-blocking code. During the IO operation, the thread continues to have the CPU’s time slice while switching to another task. When the asynchronous task completes, the thread returns to the current thread to continue the execution. Asynchronous systems typically use one thread per CPU core to handle all requests and responses, since not one thread needs to be created for every request, so connections are cheap. Its cost is just file descriptors and listeners (such as epoll). In the blocking model, the cost of connection is thread, and there is a lot of memory and system overhead. Because the data is kept on the same CPU, cpu-level caching is better utilized and there are fewer context switches, resulting in increased efficiency.

For example, network I/O takes 3s and other tasks take 2s. In a blocking system, it takes 5s for the entire request to complete, and if there are 1000 requests within 5s, 1000 threads will be created without using thread pools. In an asynchronous system, the entire request is completed in 3s. This means that in an asynchronous system we can use fewer threads to achieve the same effect as in a blocking system.

Why reactive programming

Two asynchronous programming models are provided on the JVM:

Callbacks: Asynchronous methods return no value, but require an additional callback argument (lambda or anonymous function) that is called when the result is available. A famous example is Swing’s EventListener hierarchy.

Futures: The asynchronous method returns a Future immediately. The asynchronous process computes the T value and wraps access to it through a Future object. The value is not immediately available, and the object can be polled until the value is available. For example, the ExecutorService uses Future objects to run Callable

But both techniques have their limitations. Callbacks are hard to put together, and quickly lead to code that is difficult to read and maintain (a condition known as H “Callback Hell”).

An example of Callback Hell:

Requirement: Get the user’s favorites by user ID, and display the information of the first five favorites on the user’S UI. If the favorites are empty, display the five suggestions.


userService.getFavorites(userId, new Callback<List<String>>() { / / 1
  public void onSuccess(List<String> list) { / / 2
    if (list.isEmpty()) { / / 3
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { / / 4
          UiUtils.submitOnUiThread(() -> { / / 5
            list.stream()
                .limit(5)
                .forEach(uiList::show); / / 6
            });
        }

        public void onError(Throwable error) { / / 7UiUtils.errorPopup(error); }}); }else {
      list.stream() / / 8
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, / / 9
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) { UiUtils.errorPopup(error); }})); }}public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
Copy the code
  1. We have callback-based services: a callback interface that is invoked when the asynchronous process succeedsonSuccessMethod, called when an error occursonErrorMethods.
  2. The first service gets a list of favorites ids through a callback.
  3. Call is required if the favorites are emptysuggestionService.
  4. suggestionServiceProvide one in the second callbackList<Favorite>.
  5. Switch to the UI thread.
  6. We used the Java 8 stream to limit the number of suggestions processed to five and display them in a graphical list of the UI.
  7. At each level, we handle errors the same way: displaying them in a pop-up window.
  8. If the service returns a list of favorite ids, we limit the list of favorite ids to five and go tofavoriteServiceGet detailedFavoriteObject.
  9. Again through the callback willFavoriteSubmit to the UI thread for processing.

We wrote a lot of callbacks and a lot of repetitive code. Next, we used ** responsive ** for refactoring:

userService.getFavorites(userId) / / 1
           .flatMap(favoriteService::getDetails)  / / 2
           .switchIfEmpty(suggestionService.getSuggestions()) / / 3
           .take(5) / / 4
           .publishOn(UiUtils.uiThreadScheduler()) / / 5
           .subscribe(uiList::show, UiUtils::errorPopup); / / 6
Copy the code
  1. Gets the stream for the favorites IDs.
  2. By asynchronously converting them to Favorite objects via flatMap, we now have a stream of Favorite objects.
  3. If the flow for Favorite object is empty, we switch to suggestionService.
  4. We only take five elements from the result stream.
  5. Switch to the UI thread.
  6. Finally, the data flow is displayed in the UI list and how to display it when an error occurs (pop-up box prompts).

What if you want to ensure that your favorites IDs are retrieved in 800 milliseconds, and if it takes longer, then you get them from the cache? This is a complex task in callback-based code. In reactive programming, it is as simple as adding a timeout operator to a chain, as follows:

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) / / 1
           .onErrorResume(cacheService.cachedFavoritesFor(userId))  / / 2
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
Copy the code
  1. If the above part does not send any message over 800ms, an error is propagated.
  2. If an error occurs, fall back to cacheService.

Future objects are better than callbacks, but futures are certainly harder to compose. Although Java 8 has been improved with CompletableFuture. It’s possible to orchestrate multiple Future objects, but it’s not easy. And Future has other problems:

  • Calling the get() method could easily cause the Future object to block in another way.
  • Lazy computing is not supported.
  • Lack of support for multivalues and advanced error handling.

Responsive libraries such as Reactor, Rxjava aim to address these shortcomings of “classic” asynchronous methods on the JVM, while focusing on some additional aspects:

  • Composability and readability
  • Data operates as a flow using a rich set of operators
  • Nothing happens until you subscribe
  • Backpressure, or the ability of consumers to signal to producers that the rate of emissions is too high
  • High level but high value abstraction that is concurrency-agnostic

Responsive programming is not a silver bullet

Based on the above analysis, using an asynchronous architecture would reduce thread context switching and make more efficient use of THE CPU cache. We thought efficiency would increase by an order of magnitude, but it did not.

According to The practice of Netfix, the use of asynchronous architecture for CPU intensive applications does not provide a great improvement. For I/O intensive applications, the use of asynchronous architecture reduces the cost of network connection, has a good flexibility advantage over blocking systems, and will provide a large performance improvement of about 25%.

The benefits of an asynchronous architecture seem great, but they come at the expense of operations. Blocking systems (imperative programming) are easy to understand and debug.

A blocking thread is always performing a single task, so the thread stack is easy to capture. In contrast, asynchronous architecture (reactive programming) is based on callbacks and event-driven (event loop), and the event loop stack is meaningless. It is difficult to track requests while handling events and callbacks, and there is a dearth of debugging tools in this area. Edge cases, unhandled exceptions, and improperly handled state changes result in a resource dangling lost response, resulting in ByteBuf leaks, file descriptor leaks, lost responses, and more. These types of problems proved difficult to debug because it was difficult to know which events were not handled correctly or cleared correctly.

Many libraries rely on Thread Local to build and store context about requests. Thread Local, however, does not work in an asynchronous, non-blocking environment because multiple requests are being processed on the same Thread.

Converting blocking network logic to non-blocking network code is not an easy task, and there is no one-size-fits-all strategy for converting blocking network logic to asynchronous.

In addition to reactive programming, you can also consider the Go language’s coroutines.

[^]: Zuul2:The Netflix Journey to Asynchronous, Non-Blocking Systems

[^] : projectreactor. IO/docs/core/r…