sequence

This paper mainly studies the backpressure of Reactive Streams

Differences between Reactive Streams and traditional Streams

    @Test
    public void testShowReactiveStreams() throws InterruptedException {
        Flux.interval(Duration.ofMillis(1000))
                .take(500)
                .subscribe(e -> LOGGER.info("get {}",e));

        Thread.sleep(5*60*1000);
    }
Copy the code

An example output is as follows:

18:52:34. 118. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 18:52:35. 157 INFO com. [the parallel - 2] example. The demo. FluxTest - get 0 18:52:36. 156 [the parallel - 2] INFO com. Example. Demo. FluxTest - get 1 18:52:37. 156 INFO [the parallel - 2] com. Example. Demo. FluxTest - get 2 18:52:38. 159 INFO com. [the parallel - 2] example. Demo. FluxTest - get 3 18:52:39. [the parallel - 2) 157 INFO Com. Example. Demo. FluxTest - get 4 18:52:40. 155 INFO [the parallel - 2] com. Example. Demo. FluxTest - get 5 18:52:41. 154 [the parallel - 2] INFO com. Example. Demo. FluxTest - get 6 18:52:42. 158 INFO [the parallel - 2] com. Example. Demo. FluxTest - get 7 18:52:43. 157 INFO com. [the parallel - 2] example. Demo. FluxTest - get 8 18:52:44. 156 [the parallel - 2] INFO Com. Example. Demo. FluxTest - get 9 18:52:45. 154 INFO [the parallel - 2] com. Example. Demo. FluxTest - get 10Copy the code

Traditional list Streams are not asynchronous. For example, A batch of 500 semi-finished products must be processed in link A before they can be processed in link B. Reactive Streams become reactive because, for example, this batch of 500 semi-finished products can be processed in link B. After processing one piece in link A, it can be immediately pushed to the next link B for continuous processing, rather than waiting for all semi-finished products to be processed in link A and then pushed to link B. A typical living example of an assembly line.

backpressure

In such a production line, there is a requirement that the processing of each link should be coordinated, just like in the movie starting line, the hero goes to the factory to work, and the production line pushes goods to him. He can’t keep up with the speed, so the goods fall to the ground, and finally the production line has to be shut down manually. In the application, if the publisher is too fast and the subscriber is slow, then the data will accumulate, and it is easy to run out of memory if it is not controlled properly. Backpressure is designed to solve this problem.

Backpressure of the pull model

@Test
    public void testPullBackpressure(){
        Flux.just(1, 2, 3, 4)
                .log()
                .subscribe(new Subscriber<Integer>() {
                    private Subscription s;
                    int onNextAmount;

                    @Override
                    public void onSubscribe(Subscription s) {
                        this.s = s;
                        s.request(2);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                        onNextAmount++;
                        if (onNextAmount % 2 == 0) {
                            s.request(2);
                        }
                    }

                    @Override
                    public void onError(Throwable t) {}

                    @Override
                    public void onComplete() {}}); try { Thread.sleep(10*1000); } catch (InterruptedException e) { e.printStackTrace(); }}Copy the code

Backpressure of push model

Use thread-specific operators such as timeout(),delayElements(),buffer(),skip(),take() to control data generation speed.

delayElements

@Test
    public void testPushBackpressure() throws InterruptedException {
        Flux.range(1, 1000)
                .delayElements(Duration.ofMillis(200))
                .subscribe(e -> {
                    LOGGER.info("subscribe:{}",e); try { Thread.sleep(2000); } catch (InterruptedException e1) { e1.printStackTrace(); }}); Thread.sleep(100*1000); }Copy the code

Output instance

19:37:00. 870. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 19:37:01. 117 INFO com [the parallel - 1]. The example. The demo. FluxTest - subscribe: 1 19:37:03. 326 [the parallel - 2] INFO com. Example. Demo. FluxTest - subscribe: 2 19:37:05. 535 [the parallel - 3] INFO com. Example. Demo. FluxTest - The subscribe: 3 19:37:07. 743 [the parallel - 4] INFO com. Example. Demo. FluxTest - subscribe: 4 19:37:09. [the parallel - 5] 953 INFO Com. Example. Demo. FluxTest - subscribe: 5 19:37:12. 156. [the parallel - 6] INFO com example. Demo. FluxTest - subscribe: 6 19:37:14. 363 / parallel - 7 INFO com. Example. Demo. FluxTest - subscribe: 7 19:37:16. [the parallel - 8] 568 INFO Com. Example. Demo. FluxTest - subscribe: 8 19:37:18. 775 INFO com [the parallel - 1]. Example. Demo. FluxTest - subscribe: 9Copy the code

This is an example of delayElements. You can see that the data is not lost, but the delay is production delay + consumption delay

sample

@Test
    public void testSampleBackpressure() throws InterruptedException {
        Flux.range(1, 1000)
                .log()
                .delayElements(Duration.ofMillis(200))
                .sample(Duration.ofMillis(1000))
                .subscribe(e -> {
                    LOGGER.info("subscribe:{}",e); try { Thread.sleep(2000); } catch (InterruptedException e1) { e1.printStackTrace(); }}); Thread.sleep(100*1000); }Copy the code

Output instance

19:48:40. 516. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 19:48:40. 544. [the main] INFO reactor. The Flux. Range. 1 - | onSubscribe ([Synchronous Fuseable] FluxRange. RangeSubscription) 19:48:40. [the main] 546 INFO reactor. The Flux. Range. 1 - | onNext 19:48:40. (1) 770 [the parallel - 2] INFO Reactor. Flux. Range. 1 - | onNext 19:48:40. (2), 974 [the parallel - 3] INFO reactor. The Flux. Range. 1 - | onNext 19:48:41. (3), 175 [the parallel - 4] INFO reactor. Flux. Range. 1 - | onNext 19:48:41. (4), 378 [the parallel - 5] INFO reactor. The Flux. Range. 1 - | onNext (5) 19:48:41. 543 INFO com [the parallel - 1]. The example. The demo. FluxTest - subscribe: 4 19:48:41. [the parallel - 6] 583 INFO Reactor. Flux. Range. 1 - | onNext 19:48:41. (6), 785 [the parallel - 7] INFO reactor. The Flux. Range. 1 - | onNext (7) 19:48:41. 989 [the parallel - 8] INFO reactor. Flux. Range. 1 - | onNext (8) 19:48:43. 547] [the parallel - 1 the INFO reactor. The Flux. Range. 1 - | onNext (9) 19:48:43. 548 INFO com [the parallel - 1]. The example. The demo. FluxTest - subscribe: 8 19:48:43. 751 [the parallel - 2] INFO Reactor. Flux. Range. 1 - | onNext (10) 19:48:43. [the parallel - 3] 952 INFO reactor. The Flux. Range. 1 - | onNext (11)Copy the code

As you can see, some of the data is discarded because the subscriber is slow

buffer

@Test
    public void testBufferBackpressure() throws InterruptedException {
        Flux.range(1, 1000)
//                .log()
                .delayElements(Duration.ofMillis(200))
                .buffer(Duration.ofMillis(800))
                .subscribe(e -> {
                    LOGGER.info("subscribe:{}",e); try { Thread.sleep(2000); } catch (InterruptedException e1) { e1.printStackTrace(); }}); Thread.sleep(100*1000); }Copy the code

Output instance

19:55:06. 680. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 19:55:06. 712. [the main] INFO reactor. The Flux. Range. 1 - | onSubscribe ([Synchronous Fuseable] FluxRange. RangeSubscription) 19:55:06. [the main] 714 INFO reactor. The Flux. Range. 1 - | onNext 19:55:06. (1) 940 [the parallel - 2] INFO Reactor. Flux. Range. 1 - | onNext 19:55:07. (2), 141 [the parallel - 3] INFO reactor. The Flux. Range. 1 - | onNext 19:55:07. (3), 343 [the parallel - 4] INFO reactor. Flux. Range. 1 - | onNext 19:55:07. (4) 509 [the parallel - 1] INFO com. Example. Demo. FluxTest - subscribe:[1, 2, 545 [3] 19:55:07. The parallel - 5] INFO reactor. The Flux. Range. 1 - | onNext 19:55:07. (5), 748 [the parallel - 6] INFO reactor. The Flux. Range. 1 - | onNext (6) 19:55:07. 951 / parallel - 7 INFO reactor. The Flux. Range. 1 - | onNext 19:55:08. (7), 156 [the parallel - 8] INFO Reactor. Flux. Range. 1 - | onNext (8) 19:55:09. 512 INFO com [the parallel - 1]. Example. Demo. FluxTest - subscribe: [4, 5, 6, 515 [7] 19:55:11. The parallel - 1] INFO reactor. The Flux. Range. 1 - | onNext (9) 19:55:11. 516 INFO [the parallel - 1] Com. Example. Demo. FluxTest - subscribe: [8] 19:55:11, 719 [the parallel - 2] INFO reactor. The Flux. Range. 1 - | onNext (10) 19:55:11. 923 [the parallel - 3] INFO reactor. The Flux. Range. 1 - | onNext 19:55:12 (11), 127 [the parallel - 4] INFO reactor. The Flux. Range. 1 | onNext (12) 19:55:12. 330 [the parallel - 5] INFO reactor. The Flux. Range. 1 - | onNext (13) 19:55:12. [the parallel - 6] 533 INFO Reactor. Flux. Range. 1 - | onNext (14) 19:55:12. [the parallel - 7] 735 INFO reactor. The Flux. Range. 1 - | onNext (15) 19:55:12. 941 [the parallel - 8] INFO reactor. Flux. Range. 1 - | onNext (16) 19:55:13. 516 INFO com [the parallel - 1]. Example. Demo. FluxTest - subscribe:[9, 10, 11, 12, 13, 14, 517 [15] 19:55:15. The parallel - 1] INFO reactor. The Flux. Range. 1 - | onNext (17) 19:55:15. 517 INFO [the parallel - 1] Com. Example. Demo. FluxTest - subscribe: [16] 19:55:15, 721 [the parallel - 2] INFO reactor. The Flux. Range. 1 - | onNext (18) 19:55:15. 925 [the parallel - 3] INFO reactor. The Flux. Range. 1 - | onNext (19) 19:55:16. 127] [the parallel - 4 INFO reactor. The Flux. Range. 1 | onNext (20) 19:55:16. 331 / parallel - 5 INFO reactor. The Flux. Range. 1 - | onNext (21) 19:55:16. [the parallel - 6] 537 INFO Reactor. Flux. Range. 1 - | onNext (22) 19:55:16. [the parallel - 7] 738 INFO reactor. The Flux. Range. 1 - | onNext (23) 19:55:16. 942 [the parallel - 8] INFO reactor. Flux. Range. 1 - | onNext (24) 19:55:17. 519 INFO com [the parallel - 1]. Example. Demo. FluxTest - subscribe:[17, 18, 19, 20, 21, 22, 522 [23] 19:55:19. The parallel - 1] INFO reactor. The Flux. Range. 1 - | onNext (25) 19:55:19. 522 INFO [the parallel - 1] com.example.demo.FluxTest - subscribe:[24]Copy the code

The data generated in each 800ms is piled up and pushed to subscribers in batches

skip

@Test
    public void testSkip() throws InterruptedException {
        Flux.range(1, 1000)
                .log()
                .delayElements(Duration.ofMillis(200))
                .skip(Duration.ofMillis(800))
                .subscribe(e -> {
                    LOGGER.info("subscribe:{}",e); try { Thread.sleep(2000); } catch (InterruptedException e1) { e1.printStackTrace(); }}); Thread.sleep(100*1000); }Copy the code

Output instance

20:02:07. 558. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 20:02:07. 606. [the main] INFO reactor. The Flux. Range. 1 - | onSubscribe ([Synchronous Fuseable] FluxRange. RangeSubscription) 20:02:07. [the main] 608 INFO reactor. The Flux. Range. 1 - | onNext 20:02:07. (1) 815 [the parallel - 2] INFO Reactor. Flux. Range. 1 - | onNext 20:02:08. (2), 016 [the parallel - 3] INFO reactor. The Flux. Range. 1 - | onNext 20:02:08. (3), 218 [the parallel - 4] INFO reactor. Flux. Range. 1 - | onNext 20:02:08. (4) 421 [the parallel - 5] INFO com. Example. Demo. FluxTest - The subscribe: 4 20:02:10. [the parallel - 425 5] INFO reactor. The Flux. Range. 1 - | onNext 20:02:10. (5), 631 [the parallel - 6] INFO Com. Example. Demo. FluxTest - subscribe: 5 20:02:12. [the parallel - 6] 635 INFO reactor. The Flux. Range. 1 - | onNext (6) 20:02:12. 840 [the parallel - 7] INFO com. Example. Demo. FluxTest - subscribe: 6 20:02:14. [the parallel - 7] 843 INFO reactor. The Flux. Range. 1 - | OnNext (7) 20:02:15. 049 INFO com. [the parallel - 8] example. Demo. FluxTest - subscribe: 7Copy the code

Skip specifies that data generated during the first time period is skipped

take

@Test
    public void testTakeBackpressure() throws InterruptedException {
        Flux.range(1, 1000)
                .log()
                .delayElements(Duration.ofMillis(200))
                .take(Duration.ofMillis(4000))
                .subscribe(e -> {
                    LOGGER.info("subscribe:{}",e); try { Thread.sleep(2000); } catch (InterruptedException e1) { e1.printStackTrace(); }}); Thread.sleep(100*1000); }Copy the code

Output instance

20:05:08. 366. [the main] the DEBUG reactor. The util. Loggers$LoggerFactory- Using Slf4j logging framework 20:05:08. 419. [the main] INFO reactor. The Flux. Range. 1 - | onSubscribe ([Synchronous Fuseable] FluxRange. RangeSubscription) 20:05:08. [the main] 422 INFO reactor. The Flux. Range. 1 - | onNext 20:05:08. (1) 629 [the parallel - 2] INFO Com. Example. Demo. FluxTest - subscribe: 1 20:05:10. [the parallel - 2] 633 INFO reactor. The Flux. Range. 1 - | onNext 20:05:10. (2), 835 [the parallel - 3] INFO com. Example. Demo. FluxTest - subscribe: 2 20:05:12. 418] [the parallel - 1 the INFO reactor. The Flux. Range. 1 - | cancel()Copy the code

Take indicates that only the data generated in the previous few days or a period of time will be pushed to subscribers

summary

Reactive Streams is very useful for multi-stage data processing, saving a lot of time, as well as backpressure to control slow subscriber speeds.

doc

  • The most friendly article on RxJava — Backpressure
  • Flux for the Java Project Actor framework