One: What is RxJava?

GitHub about RxJava

a library for composing asynchronous and event-based programs by using observable sequences

What he means is an asynchronous and event-based library composed of observable sequences.

The advent of RxJava eliminates synchronization issues, thread safety, and more

In general, it is convenient for us to program asynchronously.

Two: advantages and disadvantages of RxJava

advantages

asynchronous

Chain call structure

Simplicity can be maintained when using complex asynchronous invocation methods

disadvantages

The cost of learning is relatively high, the threshold of entry is relatively high

Difficult to understand API, you need to look at the source code to understand the specific effects of the API

Three: basic use of RxJava

First understand the basic steps of its use:

  1. Create an Observable
  2. Create an Observer
  3. Subscribe

1. Create the observed

Normal creation of observed:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("ONE");
                emitter.onNext("TWO");
                emitter.onNext("THREE"); emitter.onComplete(); }});Copy the code

There are four events: One, Two, Three, and the end.

PS:

Normal create first bullet:

Observable observable = Observable.just("ONE","TWO","THREE");

Normal create second bullet:

String[] values = {"ONE"."TWO"."THREE"};
Observable observable = Observable.fromArray(values);
Copy the code

In fact, this abnormal creation is an internal wrapper that sends this information to the observer as an event like onNext().

2. Create an observer

Normal creation:

Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("z"."onSubscribe: ");
            }

            @Override
            public void onNext(String s) {
                Log.i("z"."onNext: s = " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i("z"."onError: ");
            }

            @Override
            public void onComplete() {
                Log.i("z"."onComplete: "); }};Copy the code

Abnormal creation:

Consumer<String> observer = new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i("z"."accept: s = "+ s); }};Copy the code

3. To subscribe to

observable.subscribe(observer);

You have noticed something different. Why did the observer subscribe to the observer?

This happens because RxJava keeps chained calls flowing.

4. Asynchronous invocation

Since RxJava is an asynchronous library, of course it is better for asynchronous processing

Before we look at RxJava’s asynchronous invocation, let’s take a look at two of the more important points

  • subscribeOn()
  • observeOn()
subscribeOn

An Observable is created in a specific environment. It can only be used once.

observeOn

Indicates the context in which event delivery and final processing took place. It can be called multiple times, and after each invocation, the next step takes effect.

Such as:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("ONE");
                emitter.onNext("TWO");
                emitter.onNext("THREE"); emitter.onComplete(); }}) // The observed creates.subscribeon (schedulers.newthread ()) in a new thread. ObserveOn (schedulers.io ()).map(new) Function<String, String>() { @Override public String apply(String s) throws Exception {returns.toLowerCase(); }}) // The observer is in the main thread. ObserveOn (AndroidSchedulers. MainThread ()). The subscribe (new Consumer < String > () {@ Override public void accept(String s) throws Exception { } });Copy the code

Four: RxJava basic subscription process

Let’s take a look at the basic invocation:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.i(TAG, "subscribe: ");
                emitter.onNext("ONE");
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe: ");
            }


            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext: s = " + s);
            }


            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError: e = " + e.getMessage());
            }


            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete: "); }}); }});Copy the code

Results:

onSubscribe:
subscribe:
onNext: s = ONE
Copy the code

Let’s start with the subscribe method, which is the subscribe method

public final void subscribe(Observer<? super T> observer) { ... // Ignore part of the source code subscribeActual(observer); . // Ignore some source code}Copy the code

Go directly to the main method subscribeActual(Observer), which is an abstract method that is implemented in a subclass.

So let’s look at subclasses of Observable:

Let’s go to the create method:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source."source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Copy the code

The object that is returned is the ObservableCreate object,

ObservableCreate is a subclass of ObservableCreate, an ObservableCreate subclass of ObservableCreate, an ObservableCreate subclass of Observable.

The subscribeActual method is implemented in ObservableCreate

@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); // Trigger observer here#onSubscribe()observer.onSubscribe(parent); Try {// call source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code

Here the method can see that the onSubscribe of the observer occurs before the callback.

We then call the ObservableOnSubscribe method subscribe

It’s up to the developer to do that. As you can see in this example CreateEmitter is called, go inside CreateEmitter and see how onNext() is implemented

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}
Copy the code

You can see that onNext() of the Observer is called in CreateEmitter’s onNext().

You can then see the call in the case:

@Override
public void onNext(String s) {
   Log.i(TAG, "onNext: s = " + s);
 }
Copy the code