We talk about RxJava2 from the creation of an event to the event is observed process principle, this article we talk about RxJava2 chain call principle. This article does not cover usage; familiarity with the basic usage of Rxjava is still required.

Basic usage of Rxjava2

Rxjava solves the asynchronous problem, and its chained calls make the code look smooth and elegant. Now let’s take a look with thread switches and chain calls. The following code is an example:

        Observable
                .create(new ObservableOnSubscribe<String>() {

                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        e.onNext("a");
                    }
                })
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        return 1;
                    }
                })
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {}@Override
                    public void onNext(Object o) {}@Override
                    public void onError(Throwable e) {}@Override
                    public void onComplete(a) {}});Copy the code

We create an event (observer) and want to output the string “a”. This event occurs on the IO thread, terminates on the IO thread, and the state callback of the event occurs on the main thread. You should be able to understand the usage of the example, we will mainly discuss the chain of principle flow. Why do you say that? Because this chain doesn’t look like a regular chain.

2. The create method

This method, as we saw before, returns an ObservableCreate object, an ObservableCreate descends from an Observable, and the source in it holds the ObservableOnSubscribe anonymous object we created.

3. SubscribeOn method

Obserbvable method: Obserbvable method

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }


    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if(f ! =null) {
            return apply(f, source);
        }
        return source;
    }
Copy the code

ObservableSubscribeOn<T>(this, Scheduler) returns the ObservableSubscribeOn<T>(this, Scheduler) object we created, and passes in the ObservableCreate object, which is the ObservableCreate object. So let’s look at the code for this class:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T.T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler; }}Copy the code

This class inherits from AbstractObservableWithUpstream class, the constructor parameter is ObservableSource, so here we need to introduce two classes:

  • ObservableSource ObservableSource is an interface that all Observables implement.
 void subscribe(@NonNull Observer<? super T> observer);
Copy the code

This is one method. This method is obviously used to get the Observer to subscribe to an Observable, or to pass event state to the Observer.

  • AbstractObservableWithUpstream this class inherits Observbable
abstract class AbstractObservableWithUpstream<T.U> extends Observable<U> {

    protected final ObservableSource<T> source;

    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source; }}Copy the code

As you can see from the source, this class has the source variable, which is passed in the constructor to store the ObservableSource object.

So when we call the subscribeOn method of an Observable, we create an ObservableSubscribeOn object, store the current Observable with the variable source, and return the ObservableSubscribeOn object.

4. UnsubscribeOn method

  public final Observable<T> unsubscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableUnsubscribeOn<T>(this, scheduler));
    }

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if(f ! =null) {
            return apply(f, source);
        }
        return source;
    }
Copy the code

This method is a copy of the previous one. So let’s just focus on the ObservableUnsubscribeOn class.

public final class ObservableUnsubscribeOn<T> extends AbstractObservableWithUpstream<T.T> {
    final Scheduler scheduler;
    public ObservableUnsubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler; }}Copy the code

This class with just ObservableSubscribeOn are almost identical, inherited from AbstractObservableWithUpstream class, use the source to save the current object of observables. The Observbvable object in this case is the ObservableSubscribeOn object created by the previous method.

ObserveOn method and Map method

I’m going to skip the code explanation because the content of these methods is basically the same. The observeOn method creates an ObservableObserveOn object and stores the Observable created by the previous method. The Map method creates an ObservableMap object and stores the ObservableMap object created by the previous ObservableMap method. To sum up, each chain call to these methods creates an associated object and uses the variable source to store the Observable subclass object created by the previous ObservableMap method.

6. The subscribe method

As I described in the last article, this method internally calls an abstract method, the subscribeActual method, as the real subscription. The logic of this method depends on how the subclass implements it. The first object that calls this SUBSCRIBE method is the ObservableMap object. So let’s look at how it’s implemented internally. Implement the subscribeActual method of ObservableMap:

   public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
Copy the code

The subscribe method of source is called internally. The source stored in the ObservableMap object is the ObservableObserveOn object created by the previous ObservableMap method. So let’s look at how ObservableObserveOn implements the subscribeActual method:

 protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(newObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}Copy the code

It also ends up calling subscribe from the last Observable.

So we know that when we call subscribe, we will recursively call the subscribeActual method of Observable created by the previous method stored in source, all the way to the subscribeActual method of ObsservableCreate. Pass the state of the event to the observer. This was covered in the last article.

7. To summarize

The normal chain calls we see usually return the same object. Instead of regular chain calls, when we call Rxjava2 chain calls, they return their own Observable subclasses, each of which is different, and then recursively call the subscribeActual method for each object. Complete a chain call.

If you like my series of articles, you can give a thumbs up + like oh, I will update more wonderful related series of articles later

Write in the last

Many people always encounter some problems when they just get into this industry or when they meet the bottleneck. For example, after learning for a period of time, they feel directionless and do not know where to start to learn. I have sorted out some materials and can share them with you for free if necessary. The newer and more popular technologies shared by FLUTTER — Performance Optimization — Mobile Architecture — Senior UI Engineer — NDK will also be reviewed later

If you like my articles and want to learn with a group of experienced developers, welcome to join my partner group Android Senior Engineer Technology Exchange Group: 925019412

Claim method: Android Technology Exchange Group 925019412