1. The difference between map and flatMap, flatMap can distribute multiple data to the following juejin.cn/post/684490…

  2. Global hooks RxJava

    RxJavaPlugins

    The global operator prints, the global thread listens, and so on. Here’s the global error handling:

    RxJavaPlugins.setErrorHandler(Consumer {
        LogUtil.d("RxJavaError:${it.message}")})Copy the code
  3. RxJava thread transformation

    The Android main thread is handled by a Handler (the real operations are wrapped in a Message)

    Child threads are pooled (the real operations are wrapped in Runnable)

    Process:

    1. Scheduler Scheduler that passes in different schedulers to use different threads

    2. Use Scheduler to create the Work(inner class) to use a real thread pool

    3. Pass in the specific operation wrapped as Runnable

    4. Schedule method is used to realize the scheduling, worker.schedule

  1. doSubscribeOn

    DoOnSubscribe is called from the same thread, or it can be switched to another thread. The code is as follows:

.doOnSubscribe(new Consumer<Disposable>() {
    @Override
    public void accept(Disposable disposable) throws Exception {
        System.out.println("==========jackie===doOnSubscribe======="+Thread.currentThread().getName());
    }
})
.subscribeOn(Schedulers.io())

//System.out: ==========jackie===doOnSubscribe=======RxCachedThreadScheduler-1
Copy the code
  1. Map Conversion Process

    Observable returns an ObservableMap by calling map(). It also creates a MapObserver (wrapped Observer). Observablemap.onnext ()-> Apply () observableMap.onNext ()

    If there are two Map transformations, the process

    Observables by map () returns a ObservableMap1, it will also create a MapObserver1 (packing inside the Observer), map () returns a ObservableMap2 again, It also creates a MapObserver2 (with MapObserver1 wrapped inside), and after calling the subscribe susbcribe() method, OnNext ()-> Apply2 ()-> Apply2 ()-> Observer.onNext()

    The onNext() method has the apply method inside it

The code must be executed in such a sequence that, although MapObservaber2 wraps MapObserver1, MapObservaber1’s onNext is executed first, followed by MapObservaber2’s onNext(), Then there is the onNext of the original Observer

The details of the code are not important; the packaging process must be clear, the order of execution of onNext in different observers and the order of execution of apply methods should be clear.

  1. Implement an RXJava-like observer pattern yourself

    public interface Action1<T> {
    
        void call(T t);
    
    }
    Copy the code
    public interface Callee<T> {
    
        void onNext(T t);
    
        void onError(Throwable t);
    
        void onComplete(a);
    
    }
    Copy the code
    public class Caller<T> {
    
        final OnCall<T> mOnCall;
    
        public Caller(OnCall<T> onCall){
            this.mOnCall = onCall;
        }
    
        public static <T> Caller<T> create(OnCall<T> onCall){
            return new Caller(onCall);
        }
    
        public Calling call(Receiver receiver){
            this.mOnCall.call(receiver);
            return receiver;
        }
    
        public interface OnCall<T> extends Action1<Receiver<T>>{}}Copy the code
    public interface Calling {
    
        void unCall(a);
    
        boolean isUnCalled(a);
    
    }
    Copy the code
    public abstract class Receiver<T> implements Callee<T>,Calling{
    
        @Override
        public boolean isUnCalled(a) {
            return false;
        }
    
        @Override
        public void unCall(a) {}}Copy the code

    The actual call:

     Receiver<String> receiver = new Receiver<String>() {
         @Override
         public void onNext(String o) {}@Override
         public void onError(Throwable t) {}@Override
         public void onComplete(a) {}}; Caller.OnCall onCall =new Caller.OnCall<String>() {
        @Override
        public void call(Receiver<String> stringReceiver) {
            stringReceiver.onNext("123"); }};// Simply pass in an instance and call stringReceiver.onNext(Object O) to get the value from the implementation class
    Calling calling = Caller.create(onCall).call(receiver);
    Copy the code

The class diagram relationship is as follows:

  1. CreateEmitter data processor is essentially a wrapper around an Observer and Subscribe

  2. The compose operator encapsulates the thread transition operation

  3. Back pressure Flowable operation, reactive pull required, S.requeset (long.max_value)

  4. RxJava for Single, Completable and Maybe

    type describe
    Observable Can emit 0 or N data and terminate with a success or error event.
    Flowable Can emit 0 or N data and terminate with a success or error event. Support Backpressure, can control the data source transmission speed.
    Single Single can only emit a Single data or error event

    The SingleObserver in Single has only onSuccess and onError, but no onComplete. This is the biggest difference between Single and the other four observed species.

    Single can be converted to Observable, Flowable, Completable, and Maybe using the toXXX method
    Completable It never emits data and only handles onComplete and onError events. You can view it as a Runnable of Rx.
    Maybe To be able to send zero or one data, either success or failure. Kind of like Optional

    Completable

    Completable.fromAction(new Action() {
                @Override
                public void run(a) throws Exception {
    
                    System.out.println("Hello World");
                }
            }).subscribe();
    
    //Completable often combines the andThen operator
    Completable.create(new CompletableOnSubscribe() {
                @Override
                public void subscribe(@NonNull CompletableEmitter emitter) throws Exception {
    
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        emitter.onComplete();
                    } catch (InterruptedException e) {
                        emitter.onError(e);
                    }
                }
            }).andThen(Observable.range(1.10))
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception { System.out.println(integer); }});Copy the code

Reference article:

www.jianshu.com/p/45309538a…

Juejin. Cn/post / 684490…