【 Beijing 】 IT technical personnel face interview, job-hopping, promotion and other problems, how to grow quickly, get the entrance qualification of big factory and promotion and salary chip? Face to face with dachang technology masters to answer your questions. My Career Anxiety and Redemption: A Journey from a young professional to a technical director

Congratulations FPX, new king, LPL * B we are the champions

The original link: segmentfault.com/a/119000002…

preface

Only a bald head can be strong. Text has been included to my lot warehouse, welcome Star:https://github.com/ZhongFuCheng3y/3y

The knowledge structure of this article:

If there are students who follow my articles on the official account, they will find that I have forwarded some good WebFlux articles from time to time recently, because I am learning recently.

As I said before, before you learn a technology, you need to understand why you are learning it. In fact, I didn’t have much original motivation to learn WebFlux this time, mainly because I would take turns to share technology in our group, and I didn’t know what was better to share…

I was just learning the knowledge related to big data before, but the time line of this piece was too long to share in the group (and most of the students in the group knew big data, so I was the only one who liked big data, tears eyes). So, thinking, “Why don’t I learn something new?” . So I spent some time learning about WebFlux

This article mainly explains what WebFlux is, lead you into the door, hope to help you (at least after reading this article, know what WebFlux is used for).

What is WebFlux?

We can scroll down a little bit from the Spring website to see where WebFlux is introduced

What can we learn from the introduction on the official website?

  • We programmers often choose different technologies for different application scenarios, some suitable for synchronous blocking and some suitable for asynchronous non-blocking. Spring5 provides a full stack of responsive (non-blocking) technologies that we can use (Web controllers, permission controls, data access layers, and so on).

  • The diagram on the left is a comparison of technology stacks;

    • Reactive typically uses Netty or Servlet 3.1 containers (because of asynchronous non-blocking support), while Servlet technology stacks use Servlet containers
    • On the Web side, reactive uses WebFlux and servlets use SpringMVC. -…. In summary, WebFlux is only one part of responsive programming (on the Web control side), so we generally compare it to SpringMVC.

How to understand responsive programming?

Reactive Programming is mentioned above, and WebFlux is just one stack of Reactive Programming, so let’s start by exploring what Reactive Programming is

Here’s the definition from Wikipedia:

reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change

Reactive programming is a programming paradigm declarative based on data stream and propagation of change

Here’s a little example from Wikipedia:

It means something like this:

  • In imperative programming (our everyday programming mode), the formulaa=b+cWhich means thataThe value of is madebAnd c. ifborcThere are subsequent changes,Does not affect thetoaThe value of the
  • In reactive programming, the formulaa:=b+cWhich means thataThe value of is madebandcIt’s calculated. But if theborcSubsequent changes in the value of, will affectaThe value of the

In my opinion, the above examples can help us to understand the propagation of change.

What about data stream and declarative? Let’s mention our Stream. Lambda expressions and Stream streams

  • Recently learned basics of Lambda expressions
  • The syntax for a Stream Stream is as follows:

The use of a Stream consists of three steps (creating a Stream, performing an intermediate action, and performing the final action) :

Performing intermediate operations actually gives us a lot of APIS to manipulate the data in the Stream (summing/de-replicating/filtering) and so on

With all that said, what about data flows and declaratives? It works like this:

  • We used to process the data ourselves, but then we abstracted the data to be processed (into a data stream), and then processed the data in the data stream through the API (declarative).

Such as the following code; Turn the data in the array into a data stream, and process the data in the data stream with an explicit declaration calling.sum() to get the final result:

public static void main(String[] args) {
    int[] nums = { 1.2.3 };
    int sum2 = IntStream.of(nums).parallel().sum();
    System.out.println("Results are:" + sum2);
}
Copy the code

As shown below:

2.1 Reactive programming -> Asynchronous non-blocking

Here’s what responsive programming is:

Reactive programming is a programming paradigm declarative based on data stream and propagation of change

I also explained what data flow/chang-pass/declarative means, but when it comes to responsive programming, asynchronous non-blocking is essential.

Asynchronous, nonblocking, async, async, async, async, async

As shown below, the total amount will be affected by other amounts (the update process is asynchronous) :

Our JDK8 Stream is synchronous, so it is not suitable for reactive programming.

Reactive flows are already supported in JDK9, so let’s take a look

Third, JDK9 Reactive

The specification for reactive flows has been proposed: it says:

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure —–>www.reactive-streams.org/

Translation add more information:

Reactive Streams provide a standard for implementing asynchronous non-blocking backpressure by defining a set of entities, interfaces, and interoperable methods. Third parties follow this standard to implement specific solutions. Common examples are Reactor, RxJava, Akka Streams, Ratpack, etc.

The specification actually defines four interfaces:

The Java platform does not provide full support for Reactive until JDK9, which also defines the four interfaces mentioned above in the java.util.concurrent package

A general flow architecture would look something like this (producers generate data, mediate data processing, and consumers consume it) :

  • The sources of data are generally known as producers.
  • The destination of the data, commonly referred to as the Consumer
  • In processing, one or more processing phases perform certain operations on data. (Processor)

If we look back at the interface for a reactive flow, we should be able to understand it:

  • Publishers are equivalent to producers.
  • Subscriber is equivalent to Consumer
  • The Processor is used to process data between publishers and subscribers

In response flow, the concept of back pressure is mentioned, which is actually very easy to understand. Implementing asynchronous non-blocking in reactive flows is based on the producer-consumer model, and one of the problems with producer-consumers is that producers overwhelm consumers by producing too much data.

Back pressure, in plain English, is the ability of consumers to tell producers how much data they need. This is what the Subscription interface does.

Let’s take a look at the methods of the JDK9 interface to make sense of the above statement:

// Publisher (producer)
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
// Subscriber (consumer)
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete(a);
}
// For communication between publishers and subscribers (back pressure: subscribers can tell producers how much data is needed)
public interface Subscription {
    public void request(long n);
    public void cancel(a);
}
// It is used to process a message published by a publisher, which is then processed for consumption by consumers
public interface Processor<T.R> extends Subscriber<T>, Publisher<R> {}Copy the code

3.1 Look at an example

There are a lot of comments in the code, I don’t want to BB, recommend directly copy run to see:

class MyProcessor extends SubmissionPublisher<String>
        implements Processor<Integer.String> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        // Save the subscription, which is needed to respond to the publisher
        this.subscription = subscription;

        // Request data
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        // A data is received and processed
        System.out.println("Processor receives data:" + item);

        // Filter out those less than 0 and publish them
        if (item > 0) {
            this.submit("Converted data :" + item);
        }

        // Call request and request data
        this.subscription.request(1);

        // Or the goal has been reached and cancel is called to tell the publisher that the data is no longer accepted
        // this.subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
        // An exception occurred (e.g. an exception occurred while processing data)
        throwable.printStackTrace();

        // We can tell the publisher that the data will not be accepted later
        this.subscription.cancel();
    }

    @Override
    public void onComplete(a) {
        // All data processed (publisher closed)
        System.out.println("Processor done!");
        // Close the publisher
        this.close(); }}public class FlowDemo2 {

    public static void main(String[] args) throws Exception {
        // 1. Define the publisher. The published data type is Integer
        // Use the JDK SubmissionPublisher directly
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();

        // 2. Define the handler to filter the data and convert it to String
        MyProcessor processor = new MyProcessor();

        // 3. The publisher establishes a subscription relationship with the processor
        publiser.subscribe(processor);

        // 4. Define the final subscriber and consume String data
        Subscriber<String> subscriber = new Subscriber<String>() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // Save the subscription, which is needed to respond to the publisher
                this.subscription = subscription;

                // Request data
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                // A data is received and processed
                System.out.println("Received data:" + item);

                // Call request and request data
                this.subscription.request(1);

                // Or the goal has been reached and cancel is called to tell the publisher that the data is no longer accepted
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // An exception occurred (e.g. an exception occurred while processing data)
                throwable.printStackTrace();

                // We can tell the publisher that the data will not be accepted later
                this.subscription.cancel();
            }

            @Override
            public void onComplete(a) {
                // All data processed (publisher closed)
                System.out.println("Done!); }};// 5. The processor establishes a subscription relationship with the final subscriber
        processor.subscribe(subscriber);

        // 6. Production data, and publish
        publiser.submit(-111);
        publiser.submit(111);

        // 7. Close the publisher
        // The formal environment should either put finally or use try-resouce to ensure closure
        publiser.close();

        // The main thread is delayed to stop, otherwise the data is not consumed and exits
        Thread.currentThread().join(1000); }}Copy the code

The output is as follows:

The process is actually quite simple:

References:

  • Yanbin. Blog/Java – 9 – talk…
  • Blog.csdn.net/wudaoshihun…
  • www.spring4all.com/art…
  • www.cnblogs.com/IcanFixIt/p…

Java 8’s Stream focus is on filtering, mapping, and merging streams, while Reactive Stream is more focused on Stream generation and consumption, that is, the coordination between production and consumption of streams

To put it simply: Reactive flows are asynchronous non-blocking + flow controlled (can tell producers how much they want/unsubscribe)

Looking forward to the application of responsive programming scenarios:

For example, in a log monitoring system, our front-end page will no longer need to constantly request data from the server and then update it through “imperative” polling, but after establishing a good channel, the data flow will flow from the system to the page, so as to show the real-time index change curve; Another example is a social platform. Friends’ dynamic information, likes and messages are not manually generated, but automatically displayed on the interface when the background data changes.

Getting started with WebFlux

Back to WebFlux. Based on the above foundation, we can now draw some conclusions:

  • WebFlux is part of Spring’s rollout of responsive programming (Web side)
  • Responsive programming is asynchronous and non-blocking (a declarative programming paradigm based on data stream and propagation of change).

Let’s go back to the picture on the official website:

4.1 Simply Experiencing WebFlux

In order to make it faster/smoother to WebFlux, SpringMVC was previously supported. In other words: We can use WebFlux just like we use SpringMVC.

The responsive flows WebFlux uses are not based on THE JDK9 platform, but a library called Reactor Responsive Flows. So getting started with WebFlux is more about learning how to use the Reactor API

A Reactor is a reactive flow and has a Publisher. The Publisher of a Reactor is represented by two classes:

  • Mono(returns 0 or 1 element)
  • Flux(returns 0-N elements)

The Subscriber is done by the Spring framework

Let’s look at a simple example (built on a WebFlux environment) :

// block for 5 seconds
private String createStr(a) {
    try {
        TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
    }
    return "some string";
}

// A normal SpringMVC method
@GetMapping("/ 1")
private String get1(a) {
    log.info("get1 start");
    String result = createStr();
    log.info("get1 end.");
    return result;
}

// WebFlux(Mono)
@GetMapping("/ 2")
private Mono<String> get2(a) {
    log.info("get2 start");
    Mono<String> result = Mono.fromSupplier(() -> createStr());
    log.info("get2 end.");
    return result;
}
Copy the code

First, it’s worth noting that when we built our WebFlux environment to boot, the application server was Netty by default:

Let’s take a look at the SpringMVC interface and the WebFlux interface to see the difference:

For SpringMVC:

WebFlux:

From the point of view of the caller (the browser), there is no sense that anything has changed because you have to wait five seconds for the data to return. However, we can see from the server-side logs that WebFlux returns Mono objects directly (as opposed to SpringMVC blocking synchronously for 5 seconds until the thread returns).

This is the benefit of WebFlux: the ability to handle high concurrency with fixed threads (to maximize machine performance).

WebFlux also supports Server push (SSE – >Server Send Event). Let’s take a look at an example:

/** * Flux: returns 0-N elements * Note: MediaType * needs to be specified@return* /
@GetMapping(value = "/ 3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
private Flux<String> flux(a) {
    Flux<String> result = Flux
        .fromStream(IntStream.range(1.5).mapToObj(i -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
            return "flux data--" + i;
        }));
    return result;
}
Copy the code

The effect is to push data to the browser every second:

Thank you very much talent can see here, if this article is written well, feel “three crooked” I have something to ask for praise for attention ️ to share 👥 to leave a message 💬 for warm male I really very useful!!

WebFlux I have not finished writing, this article wrote WebFlux support SpringMVC annotations to develop, the next article wrote how to use WebFlux another mode (Functional Endpoints) to develop and some common problems also need to supplement ~

If you want to learn programming, please searchCircle T community, more industry related information and industry related free video tutorials. It’s totally free!