Outline

[TOC]

preface

The previous section explained the thread scheduling, and gave two practical examples, including a login example, I wonder if you have thought about such a problem, if a new user must register first, and then automatic login after the registration is successful.

Obviously, this is a nested network request, first need to request registration, after successful registration callback to request login interface.

Of course we can take it for granted:

    private void login(a) {
        api.login(new LoginRequest())
                .subscribeOn(Schedulers.io())               // Network requests are made in the IO thread
                .observeOn(AndroidSchedulers.mainThread())  // go back to the main thread to process the request result
                .subscribe(new Consumer<LoginResponse>() {
                    @Override
                    public void accept(LoginResponse loginResponse) throws Exception {
                        Toast.makeText(MainActivity.this."Login successful", Toast.LENGTH_SHORT).show(); }},new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Toast.makeText(MainActivity.this."Login failed", Toast.LENGTH_SHORT).show(); }}); }private void register(a) {
        api.register(new RegisterRequest())
                .subscribeOn(Schedulers.io())               // Network requests are made in the IO thread
                .observeOn(AndroidSchedulers.mainThread())  // go back to the main thread to process the request result
                .subscribe(new Consumer<RegisterResponse>() {
                    @Override
                    public void accept(RegisterResponse registerResponse) throws Exception {
                        Toast.makeText(MainActivity.this."Registration successful", Toast.LENGTH_SHORT).show();
                        login();   // If the registration succeeds, call the login method}},new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Toast.makeText(MainActivity.this."Registration failed", Toast.LENGTH_SHORT).show(); }}); }Copy the code

(Actually can write such code is also very good, at least not written together…)

Such code works, but is not elegant, and this section will allow us to solve this problem in a more elegant way.

To the chase

Let’s start with the simplest transformation operator, Map

Map

Map is one of the simplest transformation operators in RxJava. Its function is to apply a function to each event sent upstream, making each event change according to the specified function. As shown in the event graph:

map.png

The map function converts a circular event to a rectangular event, resulting in a rectangular event received downstream. To represent this example in code:

    Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "This is result " + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception { Log.d(TAG, s); }});Copy the code

Upstream we send numbers, downstream we receive strings, and in the middle we convert to the map operator. The result is:

 D/TAG: This is result 1 
 D/TAG: This is result 2 
 D/TAG: This is result 3Copy the code

With Map, you can convert events sent from upstream to any type, either an Object or a collection. Wouldn’t you like to try such a powerful operator?

Next, let’s look at another well-known operator, flatMap.

FlatMap

FlatMap is a very powerful operator, first illustrated with a rather esoteric concept:

FlatMap transforms an upstream Observables that send events into multiple Observables that send events, and then merges their emitted events into a single Observable.

This sentence is more difficult to understand, let’s first easy to understand the picture to explain in detail, first of all, to look at a picture of the whole:

flatmap.png

Let’s look at upstream. Upstream sends three events, one, two, three, and notice the colors.

The middle flatMap converts circular events into a new upstream Observable that sends rectangular and triangular events.

Still can’t understand? Don’t worry, let’s look at the breakdown:

flatmap1.png

This is very easy to understand!!

For each event sent upstream, flatMap creates a new water pipe, then sends the new event after the transformation, and the downstream receives the data sent by these new water pipes. It is important to note here that flatMap does not guarantee the order of events, as seen in the figure, that event 1 precedes event 2. If you want to ensure order, use concatMap.

Having said that, let’s look at how code is written in practice:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception { Log.d(TAG, s); }});Copy the code

As shown in the code, we convert each event sent upstream into a new pipe that sends three String events in the flatMap. To see that the flatMap result is unordered, we add a delay of 10 milliseconds. Let’s see the result:

D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 2
D/TAG: I am value 2
D/TAG: I am value 2Copy the code

And the results do confirm what we said before.

ConcatMap is exactly the same as flatMap, except that its results are sent in exactly the same order as those sent upstream.

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception { Log.d(TAG, s); }});Copy the code

Just change the previous flatMap to concatMap and leave the rest unchanged. The result is as follows:

D/TAG: I am value 1   
D/TAG: I am value 1   
D/TAG: I am value 1   
D/TAG: I am value 2   
D/TAG: I am value 2   
D/TAG: I am value 2   
D/TAG: I am value 3   
D/TAG: I am value 3   
D/TAG: I am value 3Copy the code

As you can see, the result is still in order.

RxJava has a large number of built-in operators. Map and flatMap are just a primer for other operators. As long as you follow the ideas of this article to understand, and then carefully read the documentation, There should be no problem, if you need to list the operators that need to be explained, I can explain it according to your needs.

practice

By learning the FlatMap operator, we can answer the question posed at the beginning of this article.

How to elegantly resolve nested requests is to use flatMap transformations.

Let’s review the request interface from the previous section:

public interface Api {
    @GET
    Observable<LoginResponse> login(@Body LoginRequest request);

    @GET
    Observable<RegisterResponse> register(@Body RegisterRequest request);
}Copy the code

As you can see, both login and registration return an upstream Observable, and the function of our flatMap operator is to convert one Observable to another, so the result is obvious:

            api.register(new RegisterRequest())            // Initiate a registration request
                .subscribeOn(Schedulers.io())               // Network requests are made in the IO thread
                .observeOn(AndroidSchedulers.mainThread())  // go back to the main thread to process the registration result
                .doOnNext(new Consumer<RegisterResponse>() {
                    @Override
                    public void accept(RegisterResponse registerResponse) throws Exception {
                        // Perform some operations based on the registered response results
                    }
                })
                .observeOn(Schedulers.io())                 // Go back to the IO thread to initiate the login request
                .flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {
                    @Override
                    public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
                        return api.login(new LoginRequest());
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())  // Go back to the main thread to process the result of the login request
                .subscribe(new Consumer<LoginResponse>() {
                    @Override
                    public void accept(LoginResponse loginResponse) throws Exception {
                        Toast.makeText(MainActivity.this."Login successful", Toast.LENGTH_SHORT).show(); }},new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Toast.makeText(MainActivity.this."Login failed", Toast.LENGTH_SHORT).show(); }});Copy the code

This example also shows how easy it is to switch threads.

That’s it for this tutorial. In the next section we will learn about Flowable and understand the concept of Backpressure. Stay tuned.