Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

This series is the fifth installment of my TM stupid series [Covering your face].

  • After upgrading to Spring 5.3.x, GC times increased dramatically and I was f * * king stupid
  • Mysql > alter TABLE select * from SQL where table select * from SQL
  • Get abnormal information in the abnormal can not find the log, I TM stupid
  • Spring-data-redis connection is leaking, I’m fucking stupid

This article deals with the underlying design and principles, as well as the problem positioning and possible problem points. It is very in-depth and long, so it is divided into three parts:

  • Above: A brief description of the problem and the basic structure and flow of the Spring Cloud Gateway as well as the underlying principles
  • CD: How did Spring Cloud Sleuth add link tracing to Spring Cloud Gateway and why did this problem occur
  • Next: Performance issues caused by the non-invasive design of the existing Spring Cloud Sleuth, other possible problem points, and how to solve them

How does Spring Cloud Sleuth add link information

From the previous source code analysis, we know that in the initial TraceWebFilter, we packaged Mono into a MonoWebFilterTrace, whose core source code is:

@Override public void subscribe(CoreSubscriber<? super Void> subscriber) { Context context = contextWithoutInitialSpan(subscriber.currentContext()); Span span = findOrCreateSpan(context); Slf4j. MDC // The MDC of the log is usually a Map of ThreadLocal. For the realization of the Log4j2 class is org. Apache. Logging. Log4j. ThreadContext, ContextMap (contextMap), contextMap (contextMap), contextMap (contextMap), contextMap (contextMap), contextMap Each thread to access their own Map link information try (CurrentTraceContext. Scope Scope = this. CurrentTraceContext. MaybeScope (span) context ())) { // Wrap the actual subscribe in the Span Context, Subscribe (new WebFilterTraceSubscriber(subscriber, context, Span, this)); } // After scope.close(), } @override public Object scanUnsafe(Attr key) {if (key == Attr.RUN_STYLE) { Return attr.runstyle.sync; return attr.runstyle.sync; return attr.runstyle.sync; } return super.scanUnsafe(key); }Copy the code

What does WebFilterTraceSubscriber do? When an exception occurs, when an HTTP request ends, we might want to record the response, the exception, into a Span, and that’s what this class encapsulates.

After being wrapped by MonoWebFilterTrace, since spring-WebFlux processes the request, which is actually a request to subscribe after wrapping the Mono we obtained above, The entire inner Mono publish link and subscribe link are wrapped by the scope in WebFilterTraceSubscriber. Link information will not be lost as long as the GatewayFilter does not switch to some forcibly asynchronous Mono or Flux causing thread switching.

Where we lose link information in our application

By looking at the logs, we found that links were missing wherever RequestBody caching was enabled. The RequestBody cache we are using the Spring Cloud AdaptCachedBodyGlobalFilter of Gateway, its core source is:

private static <T> Mono<T> cacheRequestBody(ServerWebExchange exchange, boolean cacheDecoratedRequest, Function<ServerHttpRequest, Mono<T>> function) { ServerHttpResponse response = exchange.getResponse(); NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory(); Databufferutils.join (exchange.getrequest ().getBody())) // If there is no Body, Directly returns an empty DataBuffer. DefaultIfEmpty (factory. Wrap (new EmptyByteBuf (factory) getByteBufAllocator ()))) / / decorate method DataBuffer in exchange, a list of Attributes, only to prevent the repeat entering this ` AdaptCachedBodyGlobalFilter ` situation lead to repeat the cache request Body / /, Wrap the new request with the new body and the original request, GatewayFilters link. Map (dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest)) .switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function); }Copy the code

Why use this AdaptCachedBodyGlobalFilter? The request Body is obtained via exchange.getrequest ().getBody(), which results in a Flux

. The requested Body is one-time, and if you need to request a retry, after the first call fails, the Body cannot be read on the second retry because Flux has ended. So, in cases where repeated calls are required, such as retry, one-to-many routing and forwarding, the request Body needs to be cached through the GatewayFilter. However, after passing through the GatewayFilter, the link information is lost and can be reproduced by the following simple item (item address) :

Introducing dependencies:

<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> < version > 2.4.6 < / version > < / parent > < dependencies > < the dependency > < groupId > org. Springframework. Cloud < / groupId > <artifactId>spring-cloud-starter-gateway</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <! --log4j2 dependency required for asynchronous logging, Lmax </groupId> <artifactId> Disruptor </artifactId> Disruptor </artifactId> <version>${disruptor.version}</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency>  <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2020.0.3</version> <type> POm </type> <scope>import</scope> </dependency> </dependencies> </dependencies>Copy the code

For all paths open AdaptCachedBodyGlobalFilter:

@Configuration(proxyBeanMethods = false) public class ApiGatewayConfiguration { @Autowired private AdaptCachedBodyGlobalFilter adaptCachedBodyGlobalFilter; @Autowired private GatewayProperties gatewayProperties; @ PostConstruct public void init () {gatewayProperties. GetRoutes (). The forEach (routeDefinition - > {/ / for spring cloud gateway Routing configuration of each routing enable AdaptCachedBodyGlobalFilter EnableBodyCachingEvent EnableBodyCachingEvent = new EnableBodyCachingEvent (new  Object(), routeDefinition.getId()); adaptCachedBodyGlobalFilter.onApplicationEvent(enableBodyCachingEvent); }); }}Copy the code

Configure (we only have one route to forward requests to httpbin.org, the HTTP request testing site) :

server:
  port: 8181
spring:
  application:
    name: apiGateway
  cloud:
    gateway:
      httpclient:
        connect-timeout: 500
        response-timeout: 60000
      routes:
        - id: first_route
          uri: http://httpbin.org
          predicates:
              - Path=/httpbin/**
          filters:
              - StripPrefix=1
Copy the code

Add the two global Filter, before a AdaptCachedBodyGlobalFilter, a after AdaptCachedBodyGlobalFilter. These two filters are very simple, just a single line of logging.

@Log4j2 @Component public class PreLogFilter implements GlobalFilter, Ordered { public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() - 1; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("before AdaptCachedBodyGlobalFilter"); return chain.filter(exchange); } @Override public int getOrder() { return ORDER; } } @Log4j2 @Component public class PostLogFilter implements GlobalFilter, Ordered { public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() + 1; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("after AdaptCachedBodyGlobalFilter"); return chain.filter(exchange); } @Override public int getOrder() { return ORDER; }}Copy the code

Finally, specify that the output format of Log4j2 contains link information, as specified at the beginning of the series.

After launch this application, visit http://127.0.0.1:8181/httpbin/anything and view the log, found PostLogFilter the logs, no link information:

The 2021-09-08 06:32:35. 457 INFO [service - apiGateway, 51063 d6f1fe264d0, 51063 d6f1fe264d0] [30600] [reactor - HTTP - nio - 2] [?] : Before AdaptCachedBodyGlobalFilter 06:32:35 2021-09-08. 474 INFO [service - apiGateway,,] the [30600] [reactor - HTTP - nio - 2] [?] :  after AdaptCachedBodyGlobalFilterCopy the code

Why is link information lost

Let’s take a look at after AdaptCachedBodyGlobalFilter, what will become of our previous spell of Mono link:

return Mono.defer(() -> new MonoWebFilterTrace(source, RoutePredicateHandlerMapping. This. LookupRoute (exchange) / / according to the request for routing. FlatMap ((Function < the Route, Mono<?>>) r -> { exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); / / routing in the Attributes, we will use the return back Mono. Just (RoutePredicateHandlerMapping. This. WebHandler); / / return RoutePredicateHandlerMapping FilteringWebHandler}). SwitchIfEmpty (. / / if for Mono empty (), Empty (). Then (mono.fromrunnable (() -> {// return mono.empty (), If (logger.istraceEnabled ()) {logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); }}))). SwitchIfEmpty (DispatcherHandler. Enclosing createNotFoundError ()) / / if there is no return to Mono. Empty handlerMapping (), Directly back to 404. Then (Mono) defer (() - > {/ / omitted in AdaptCachedBodyGlobalFilter link nested / / read in front of the Body, because TCP unpacking, Databufferutils.join (exchange.getrequest ().getBody())) // If there is no Body, Directly returns an empty DataBuffer. DefaultIfEmpty (factory. Wrap (new EmptyByteBuf (factory) getByteBufAllocator ()))) / / decorate method DataBuffer in exchange, a list of Attributes, only to prevent the repeat entering this ` AdaptCachedBodyGlobalFilter ` situation lead to repeat the cache request Body / /, Wrap the new request with the new body and the original request, GatewayFilters link. Map (dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest)) .switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function); }). Then (Mono. Empty ()))), / / call the corresponding Handler TraceWebFilter. Enclosing isTracePresent (), TraceWebFilter. This, TraceWebFilter. This. SpanFromContextRetriever ()). TransformDeferred (- > (call) {/ / MetricsWebFilter related processing, in front of the code is given, });) ;Copy the code

Databufferutils.join (exchange.getrequest ().getBody())); Submit a task that attempts to read the Body of the request, adding the subsequent link processing of the GatewayFilter to the callback after reading the Body, and return immediately after submitting the task. It’s a little more complicated to look at this way, but let’s use a similar analogy:

Span Span = tracer.newtrace (); // Declare a MonoOperator class MonoWebFilterTrace<T> extends MonoOperator<T, similar to the MonoWebFilterTrace encapsulated in TraceWebFilter. T> { protected MonoWebFilterTrace(Mono<? extends T> source) { super(source); } @Override public void subscribe(CoreSubscriber<? SpanInScope SpanInScope = tracer.withspaninscope (span)) { source.subscribe(actual); // When spanInScope is about to be turned off (that is, link information is removed from ThreadLocal's Map), log log.info("stopped") is printed; } } } Mono.defer(() -> new MonoWebFilterTrace( Mono.fromRunnable(() -> { log.info("first"); }) // Simulate fluxreceive.then (mono.delay (duration.ofseconds (1)).doonsuccess (longSignal -> log.info(longSignal))))) ).subscribe(aLong -> log.info(aLong));Copy the code

Mono. Delay and FluxReceive behave similarly in that they switch thread pools asynchronously. Execute the above code, we can see from the log:

The 2021-09-08 07:12:45. 236 INFO [service - apiGateway, 7 b2f5c190e1406cb, 7 b2f5c190e1406cb] [31868] [reactor - HTTP - nio - 2] [?] : First the 2021-09-08 07:12:45. 240 INFO [service - apiGateway, 7 b2f5c190e1406cb, 7 b2f5c190e1406cb] [31868] [REACTOR - HTTP-NIO-2][?:]: Stopped 2021-09-08 07:12:46.241 INFO [service-apigateway,,] [31868] [PARALLEL -1][?:]: DoOnEach_onNext (0) 2021-09-08 07:12:46.242 INFO [service-apigateway,,] [31868] [parallel-1][?:]: OnComplete () 2021-09-08 07:12:46.242 INFO [service-apigateway,,] [31868] [PARALLEL 1][?:]: 0Copy the code

In the Spring Cloud Gateway, FluxReceive for The Request Body uses the same thread pool as the one that called GatewayFilter, so it may still be the same thread, but since Span has ended, Link information has been removed from ThreadLocal’s Map, so there is no link information in the log.

Wechat search “my programming meow” public account, a daily brush, easy to improve skills, won a variety of offers