I am participating in the 2022 Spring Recruitment series – experience review, click here for more details

In the previous article, I introduced some bugs encountered in upgrading SpringCloudGateway3.1.1. I was very glad to get feedback from some students, and also raised some questions for me. After analysis, I found that I did have problems in the use level. SpringCloudgateWay upgrade to version 3.1.1 Have you encountered any of these potholes?

Problem description

Earlier, I introduced the way of using WebClient to replace the calling way of FeignClient in SpringCloudGateway3.1.1. Some students tried it according to the way I provided.

The following problem is found: When the number of requests is small, there is no problem, but when the number of requests increases, the whole service will be blocked, and subsequent requests cannot be initiated.

Problem of repetition

I use a login interface to reproduce the problem as follows:

/** * login **@param userDTO
 * @return com.wjbgn.bsolver.gateway.util.dto.Result
 * @author weirx
 * @date: 2022/3/14 * /
@PostMapping("/login")
public Result login(@RequestBody UserDTO userDTO) {
    // Password md5 encryption
    userDTO.setPassword(MD5.create().digestHex(userDTO.getPassword()));
    // 1. Webclient calls the interface
    Mono<Boolean> monoInfo = webClientBuilder
            .build().post().uri(USER_VALIDATE_PATH)
            .body(BodyInserters.fromValue(userDTO)).header(HttpHeaders.CONTENT_TYPE, "application/json")
            .retrieve().bodyToMono(Boolean.class);

    Call the block method asynchronously, otherwise an error will be reported, because blockingGet is a synchronous method.
    CompletableFuture<Boolean> voidCompletableFuture = CompletableFuture.supplyAsync(() ->
            monoInfo.block());
    try {
        // 3
        Boolean result = voidCompletableFuture.get();
        if (result) {
            // The user exists, the token is generated, and the login succeeds n
            String token = JwtUtil.generateToken(userDTO.getUsername());
            // Put the token into redis
            redisUtil.setObjectExpire(JwtUtil.REDIS_TOKEN_PREFIX + userDTO.getUsername(), token, Duration.ofMinutes(JwtUtil.REDIS_TOKEN_EXPIRE_MINUTE));
            return Result.success("Successful landing.".new UserDTO(userDTO.getUsername(), token));
        } else {
            return Result.failed("User name does not exist or password is incorrect"); }}catch (Exception e) {
        log.info("Login failed, MSG = {}" ,e.getMessage());
    }
    return Result.failed("Login failed");
}
Copy the code

As shown above, there are three key points to note:

  • WebClient interface call, return the valueMono
  • Call the method that gets the resultmonoInfo.block()You must use asynchronous invocation
  • Gets the result, defining the result of the CompletableFuture as the interface return valueBealoon.

This is my test code, after I repeatedly log in through this interface, it turns out that the interface is blocked and does not respond:

After my verification, this problem does appear, let’s analyze the cause of the problem.

Problem analysis

It is important to note that we are using Mono asynchronously because the name of the block method indicates that it is a blocking method, and only when we call the block method will the request be sent.

Imagine if we use synchronous mode, so that when multiple requests come in, all the requests become synchronous mode, then the overall service concurrency will become serial, and as long as the interface is blocked, all the requests will be blocked.

So we’re going to execute the block method asynchronously.

Despite using the asynchronous approach, we saw earlier that the request volume was still blocked. Why is that?

We call the user login interface service validation of the user interface, assuming that we can only accept 10 concurrent user authentication interface, when we landed a large number of users, the corollary of user service authentication interface block, cannot handle too many requests, then block method will be blocked waiting for the return, eventually lead to a large number of requests, no response.

So even if you call asynchronously, you can’t handle the downstream service blocking, which requires us to handle the exception.

Source code analysis

Block source

@Nullable
public T block(a) {
    BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber();
    this.subscribe((Subscriber)subscriber);
    return subscriber.blockingGet();
}
Copy the code

We focus on the last line of code, the blockingGet method, which, as the name indicates, is a blockingGet method

@Nullable
final T blockingGet(a) {
    if (Schedulers.isInNonBlockingThread()) {
        throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName());
    } else {
        if (this.getCount() ! =0L) {
            try {
                this.await();
            } catch (InterruptedException var3) {
                this.dispose();
                throw Exceptions.propagate(var3);
            }
        }

        Throwable e = this.error;
        if(e ! =null) {
            RuntimeException re = Exceptions.propagate(e);
            re.addSuppressed(new Exception("#block terminated with an error"));
            throw re;
        } else {
            return this.value; }}}Copy the code

Let’s see what isInNonBlockingThread is, as shown below. Is the current thread NonBlocking

public static boolean isInNonBlockingThread(a) {
    return Thread.currentThread() instanceof NonBlocking;
}
Copy the code

What exactly does NonBlocking mean? Is an interface with the following two implementations:

  • The netty EventLoop
  • Reactor’s non-blocking thread

If this method returns true, it will throw an exception:

2022-03-20 13:29:34 ERROR reactor-http-nio-2 org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler [84630d14-1] 500 Server Error  for HTTP POST "/user/login? username=weirx1" java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2 at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): * __checkpoint ⇢ org. Springframework. Cloud. Gateway. Filter. WeightCalculatorWebFilter [DefaultWebFilterChain] * __checkpoint ⇢ org. Springframework. Boot. Actuate. Metrics. Web. Reactive. Server MetricsWebFilter [DefaultWebFilterChain] * __checkpoint ⇢  HTTP POST "/user/login? username=weirx1" [ExceptionHandlingWebHandler] Original Stack Trace: at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) at reactor.core.publisher.Mono.block(Mono.java:1707) at com.wjbgn.bsolver.gateway.controller.LoginController.login(LoginController.java:59) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498)Copy the code

Why is that? We need to know that gateway is based on Netty as the runtime container, so we find that the thread of the interface request is reactor-HTTP-NIO-2 in the exception output above. Is it a subclass of the NonBlocking implementation? The answer is yes.

When we call blocks directly from the interface, rather than asynchronously, we look at the following debugger procedure:

Can see that the type is EventLoop, name is reactor – HTTP – nio – 3, so it must be in line with the front of the judge, the judgment of the interface conditions which cannot be used directly in the interface Mono. Block () method, otherwise will be quoted as the cause of the abnormal.

From the above analysis, I believe we know that the Gateway has logically mandated that we use asynchronous calls.

If it is another thread, else judgment will be done:

if (this.getCount() ! =0L) {
    try {
        this.await();
    } catch (InterruptedException var3) {
        this.dispose();
        throwExceptions.propagate(var3); }}Copy the code

What is getCount? After tracking, it is found to be the CountDownLatch method. Since this class inherits CountDownLatch, we know that the await method of CountDownLatch is blocked. The execution will continue until the count value of CounmtDownLatch is 0, otherwise it will be blocked.

In the code above, if getCount is not equal to zero, the thread will be blocked. As we analyzed earlier, if the user service never returns, the thread will always be blocked, and the more requests, the more threads will be blocked.

It does not have the ability to handle blocking or release the lock itself.

What is the usual way we resolve interface request blocking?

Very simple, is to specify the timeout.

The solution

Now that we know what causes the blockage, let’s introduce the solution.

Use the block (Duration timeout)

Take a look at the source code:

@Nullable
final T blockingGet(long timeout, TimeUnit unit) {
    if (Schedulers.isInNonBlockingThread()) {
        throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName());
    } else {
        RuntimeException re;
        if (this.getCount() ! =0L) {
            try {
                if (!this.await(timeout, unit)) {
                    this.dispose();
                    throw new IllegalStateException("Timeout on blocking read for " + timeout + ""+ unit); }}catch (InterruptedException var6) {
                this.dispose();
                re = Exceptions.propagate(var6);
                re.addSuppressed(new Exception("#block has been interrupted"));
                throw re;
            }
        }

        Throwable e = this.error;
        if(e ! =null) {
            re = Exceptions.propagate(e);
            re.addSuppressed(new Exception("#block terminated with an error"));
            throw re;
        } else {
            return this.value; }}}Copy the code

This. Await (timeout, unit) specifies the timeout time. If the timeout is still blocked, the thread will be interrupted and an exception will be raised manually:

throw new IllegalStateException("Timeout on blocking read for " + timeout + "" + unit);
Copy the code

So we can have the following transformation, timeout is 1s:

monoInfo.block(Duration.ofSeconds(1))
Copy the code

When the request is blocked again, the thread will be unblocked by means of an interrupt after the elapsed time reaches 1s.

The 2022-03-20 14:15:21 INFO reactor - HTTP - nio - 4 com. WJBGN. Bsolver. Gateway. Controller. LoginController login failed, MSG = java.lang.IllegalStateException: Timeout on blocking read for 1000000000 NANOSECONDSCopy the code

Use custom thread pools

When we use asyncism, by way of the CompletableFuture, the default thread pool is ForkJoinPool.commonPool(), which is a default thread pool based on the number of CPU cores. This is not recommended because if it is IO dense, it will block other tasks using the sub-thread pool. Custom thread pools are recommended.

CompletableFuture<Boolean> voidCompletableFuture = CompletableFuture.supplyAsync(() ->
        monoInfo.block(Duration.ofSeconds(1)), GlobalThreadPool.getExecutor());
Copy the code

conclusion

In fact, this summary is more self-reflection. It is not rigorous in the process of code upgrading, and at the same time, it is not comprehensive enough to consider the problem, which leads to some problems in the end. This is a problem that would have been very serious if it had waited until it went live.

At the same time, I also thank the students for their positive responses and help me find this problem. I will publish articles more carefully in the future to reduce the troubles caused to everyone.

The related references

Semaphore, CountDownLatch, and CyclicBarrier have you used CountDownLatch?

How do asynchronous calls work best?

Source: gitee.com/wei_rong_xi…