preface

I wrote the most friendly article about Rxjava before, which received a good response. Because the positioning of that article is simple and friendly, I try my best to abandon complex concepts and only grasp the key things to make sure that everyone can understand.

But after that article was written, I thought it would be perfect to have another article that would give an in-depth explanation of RxJava, and that’s where today’s advanced article comes in. Because everyone on a team might be using RxJava, but you have to have one person who knows it, or you’re going to have trouble running into problems.

At the end of the previous article, we concluded that RxJava is all about handling complex business with rich operators and convenient asynchronous operations within the framework of the Observer pattern. Today we will expand further on the observer pattern and operators in the conclusion.

Before we get down to business, I’d like you to take a look at the most friendly articles on Rxjava.

About the observer model

The previous article first focused on the Observer pattern, which we consider the backbone of RxJava *. This is not to overturn the previous conclusion, but to understand its implementation from the inside.

Use the same code for switches and lamps as in the previous article

// Create an observed (switch)
 Observable switcher=Observable.create(new Observable.OnSubscribe<String>(){

            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("On");
                subscriber.onNext("Off");
                subscriber.onNext("On");
                subscriber.onNext("On"); subscriber.onCompleted(); }});// Create an observer (lamp)
 Subscriber light=new Subscriber<String>() {
            @Override
            public void onCompleted() {
                // The observed onCompleted() event will go here;
                Log.d("DDDDDD"."End of observation... \n");
            }

            @Override
            public void onError(Throwable e) {
                    // This method is called when an error occurs
            }
            @Override
            public void onNext(String s) {
                // Handle the onNext event
                Log.d("DDDDD"."handle this---"+s)
            }
/ / subscribe
switcher.subscribe(light);Copy the code

This is an RxJava observer architecture. If you see code like this, you will have some confusion:

  • What is observable. OnSubscribe in the Observable?
  • In the call(subscriber) method, where does subscriber come from?
  • Why does the observed start sending messages only after subscribing?

In fact, all of these problems can be solved by understanding OnSubscribe.

So let’s look at the definition of OnSubscribe

// The Acton1 interface mentioned in the previous article has only one call() method waiting to be implemented
// Nothing special
public interface Action1<T> extends Action {
    void call(T t);
}
//OnSubscribe inherits the Action1 interface
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // subscribe is still an interface
    }Copy the code

So, OnSubscribe is essentially the same interface as Action1, but it’s specialized inside an Observable.

In the Observable observer class, OnSubscribe is the only property and the only parameter that must be passed in the Observable constructor. In other words, whenever an Observable is created, there must be an OnSubscribe object inside.

Of course, there is no way to create an Observable directly. We can only create an Observable using methods like create(), just(), etc. Of course, these methods call a new Observable(onSubscribe).

public class Observable<T> {
    // Unique attribute
    final OnSubscribe<T> onSubscribe;
    Constructor, because protected, we can only use the create function
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
    //create(onSubscribe) calls the constructor internally.
    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return newObservable<T>(RxJavaHooks.onCreate(f)); }... . }Copy the code

What happens when you call the Subscribe (subscriber) method after you create the Observable and Subscribe?

    // The observer object is passed in
    public final Subscription subscribe(final Observer<? super T> observer) {
       ....
        // use the following command
        return subscribe(new ObserverSubscriber<T>(observer));
    }

    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        // use the following command
        return Observable.subscribe(subscriber, this);
    }


    // Call this function
 static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        // new Subscriber so onStart it
        subscriber.onStart();

        // add a significant depth to already huge call stacks.
        try {
            // Encapsulate onSubscribe.
            OnSubscribe onSubscribe=RxJavaHooks.onObservableStart(observable, observable.onSubscribe);

            // This is the point!!
            // The implementation of this call is when we create the observer
            // Call () in observable.create ()
            // When the call() is called, the event is sent
            onSubscribe.call(subscriber);
            // Look back

            return RxJavaHooks.onObservableReturn(subscriber);
        } catch(Throwable e) { .... . }returnSubscriptions.unsubscribed(); }}Copy the code

When the code looks at this, we can make a uniform answer to the above three questions:

  • OnSubscribe is the only attribute inside an Observable and the key to connect an Observable and subscriber, just like the wire that connects a lamp to a switch
  • call(Subscriber
    subscriber is the observer we created ourselves
  • OnSubscribe. Call (subscriber) occurs only when you subscribe, and onNext(),onComplete(), etc.

Now, do you understand the observer mode of RxJava more clearly? Let’s review the process with a flow chart.

Knowing the above, we can further make the following conclusions:

  • The subscriber object transfers itself to onSubscribe inside the Observable.
  • The job of onSubscribe is to call call(subscriber) to tell the observed to send a message to the subscriber.

The above conclusion is very helpful for us to understand the principle of the operator below, so be sure to understand.

Here’s the observer model, and I’m done.

About operators

The last article covered some of these operators, and there are plenty of other examples of how to use them on Github, so I won’t cover more of them here, but how to implement them. How does he intercept the event, process it, and deliver it to the observer?

Those of you who are familiar with this may recall the lift() operator, which was originally the basis for other operators to do transformations, but that was several versions ago. However, the current version of RxJava is different, with the lift() work directly relegated to each operator, weakening lift() (but keeping the lift() operator).

Therefore, we don’t need to talk about Lift here. We can just use an operator as an example to understand how it works, because operators are basically implemented the same way.

Take map() as an example, again from the previous article:

 Observable.just(getFilePath())
            // Use the map operation to perform the conversion
            .map(new Func1<String, Bitmap>() {
              @Override
              public Bitmap call(String s) {
                // Obviously the custom createBitmapFromPath(s) method is an extremely time-consuming operation
                  return createBitmapFromPath(s);
              }
          })
            .subscribe(
                 // Create an observer to handle the event as the end of the event pass
                  new Subscriber<Bitmap>() {
                        @Override
                        public void onCompleted() {
                            Log.d("DDDDDD"."End of observation... \n");
                        }

                        @Override
                        public void onError(Throwable e) {
                            // This method is called when an error occurs
                        }
                        @Override
                        public void onNext(Bitmap s) {
                            // Handle events
                            showBitmap(s)
                        }
                    );Copy the code

What’s going on behind the map

 public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
            // Creates a new Observable for the agent. The constructor takes OnSubscribe
            //OnSubscribe map is obviously an implementation of OnSubscribe,
            That is, OnSubscribeMap needs to implement the call() method
            // The constructor passes in the actual Observable
            // and an instance of Func1 implemented by the developers themselves
        return create(new OnSubscribeMap<T, R>(this, func));
    }Copy the code

Take a look at the implementation of OnSubscribeMap:

public final class OnSubscribeMap<T.R> implements OnSubscribe<R> {
    // Saves the actual Observable
    final Observable<T> source;
    // And an instance of Func1 that we passed in
    final Func1<? super T, ? extends R> transformer;

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }

    // The call method is implemented and we know the Subscriber passed in by the call method
    // After subscribing, outsiders pass in real observers
    @Override
    public void call(final Subscriber<? super R> o) {
        // Pass the external true observer to MapSubscriber to construct a proxy observer
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        // Make an external Observable subscribe to the proxy observer
        source.unsafeSubscribe(parent);
    }

    //Subscriber subclass to construct a proxy observer
    static final class MapSubscriber<T.R> extends Subscriber<T> {
            // This Subscriber saves the true observer
        final Subscriber<? super R> actual;
        Func1 is defined externally by ourselves
        final Func1<? super T, ? extends R> mapper;

        boolean done;

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }
        // Events such as onNext() sent by an external Observable
        // will be passed to the proxy observer first
        @Override
        public void onNext(T t) {
            R result;

            try {
                // Mapper is Func1 created by the developers themselves,
                // Call () starts to transform data
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            // Call the real observer's onNext()
            // After the data is transformed, the data is sent to the real observer
            actual.onNext(result);
        }
        // the onError() method is the same
        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

        @Override
        public voidsetProducer(Producer p) { actual.setProducer(p); }}}Copy the code

The principle of the map operator is almost complete. The other operators are the same principle as map.

If you want to create custom operators (which is not recommended), you should follow the steps above

  • Creates the observed of a proxy
  • Implement the onSubscribe call method in the observed
  • Create a proxy observer in the Call method and have the actual observed subscribe to it.

I know you will be a little dizzy, it doesn’t matter, I will write a custom operator later put on my Github, you can pay attention to.

Now, let’s use a flow chart to consolidate what we have learned.

The next time you use an operator, know that each operator creates a proxy observer and a proxy observed, and how they call each other. With this level of understanding, it should be easier to use RxJava in the future.

But in the end, I want to leave you with a question: If you use one operator, what about multiple operators?

errata

no

Afterword.

This is the end of the RxJava tutorial,

I believe that after reading the two articles, my understanding of RxJava should have reached a relatively high level, and my goal has been achieved.

The next…

Since RxJava is an asynchronous event processing framework, it can theoretically encapsulate any other library, so…..