preface

Due to the replanning of the company’s department, I was transferred to another department, so the project I was in charge of was also changed. I took a closer look at the overall project, RxJava + Retrofit. A whole set. As we all know, RxJava + Retrofit is currently the most popular web resolution framework on the Internet. Most of the articles on the web are still about RxJavA1. Little is known about RxJava2, hence this article.

The purpose of this article is threefold: 1. To give people who are interested in RxJava2 some introduction. 2. Some deeper parsing 3 for those who are using RxJava2 but are still wondering. Give a direct comparison to those who want to replace RxJava1 with RxJava2.

Introduction to RxJava concepts

RxJava = reactive + extension. So I’m going to talk about these two points in general. Reactive is also called reactive programming. Reactive programming. In short, RxJava can handle thread switching very easily. When we think about this, we think about asynchronous operations. Handler? AsyncTask? But you should know that as the number of requests increases, the code logic will become more complex. Rxjava, on the other hand, still maintains clear logic. What it does is it creates an Observable to do things. It then uses various operators to create a series of chain operations through the builder pattern. It’s like an assembly line, getting things done. It is then transmitted to the Observer for processing.

Observer mode

Rxjava is implemented primarily through the observer pattern. So what is the observer mode? Let me just give you a quick introduction.

Chestnut: The observer does a simple thing to the observed. When the observed is changed, react immediately. For example, if you think that Lao Wang next door is having an affair with your wife, but you have no proof, then as soon as Lao Wang next door enters your wife’s door, you will go and capture him. In this example, you are the observer and Wang is the observed. (REMEMBER when I used to get it backwards). So is the observer pattern one to one? Obviously not. In the above example, you can have 3,000 chengguan listening to Lao Wang. As long as he’s up to something. I’ll break his third leg. That is, multiple observers correspond to one observed. Word tired to see the picture:



In fact, there are many built-in observer modes in Android. The most obvious is the click event. For the simplest example, click a button and play a toast. So, when we click the button, we tell the system, at this point, I need to play a toast. So it just pops out. Well, here’s the problem. Do I need to listen to this button in real time? The answer is no. This is different from the previous examples. In other words. I just listen when this button is clicked. This operation is called a subscription. That is, the Button listens to the onClick method by subscribing to the OnclickListener via setOnClickListener.

extension

  • Support not only for event sequences but also for data flows. Events -> dynamic, unpredictable, e.g. event clicks, server pushes, etc. Data streams -> static, predictable, e.g. reading local files, playing audio/video, etc.

  • The handling of intermediate events by operators.

  • Convenience of thread operation. About these specific implementations. I’ll give you some examples later.

RxJava1 and RxJava2

Speaking of differences, some of you might ask, I haven’t seen RxJava1. You can see rxJavA2 directly. Rxjava2.x is completely rewritten according to the Reactive Streams Specification. It is completely independent of rxJavA1.x, which changes the use of rxJava1. In other words, do I need to learn C language before learning Java?

So what’s the difference? Mainly in the following aspects:

  • This should be a big change, as anyone who has used RxJava1 knows that we can pass NULL when firing events. This does not exist in RxJavA2. Don’t believe you try? I’ll give you a NullPointerExpection every minute.

  • In RxJava1, we have various Func1, Func2…… , but in RxJavA2 there is only Function. Still remember to read the kai elder brother’s article of the time to make me whole. I didn’t find it until I noticed it had been replaced. Also, they all add throw exceptions.

  • About backpressure, this is great. I don’t even know what it is. All right, just kidding. Let’s move on. We know that Observable supports backpressure in Rxjava1. But In Rxjava2, Observable removes support for backpressure. And it introduced something called Flowable to support backpressure.

The upstream production speed is greater than the downstream processing speed, resulting in the downstream processing is not urgent, this operation is called backpressure.

This may seem like a common situation, but in fact, it’s not very common, or very rare. So what to do? If it does, throw it away. What the fuck? Are you fucking kidding me? But that’s just the way it is. If we get backpressure in development, we should get rid of it.

Backpressure occurs when the amount of events that can be discarded exceeds the upper limit of the buffer.

What is the solution: 1. Discard the new event. Don’t throw away, continue to pile up. (Ignore backpressure, Observable).

Situations suitable for backpressure: Live streaming online: For example, when live streaming, suddenly the network has a lag, the page is stuck. So when the network is ready, it will not continue on the previous page, which is equivalent to how long your network card, he discarded the data for how long.

What is the key point of backpressure: it is not controllable and can be discarded.

The basic use

A lot of ideas, and then it’s time to get to work. So the basic implementation of Rxjava2 is three things: create an Observable, create an Observer, and bind. So let’s look at them one by one.

Create observables

What is an Observable? Observer or observed? I forgot again. Ha ha. I’m kidding. The latter, of course. Why create an Observable first instead of an Observer? Of course, the order doesn’t matter. But consider the chained calls that follow. So I’m just going to create an Observable first.

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("Hello");
                emitter.onNext("Rxjava2");
                emitter.onNext("My name is Silence");
                emitter.onNext("What's your name"); // Once onComplete is called, the following will no longer be accepted emitter. OnComplete (); }});Copy the code

Now let me explain what ObservableEmitter is. It literally means observable emitter. Yes, this is what the observer uses to send the event. It can emit three types of events, next, Error, and complete, respectively, by calling Emitter onNext(T Value), onError(Throwable Error), and onComplete(). As to what these three events mean. No hurry. We’ll talk later.

Create the Observer

Now let’s create an observer, which determines what kind of behavior should take place in the observation.

        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe: " + d);
                result += "onSubscribe: " + d + "\n";
            }
​
            @Override
            public void onNext(String string) {
                Log.i(TAG, "onNext: " + string);
                result += "onNext: " + string + "\n";
            }
​
            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError: " + e);
                result += "onError: " + e + "\n";
            }
​
            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete: ");
                result += "onComplete: " + "\n"; }};Copy the code

OnSubscribe, onNext, onError, and onComplete are necessary implementations.

  • OnSubscribe: This is called before the event is sent and can be used to prepare for it. And then the Disposable inside is used to cut off the relationship between the upstream and downstream.

  • OnNext: Normal event. Events to be processed are added to the queue.

  • OnError: Event queue exception. This method is called when an exception occurs during event processing. At the same time, the queue will terminate, that is, no event will be emitted.

  • OnComplete: Event queue completion. Rxjava not only handles each event individually. And I think of them as a queue. When no more onNext events are emitted, the onComplete method needs to be fired as a completion flag.

To Subscribe

Subscribing is really just a line of code:

observerable.subscribe(Observer);Copy the code

Run one to see what happens first:

Just like we did before, we call onSubscribe, we go onNext, and we end up with onComplete.

Amazing operator

For RxJava, there’s a saying that I think is true: If you study an operator every day, at least a month and a half, if you want to understand how it works. At least half a year. In other words, you could write a book about RxJava. This article will certainly not go into that detail. I’m going to show you some of the common operators on this side. Ensure that the daily development process is adequate.

Create operator

The general create operator is what you call when you first create the observer. I’ve covered the create operator in basic usage, so here we go to just, fromarray, and Interval.

just

This operator emits the passed arguments in turn.

Observable observable = Observable.just("Hello"."Rxjava2"."My name is Silence"."What's your name"); // will be called in turn: // onNext("Hello");
// onNext("Rxjava2");
// onNext("My name is Silence");
// onNext("What's your name");
// onCompleted();Copy the code
fromarray

Sends the incoming array through the coordinates at once.

String[] words = {"Hello"."Rxjava2"."My name is Silence"."What's your name"}; Observable observable = Observable.from(words); // will be called in turn: // onNext("Hello");
// onNext("Rxjava2");
// onNext("My name is Silence");
// onNext("What's your name");
// onCompleted();Copy the code
interval

This is actually a timer, and with this you can get rid of CountDownTimer. Now let’s see how to use it:

 Observable.interval(2, TimeUnit.SECONDS).subscribe(
                new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.i(TAG, "accept: "+aLong.intValue()); }});Copy the code

Let’s look at the results:



So that’s what we print the value of long every 2 seconds.

Transformation operator

The transform operator performs some transformation operations on the data emitted by the Observable according to certain rules, and then transmits the transformed data. Transform operators include map, flatMap, concatMap, switchMap, Buffer, groupBy, and so on. Here we’ll look at some of the most common maps: flatMap, concatMap, and compose.

map

The Map operator converts an Observable into a new Observable by specifying a Function object and emits it. The observer receives the new Observable. Go directly to the code:

  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "This is result " + integer + "\n";
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String str) throws Exception {
                Log.i("- >"."accept: "+str); string += str; }}); tv_first.setText(string);Copy the code

The input result is as follows:



In the map() method, we convert an INTEGER object to a String. Then, when the map() call ends, the event parameter type is also converted from INTEGER to String. This is the most common transformation operation.

flatMap

The FlatMap operator turns the set of data emitted by an Observable into an Observable set. That is, it can say that one observation can be transformed into multiple observations, but it does not guarantee the order of events. Want to ensure the order of events? You’ll see the concatMap below.

So what does a set of data become an Observable set? Again, in the example above, I have a set of integers. What if I want to convert it to a set of strings? Let’s move on to the code:

  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer + "\n");
                }
                return Observable.fromIterable(list);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i("- >"."accept: "+s); string += s; }}); tv_first.setText(string);Copy the code

Let’s look at the results:



Hold on, hold on. Is there a problem? WTF? What’s the problem? Remember I said that flatMap does not guarantee the order in which events are executed. So why are the events here executed in order? No hurry, let’s put a delay on the launch event and see what happens:

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer + "\n");
                }
                return Observable.fromIterable(list).delay(100,TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i("- >"."accept: "+s); string += s; }}); tv_first.setText(string);Copy the code

Let’s add a 100ms delay when he launches the event and see what happens:



See? What did I say? Execution order is not guaranteed. So let me tell you all about it. Have a cup of tea first. Let’s move on.

concatMap

I also introduced concatMap above. It’s the same as concatMap except for the order of execution. You said you were gonna make a promise. You have a cup of tea, and then read on:

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer + "\n");
                }
                return Observable.fromIterable(list).delay(1000,TimeUnit.MILLISECONDS);
//                return Observable.fromIterable(list);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i("- >"."accept: "+s); string += s; }}); tv_first.setText(string);Copy the code

So just to make it a little bit more obvious, I’m just going to set a delay of one second here. Here’s a look at the renderings:



As you can see from the execution sequence and the print time, there is indeed a delay of one second.

compose

So this operator is pretty impressive. How does his transformation work? We know that RxJava is called through the builder pattern through the chain. Multiple chains require multiple Observables. The operator converts multiple Observables into one Observable. Doesn’t that sound great? How to operate, we continue to see:

​
    public  <T> ObservableTransformer<T, T> applyObservableAsync() {
        return new ObservableTransformer<T, T>() {
            @Override
            public ObservableSource<T> apply(Observable<T> upstream) {
                returnupstream.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); }}; }Copy the code

As you can see from the code above, I’ve wrapped the child thread and the main thread together and returned an ObservableTransformer object. So we just have to do it this way:

    Observable.just(1, 2, 3, 4, 5, 6)
                .compose(this.<Integer>applyObservableAsync())
                .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer strings) throws Exception {
                Log.i("-- >"."accept: "+ strings); string += strings; }}); tv_first.setText(string);Copy the code

Filter operator

The filter operator is used to filter and select the sequence of data emitted by the Observable. Let the Observable only return data that satisfies our condition. Filter operators include buffer, filter, skip, take, skipLast, takeLast and so on. I will introduce filter, buffer, skip, take and distinct here.

filter

The filter operator is a regular filter of the results produced by the source Observable. Only results that satisfy the rules are delivered to the observer. Such as:

Observable. Just (1,2,3).filter(new Predicate<Integer>() {@override public Booleantest(Integer integer) throws Exception {
                return integer < 3;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer s) throws Exception {
                Log.i("- >"."accept: "+ s); string += s; }}); tv_first.setText(string); }Copy the code

The code is simple. We send 1,2,3; But let’s add a filter operator that returns only things less than 3. So let’s look at the results:



distinct

This operator is actually much simpler. For example, if I want to remove duplicates from a set of data, I use it. Which is to de-weight. It allows only data items that have not yet been launched. The data items that have been transmitted pass directly.

Observable. Just (1,2,3,4,2,3,5,6,1,3). Distinct ().subscribe(new Consumer<Integer>() {@override public void accept(Integer s) throws Exception { Log.i("- >"."accept: "+ s); string += s; }}); tv_first.setText(string);Copy the code

The output is simple:



buffer

This isn’t really that hard. It’s basically caching, transforming the source Observable into a new Observable. Instead of sending a single data source, this new Observable emits a List of items at a time.

  Observable.just(1,2,3,4,5,6)
                .buffer(2).subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> strings) throws Exception {
                for (Integer integer : strings) {
                    Log.i("-- >"."accept: "+integer);
                    string+=strings;
                }
                Log.i("-- >"."accept: ----------------------->"); }}); tv_first.setText(string);Copy the code

Let’s ask him to cache two at a time. Here’s the result:



The skip, take

The skip operator filters out the first N items of the data emitted by the source Observable, while the take operation takes only the first N items. SkipLast and takeLast filter from the back to the front. Take a look at the SKIP operator first.

 Observable.just(1, 2, 3, 4, 5, 6)
                .skip(2).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer strings) throws Exception {
                Log.i("-- >"."accept: "+ strings); string += strings; }}); tv_first.setText(string);Copy the code

Here are the results:



Let’s change skip to take.

 Observable.just(1, 2, 3, 4, 5, 6)
                .take(3).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer strings) throws Exception {
                Log.i("-- >"."accept: "+ strings); string += strings; }}); tv_first.setText(string);Copy the code

Here are the results:



Combination operator

merge

Merge is the process of matching multiple operations into an Observable and launching them into an Observable. Merge can disrupt data merged into an Observable. (Parallel disorder)

Observables < Integer > observable1 = observables. Just (1, 2, 3); Observables < Integer > observable2 = observables. Just (1, 2, 3); Observable.merge(observable1,observable2).subscribe(new Consumer<Integer>() { @Override public void accept(Integerinteger) throws Exception {
                Log.i(TAG, "accept: "+integer); }});Copy the code

Here are the results:



concat

Merge and transmit data from multiple Observables. Unlike merge, merge is unordered, while Concat is ordered. (serial order) It must not send the last one until it has finished firing the first one.

Observables < Integer > observable1 = observables. Just (1, 2, 3); Observables < Integer > observable2 = observables. Just (4 and 6); Observable.concat(observable1,observable2).subscribe(new Consumer<Integer>() { @Override public void accept(Integerinteger) throws Exception {
                Log.i(TAG, "accept: "+integer); }});Copy the code

Here are the results:



zip

This operator combines the data items sent by multiple Observables, transforms them according to their type, and emits a new value.

Observables < Integer > observable1 = observables. Just (1, 2, 3); Observable<String> observable2=Observable.just("a"."b"."c");
        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
​
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer+s;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i(TAG, "apply: "+s); }});Copy the code

Here are the results:



concatEager

ConcatEager is parallel and ordered. Let’s see if we modify:

Observables < Integer > observable1 = observables. Just (1, 2, 3); Observable<String> observable2=Observable.just("a"."b"."c"); Observable.concatEager(Observable.fromArray(observable1,observable2)).subscribe(new Consumer<Serializable>() { @Override  public void accept(Serializable serializable) throws Exception { Log.i(TAG,"accept: "+serializable); }});Copy the code

Here are the results:



The thread of control

Thread control is also an operator. But it does not belong to create, transform, filter. So let me just pull it out on its own.

SubscribeOn indicates the upstream thread that sends events. In other words, child threads. Only the first time that an upstream thread is specified is valid. That is, only the first time that a call to subscribeOn() is specified is valid, and the rest is ignored.

ObserverOn is the downstream thread that receives events. That’s the main thread. It is possible to specify the downstream thread more than once, meaning that each time observeOn() is called, the downstream thread switches once.

Here’s an example:

Observable. Just (1, 2, 3, 4) SubscribeOn (schedulers.io ()).observeon (schedulers.newthread ()).map(mapOperator) // newThread, ObserveOn (Schedulers. IO ()).map(mapOperator2) // The IO thread, By observeOn () to specify. ObserveOn (AndroidSchedulers. MainThread). The subscribe (the subscriber); // Android main thread, specified by observeOn()Copy the code

In RxJava, there are already a number of built-in threading options for us to choose from

  • Schedulers.io() : The Scheduler used for I/O operations (reading and writing files, databases, network interactions, etc.). The behavior pattern is similar to newThread(). The difference is that the internal implementation of IO () uses an unlimited pool of threads. Idle threads can be reused. So in most cases IO () is more efficient than newThread().

  • Schedulers. Immediate () : Runs directly on the current thread.

  • Schedulers.computation() : Scheduler used for computing, such as graphical calculations. This Scheduler uses a fixed thread pool, the size of which is CPU cores. Do not put I/O operations in computation. Otherwise, waiting for I/O operations wastes CPU.

  • Schedulers.newthread () : represents a regular newThread

  • Schedulers.trampoline() : When we want a task to be executed on a thread (not immediately), we can use this method to queue it. The scheduler will process its queue and execute each task in the queue in order.

  • AndroidSchedulers. MainThread () : on behalf of the main thread of Android

These built-in schedulers are sufficient for our development needs, so we should use the built-in options, and use the thread pool inside RxJava to maintain these threads, all of which is more efficient.

With the Retrofit

Retrofit is arguably the most popular web framework from a development perspective right now. I think there are two reasons for this. First, it can be combined with OKHTTP. Second: can be combined with RxJava. In addition to the traditional Callback API, Retrofit also provides an RxJava version of the Observable API.

If we need to use Retrofit, we need to add this sentence to gradle’s configuration:

compile 'com. Squareup. Retrofit2: retrofit: 2.0.1'Copy the code

Without further ado, let’s go to the following example:

​
    private static OkHttpClient mOkHttpClient;
    private static Converter.Factory gsonConverterFactory = GsonConverterFactory.create();
    private static CallAdapter.Factory rxJavaCallAdapterFactory = RxJavaCallAdapterFactory.create();
     public static BaseHttpApi getObserve() {
​
        if (baseHttpApi == null) {
            Retrofit retrofit = new Retrofit.Builder()
                    .addConverterFactory(gsonConverterFactory)
                    .addCallAdapterFactory(rxJavaCallAdapterFactory)
                    .client(mOkHttpClient)
                    .baseUrl(BaseUrl.WEB_BASE)
                    .build();
            baseHttpApi = retrofit.create(BaseHttpApi.class);
       }
        return baseHttpApi;
​
    }Copy the code

As shown in the above code, it is clear that it creates gSON and RxJava through two factory patterns. And bind them through chained calls. So how do you implement network requests through chained calls? No hurry. Let’s have a cup of tea and keep looking.

For example, for a post request, we could write:

public interface BaseHttpApi{  
    @FormUrlEncoded
    @POST("seller/cash_flow_log_detail.json")
    Observable<ServiceReward> serviceReward(@Field("requestmodel") String model);
}Copy the code

Knock on the blackboard. Notice that I have an interface here instead of a class. Next is the daily call, the code is as follows:

  Network.getObserve()
                .serviceReward(new Gson().toJson(map))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<ServiceReward>() {
                    @Override
                    public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(ServiceReward serviceReward) { parseOrderDetail(serviceReward); }});Copy the code

Looking at the second line, this is why you created gSON in the factory pattern in the first place. Now we just need to handle the normal logic in the parseOrderDetail method. Does that look like a lot of code? So we can do this:

 Network.getObserve()
                .serviceReward(new Gson().toJson(map))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(serviceReward ->{
                        parseOrderDetail(serviceReward);
                 });Copy the code

A Lamada expression, doesn’t it feel like a lot of instant code, but some people say, when I load it, it’s a popover, and if I fail to load it, won’t my popover be hidden? It doesn’t exist. What if it does? Here we go:

 Network.getObserve()
                .serviceReward(new Gson().toJson(map))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(serviceReward ->{
                        parseOrderDetail(serviceReward);
                 },throwable ->{dosomething when net error... });Copy the code

What a way to deal with it. For Lamada, I may not get used to it at first, but once I get used to it, I will find that the code is very concise (I am also getting used to it recently).

The last

Rxjava is actually quite difficult for us to get started with. Or rather, the rxJava stuff is so deep that it’s hard to get a handle on it. So as I said before if you study an operator every day, at least a month and a half, if you want to understand the principle. At least half a year. In other words, you could write a book about RxJava. But on a daily basis, the content in this article should cover most of your daily needs. Of course, you can try to understand the underlying implementation of RxJava if you want to.