Introduction to RxJava

RxJava is the Java VM implementation of Reactive Extensions: a library for writing asynchronous and event-based programs using observable sequences. It extends the observer pattern to support data/event sequences and adds operators that allow you to combine sequences declaratively, while abstracting concerns about low-level threading, synchronization, thread safety and concurrent data structures.

Simply put, ReactiveX provides an API for asynchronous programming in a much better way. Why is there a better way? The existing Android AsyncTask and Handler methods are used to implement AsyncTask. Both can handle asynchronous operations without blocking the thread. However, they are not abstract enough, their implementation is complex, and they are difficult to handle complex business logic. For example, if a page has two requests, the page is displayed only when both requests are successful. Asynctasks and handlers are difficult to implement when you need to handle multiple asynchronous threads like this. At this time RxJava came on stage, it can be a good solution to the thread switch, handle the relationship between multiple threads, at the same time can make the business logic flat, logic clearer.

Simple use of RxJava:

RxJava is based on the Observer schema, so there are Observables and Observers that subscribe.

public abstract class Observable<T> implements ObservableSource<T> {
     void subscribe(@NonNull Observer<? super T> observer);
}Copy the code
public interface Observer<T> { void onSubscribe(@NonNull Disposable d); void onNext(@NonNull T t); void onError(@NonNull Throwable e); void onComplete(); }Copy the code

An Observable gets a reference from an Observer by subscribing, notifies the Observer when it has data, and the data flows from the Observable to the Observer.

Simple code implementation

Observable Observable. Create (new ObservableOnSubscribe<Integer>() {@override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onComplete(); }}) //2. Subscribe observer. Subscribe (//3 New Observer<Integer>() {@override public void onSubscribe(Disposable d) {Log."onSubscribe");
    }
    @Override
    public void onNext(Integer integer) {
        Log.i(TAG, "onNext:" + integer);
    }
    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError:" + e.getMessage());
    }
    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete");
    }});Copy the code

Source code analysis process:

Step 1. Observable.create() creates an observed and subscribes to an observer using the subscribe method

Observables. The create () :

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source."source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}Copy the code

It returns an ObservableCreate, which is the returned Observable, and it just creates the Observable and passes in the ObservableOnSubscribe object that it created. The subscribeActual () method and CreateEmitter class are not called yet.

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source; Subscribe (Observer) {Override protected void subscribeActual(Observer<? super T> observer) { } }Copy the code

Step 2. Now you create the ObservableCreate Observable, which calls the Subscribe () method to subscribe to the observer

public final void subscribe(Observer<? Super T > observer) {/ / check whether is empty ObjectHelper. RequireNonNull (the observer,"observer is null"); Try {/ / used for debugging, and returned by the real environment or the observer, ignore this step. The observer = RxJavaPlugins onSubscribe (this, the observer). ObjectHelper.requireNonNull(observer,"");
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        
    }
}Copy the code

You can see that you are calling the subscribeActual(Observer); This abstract method. You call the subscribeActual () method of the ObservableCreate object created in Step 1 and pass in the Observer created in step 3

Now look at the class subscribeActual () that ObservableCreate created in Step 1

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source; } @Override protected void subscribeActual(Observer<? Super T> observer) {// This is a class that encapsulates an observer, CreateEmitter<T> parent = new CreateEmitter<T>(observer); // The onSubscribe callback from Step 3 is called here. observer.onSubscribe(parent); // This calls the subscribe method of ObservableOnSubscribe created in Step 1. // Emitters. OnNext (1); The onNext() method of the parent class finally calls the onNext() method of the Observer created in Step 3, thus finishing the process source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) {if (t == null) {
                return;
            }
            if(! IsDisposed ()) {// Call the observer observer.onnext (t) that you actually created in Step 3; } } @Override public void onError(Throwable t) { }Copy the code

ObservableCreate is an observed, and there’s an inner class CreateEmitter, and the inner class holds the observer, and the ObservableCreate sends data by calling onNext(T T), and then calling the inner observer.