Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

This series is the fifth installment of my TM stupid series [Covering your face].

  • After upgrading to Spring 5.3.x, GC times increased dramatically and I was f * * king stupid
  • Mysql > alter TABLE select * from SQL where table select * from SQL
  • Get abnormal information in the abnormal can not find the log, I TM stupid
  • Spring-data-redis connection is leaking, I’m fucking stupid

This article deals with the underlying design and principles, as well as the problem positioning and possible problem points. It is very in-depth and long, so it is divided into three parts:

  • Above: A brief description of the problem and the basic structure and flow of the Spring Cloud Gateway as well as the underlying principles
  • CD: How did Spring Cloud Sleuth add link tracing to Spring Cloud Gateway and why did this problem occur
  • Next: Performance issues caused by the non-invasive design of the existing Spring Cloud Sleuth, other possible problem points, and how to solve them

Other points where the Spring Cloud Gateway may lose link information

From the previous analysis, we can see that there are other places where the link tracing information of Spring Cloud Sleuth can be lost. Here are some common examples:

1. In GatewayFilter, some tasks are specified to be performed asynchronously, but there is no link information because the thread is switched and the Span may have ended at this time, for example:

@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return chain.filter(exchange).publishOn(Schedulers.parallel()).doOnSuccess(o -> { Log.info ("success"); }); }Copy the code

2. Will continue to link chain GatewayFilter. Filter (exchange) on the asynchronous task execution, the above AdaptCachedBodyGlobalFilter belong to this kind of situation, This will result in no link information in the subsequent GatewayFilter, for example:

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
	return Mono.delay(Duration.ofSeconds(1)).then(chain.filter(exchange));
}
Copy the code

Conflict between Java Concurrent programming model and Project Reactor Programming model

Many frameworks in Java use ThreadLocal or identify uniqueness by Thread. Such as:

  • The MDC in the logging framework is generally implemented as ThreadLocal.
  • All locks, and AQS-based data structures, are uniquely identified by the Thread attribute as having acquired the lock.
  • Distributed locks and other data structures are also used to uniquely identify who has acquired the lock through the attribute of Thread, such as the implementation of distributed Redis lock in Redisson.

However, this is out of place in the Project Reactor programming model, because asynchronous responsive programming means that there is no guarantee that the submitted task and callback will be on the same thread, so the semantics of ThreadLocal are hard to hold. Although Project Reactor provides the Context for standard ThreadLocal, mainstream frameworks are not compatible with this Context, so it brings great difficulties for Spring Cloud Sleuth to glue these link tracking. Because MDC is a ThreadLocal Map implementation, not a context-based Map. This requires Spring Cloud Sleuth to put the link information into MDC at the start of the subscription and to ensure that threads are not switched at runtime.

Running without switching threads actually limits the flexible scheduling of the Project Reactor and has some performance losses. We actually want to try to make sure that even if we add link tracing information, we don’t have to force it to run without switching threads. But Spring Cloud Sleuth is a non-invasive design that is difficult to achieve. However, for the use of our own business, we can customize some programming specifications to ensure that the code written by everyone does not lose link information.

Improve our programming specifications

First, we customize Mono and Flux factories

Public Subscriber encapsulation: all key interfaces of reactor Subscriber are checked to see if the current context has link information, that is, Span. If there is no link information, it will be wrapped. If there is link information, it will be executed directly.

public class TracedCoreSubscriber<T> implements Subscriber<T>{ private final Subscriber<T> delegate; private final Tracer tracer; private final CurrentTraceContext currentTraceContext; private final Span span; TracedCoreSubscriber(Subscriber<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) { this.delegate = delegate; this.tracer = tracer; this.currentTraceContext = currentTraceContext; this.span = span; } @Override public void onSubscribe(Subscription s) { executeWithinScope(() -> { delegate.onSubscribe(s); }); } @Override public void onError(Throwable t) { executeWithinScope(() -> { delegate.onError(t); }); } @Override public void onComplete() { executeWithinScope(() -> { delegate.onComplete(); }); } @Override public void onNext(T o) { executeWithinScope(() -> { delegate.onNext(o); }); } private void executeWithinScope(Runnable Runnable) {// If there is no link information, Mandatory package if (tracer currentSpan () = = null) {try (CurrentTraceContext. Scope Scope = this.currentTraceContext.maybeScope(this.span.context())) { runnable.run(); }} else {// If there is link information, run able.run(); }}}Copy the code

Then, the agent TracedFlux of all Flux and the agent TracedMono of all Mono are defined respectively. In fact, when subscribes, the incoming CoreSubscriber is packaged with TracedCoreSubscriber:

public class TracedFlux<T> extends Flux<T> { private final Flux<T> delegate; private final Tracer tracer; private final CurrentTraceContext currentTraceContext; private final Span span; TracedFlux(Flux<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) { this.delegate = delegate; this.tracer = tracer; this.currentTraceContext = currentTraceContext; this.span = span; } @Override public void subscribe(CoreSubscriber<? super T> actual) { delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span)); } } public class TracedMono<T> extends Mono<T> { private final Mono<T> delegate; private final Tracer tracer; private final CurrentTraceContext currentTraceContext; private final Span span; TracedMono(Mono<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) { this.delegate = delegate; this.tracer = tracer; this.currentTraceContext = currentTraceContext; this.span = span; } @Override public void subscribe(CoreSubscriber<? super T> actual) { delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span)); }}Copy the code

Define the factory class to create TracedFlux using the request ServerWebExchange and the original Flux, and to create TracedMono using the request ServerWebExchange and the original Mono, And Span is obtained by Attributes. According to the source code analysis above, we know that this Attribute is added to Attributes through TraceWebFilter. Since we only use it in GatewayFilter, this Attribute must exist after TraceWebFilter.

@Component public class TracedPublisherFactory { protected static final String TRACE_REQUEST_ATTR = Span.class.getName(); @Autowired private Tracer tracer; @Autowired private CurrentTraceContext currentTraceContext; public <T> Flux<T> getTracedFlux(Flux<T> publisher, ServerWebExchange exchange) { return new TracedFlux<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR)); } public <T> Mono<T> getTracedMono(Mono<T> publisher, ServerWebExchange exchange) { return new TracedMono<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR)); }}Copy the code

Then, we stipulate: 1. All gatewayFilters need to inherit our custom abstract class, which only encapsulates the filter result with TracedPublisherFactory’s getTracedMono layer of TracedMono. Take GlobalFilter as an example:

public abstract class AbstractTracedFilter implements GlobalFilter {
    @Autowired
    protected TracedPublisherFactory tracedPublisherFactory;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        return tracedPublisherFactory.getTracedMono(traced(exchange, chain), exchange);
    }

    protected abstract Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain);
}
Copy the code

2. For the newly generated Flux or Mono in GatewayFilter, the TracedPublisherFactory is used to encapsulate another layer.

3. Read the Request for AdaptCachedBodyGlobalFilter link loss, as a result of the Body, I gave the community a Pull Request: Fix #2004 Span is not terminated properly in Spring Cloud Gateway. You can also wrap the Request Body using the TracedPublisherFactory before the Filter itself.

Wechat search “my programming meow” public account, a daily brush, easy to improve skills, won a variety of offers: