Rxjava2

The usage is very simple, but the source code is a little complicated. Here, in order to simulate multi-subscribeon and observeOn, two methods and two classes are added to the Observable class for easy analysis and debugging.

  • Q1: How is Rxjava’s chained call implemented?

Rxjava does not. The Observable class alone has 15000+ code. The chain invocation is implemented in the decorator design pattern, with the exception of the last subscription, once for each chain invocation. Our last chain call to the Observable above is ObserveOn2, so the resulting Observable is ObservableObserveOn2. I’ll call ObservableOnSubscribePseudo observablesBecause there’s only one subscribe method, unlike other Observables, The Subscribe method of ObservableOnSubscribe can be used by transmitters to emit data (calling onNext, onComplete, onError, etc.).

public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Throwable;
}

//from Observable
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    // The source is the pseudo-Observable
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Copy the code

Ps: all know observables dolls, that it corresponds to the observer the dolls? I got a doll too.

The OnSubscribe method is called by user Observer

  • 2.1, subscription trigger — > ObservableObserveOn2. Subscribe (observer)

Observable is ObservableObserveOn2, and observeOn2 calls to switch to the main thread.Let’s look at the subscribe method.

public final void subscribe(Observer<? super T> observer){... subscribeActual(observer); . } } @Override protectedvoid subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer); // Current thread mode
    } else  / / we are here in a AndroidSchedulers mainThread () - > android main thread model
        Scheduler.Worker w = scheduler.createWorker()
        source.subscribe(newObserveOnObserver2<T>(observer, w, delayError, bufferSize)); }}Copy the code

The parameter observer is the user observer, which is encapsulated by ObserveOnObserver2 and includes the scheduler and buffersize(128). The main thread for Android is from JakeWharton’s RxAndroid library. Let’s take a look at HandlerWorker.

  • 2.1.1 How does Rxjava switch to the main thread?

public static Scheduler mainThread(a) {
    return new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
final class HandlerScheduler extends Scheduler {...public Worker createWorker(a) {
        return newHandlerWorker(handler, async); }}private static final class HandlerWorker extends Worker {
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {... ScheduledRunnable scheduled =new ScheduledRunnable(handler, run);
        Message message = Message.obtain(handler, scheduled);
        message.obj = this; 
        handler.sendMessageDelayed(message, unit.toMillis(delay));
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }
        return scheduled;
    }
    @Override
    public void dispose(a) {
        disposed = true;
        handler.removeCallbacksAndMessages(this /* token */); }... }Copy the code

Guys, this is where the thread switch is using handler. Runnalb, however, must be ObserveOnObserver2, so that its onNext, onComplete, and onError threads will pass through the Handler’s switching thread before being distributed to the user Observer.

static final class ObserveOnObserver2<T> implements Observer<T>, Runnable {
    final Observer<? superT> downstream; . ObserveOnObserver2(Observer<?super T> actual, Scheduler.Worker worker, int bufferSize) {
        this.downstream = actual; . }@Override
    public void onSubscribe(Disposable d) {
       // There is no call to worker.schedule(this); methods
    }
    @Override
    public void onNext(T t) {... worker.schedule(this);
    }
    @Override
    public void onError(Throwable t) {... worker.schedule(this);
    }
    @Override
    public void onComplete(a) {... worker.schedule(this);
    }
     @Override
     public void run(a) {
        if (outputFused) {
            drainFused();
        } else{ drainNormal(); }}void drainNormal(a) {... T v = q.poll(); downstream.onNext(v); }}Copy the code

When onNext is triggered, the worker.schedule(this) performs a thread switch and the handler processes the message using the run method, which runs the drainNormal. All the preceding observeons go drainFused, only the last one drainNormal) and send it to the drainNormal downstream observer. Here is the user observer. So the onNext, onComplete, and onNext methods of the user Observer are related only to the thread set in the nearest observeOn method. But where is the thread on which the onSubscribe method of the user Observer is determined? Inform later.

  • 2.1.2 Continue subscribeActual of ObservableObserveOn2

  • 2.2, ObservableMap. Subscribe (observer)

Subscribe (observer) -> subscribe(observer)

Map Major operations

static final class MapObserver<T.U> extends BasicFuseableObserver<T.U> {
    final Function<? super T, ? extends U> mapper;
    MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
        super(actual);
        this.mapper = mapper;
    }
    @Override
    public void onNext(T t){ U v = mapper.apply(t) downstream.onNext(v); }... }Copy the code

The map operation is too simple, passing the value returned by the Apply method down as the new value in the onNext chain.

  • 2.3, ObservableObserveOn. Subscribe (observer)

The Worker passed in by ObserveOnObserver constructs is the EventLoopWorker of IoScheduler. The number of core threads is the same as the number of Max threads, which is equal to the number of CPU. But that’s not how Rxjava works, and it’s a bit more complicated than we’ll discuss here. Just think of it as thread pool submission. Ps:The thread pool handles onNext, onComplete, and onError of its downstream observer, not the current ObserveOnObserver.The onNext of the user Observer is determined by the thread whose observeOn is closest to it.

  • 2.4, ObservableSubscribeOn2. Subscribe (observer)

Now I’m going to subscribeOn, which is not the same as observeOn.Look at observer.onSubscribe(parent) in the subscribeActual methodThe observer is ObserveOnObserverSo soon the downstream onSubscribe is executed, since the Observer onSubscribe is called from the innermost ObservableCreate method’s subscribeActual. In fact, the subscribeOn closest to ObserveOn starts calling onSubscribe downstream.

  • Against 2.4.1, ObserveOnObserver onSubscribe

@Override
public void onSubscribe(Disposable d) {
  / / d is SubscribeOnObserver2
    if (DisposableHelper.validate(this.upstream, d)) {
        this.upstream = d;
        if (d instanceof QueueDisposable) {
          ......
          return 
        }
        queue = new SpscLinkedArrayQueue<T>(bufferSize);
        // Continue calling onSubscribe downstream
        downstream.onSubscribe(this); }}Copy the code

So d here is SubscribeOnObserver2, which is not QueueDisposable, so this is going to call onSubscribe downstream. Downstream of ObserveOnObserver is MapObserver.

static final class MapObserver<T.U> extends BasicFuseableObserver<T.U> {
public abstract class BasicFuseableObserver<T.R> implements Observer<T>, QueueDisposable<R> {
Copy the code
 // from MapObserver
public final void onSubscribe(Disposable d) {
    if (DisposableHelper.validate(this.upstream, d)) {
        this.upstream = d;
        if (d instanceof QueueDisposable) {
            this.qd = (QueueDisposable<T>)d;
        }
       downstream.onSubscribe(this)}}Copy the code

So MapObserver is a QueueDisposable, and it also calls onSusbcribe downstream, which is ObServer on ObServer2.

// from ObserveOnObserver2
@Override
public void onSubscribe(Disposable d) {
  / / d is SubscribeOnObserver2
    if (DisposableHelper.validate(this.upstream, d)) {
        this.upstream = d;
        if (d instanceof QueueDisposable) {
          ......
          return 
        }
        queue = new SpscLinkedArrayQueue<T>(bufferSize);
        // Continue calling onSubscribe downstream
        downstream.onSubscribe(this); }}Copy the code

Here ObserveOnObserver2 calls onSubscribe from the user observer.

Summarize the onSusbcribe call for user Observer The onSubscribe callback (observer) is in which thread?

This is the thread from which Rxjava initiates the subscription.

3, User Observer onNext method call flow

  • 3.1 Continue to subscribeActual of ObservableSubscribeOn2

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver2<T> parent;
    SubscribeTask(SubscribeOnObserver2<T> parent) {
        this.parent = parent;
    }
    @Override
    public void run() {
        source.subscribe(parent);
    }
}
public Disposable scheduleDirect(@NonNull Runnable run) {
    final Worker w = new EventLoopWorker(pool.get())
    w.schedule(run, 0, TimeUnit.NANOSECONDS);
    return task;
}
Copy the code

Source. Subscribe (parent), At this point that subscribeOn2 (schedulers. IO) determines the ObservableSubscribeOn. Subscribe (subscribeOnObserver2) method thread of execution, is really good oh.

  • 3.2, The ObservableSubscribeOn subscribe(Observer)

From the subscribe (observer) — – > subscribeActual (observer)

Observer. onSubscribe(parent), which indicates that onSubscribe is called for subscribeOnObserver2.You just set the value of subscribeOnObserver2’s upstream field to subscribeOnObserver, so the subscribeOnObserver2’s upstream is subscribeOnObserver and the downstream is passed in from the construct. For observeOnObserver.

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver2<T> parent;
    SubscribeTask(SubscribeOnObserver2<T> parent) {
        this.parent = parent;
    }
    @Override
    public void run() {
        source.subscribe(parent);
    }
}
public Disposable scheduleDirect(@NonNull Runnable run) {
    final Worker w = new EventLoopWorker(pool.get().getEventLoop());
    w.schedule(run, 0, TimeUnit.NANOSECONDS);
    return task;
}
Copy the code

Algorithm-based Schedulers.computation() is simply an computation thread pool that performs tasks source.subscribe(parent). Show subscribeOn (Schedulers.com putation ()) determines the ObservableCreate. Subscribe thread of execution (subscribeOnObserver) method.

  • 3.3, ObservableCreate. Subscribe (observer)

From the subscribe (observer) — – > subscribeActual (observer)CreateEmitter is created using the subscribeOnObserver.

Call onSubscribe(parent) and set the upstream Observer to createEmitter

Source. Subscribe (parent), the source is the pseudo-Observable (ObservableOnSubscribe).The Schedulers.computation() determines the thread on which the SUBSCRIBE method executes.

  • 3.4 CreateEmitter. OnNext method

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}
Copy the code

Observer. onNext is called onNext in the subscribeOnObserver directly

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        @Override
        public void onNext(T t){ downstream.onNext(t); }... }Copy the code

Downstream. onNext is directly the onNext call to subscribeOnObserver2

static final class SubscribeOnObserver2<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable 
    @Override
    public void onNext(T t) { downstream.onNext(t); }... }Copy the code

Downstream. OnNext is a direct onNext call to the observeOnObserver, and onNext is still running on the Computation thread.

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
    @Override
    public void onNext(T t){...if(sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t);// Put the data into the queue
        }
       if (getAndIncrement() == 0) 
            worker.schedule(this);
        }
    }
      @Override
    public void run() {
        if (outputFused) {
            drainFused()
        } else{ drainNormal(); }}void drainNormal(){... T v = q.poll(); downstream.onNext(v); }}Copy the code

Also distributing data downstream, observeOnObserver downstream mapObserver, its onNext will be on the IO thread.

OnNext in mapObserver does a simple processing of the data and then continues to distribute the data downstream.

@Override
public void onNext(T t) {
   ......
   U v = mapper.apply(t)
   downstream.onNext(v);
}
Copy the code

The onNext method of mapObserver’s downstream observeOnObserver2 continues to distribute data.

static final class ObserveOnObserver2<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
    @Override
    public void onNext(T t){...if(sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t);// Put the data into the queue
        }
       if (getAndIncrement() == 0) 
            worker.schedule(this);
        }
    }
      @Override
    public void run() {
        if (outputFused) {
            drainFused()
        } else{ drainNormal(); }}void drainNormal(){... T v = q.poll(); downstream.onNext(v); }}Copy the code

ObserveOnObserver2 onNext is executed on the same thread as MapObserver onNext. Downstream of observeOnObserver2 is the user Observer, where the onNext method of the user Observer is executed on the main thread after a mainthread code switch.

Analyze threads

Some people say that multiple subscribeOn, only the first time to effect, in fact, every time.

Each time the subscribeOn thread sets itsource ObservableThe subscribe method (or subscribeActual) has an impact.

Every time observeOn sets the thread to itDownstream of the ObserverOnNext, onComplete, onError of

The observed The subscribe method The observer OnSubscribe method OnNext/onComplete/onError method
Pseudo observables Computation thread CreateEmitter Computation thread
ObservableCreate Computation thread CreateEmitter Computation thread
ObservableSubscribeOn IO thread SubscribeOnObserver Computation thread Computation thread
ObservableSubscribeOn2 User threads SubscribeOnObserver2 IO thread Computation thread
ObservableObserveOn User threads ObserveOnObserver User threads Computation thread
ObservableMap User threads MapObserver User threads IO thread
ObservableObserveOn2 User threads ObserveOnObserver2 User threads IO thread
The user the observer User threads The main thread

Ps: The user thread is the thread called by the Observable. subscribe(Observer) code. CreateEmitter is not an Observer. CreateEmitter does not inherit an Observer, but contains onNext, onComplete, and onError methods.