Reactor is a fully non-blocking reactive programming foundation for the JVM, with efficient demand management (in the form of managing “backpressure”).

It sounds hard, but it follows the Pareto principle. That means you can do most of your daily tasks with only 20 percent of its functions.

Take my current project as an example. So far, we have not used backpressure. The feature we use a lot is

  • (Flux|Mono).just
  • (Flux|Mono).map
  • (Flux|Mono).flatMap
  • (Flux|Mono).onErrorResume
  • (Flux|Mono).filter

These are some of the simplest features, but also the most commonly used. So don’t worry, after reading this blog I’m sure you can easily use it in your own projects too.

How to install?

Add the following configuration to your pom.xml. We use BOM here so we don’t have to worry about version compatibility.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2020.0.16</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
Copy the code

How to test?

Reactor provides a library of tests that we can use StepVerifier to do the following

  • Validate elements and their order

    StepVerifier.create(Flux.fromArray(new Integer[] {1.2.3.4.5})).expectNext(1)
        .expectNext(2).expectNext(3).expectNext(4).expectNext(5).verifyComplete();
    Copy the code
  • Validation error

    StepVerifier.create(Flux.error(new Exception("some error")))
            .verifyErrorMessage("some error");
    Copy the code
  • Verify that the sequence ends

    StepVerifier.create(Flux.empty()).verifyComplete();
    Copy the code

In the following sections, we will use StepVerifier to validate code.

What are the basic concepts?

Reactor has two basic concepts

  • Flux

    Represents a column of data. The number of data may be 0 or any number of data.

  • Mono

    Represents a column of data with either zero or one number of data.

What are the top 20 percent features?

In this section, I use several concepts to group functionality

  • packing

    The data in the Flux | Mono

  • conversion

    Converts current data to another data

  • review

    View the value of the element and perform some manipulation without changing the value of the element

  • Split open a case

    From Flux | Mono to retrieve data

packing

How do I handle strings?

  • Flux

    @Test
    public void canBeCreatedFromString(a) {
        StepVerifier.create(Flux.just("hello world")).expectNext("hello world").verifyComplete();
    }
    Copy the code
  • Mono

    @Test
    public void canBeCreatedFromString(a) {
        StepVerifier.create(Mono.just("hello world")).expectNext("hello world").verifyComplete();
    }
    Copy the code

How do I deal with Number?

  • Flux

    @Test
    public void canBeCreatedFromNumber(a) {
        StepVerifier.create(Flux.just(1)).expectNext(1).verifyComplete();
        StepVerifier.create(Flux.just(1.0)).expectNext(1.0).verifyComplete();
    }
    Copy the code
  • Mono

    @Test
    public void canBeCreatedFromNumber(a) {
        StepVerifier.create(Mono.just(1)).expectNext(1).verifyComplete();
        StepVerifier.create(Mono.just(1.0)).expectNext(1.0).verifyComplete();
    }
    Copy the code

How to handle Optional or Nullable data?

Only Mono can perform this operation

@Test
public void canBeCreatedFromNullableValue(a) {
    String value = null;
    StepVerifier.create(Mono.justOrEmpty(value)).verifyComplete();

    value = "hello world";
    StepVerifier.create(Mono.justOrEmpty(value)).expectNext("hello world").verifyComplete();
}

@Test
public void canBeCreatedFromOptionalValue(a) {

    Optional<String> value = Optional.empty();
    StepVerifier.create(Mono.justOrEmpty(value)).verifyComplete();

    value = Optional.of("hello world");
    StepVerifier.create(Mono.justOrEmpty(value)).expectNext("hello world").verifyComplete();
}
Copy the code

How to deal with a data generator?

Here A data generator is A function that has no arguments and whose function signature is () -> A.

Only Mono can perform this operation.

@Test
public void canBeCreatedFromCallable(a) {
    StepVerifier.create(Mono.fromCallable(() -> "hello world!")).expectNext("hello world!")
            .verifyComplete();
}
Copy the code

How do I deal with Array?

Only Flux can perform this operation.

@Test
public void canBeCreatedFromArray(a) {
    StepVerifier.create(Flux.fromArray(new Integer[] {1.2.3.4.5})).expectNext(1)
            .expectNext(2).expectNext(3).expectNext(4).expectNext(5).verifyComplete();
}
Copy the code

How do I deal with Iterable?

Only Flux can perform this operation.

@Test
public void canBeCreatedFromList(a) {
    List<Integer> list = new LinkedList<Integer>();
    list.add(1);
    list.add(2);
    list.add(3);
    list.add(4);

    StepVerifier.create(Flux.fromIterable(list)).expectNext(1).expectNext(2).expectNext(3)
            .expectNext(4).verifyComplete();
}

@Test
public void canBeCreatedFromSet(a) {
    Set<Integer> set = new HashSet<Integer>();
    set.add(1);
    set.add(2);
    set.add(2);
    set.add(3);
    set.add(4);

    StepVerifier.create(Flux.fromIterable(set)).expectNext(1).expectNext(2).expectNext(3)
            .expectNext(4).verifyComplete();
}
Copy the code

How do I handle the Stream?

Only Flux can perform this operation.

@Test
public void canBeCreatedFromStream(a) {
    Stream<Integer> stream = Stream.of(1.2.3.4.5);
    StepVerifier.create(Flux.fromStream(stream)).expectNext(1).expectNext(2).expectNext(3)
            .expectNext(4).expectNext(5).verifyComplete();
}
Copy the code

How to handle Throwable?

  • Flux

    @Test
    public void canBeCreatedFromThrowable(a) {
        StepVerifier.create(Flux.error(new Exception("some error")))
                .verifyErrorMessage("some error");
    }
    Copy the code
  • Mono

    @Test
    public void canBeCreatedFromThrowable(a) {
        StepVerifier.create(Mono.error(new Exception("some error")))
                .verifyErrorMessage("some error");
    }
    Copy the code

conversion

How do you filter elements?

  • Flux

    @Test
    public void canDoFilter(a) {
        Flux<Integer> flux = Flux.just(1.2.3.4.5.6);
        StepVerifier.create(flux.filter(x -> x > 5)).expectNext(6).verifyComplete();
    }
    Copy the code
  • Mono

    @Test
    public void canDoFilter(a) {
        StepVerifier.create(Mono.just(6).filter(x -> x > 5)).expectNext(6).verifyComplete();
        StepVerifier.create(Mono.just(2).filter(x -> x > 5)).verifyComplete();
    }
    Copy the code

How do I execute function A -> B on each element?

Suppose you have A function A – > B, which is A Flux | Mono element types, B is not Flux | Mono, so we can use (Flux | Mono). The map to perform this function on each element.

  • Flux

    @Test
    public void canMapTheTypeOfValueInSequenceToAnotherType(a) {
        StepVerifier.create(flux.map(x -> x.toString())).expectNext("1").expectNext("2")
                .expectNext("3").expectNext("4").expectNext("5").expectNext("6").verifyComplete();
    }
    
    @Test
    public void canMapTheValueInSequenceToAnotherValue(a) {
        StepVerifier.create(flux.map(x -> x + 1)).expectNext(2).expectNext(3).expectNext(4)
                .expectNext(5).expectNext(6).expectNext(7).verifyComplete();
    }
    Copy the code
  • Mono

    @Test
    public void canMapTheTypeOfValueInSequenceToAnotherType(a) {
        StepVerifier.create(Mono.just(1).map(x -> x.toString())).expectNext("1").verifyComplete();
    }
    
    @Test
    public void canMapTheValueInSequenceToAnotherValue(a) {
        StepVerifier.create(Mono.just(1).map(x -> x + 1)).expectNext(2).verifyComplete();
    }
    Copy the code

How to perform function on each element A – > (Flux | Mono) < B >?

Suppose you have A function A – > (Flux | Mono) < B >, which is A Flux | Mono element type, then we can use (Flux | Mono). FlatMap or Mono. FlatMapMany to perform this function on each element.

  • Flux

    @Test
    public void canMapTheValueInSequenceToAnotherSequence(a) {
    
        StepVerifier.create(flux.flatMap(x -> Flux.just(x, x))).expectNext(1).expectNext(1)
                .expectNext(2).expectNext(2).expectNext(3).expectNext(3).expectNext(4).expectNext(4)
                .expectNext(5).expectNext(5).expectNext(6).expectNext(6).verifyComplete();
    
        StepVerifier.create(flux.flatMap(x -> Mono.just(x))).expectNext(1).expectNext(2)
                .expectNext(3).expectNext(4).expectNext(5).expectNext(6).verifyComplete();
    }
    Copy the code
  • Mono

    @Test
    public void canMapTheValueInSequenceToAnotherSequence(a) {
    
        StepVerifier.create(Mono.just(1).flatMapMany(x -> Flux.just(x, x))).expectNext(1)
                .expectNext(1).verifyComplete();
    
        StepVerifier.create(Mono.just(1).flatMap(x -> Mono.just(x + 1))).expectNext(2)
                .verifyComplete();
    }
    Copy the code

How to give a default value when there is no data?

  • Flux

    @Test
    public void canRecoverWithSingleDefaultValueFromEmptySequence(a) {
        StepVerifier.<Integer>create(Flux.<Integer>empty().defaultIfEmpty(1)).expectNext(1)
                .verifyComplete();
    }
    Copy the code
  • Mono

    @Test
    public void canRecoverWithSingleDefaultValueFromEmptySequence(a) {
        StepVerifier.<Integer>create(Mono.<Integer>empty().defaultIfEmpty(1)).expectNext(1)
                .verifyComplete();
    }
    Copy the code

How to when no data will now Flux | Mono replaced with another Flux | Mono?

  • Flux

    @Test
    public void canRecoverWithAnotherSequenceFromEmptySequence(a) {
        StepVerifier.<Integer>create(Flux.<Integer>empty().switchIfEmpty(Flux.just(1)))
                .expectNext(1).verifyComplete();
    }
    Copy the code
  • Mono

    @Test
    public void canRecoverWithAnotherSequenceFromEmptySequence(a) {
        StepVerifier.<Integer>create(Mono.<Integer>empty().switchIfEmpty(Mono.just(1)))
                .expectNext(1).verifyComplete();
    }
    Copy the code

review

How do I print each element?

We can use (Flux | Mono). DoOnNext to view the value of each element and do some operation, but will not change the value of the element.

  • Flux

    @Test
    public void canDoSomethingForEveryElement(a) {
        Flux<Integer> flux = Flux.just(1.2.3.4.5.6);
        flux.doOnNext(x -> System.out.println(x)).collectList().block();
    }
    Copy the code
  • Mono

    @Test
    public void canDoSomethingForEveryElement(a) {
        Mono.just(1).doOnNext(x -> System.out.println(x)).block();
    }
    Copy the code

Split open a case

How do I convert Flux to a List?

@Test
public void canBeConvertedToList(a) {
    List<Integer> list = Flux.just(1.2.3).collectList().block();
    assertThat(list.size()).isEqualTo(3);
    assertThat(list.get(0)).isEqualTo(1);
    assertThat(list.get(1)).isEqualTo(2);
    assertThat(list.get(2)).isEqualTo(3);
}
Copy the code

How do I get data out of Mono?

@Test
public void canBeConvertedToValue(a) {
    assertThat(Mono.just(1).block()).isEqualTo(1);
    assertThat(Mono.empty().block()).isNull();
}
Copy the code

conclusion

I created a Repo Reactor-Examples that contains all of the above examples.

Download the repo and try it out for yourself!

git clone [email protected]:sjmyuan/reactor-examples.git
Copy the code

Feel free to submit PR to add more examples.