This article first appeared on my CSDN blog: blog.csdn.net/sinat_23092…

RxJava global viewing

RxJava has been around for years and is an old framework for the ever-changing mobile Internet industry. Although the cost of learning is not low, but after skilled really call a person from the heart said cool. What’s cool about it? I personally sum it up as follows:

1. No matter the complex business can also be linked to the end with a chain, each logical operation step is divided into each operator 2. Thread switching is done by one operator, completely eliminating tedious callbacks. Multiple thread switching is still very intuitive from the point of view of code 3. Multiple business processes can be spliced together and multiple threads can merge operations.

A simple sentence is: cut to simplify sanqiu tree. Of course, there are also some people who reflect that it is not good to use, which should be made according to different business and technical scenes, such as people know how to drink cold and warm, different opinions, here will not be repeated.

RxJava principle flow overview

While marveling at RxJava’s ability to simplify things, we can’t help but wonder how it works. “To go all the way through the chain operator? Why can threads be chain-switched? Wait…”

I recently Spring Festival at home, finally have the opportunity to explore the RxJava system under the principle of source code, download the RXJava1. x source code carefully consider some, away from the heavy fog of the code, slowly seized its simple and not simple design principle.

Why is it simple and not simple? Simple because its principle is not complex, not sophisticated, not simple because it can hide all the complexity in the smooth chain.

Without further ado, let’s get to the point.

After analysis, I think it is appropriate to describe the total workflow of RxJava with mobile phone packaging assembly line. First set up the production line; 2. Start the packaging of the production line; 3.

Here are the code objects for each metaphor:

Observable: a process in the pipeline OnSubscribe: the call method of the worker in the process OnSubscribe: package Subscriber subscribe: start pipeline Subscriber: OnNext of Subscriber: The user unpacks the package

To elaborate on the meaning of the above metaphor:

1. First set up the production line

Most operators create an Observable, wrap the upstream Observable, and pass a new OnSubscribe, such as:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
    }

public final Observable<T> filter(Func1<? super T, Boolean> predicate) {
        return unsafeCreate(new OnSubscribeFilter<T>(this, predicate));
    }

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }


 public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
    }
Copy the code

Finally, the create method is called to create an Observable, passing the current Observable to the new Observable to keep the chain (similar to a linked list holding the pointer to the previous node). The reason for this is that RxJava’s chain is based on the proxy pattern, which is based on layer upon layer of Observable wrappers.

What does it come in? It’s OnSubscribe, so what’s the meaning of an OnSubscribe wrapper? In fact, it is how to package Subscriber logic.

For example, map, which is called OnSubscribeMap (the base class of OnSubscribe), has the following call code:

 @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }
Copy the code

When an Observable created by the map operator is called subscribe, the OnSubscribeMap call method is called. You can see in the code that a MapSubscriber object is created. The unsafeSubscribe method of the upstream Observable is then called, passing in the MapSubscriber object as a parameter.

So when you’re happy to write long chains of RxJava operators, the logic is to wrap up one Observable after another, each with its own OnSubscribe, the type of which is determined by the operator.

This is the first flow building pipeline I mentioned. In general, it is constantly creating observables from top to bottom and connecting them into chains. In other words, the latter One holds the reference of the upstream Observables.

Observable is a process in the pipeline because it is the most basic concatenation element in the chain, and OnSubscribe is a worker in the process because it determines how Subscriber is packaged.

2. Start assembly line packaging

The switch that initiates is the subscribe method at the end of the chain. Look at the Subscribe method of an Observable:

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
Copy the code

subscribe(Subscriber<? Super T> subscriber, Observable Observable

RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
Copy the code

RxJava1. X, RxJavaHooks. OnObservableStart actually didn’t do what operation, return is the original observables onSubscribe object, So it’s calling the onSubscribe call of the Observable, passing in the subscriber object.

public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // cover for generics insanity
    }
Copy the code

OnSubscribe is an interface that inherits Action1, and Action1 is an interface that only has a call method, so the call logic is determined by the specific OnSubscribe object.

Remember the call method logic for the OnSubscribeMap object generated by the MAP operation? It creates a MapSubscriber object in its call method, then calls the unsafeSubscribe method of the upstream Observable and passes the MapSubscriber object as a parameter.

When a MapSubscriber object is created, the Subscriber object of the subscribe method called by the current Observable is passed in, and the reference to actual is saved to preserve the chain:

public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }
Copy the code

So if the Observable at the end of the chain is created by the Map operator, subscribe executes the call method of the OnSubscribeMap in the Observable. Generate a MapSubscriber object and hold the last incoming Subscriber object in the chain in our code, then have the upstream Observable call subscribe and pass in the MapSubscriber object. So we recursively call the Subscribe method of the Observable from the bottom up, generating a chain of Subscriber objects (or layers of wrapping).

Here, after the start of subscribe method, we have started processing and packaging, and finally produced a chain of Subscriber object, that is, our mobile phone packaging box.

3. The user unpacks the mobile phone

This process can be described in a classic lyric from The Onion by Yang Zongwei: “Peel my heart layer by layer ~~”

The previous operation generates a chain of Subscriber objects from bottom to top, and the end of the chain is in the upstream Observable:

Observable.create(object : OnSubscribe<Int> {
            override fun call(t: Subscriber<in Int>) {
                t.onStart()
                t.onNext(1)
            }
        })
Copy the code

Here t: Subscriber in line 2, now t. next (1) is called in line 4, again using the MapSubscriber object generated by the previous map operator:

public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }
Copy the code

Mapper.call (t) is first used to transform the Subscriber object passed in, that is, the transformation method specified in the map operation, which will be discussed in the next section. Notice that we call actual. OnNext (result) at the end; , and actual is the next Subscriber object of the Subscriber chain. Besides map, the onNext method of most Subscriber objects also has such logic, so we can know that the Subscriber chain is called recursively. It can also be viewed as layer by layer, like unpacking a cell phone.

The process to summarize

Look at the preceding narration, you may still be a little confused, to sum up: The first three sections correspond to a process. In terms of RxJava calling code, it is to link each transform Observable into a chain from top to bottom (assembly line), and then subscribe at the end. From bottom to top, the chain is connected from the bottom Subscriber object through OnSubscribe of each Observable (the pipeline starts to work and package the Subscriber) until the top. When the top Subscriber object calls the onNext method, The onNext of Subscriber chain is also lowered from the top (the user opens the packaging box layer by layer), and the transformation logic of each operation is executed inside.

An example to further illustrate the above process:

 Observable.create(object : OnSubscribe<Int> {
            override fun call(t: Subscriber<in Int>) {
                t.onStart()
                t.onNext(1)
            }
        })
            .map(object : Func1<Int, String> {

                override fun call(t: Int): String {
                    Log.d(TAG, Thread.currentThread().name)
                    return t.toString()
                }
            })
            .map(object : Func1<String, Book> {

                override fun call(t: String): Book {
                    Log.d(TAG, Thread.currentThread().name)
                    return Book(t)
                }

            })

            .subscribe(object : Subscriber<Book>() {
                override fun onStart() {

                }

                override fun onNext(t: Book) {
                    Log.d(TAG, Thread.currentThread().name)
                    Log.d(TAG, t.toString())
                }

                override fun onComplete() {
                }

                override fun onError(t: Throwable) {
                    Log.d(TAG, t.message)
                }
            })

Copy the code

For simplicity, only the map operator is used.

Here is a simple flow chart:

RxJava operator principle parsing

If the overall flow analysis above makes sense, the following operators should be easy to understand.

The ordinary transformation operator

Here’s an example of a map. The transformation here is in the stage of executing the Subscriber chain onNext recursively from top to bottom (the user unpacks the mobile phone box). The onNext method of the MapSubscriber object generated in map has been mentioned above:

public void onNext(T t) {
            R result;
		    try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return; } // call downstream Subscriber's onNext method actual.onNext(result); }Copy the code

Note the fourth line of code result = mapper.call(t); Mapper = mapper = mapper = mapper

.map(object : Func1<String, Book> {

                override fun call(t: String): Book {
                    Log.d(TAG, Thread.currentThread().name)
                    return Book(t)
                }

            })

Copy the code

The Func1 callback is used to perform the map operation, and then actually. onNext(result); , the result of the transformation will be handed to the onNext method of the downstream Subscriber.

If you understand the above flowchart, is it a piece of cake to understand the Map?

Thread-switch operator

Thread switching has two main operators: subscribeOn and observeOn

Thread switching is the most awesome part of RxJava in my opinion, but after understanding the principle, I think it is not complicated and sophisticated. The main thing is that the corresponding nodes in the general process above use the common thread switching mode.

subscribeOn

Action: Puts the subscribe Observer execution on the corresponding thread.

SubscribeOn ultimately executes to:

public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
    }

Copy the code

Note that the final execution is:

return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));

Copy the code

From the previous analysis, you create a new Observable and pass in an OnSubscribe instance object, and an OperatorSubscribeOn object.

Following the instructions above, here is the call method:

public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();

        SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
        subscriber.add(parent);
        subscriber.add(inner);

        inner.schedule(parent);
    }

Copy the code

SubscribeOnSubscriber parent = new SubscribeOnSubscriber(subscriber, requestOn, inner, source) , so it creates Subscriber and the SubscribeOnSubscriber object is SubscribeOnSubscriber. Notice the second line here: Final Worker inner = scheduler.createWorker(); And the last line inner. Schedule (parent); .

There is a long method call stack here. Let me just say that thread switching is performed in worker, which encapsulates thread pool or Handler. Through schedule method, SubscribeOnSubscriber can be packaged into a Runnable and put into thread pool for execution. The method executed is the Call method for SubscribeOnSubscriber.

And the call method for SubscribeOnSubscriber:

public void call() {
            Observable<T> src = source;
            source = null;
            t = Thread.currentThread();
            src.unsafeSubscribe(this);
        }

Copy the code

It is also in the Subscribe method of the upstream Observable, like any other Subscriber.

Recall the general process mentioned above, when the second process of ** packed Subscriber chain from bottom up (processing and packaging) **, SubscribeOn means that starting from its current node, the chain of the following series of Subscriber and the onNext of each Subscriber object executed from top to bottom are put into the specified thread for execution.

A common way to describe the role of subscribeOn: “Put the upstream of the subscribeOn statement in the corresponding thread” is not accurate, because if subscribeOn is only used and observeOn is not used, the transformation process of the whole chain will be executed in the thread specified by subscribeOn. RxJava’s official explanation is accurate:

Asynchronously subscribes Observers to this Observable on the specified {@link Scheduler}.

Copy the code

Add subscribeOn to the sample code from earlier:

Observable.create(object : OnSubscribe<Int> {
            override fun call(t: Subscriber<in Int>) {
                t.onStart()
                t.onNext(1)
            }
        })
            .map(object : Func1<Int, String> {

                override fun call(t: Int): String {
                    Log.d(TAG, Thread.currentThread().name)
                    returnSchedulers.io()). Map (object: Func1<String, Book> {override fun call(t: String): Book { Log.d(TAG, Thread.currentThread().name)return Book(t)
                }

            })

            .subscribe(object : Subscriber<Book>() {
                override fun onStart() {

                }

                override fun onNext(t: Book) {
                    Log.d(TAG, Thread.currentThread().name)
                    Log.d(TAG, t.toString())
                }

                override fun onComplete() {
                }

                override fun onError(t: Throwable) {
                    Log.d(TAG, t.message)
                }
            })

Copy the code

As shown in the previous flow chart, subscribeOn switching threads looks something like this:

The red part is the logic to put into the specified thread

observeOn

ObserveOn will eventually go to:

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
    }

Copy the code

Here we use the lift method:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }

Copy the code

Create a new Observable and pass in a new OnSubscribe object (OnSubscribe Ift). That depends on what the OnSubscribe ift call does:

Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);

Copy the code

The most important part of call is these lines, which are basically similar to map. That is, operator is used to convert the call that is passed in from downstream to the Subscribeder. Therefore, it depends on what operator observeon does to convert the call:

 ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;

Copy the code

Basically, OperatorObserveOn creates an ObserveOnSubscriber and returns.

Notice the difference with OperatorSubscribeOn, which executes the new Subscriber object wrapped as Runnbale in the thread pool in the call method. Puts the subscribe call from the upstream Observable on the specified thread.

OperatorObserveOn is a call method that takes an ObserveOnSubscriber object as an argument to the upstream OnSubscribe, and then wraps the Subscribe from bottom up in the same thread, So the key here is to see what ObserveOnSubscriber’s onNext does:

if(! queue.offer(NotificationLite.next(t))) { onError(new MissingBackpressureException());return;
            }
            schedule();

Copy the code

These are the key lines. T in the first line is the Subscriber object returned by onNext, notificationLite.next normally returns T, while queue is a queue, and t is entered. Schedule () is then executed, which wraps the current ObserveOnSubscriber object as Runnable into the thread pool, and then executes its call method on the specified thread as follows:

. final Queue<Object> q = this.queue; final Subscriber<? super T>localChild = this.child; .for(;;) {... Object v = q.poll(); boolean empty = v == null;if (checkTerminated(done, empty, localChild, q)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    localChild.onNext(NotificationLite.<T>getValue(v)); . }Copy the code

As you can see, this is the onNext method that loops out the element from the queue and passes in the downstream Subscriber.

To summarize the observeOn operation:

When the Subscriber chain is packed from bottom up (the user opens the packing box of the mobile phone), an ObserveOnSubscriber object is inserted where observeOn is called, which can recursively call the Subscriber chain from top down later. Put all onNext methods after the ObserveOnSubscriber object into the specified thread for execution.

Now add observeOn to the instance:

Observable.create(object : OnSubscribe<Int> {
            override fun call(t: Subscriber<in Int>) {
                t.onStart()
                t.onNext(1)
            }
        })
            .map(object : Func1<Int, String> {

                override fun call(t: Int): String {
                    Log.d(TAG, Thread.currentThread().name)
                    return// Add onNext after the current call to the specified thread. ObserveOn (schedulers.main ()). Map (object: Func1<String, Book> { override fun call(t: String): Book { Log.d(TAG, Thread.currentThread().name)return Book(t)
                }

            })

            .subscribe(object : Subscriber<Book>() {
                override fun onStart() {

                }

                override fun onNext(t: Book) {
                    Log.d(TAG, Thread.currentThread().name)
                    Log.d(TAG, t.toString())
                }

                override fun onComplete() {
                }

                override fun onError(t: Throwable) {
                    Log.d(TAG, t.message)
                }
            })

Copy the code

Expressed as a flow chart:

To combat

The paper come zhongjue shallow, and must know this to practice. In order to strengthen the understanding of RxJava source code, I wrote a demo level RxJava source code, using Kotlin write, referred to as RxKotlin (of course now does have a well-known RxKotlin open source library ~), process and RxJava consistent, There are simple operators (details vary) and class names are consistent with RxJava. If you feel that RxJava source is not easy to understand, you can also refer to my demo.

Why read the source code for RxJava? In addition to satisfy their desire to explore, through learning RxJava source code, you can learn how to use RxJava design ideas, through the packaging code to write their own responsive programming framework, we can write their own RxPay, RxLogin and so on.

Github address: github.com/yishuinanfe…

Eggs:

Finally, leave a small exercise, through the study of this article, should be able to answer this question:

If subscribeOn is used multiple times in a single RxJava chain call, why does only the first subscribeOn take effect?