Let’s take the following demo as an example to explain the entire workflow of Rxjava.

public void start() { Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, + thread.currentThread ().getName(); emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }}); Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, + subscribe + thread.currentThread ().getName()); } @override public void onNext(Integer value) {log.d (TAG, "observer Thread in" + thread.currentThread ().getName()); Log.d(TAG, "respond to Next event" + value); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }}; observable1 .subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .observeOn(Schedulers.newThread()) .subscribe(observer);Copy the code

subscribeOperator before

@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
Copy the code
  1. First operator. SubscribeOn (Observable.java)
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
Copy the code

ObservableSubscribeOn AbstractObservableWithUpstream inheritance, is a packaging of observables At this point, the execution of the new ObservableSubscribeOn, Observable has the following structure

  1. The second operator subscribeOn (Observable. Java) has the same basic execution process as above, but Scheduler is different, so it is still correctObservableAfter executing the New ObservableSubscribeOn,ObservableThe structure of

  1. The third operator. ObserveOn (Observable.java)
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
Copy the code

Wrap the above Observable again as ObservableObserveOn

    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
Copy the code

After executing the New ObservableObserveOn, the Observable has the following structure

  1. The fourth operator. ObserveOn (Observable.java)

It is the same as the Observable in step 3, but the Observable in step 3 is wrapped as follows

To sum up: all operators before Subscribe wrap around observables, so the innermost Observable is observable1 in the demo.

subscribeprocess

It’s important to understand that each operator corresponds to an Observer of a static inner class. The operator wraps an Observable downward, and the Observer is wrapped with the corresponding static inner class during the upsubscribe process.

.subscribe(observer);
Copy the code

Here the observer is the one actually declared in the application. Let’s look at the subscription process

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null"); Try {subscribeActual is the last wrapped Observable, ObservableObserveOn subscribeActual(observer); }}Copy the code
  1. Now what does the subscribeActual method of ObservableObserveOn do
 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } elseWorkerw = scheduler.createWorker(); Source. subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); }} You can see a wrapper around the defined observer and worker ObserveOnObserver(observer <? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.downstream = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; }Copy the code
  1. So the current Observer structure looks like this

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
ObservableSubscribeOn

 @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
Copy the code

This process wraps the Observer again,

The structure is shown in the red box

  1. observer.onSubscribe(parent);The subsequent incoming downstream (ObserveOnObserver) executes onSubscribe(parent), which is the SubscribeOnObserver
@Override
        public void onSubscribe(Disposable d) {
            if(DisposableHelper.validate(this.upstream, d)) { this.upstream = d; 支那ifCondition does not execute **if (d instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) d;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        downstream.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        downstream.onSubscribe(this);
                        return; } } queue = new SpscLinkedArrayQueue<T>(bufferSize); Execute this downstream, this downstream is also an ObserveOnObserver, downstream.onsubscribe (this); }}Copy the code

If you look at the comment above, you will call the onSubscribe method of ObserveOnObserver again. If (d instanceof QueueDisposable) is satisfied because d is an ObserveOnObserver, ObserveOnObserver inherits BasicIntQueueDisposable, so this if condition is satisfied

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>`
Copy the code

Because the result of this execution is not the same, so send the code again

@Override
        public void onSubscribe(Disposable d) {
            if(DisposableHelper.validate(this.upstream, d)) { this.upstream = d; 1. Enter if the conditions are metifstatementsif (d instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) d;
                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        downstream.onSubscribe(this);
                        schedule();
                        return; } 3. Execute the onSubscribe method of the downstream downstream after the calculationif (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        downstream.onSubscribe(this);
                        return; } } queue = new SpscLinkedArrayQueue<T>(bufferSize); downstream.onSubscribe(this); }} 2. In this Observer outputFused is set totrue@override public int requestFusion(int mode) {if((mode & ASYNC) ! = 0) { outputFused =true;
                return ASYNC;
            }
            return NONE;
        }
Copy the code

Look at note 3 above, then this downstream is actually an Observer written in our program, that is

 Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

                Log.d(TAG, "Start using subscribe connections." + Thread.currentThread().getName());
            }
Copy the code

Therefore, the conclusion here is that the Observer onSubscire is not subject to thread switching and is only relevant to the thread at which the call was made, since the thread is still in the county at which the call was made. Parent. SetDisposable (Scheduler. ScheduleDirect (new SubscribeTask(parent))); You can guess from the name of the function that this is where the thread switches. Let’s follow up on the code.

1). SubscribeTask implements the Runnable interface and saves the Observer

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() { source.subscribe(parent); }}Copy the code

2)scheduler.scheduleDirec(xx)

@NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        returnscheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { 1. EventLoopWorker Final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask = new DisposeTask(decoratedRun, w); 3. Execute worker schedule method w.schedule(task, delay, unit);return task;
    }
Copy the code

Follow up on EventLoopWorker.java

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don schedule, we are unsubscribed
                returnEmptyDisposable.INSTANCE; } where threadWorker is NewThreadWorkerreturnthreadWorker.scheduleActual(action, delayTime, unit, tasks); } where threadWorker is NewThreadWorkerCopy the code

Follow up NewThreadWorke. Java

/** * Wraps the given runnable into a ScheduledRunnable and schedules it * on the underlying ScheduledExecutorService. *  <p>If the schedule has been rejected, the ScheduledRunnable.wasScheduled willreturn
     * false.
     * @param run the runnable instance
     * @param delayTime the time to delay the execution
     * @param unit the time unit
     * @param parent the optional tracker parent to add the created ScheduledRunnable instance to before it gets scheduled
     * @return the ScheduledRunnable instance
     */
    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if(parent ! = null) {if(! parent.add(sr)) {returnsr; } } Future<? > f; try {if(delayTime <= 0) {ScheduledExecutorService executor; f = executor.submit((Callable<Object>)sr); }else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if(parent ! = null) { parent.remove(sr); } RxJavaPlugins.onError(ex); }return sr;
    }
Copy the code
  1. So this is where the thread switches, and ScheduledExecutorService executes the task, and who is that task? It just came all the waySubsribeTaskExecutes its run method, which issource.subscribe(parent);

To sum up, ObservableSubscribeOn’s approach continues wrapping the downstream Observer, then switches threads, continuing the subscription in the new thread, and passing the wrapped Observer upstream.

  1. The next source, ObservableSubscribeOn, will continue, and the thread switches to schedulers.newthread (). Follow up directly in the Run method of the SubscribeTask.
  2. The ObservableSubscribeOn source is the upstream of the ObservableSubscribeOn, which is the source of the ObservableSubscribeOnObservableCreatethe
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source; } @Override protected void subscribeActual(Observer<? Createcreateemitter <T> parent = new CreateCreateEmitter <T>(observer); observer.onSubscribe(parent); 2. Execute the true subscribe method source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override 3. Public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {log.d (ObservableEmitter<Integer> emitter)"Current thread name"+ Thread.currentThread().getName()); emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }});Copy the code

To sum up: The thread where the final observed is located is executed in the thread set in the first SubscribeOn. Therefore, although SubscribeOn threads are switched for many times, each time it takes effect, the first thread is the one that sends the data at last.

Take onNext as an example to analyze the ObserveOn process

  1. The CreateEmitter onNext
 @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if(! IsDisposed () {1. This observer is the last onesourceThe corresponding static inner class SubscribeOnObserver Observer.onNext (t); }}Copy the code
  1. The SubscribeOnObserver onNext
@override public void onNext(T T) {this downstream is the next SubscribeOnObserver downstream.onnext (T); }Copy the code
  1. According to the above comment, the implementation of 2 will be repeated, which is omitted here, so after the implementation, who is this downstream? It is ObserveOnObserver, so the onNext method of ObserveOnObserver will be implemented.
  @Override
        public void onNext(T t) {
            if (done) {
                return; } 1. Store data to queueif (sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); } 2. Schedule (); } voidschedule() {
            if(getAndIncrement() == 0) {increment (); }}Copy the code
  1. This time a thread switch is performed, and the run method of the current Observer is executed

        @Override
        public void run() {1. OutputFused is used fortrue, so go to drainFusedif (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
        
        
      void drainFused() {
            int missed = 1;

            for (;;) {
                if (disposed) {
                    return;
                }

                boolean d = done;
                Throwable ex = error;

                if(! delayError && d && ex ! = null) { disposed =true;
                    downstream.onError(error);
                    worker.dispose();
                    return; } 2. Execute the onNext method to the downstream. Who is the downstream? Or ObserveOnObserver downstream. OnNext (null);if (d) {
                    disposed = true;
                    ex = error;
                    if(ex ! = null) { downstream.onError(ex); }else {
                        downstream.onComplete();
                    }
                    worker.dispose();
                    return;
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break; }}}Copy the code
  1. Following comment 2 above, perform the thread switch again, executing the run method
 @Override
        public void run() {1. The difference in Step 4 is that the outputFused isfalseSo execute the drainNormal() methodif (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
    
     void drainNormal() { int missed = 1; final SimpleQueue<T> q = queue; 2. Who is this downstream? Final Observer<? super T> a = downstream;for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
    
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

                Log.d(TAG, "Start using subscribe connections."+ Thread.currentThread().getName()); } @override public void onNext(Integer value) {log. d(TAG, TAG, TAG) {Override public void onNext(Integer value) {Log."Observer thread is in" + Thread.currentThread().getName());
                Log.d(TAG, "Respond to the Next event" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "Respond to an Error event");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "Respond to the Complete Event"); }};Copy the code

To sum up, thoughobserseOnThe thread can be specified more than once, but the observer thread declared in the final program is the lastobserveOnIs executed in the thread.

To sum up:

  1. Each operator from the top down wraps around the upstream ObServerable, generating a new Observable
  2. After subscribe is executed, it is equivalent to recalling the subscription process of the upstream Observable, which is a dismantling of the upstream Observable and a packaging of the downstream Observer. During this process, the thread in subscribeOn is switched. But the ultimate Observable is in the thread that switches the first time
  3. Observable implements the onNext method by dismantling the Observer and switching threads, and the Observer executes on the thread specified by the last observeOn