sequence

This paper mainly studies variable transfer of REACTOR asynchronous thread

The problem of threadlocal

Using ThreadLocal to pass context variables is very convenient in the traditional request/reply synchronization mode, eliminating the need to add common variables to each method parameter, such as the current logged-in user. However, the business method may use async or be executed asynchronously in another thread pool, in which case threadLocal becomes ineffective.

In this case, the solution is to adopt propagation mode, that is, to propagate the variable between synchronous threads and asynchronous threads.

TaskDecorator

Spring, for example, provides TaskDecorators that you can implement to control which variables are propagated. Such as:

class MdcTaskDecorator implements TaskDecorator {
 
  @Override
  public Runnable decorate(Runnable runnable) {
    // Right now: Web thread context !
    // (Grab the current thread MDC data)
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
      try {
        // Right now: @Async thread context !
        // (Restore the Web thread context's MDC data) MDC.setContextMap(contextMap); runnable.run(); } finally { MDC.clear(); }}; }}Copy the code

Here note the word “clear” in finally

Configure the taskDecorator

@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
 
  @Override
  public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setTaskDecorator(new MdcTaskDecorator());
    executor.initialize();
    returnexecutor; }}Copy the code

See Spring 4.3: Using a TaskDecorator to Copy MDC data to @async Threads for a complete example

Reactor Context

Spring5 introduces webflux, which is based on a reactor. How does a reactor propagate context variables? Context objects are officially provided to replace threadLocal.

Its characteristics are as follows:

  • Map-like KV operations, such as PUT (Object key, Object value),putAll(Context), hasKey(Object key)
  • Immutable, which means that put does not overwrite the same key
  • Provide getOrDefault, getOrEmpty methods
  • Context is bound to each Subscriber on the scope chain
  • Access by subscriberContext(Context)
  • The Context is going to be bottom-up

The instance

Setting and reading

    @Test
    public void testSubscriberContext(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + "" + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello World")
                .verifyComplete();
    }
Copy the code

Here you set the message value to World from the bottom subscriberContext and then access it through the flatMap subscriberContext.

From the bottom up

    @Test
    public void testContextSequence(){
        String key = "message";
        Mono<String> r = Mono.just("Hello"CTX -> CTX. Put (key,"World"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + "" + ctx.getOrDefault(key, "Stranger")));

        StepVerifier.create(r)
                .expectNext("Hello Stranger")
                .verifyComplete();
    }
Copy the code

Because the subscriberContext setting for this example is too high, the mono.subscriberContext () in the flatMap is not applied.

immutable

    @Test
    public void testContextImmutable(){
        String key = "message";

        Mono<String> r = Mono.subscriberContext()
                .map( ctx -> ctx.put(key, "Hello"FlatMap (CTX -> mono.subscriberContext ()).map(CTX -> ctx.getordefault (key,"Default"));

        StepVerifier.create(r)
                .expectNext("Default")
                .verifyComplete();
    }
Copy the code

SubscriberContext always returns a new one

Multiple sequential SubscriberContexts

    @Test
    public void testReadOrder(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + "" + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello Reactor")
                .verifyComplete();
    }
Copy the code

Operator will only read the context nearest to it

Between flatMap subscriberContext

    @Test
    public void testContextBetweenFlatMap(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + "" + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + "" + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello Reactor World")
                .verifyComplete();
    }
Copy the code

FlatMap reads the nearest context

The subscriberContext flatMap

    @Test
    public void testContextInFlatMap(){
        String key = "message";
        Mono<String> r =
                Mono.just("Hello")
                        .flatMap( s -> Mono.subscriberContext()
                                .map( ctx -> s + "" + ctx.get(key))
                        )
                        .flatMap( s -> Mono.subscriberContext()
                                .map( ctx -> s + "" + ctx.get(key))
                                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                        )
                        .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello World Reactor")
                .verifyComplete();
    }
Copy the code

Here, the first flatMap cannot read the context inside the second flatMap

summary

Reactor implements a synchronization threadLocal function by providing a Context, which is very powerful and worth considering.

doc

  • TaskDecorator
  • Using a TaskDecorator to copy MDC data to @async Threads
  • HOW TO PASS CONTEXT IN STANDARD WAY – WITHOUT THREADLOCAL
  • Spring Security Context Propagation with @Async
  • How do I access the RequestContextHolder in an async thread
  • Context Aware Java Executor and Spring’s @Async
  • 8.8.1. The Context API