In the previous article, we learned the introduction of RxJava and learned the presentation form of RxJava through an example. I believe that we now have a preliminary impression and concept of RxJava. In this article we’ll take a closer look at the fundamentals of RxJava and how to use it.

The core of RxJava is the extended Observer mode. Let’s start with the observer mode.

Event: Xiao Ming pressed the remote control switch, the air conditioner started and began to blow. Press the cooling button and the air conditioner turns into cool air. Press the third speed button again, and the air conditioner blows more wind. In this event, the air conditioner acts as the observer and the remote control acts as the observed. The air conditioner detects the signal from the remote control through infrared ray and responds to it.

From the figure above, we can clearly see that the remote control, as the producer of the event, actively initiates the event and is the starting point of the event, while the air conditioner, as the handler of the event, passively receives the event and makes response, and is the end of the event. In the process of event transmission, events can be filtered, converted, combined and other operations, just like the effect of cooling and increasing wind speed in the example. This concept is the core of RxJava. It constructs its own program logic based on the Observer mode. The remote controller is equivalent to an Observable in RxJava, the air conditioner is equivalent to an Observer, and some operations on event processing are equivalent to various operators. The difference is that in RxJava, the observer needs to subscribe to the observed, so that when the observed emits an event, the observer receives the event for processing. The observed will not emit any events without subscribing.

use

Now that you know about the Observer pattern, let’s get your finger tips on how to use RxJava.

Creates an observed Observable

Observable step 1: Create observed Observable

Observable<Object> observable = Observable.create(new Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {
        // The observed data operation is updated
        for (int i = 0; i < 5; i++)
            subscriber.onNext("xulei"+ i); } subscriber.onCompleted(); }});Copy the code

An Observable is created and returned by the observable.create () method, which takes an OnSubsribe object as an argument and executes the call() method. This OnSubsribe object inherits from Action1. In the call() method, events can be sent through subscriber.onnext (), and subscriber.oncompleted () is marked as the event has been sent. Subscriber.onerror () is marked as an exception in the event sending process. In addition to creating an Observable, you can also use just() and from(), which will be explained in a later creation operator article.

Create an Observer

Create an Observer

Observer observer = new Observer() {
    @Override
    public void onCompleted(a) { // Mark the callback after the event is sent
        Log.e("rx_test"."onCompleted");
    }
    @Override
    public void onError(Throwable e) { // An abnormal callback occurred during event sending
        Log.e("rx_test"."onError");
    }
    @Override
    public void onNext(Object o) { // A callback was received for the sent event
        Log.e("rx_test"."onNext:"+ o.toString()); }};Copy the code

The observer provides onNext(Object O), onCompleted(), onError(Throwable E), onNext(Object O), onCompleted(), onError(Throwable E). Corresponding to the subscriber. OnNext (), subscriber. OnCompleted (), subscriber. The onNext() method corresponds to the Update () method in normal observer mode. RxJava’s extended observation mode makes up for the shortcomings of the ordinary observer mode: 1. The onCompleted() method is called to indicate when an event has been sent. 2. The onError() method is automatically raised when an exception occurs during the event sending process, and can also be called manually. Observable supports chained programming to avoid nested callbacks and simplify code.

To subscribe to

Last step: Observable subscribes to Observer Observer. This is different from ordinary observer mode. In the RxJava Extended Observer mode, the observed does not emit any events if there is no observer. So this is the observed subscribing to the observer.

observable.subscribe(observer);Copy the code

Subscribing is easy, one line of code. The Subscribe () method of the Observable is called, passing in an observer object as a parameter, which binds the Observable to the observer.

shorthand

RxJava can use chained programming:

Observable.create(new Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {
        // The observed data operation is updated
        for (int i = 0; i < 5; i++) {
            subscriber.onNext("xulei" + i);
        }
        subscriber.onCompleted();
    }
}).subscribe(new Observer<Object>() {
    @Override
    public void onCompleted(a) {
        Log.e("rx_test"."onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.e("rx_test"."onError");
    }
    @Override
    public void onNext(Object o) {
        Log.e("rx_test"."onNext:"+ o.toString()); }});Copy the code

Logical combing

To this a simple call to RxJava to achieve, I believe that many code friends after watching is still very confused 13, then we will comb the whole process. An Observable is created using observable.create (), passing an OnSubscribe object to the create() method, overwriting the call() method in OnSubscribe. When an Observable subscribes, it automatically triggers the Call () method to send events. Note that the subscriber argument in the call() method is actually the Observer passed in when we call SUBSCRIBE () to subscribe. So after five onNext() calls and one onCompleted() calls in the call() method, the Observer outputs the following log:

onNext:xulei0
onNext:xulei1
onNext:xulei2
onNext:xulei3
onNext:xulei4
onCompletedCopy the code

The subscriber argument in the call() method is the Observer passed in when subscribing, so let’s look at the source code. Look at Subscriber in the call() method first:

public abstract class Subscriber<T> implements Observer<T>, Subscription {... }Copy the code

Subscriber is the abstract implementation class of Observer, of the same type as Observer. Look again at the subscribe(observer) method called when subscribing:

public final Subscription subscribe(final Observer<? super T> observer) {
    / / ignore
    if (observer instanceof Subscriber) {
        return subscribe((Subscriber<? super T>)observer);
    }
    return subscribe(new Subscriber<T>() {

        @Override
        public void onCompleted(a) {
            observer.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onNext(T t) { observer.onNext(t); }}); }Copy the code

Subscribe (); subscribe(); subscribe(); subscribe();

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}Copy the code

Subscribe () is called, passing in the subscriber object converted from Observer and the current Observable as arguments.

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {... subscriber.onStart(); .try {
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
        ......
        returnSubscriptions.unsubscribed(); }}Copy the code

To simplify the code discovery, subscriber.onstart () is used to inform the observed that it is ready to start sending events. Hook. OnSubscribeStart (Observable, Observable. OnSubscribe). Call (subscriber); We find that the hook. OnSubscribe start (Observable, Observable. OnSubscribe) method returns its second parameter, Observable. This is the OnSubscribe object passed into the create() method when a new Observable is created. Then through the onSubscribe call(subscriber) method, the subscriber is passed in as a parameter. Have the code friends found that the subscriber is passed on and on to find its root? It is the same subscriber that the observer, originally passed in by the subscription method Subscribe (Observer), is converted to by the proxy. By reading the source code, we can see why the subscriber argument in the call() method of an Observable is actually the Observer passed in when subscribing.

conclusion

This article explains the basic principles and use of RxJava. In the next article, we will explore the four types of RxJava operators, which are the creation operators and how to use them. Thanks for the help of RxJava series written by Zhang Lei: RxJava series 2(basic concept and use introduction) technical dregs, have written wrong place welcome god to leave a message to correct, have any doubts or suggestions can also be put forward in my Github RxJavaDemo project Issues, I will timely reply. Attached is the address of RxJavaDemo: RxJavaDemo