In order to deal with
High concurrency environmentServer side programming, Microsoft has proposed an implementation
Asynchronous programmingThe solution –
Reactive Programming, Chinese name
Reactive programming. Then other technologies quickly followed, like
ES6through
PromiseA similar approach to asynchronous programming was introduced.
JavaThe community is not far behind,
Netflix
TypeSafeThe company provided
RxJava
Akka StreamTechnology,
JavaThe platform also has a framework that enables reactive programming.

In the traditional programming paradigm, we iterate over a sequence through an Iterator pattern. This traversal mode is controlled by the caller, using a pull mode. Each time the caller retrieves the next value in the sequence through the next() method. Reactive flows use a push approach, the common publisher-subscriber pattern. As new data is generated by publishers, it is pushed to subscribers for processing. A variety of different operations can be added to the reactive flow to process the data, forming a data processing chain. The chain of processing that is added declaratively is only executed when the subscriber makes a subscription.

The first important concept in reactive flow is negative backpressure. In the basic message push mode, when the message publisher generates data too fast, the processing speed of the message subscribers can not keep up with the speed of the generation, thus causing great pressure to the subscribers. When the pressure gets too high, it can cause subscribers themselves to collapse, creating a cascade effect that can even bring down the entire system. Negative pressure serves to provide a feedback channel from subscribers to producers. The subscriber can declare the number of messages it can process at a time through the Request () method, and the producer will only produce a corresponding number of messages until the next request() method call. This actually becomes a push-pull pattern.

Introduction of Reactor

The Reactor framework is implemented by Pivotal based on Reactive Programming. It’s a technology in line with the Reactive Streams specification (created by Netflix, TypeSafe, Pivotal, and others). Its name means reactor, reflecting the powerful capabilities behind it.



Flux and Mono

Flux and Mono are two basic concepts in Reactor. Flux represents an asynchronous sequence of 0 to N elements. There are three different types of message notifications that can be contained in this sequence: normal messages containing elements, end-of-sequence messages, and sequence error messages. The corresponding subscriber methods onNext(), onComplete(), and onError() are called when message notifications are generated. Mono represents an asynchronous sequence of 0 or 1 elements. This sequence can also contain the same three types of message notifications as Flux. There can be transitions between Flux and Mono. The result of counting a Flux sequence is a Mono<Long> object. When you combine two Mono sequences together, you get a Flux object.

Create a Flux

There are several different ways to create Flux sequences.

Static method of the Flux class

The first is through static methods in the Flux class.

  • Just () : You can specify all the elements contained in the sequence. The created Flux sequence ends automatically after these elements are published.
  • FromArray (), fromIterable(), and fromStream() : You can create a Flux object from an array, an Iterable, or a Stream.
  • Empty () : Creates a sequence that contains no elements and publishes only the closing message.
  • Error (Throwable error) : Creates a sequence containing only error messages.
  • Never () : Creates a sequence that does not contain any message notifications.
  • Range (int start, int count) : Creates a sequence of Integer objects containing the number of counts starting from start.
  • Interval (Duration period) and interval(Duration delay, Duration period) : Create a sequence of Long objects incrementing from 0. The elements contained therein are published at specified intervals. In addition to the interval, you can specify a delay before the initial element is published.
  • IntervalMillis (long period) and intervalMillis(long delay, long period) : The same function as interval() except that the interval and delay are specified by the number of milliseconds.

Examples of these methods are shown in Listing 1.

Listing 1. Create a Flux sequence through a static method of the Flux class
Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscribe(System.out::println);

The static methods above are suitable for simple sequence generation, and generate() or create() methods should be used when sequence generation requires complex logic.

The generate () method

The generate() method generates Flux sequences synchronously and one-by-one. The sequence is generated by calling the next(), complete(), and error(Throwable) methods of the provided SynchronousSink object. Build-by-build means that the next() method can only be called once at most in the specific build logic. In some cases, sequence generation may be stateful, requiring some state object. Use the alternative form of generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator) StateSupplier is used to provide the initial state object. During sequence generation, the state object is passed in as the first argument to be used by the Generator and can be modified in the corresponding logic for the next generation.

In Listing 2, a simple value is generated by the next() method in the generation logic of the first sequence, which is then terminated by the complete() method. If the complete() method is not called, an infinite sequence is produced. The state object in the generation logic of the second sequence is an ArrayList object. The actual value generated is a random number. The generated random numbers are added to the ArrayList. When 10 numbers are produced, the sequence is terminated with the complete() method.

Listing 2. Generate the Flux sequence using the generate() method
Flux.generate(sink -> {

sink.next("Hello");

sink.complete();
}).subscribe(System.out::println);
final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {

int value = random.nextInt(100);

list.add(value);

sink.next(value);

if (list.size() == 10) {

sink.complete();

}

return list;
}).subscribe(System.out::println);

The create () method

The create() method differs from the generate() method in that it uses a FluxSink object. FluxSink supports both synchronous and asynchronous message generation and can produce multiple elements in a single call. In Listing 3, all 10 elements are generated in a single call.

Listing 3. Generate the Flux sequence using the create() method
Flux.create(sink -> {

for (int i = 0; i < 10; i++) {

sink.next(i);

}

sink.complete();
}).subscribe(System.out::println);

Create Mono

Mono is created in a similar way to Flux described earlier. The Mono class also contains some of the same static methods as the Flux class. These methods include just(), empty(), error(), and never(). In addition to these methods, Mono also has some unique static methods.

  • FromCallable (), fromCompletionStage(), fromFuture(), fromRunnable(), and fromSupplier() : Create Mono from Callable, CompletionStage, CompletableFuture, Runnable, and Supplier, respectively.
  • Delay (Duration Duration) and delayMillis(long Duration) : Creates a Mono sequence that produces the number 0 as a unique value after the specified delay time.
  • IgnoreElements (Publisher

    source) : Creates a Mono sequence that ignores all elements in Publisher as the source, producing only the end message.
  • justOrEmpty(Optional
    data) and justOrEmpty(T data) : Creates a Mono from an Optional object or possibly null. The Mono sequence produces the corresponding element only if the Optional object contains a value or if the object is not null.

Mono can also be created using MonoSink through the create() method. An example of creating a Mono sequence is shown in Listing 4.

Listing 4. Creating the Mono sequence
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);

The operator

As with RxJava, Reactor’s power lies in its ability to add a variety of different operators declaratively to the reaction flow. The important operators are classified below.

Buffer and bufferTimeout

These two operators collect elements from the current flow into the collection and treat the collection object as a new element in the flow. You can specify different conditions when you do a collection: the maximum number of elements to include or the time interval for a collection. The method buffer() uses only one condition, whereas bufferTimeout() can specify both conditions. You can specify the Duration object or the number of milliseconds, using either the bufferMillis() or bufferTimeoutMillis() methods.

In addition to the number of elements and the time interval, collection can be done with the bufferUntil and bufferWhile operators. The parameters of these two operators are Predicate objects that represent conditions to be satisfied by the elements in each set. BufferUntil will collect until the Predicate returns true. The element that makes Predicate return true can optionally be added to the current collection or the next collection; BufferWhile is only collected if Predicate returns true. Once the value is false, the next collection begins immediately.

Listing 5 shows an example of the use of the buffer related operator. The first line prints five arrays of 20 elements; The second line prints two arrays of 10 elements; The third line outputs five arrays of two elements. An even number ends the current collection; The fourth line prints five one-element arrays that contain only even numbers.

Note that in Listing 5, the Flux sequence is first converted into a Stream object in Java 8 through the toStream() method and then output through the forEach() method. This is because the generation of the sequence is asynchronous, and converting to a Stream ensures that the main thread does not exit until the generation of the sequence is complete, thus correctly printing all the elements in the sequence.

Listing 5. Example use of the buffer related operator
Flux.range(1, 100).buffer(20).subscribe(System.out::println);
Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);
Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);

filter

The elements contained in the stream are filtered, leaving only those elements that meet the conditions specified by Predicate. The statement in Listing 6 prints all even numbers from 1 to 10.

Listing 6. Example use of the filter operator

Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);

window

The Window operator acts like a buffer, except that the Window operator collects elements from the current stream into another Flux sequence, so the return type is Flux<Flux<T>>. In Listing 7, the output of two lines of statements is five and two UnicastProcessor characters, respectively. This is because the stream produced by the Window operator contains objects of the UnicastProcessor class, and the toString method of the UnicastProcessor class outputs UnicastProcessor characters.

Listing 7. Example use of the window operator
Flux.range(1, 100).window(20).subscribe(System.out::println);
Flux.intervalMillis(100).windowMillis(1001).take(2).toStream().forEach(System.out::println);

zipWith

The zipWith operator merges elements in the current stream with elements in another stream on a one-to-one basis. The result is a stream of element type Tuple2. The combined elements can also be processed through a BiFunction function, and the resulting stream has the element type returned by the function.

In Listing 8, the elements contained in the two flows are A, B, and C, D. The first zipWith operator does not use a merge function, so the element type in the result flow is Tuple2; The second zipWith operation changes the element type to String through the merge function.

Listing 8. Example use of the zipWith operator

Flux.just("a", "b")

.zipWith(Flux.just("c", "d"))

.subscribe(System.out::println);
Flux.just("a", "b")

.zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))

.subscribe(System.out::println);

take

The take series of operators are used to extract elements from the current stream. There are many ways to extract it.

  • Take (long n), take(Duration timespan) and takeMillis(long Timespan) : Extract at specified quantities or intervals.
  • TakeLast (Long N) : Extracts the last n elements in the stream.
  • takeUntil(Predicate
    predicate) : Extracts elements until the predicate returns true.
  • takeWhile(Predicate
    continuePredicate) : The Predicate is extracted when it returns true.
  • takeUntilOther(Publisher
    other) : Extract elements until another stream starts producing them.

In Listing 9, the first line prints the numbers 1 through 10; The second line prints the numbers 991 to 1000; The third line prints the numbers 1 through 9; The fourth line outputs the numbers 1 through 10, including the elements that make the Predicate return true.

Listing 9. Example use of the take family of operators
Flux.range(1, 1000).take(10).subscribe(System.out::println);
Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);
Flux.range(1, 1000).takeWhile(i -> i <
10
).subscribe(System.out::println);
Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);

Reduce and reduceWith

The reduce and reduceWith operators accumulate all the elements contained in the flow to obtain a Mono sequence containing the calculated results. The cumulative operation is represented by a BiFunction. You can specify an initial value at operation time. If there is no initial value, the first element of the sequence is used as the initial value.

In Listing 10, the first statement adds the elements in the stream, resulting in 5050; The second line also adds, but gives an initial value of 100 through a Supplier, so the result is 5150.

Listing 10. Example use of the Reduce and reduceWith operators
Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);

The merge and mergeSequential

The merge and mergeSequential operators are used to merge multiple streams into a single Flux sequence. The difference is that mergeSequential mergeSequential merges on a stream basis in the order in which all streams are subscribed.

Listing 11 uses the merge and mergeSequential operators, respectively. The merged streams all produce an element every 100 milliseconds, but each element in the second stream is produced 50 milliseconds later than the first. In the result flow of a merge, the elements from the two streams are intertwined chronologically; The result flow using mergeSequential produces all elements in the first flow and then all elements in the second flow.

Listing 11. Example use of the Merge and mergeSequential operators
Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))

.toStream()

.forEach(System.out::println);
Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))

.toStream()

.forEach(System.out::println);

FlatMap and flatMapSequential

The flatMap and flatMapSequential operators convert each element in a flow into a flow and combine the elements of all streams. The difference between flatMapSequential and flatMap is the same as the difference between mergeSequential and merge.

In Listing 12, the elements in the stream are converted to a different number of streams every 100 milliseconds and then merged. Because the first stream contains a small number of elements, the result stream starts with elements from the two streams interwoven, and then only elements from the second stream.

Listing 12. Example use of the flatMap operator
Flux.just(5, 10)

.flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))

.toStream()

.forEach(System.out::println);

concatMap

The concatMap operator also converts each element in a stream into a stream and then merges all streams. Unlike flatMap, concatMap merges transformed streams according to the order of elements in the original stream. Unlike flatMapSequential, which subscribes to all streams before merging, concatMap subscribes to transformed streams dynamically.

Listing 13 is similar to Listing 12, except that flatMap is replaced with concatMap, and the resulting flow contains all the elements in the first flow and the second flow in turn.

Listing 13. Example use of the concatMap operator
Flux.just(5, 10)

.concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))

.toStream()

.forEach(System.out::println);

combineLatest

The combineLatest operator merges the newly generated elements from all streams into a new element that is returned as an element in the result stream. As soon as a new element is created in any of the streams, the merge operation is performed once, and the new element is created in the result stream. In Listing 14, the newly generated elements of the stream are collected into an array that is converted to a String using the arrays.toString method.

Listing 14. Example use of the combineLatest operator
Flux.combineLatest(

Arrays::toString,

Flux.intervalMillis(100).take(5),

Flux.intervalMillis(50, 100).take(5)
).toStream().forEach(System.out::println);

Message processing

When you need to process messages in Flux or Mono, you can add the corresponding subscription logic through the SUBSCRIBE method, as shown in the previous code listing. You can specify the type of message to process when you call the SUBSCRIBE method. You can process only the normal messages contained therein, or you can process both the error message and the completion message. The code in Listing 15 processes both the normal message and the error message through the subscribe() method.


Listing 15. Normal and error messages are processed through the subscribe() method
Flux.just(1, 2)

.concatWith(Mono.error(new IllegalStateException()))

.subscribe(System.out::println, System.err::println);

Normal message processing is relatively simple. When errors occur, there are several different strategies for handling them. The first strategy is to return a default value via the onErrorReturn() method. In Listing 16, the stream produces a default value of 0 when an error occurs.

Listing 16. Return the default value in case of an error
Flux.just(1, 2)

.concatWith(Mono.error(new IllegalStateException()))

.onErrorReturn(0)

.subscribe(System.out::println);

The second strategy is to use another stream to produce elements through the switchOnError() method. In Listing 17, when an error occurs, the stream corresponding to mono.just (0), the number 0, is generated.

Listing 17. Use a different stream when an error occurs
Flux.just(1, 2)

.concatWith(Mono.error(new IllegalStateException()))

.switchOnError(Mono.just(0))

.subscribe(System.out::println);

The third strategy is to use the onErrorResumeWith() method to select the flow that produces the element to use based on the different exception types. In Listing 18, different streams are returned based on the exception type as data sources in the event of an error. Because the exception is of type IllegalArgumentException, the resulting element is -1.

Listing 18. Select the flow based on the exception type when an error occurs
Flux.just(1, 2)

.concatWith(Mono.error(new IllegalArgumentException()))

.onErrorResumeWith(e -> {

if (e instanceof IllegalStateException) {

return Mono.just(0);

} else if (e instanceof IllegalArgumentException) {

return Mono.just(-1);

}

return Mono.empty();

})

.subscribe(System.out::println);

The Retry operator can also be used to retry errors. The action of retrying is achieved by re-subscribing to the sequence. You can specify the number of retries when using the retry operator. Listing 19 specifies 1 retries, and the output is 1, 2, 1, 2, and an error message.

Listing 19. Retry using the Retry operator

Flux.just(1, 2)

.concatWith(Mono.error(new IllegalStateException()))

.retry(1)

.subscribe(System.out::println);

The scheduler

The previous section described reactive flows and the various operations that can be performed on them. Schedulers specify how and on which threads these operations are performed. There are several different scheduler implementations.

  • The current thread is created using the schedulers.immediate () method.
  • A single reusable thread is created using the schedulers.single () method.
  • Use an elastic thread pool, created using the schedulers.elastic () method. Threads in a thread pool are reusable. New threads are created as needed. If a thread is idle for too long, it is destroyed. The scheduler is suitable for processing streams related to I/O operations.
  • Using a thread pool optimized for parallel operations, created using the schedulers.parallel () method. The number of threads depends on the number of cores in the CPU. The scheduler is suitable for processing computationally intensive flows.
  • Created using a scheduler that supports task scheduling through the schedulers.timer () method.
  • From an existing ExecutorService object is created in the scheduler, through Schedulers. FromExecutorService () method to create.

Some operators already use a particular type of scheduler by default. For example, flows created by the intervalMillis() method use a scheduler created by schedulers.timer (). The scheduler that performs the operation can be switched through the publishOn() and subscribeOn() methods. The publishOn() method switches how the operator is executed, and the subscribeOn() method switches how the elements in the stream are executed when they are generated.

In Listing 20, you use the create() method to create a new Flux object with the only element being the name of the current thread. This is followed by two pairs of publishOn() and map() methods, which switch the scheduler at execution time and prefix it with the current thread name. Finally, the execution mode when the stream is generated is changed by subscribeOn() method. The result is [elastice-2] [single-1] parallel-1. The innermost thread name parallel-1 comes from the schedulers.parallel () scheduler used to generate elements in the flow, and the middle thread name single-1 comes from the schedulers.single () scheduler used before the first map operation, The outermost thread name, elastic-2, comes from the schedulers.elastic () scheduler that precedes the second map operation.

Listing 20. Switch operator execution using the scheduler

Flux.create(sink -> {

sink.next(Thread.currentThread().getName());

sink.complete();
})
.publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);

test

When testing using Reactor code, need to use the IO. Projectreactor. Addons: Reactor – test library.

Using StepVerifier

A typical scenario for testing is to verify that the elements contained in a sequence are as expected. StepVerifier is used to verify elements contained in a sequence one by one. In Listing 21, the flow to be validated contains elements A and B. A stream is wrapped with the StepVerifier.create() method before validation. The expectNext() method is used to declare the value of the next element in the stream expected during testing, while the verifyComplete() method verifies that the stream ends normally. A similar method is verifyError() to verify that the stream was terminated due to an error.

Listing 21. Validate elements in the flow using StepVerifier
StepVerifier.create(Flux.just("a", "b"))

.expectNext("a")

.expectNext("b")

.verifyComplete();

Operational test time

Some sequences are time-dependent, such as generating a new element every minute. It is not possible to spend real time waiting for each element to be generated during testing. The virtual time function provided by StepVerifier is needed. Through StepVerifier. WithVirtualTime () method can be used to create virtual clock StepVerifier. The thenAwait(Duration) method is used to advance the virtual clock.

In Listing 22, the flow to be validated contains two elements that are generated one day apart, and the first element has a four hour generation delay. Through StepVerifier. WithVirtualTime () method after packing flow, expectNoEvent () method is used to verify did not produce any message within 4 hours, and then verify the first element 0; ThenAwait () is then used to advance the virtual clock one day and verify that the second element 1 is generated; Finally verify that the flow ends normally.

Listing 22. Operational test time
StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofHours(4), Duration.ofDays(1)).take(2))

.expectSubscription()

.expectNoEvent(Duration.ofHours(4))

.expectNext(0L)

.thenAwait(Duration.ofDays(1))

.expectNext(1L)

.verifyComplete();

Using TestPublisher

TestPublisher is useful for controlling the production of elements in a flow, even if they violate the reaction flow specification. In Listing 23, you create a new TestPublisher object with the create() method, then use the next() method to generate the elements, and use the complete() method to end the flow. TestPublisher is primarily used to test operators created by developers themselves.

Listing 23. Use TestPublisher to create the flow for the test
final TestPublisher<
String
> testPublisher = TestPublisher.create();
testPublisher.next("a");
testPublisher.next("b");
testPublisher.complete();
StepVerifier.create(testPublisher)

.expectNext("a")

.expectNext("b")

.expectComplete();

debugging

Because of the difference between reactive programming paradigm and traditional programming paradigm, code written by Reactor is difficult to debug when problems occur. To better assist developers in debugging, Reactor provides some ancillary features.

Enable debug mode

When you need more flow-related execution information, you can enable debug mode by adding the code in Listing 24 at the beginning of the program. After debug mode is enabled, all operators save additional information about the execution chain at execution time. When an error occurs, this information is output as part of the exception stack. This information can be used to analyze the specific operator in which the problem occurred.

Listing 24. Enabling debug mode

Hooks.onOperator(providedHook -> providedHook.operatorStacktrace());

However, when debug mode is enabled, logging this extra information comes at a cost. Debugging mode is usually only considered after an error has occurred. But when debug mode is enabled to find problems, previous errors are not always easy to reproduce. To reduce possible overhead, you can limit debugging mode to only certain types of operators.

Using checkpoints

Another option is to enable debug mode for a particular stream processing chain through the checkpoint operator. In Listing 25, you add a checkpoint named test after the map operator. The checkpoint name appears in the exception stack information when an error occurs. For important or complex flow processing chains in the program, checkpoints can be enabled at key locations to help locate possible problems.

Listing 25. Using the checkpoint operator

Flux.just(1, 0).map(x -> 1 / x).checkpoint("test").subscribe(System.out::println);

logging

Another utility in development and debugging is logging flow-related events. This can be done by adding the log operator. In Listing 26, you add the log operator and specify the name of the log class.

Listing 26. Logging events using the log operator

Flux.range(1, 2).log("Range").subscribe(System.out::println);

At actual run time, the resulting output is shown in Listing 27.

Listing 27. log generated by the log operator
13:07:56.735 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
13:07:56. 751. [the main] INFO Range - | onSubscribe ([Synchronous Fuseable] FluxRange. RangeSubscription)
13:07:56. 753. [the main] INFO Range - | request (unbounded)
13:07:56. 754. [the main] INFO Range - | onNext (1)
1
13:07:56. 754. [the main] INFO Range - | onNext (2)
2
13:07:56. 754. [the main] INFO Range - | the onComplete ()

“Cold” and “hot” sequences

All that was created in the previous code listing were cold sequences. The implication of a cold sequence is that whenever a subscriber subscribes to the sequence, all messages generated in the sequence are always received. The hot sequence, on the other hand, is constantly generating messages, and subscribers can only get messages generated after their subscription.

In Listing 28, the original sequence contains 10 elements spaced one second apart. Publish () converts a Flux object to a ConnectableFlux object. The purpose of the autoConnect() method is to start generating messages when the ConnectableFlux object has a subscriber. The code source.subscribe() simply subscribes to the ConnectableFlux object to start generating data. The current thread then sleeps for five seconds, and the second subscriber can only get the last five elements of the sequence, so the output is the numbers 5 through 9.

Listing 28. Hot sequence
final Flux<
Long
> source = Flux.intervalMillis(1000)

.take(10)

.publish()

.autoConnect();
source.subscribe();
Thread.sleep(5000);
source

.toStream()

.forEach(System.out::println);

conclusion

The reactive programming paradigm is both a challenge that requires a paradigm shift and an opportunity that is full of possibilities for developers accustomed to traditional programming paradigms. Reactor, as a new Java library based on the Reactor flow specification, can be used as the basis of the Reactor application. This article introduces the Reactor database in detail, including the creation of Flux and Mono sequences, the use of common operators, schedulers, error handling, testing and debugging techniques, etc.

The resources

  • See the Reactor website for more information.