Introduction: Mr. Tu Zipei, known as “China’s first Big data”, mentioned in his famous book Top of Data that Moore’s Law, social media and data mining are the three causes of big data. According to the IBM study, 90 percent of all data obtained by the entire human civilization has been generated in the past two years. In this context, a number of new technologies have emerged, including NoSQL, Hadoop, Spark, Storm, and Kylin. Among them, Reactive programming technology represented by RxJava and Reactor is aimed at Velocity in the classic definition of big data 4V (Volume, Variety, Velocity, Value), namely, high concurrency problem. In the upcoming Release of Spring 5, Responsive programming support has also been introduced. Over the next few weeks, I’ll be sharing some of my lessons on responsive programming in three installments. This is the second article that uses the Reactor framework as an example to introduce some of the key features of reactive programming.

Previously:

  • 【Spring 5】 Responsive Web Framework foresight

Overview of responsive programming

In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. – Reactive programming – Wikipedia

In addition to asynchronous programming, the above definition of responsive programming (RP) contains two important keywords:

  • Data streams: Static streams (such as arrays and files) and dynamic streams (such as events and logs). Based on the data flow model, RP provides a unified Stream style data processing interface. Compared to the Stream API in Java 8, the RP API supports dynamic as well as static data streams and allows reuse and simultaneous access to multiple subscribers.
  • To sum up, The propagation of change is a process in which one data stream is input, converted to another data stream through a series of operations, and then distributed to all subscribers. This is a bit like combinatorial functions in functional programming, where multiple functions are concatenated to convert a set of input data into output data of very different formats.

One confusing concept is responsive design, which, despite the word “responsive” in its name, is completely different from RP. Responsive design refers to the web page can automatically adjust the layout and style to adapt to different sizes of the screen, belongs to the category of website design, and RP is a kind of attention to system responsiveness, oriented to data flow programming ideas or programming framework.

features

RP is essentially an asynchronous programming framework that contains at least three features compared to other frameworks:

  • Describe rather than perform: before you finally callsubscribe()Method, nothing happens from the publishing side to the subscribing side. Just like no matter how long the water pipe, as long as the tap does not open, the water in the pipe will not flow. To improve description capabilities, RP provides much richer apis than Stream, for examplebuffer().merge().onErrorMap()And so on.
  • Improved throughput: Similar to connection reuse in HTTP/2, RP improves throughput through thread reuse. In a traditional Servlet container, a thread is initiated for each incoming request. There is an upper limit to the number of threads that can be supported by a single server due to limited hardware resources. Assuming T, the number of requests (throughput) that an application can handle at the same time must not exceed T. However, for an RP application developed with Spring 5, running in an asynchronous container such as Netty, the number of threads used to process requests is relatively constant, so the maximum throughput can exceed T.
  • Backpressure support: simply put, Backpressure is a feedback mechanism. In the general Push model, the publisher neither knows nor cares about the processing speed of the subscriber. When the publishing speed of data exceeds the processing speed, it is up to the subscriber to decide whether to cache or discard it. With RP, the decision is handed back to the publisher, and the subscriber simply needs to ask the publisher for the appropriate amount of data according to his or her processing power. You might ask isn’t that the Pull model? It’s different. In the Pull model, subscribers need to make a new request to Pull new data every time they finish processing data, while with back pressure, subscribers only need to make a single request to continuously repeat data requests.

Applicable scenario

Knowing these features of RP, you probably already have a sense of what RP can be used for. In general, RP is suitable for scenarios with high concurrency and delayed operations, such as the following:

  • A request involves multiple external service invocations
  • Unreliable network transmission
  • Message processing under high concurrency
  • Elastic computing network

The price

Every coin has two sides.

As with any framework, there are strengths and weaknesses. The two big problems with RP are:

  • While reusing threads can help improve throughput, once a thread gets stuck in a callback function, all requests on that thread are blocked and, in the worst case, the entire application can be dragged down.
  • Difficult to debug. Because of RP’s descriptive power, most of the code in a typical RP application is in the form of chained expressions, such asflux.map(String::toUpperCase).doOnNext(s -> LOG.info("UC String {}", s)).next().subscribe()Once something goes wrong, it’s hard to pinpoint exactly what went wrong. Fortunately, RP frameworks generally provide some tools to assist in debugging.

2 Reactor of actual combat

To help you understand some of the concepts described above, let me demonstrate two key features of RP through a few test cases: improved throughput and back pressure. See my sample project on GitHub for the complete code.

Improved throughput

    @Test
    public void testImperative(a) throws InterruptedException {
        _runInParallel(CONCURRENT_SIZE, () -> {
            ImperativeRestaurantRepository.INSTANCE.insert(load);
        });
    }

    private void _runInParallel(int nThreads, Runnable task) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
        for (int i = 0; i < nThreads; i++) {
            executorService.submit(task);
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
    }

    @Test
    public void testReactive(a) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(CONCURRENT_SIZE);
        for (int i = 0; i < CONCURRENT_SIZE; i++) { ReactiveRestaurantRepository.INSTANCE.insert(load).subscribe(s -> { }, e -> latch.countDown(), latch::countDown);  } latch.await(); }Copy the code

Use case interpretation:

  • The first test case uses multiple threads +MongoDB Driver. 100 threads are started at the same time, and each thread inserts 10000 pieces of data into MongoDB for a total of 1 million pieces of data, which takes about 15 seconds on average.
  • The second test case uses the Reactor+MongoDB Reactive Streams Driver, which also inserts 1 million data in less than 10 seconds on average and improves throughput by 50%!

Back pressure

Before demonstrating the test case, take a look at two diagrams to help you visualize what back pressure is.

Photo credit:Dataflow and simplified reactive programming

At first glance, the two graphs look the same, but they are two completely different back pressure strategies. In the first figure, the publish rate (100/s) is much faster than the subscription rate (1/s), but because of the back pressure, the publisher sends data strictly according to the number of requests from subscribers. In the second figure, the publishing speed (1/s) is less than the subscription speed (100/s). When the subscriber requests 100 data, the publisher will accumulate the required number of data and start sending. As you can see, with the back pressure mechanism, publishers can dynamically adjust the speed of publication based on the capabilities of individual subscribers.

    @BeforeEach
    public void beforeEach(a) {
        // initialize publisher
        AtomicInteger count = new AtomicInteger();
        timerPublisher = Flux.create(s ->
                new Timer().schedule(new TimerTask() {
                    @Override
                    public void run(a) {
                        s.next(count.getAndIncrement());
                        if (count.get() == 10) { s.complete(); }}},100.100)); }@Test
    public void testNormal(a) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        timerPublisher
                .subscribe(r -> System.out.println("Continuous consuming " + r),
                        e -> latch.countDown(),
                        latch::countDown);
        latch.await();
    }

    @Test
    public void testBackpressure(a) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<Subscription> timerSubscription = new AtomicReference<>();
        Subscriber<Integer> subscriber = new BaseSubscriber<Integer>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                timerSubscription.set(subscription);
            }

            @Override
            protected void hookOnNext(Integer value) {
                System.out.println("consuming " + value);
            }

            @Override
            protected void hookOnComplete(a) {
                latch.countDown();
            }

            @Override
            protected void hookOnError(Throwable throwable) { latch.countDown(); }}; timerPublisher.onBackpressureDrop().subscribe(subscriber);new Timer().schedule(new TimerTask() {
            @Override
            public void run(a) {
                timerSubscription.get().request(1); }},100.200);
        latch.await();
    }Copy the code

Use case interpretation:

  • The first test case demonstrates that in an ideal situation where the subscriber’s processing speed can keep up with the publisher’s publishing speed (producing 10 digits at 100ms intervals), the console prints a total of 10 digits from 0 to 9, consistent with the publisher.
  • The second test case deliberately adjust the processing speed of the subscriber (every 200 ms with a digital), publishers to adopt the strategy of the back pressure Drop at the same time, the console print only the half of the digital,2,4,6,8 (0), the other half of the number because of back pressure by the publisher Drop off, did not sent to the subscriber.

3 summary

From the above introduction, it is easy to see that RP is actually an asynchronous programming framework with built-in publisher subscriber model, including advanced features such as thread reuse and back pressure, especially suitable for high concurrency and delayed scenarios.

That’s my brief introduction to reactive programming. Welcome to my message board and share it with you. In the next article, I will combine the previous two articles and explain a complete Spring 5 sample application.

4 reference

  • Understanding Reactive types
  • Designing, Implementing, and Using Reactive APIs
  • Imperative to Reactive Web Applications