By default, no thread processing is done, and observables and observers are on the same thread. If you want to switch threads, you use subscribeOn() and observeOn().

1.subscribeOn

SubscribeOn specifies that processing of data runs on a specific thread Scheduler by accepting a Scheduler parameter. If subscribeOn is executed multiple times, only one time will take effect.

Click on the source code for subscribeOn() and you can see that an ObservableSubscribeOn object is created each time you call subscribeOn().

    /**
     * Asynchronously subscribes Observers to this ObservableSource on the specified {@link Scheduler}.
     *
     * @param scheduler
     *            the {@link Scheduler} to perform subscription actions on
     * @return the source ObservableSource modified so that its subscriptions happen on the
     *         specified {@link Scheduler}
     * @see #observeOn
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
Copy the code

The way ObservableSubscribeOn actually subscribes is by subscribeActual(Observer<? Super T > the observer).

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() { source.subscribe(parent); }})); }Copy the code

The SubscribeOnObserver is the OnSubscribe(Disposable) method of the downstream Observer

    s.onSubscribe(parent);
Copy the code

Then, the operations of the child thread are added to the Disposable management, which can facilitate the unified management of upstream and downstream. In this case, the scheduleDirect() method corresponding to the Scheduler has been called. ScheduleDirect () passes in a Runnable.

        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() { source.subscribe(parent); }}));Copy the code

At this point, the corresponding Scheduler thread is already running:

source.subscribe(parent);
Copy the code

In RxJava’s chained operation, data processing is bottom-up, as opposed to data transmission. If subscribeOn is called many times, the uppermost thread switchover is executed last, so it becomes effective only after the first switchover.

2.observeOn

ObserveOn also accepts a Scheduler parameter that specifies that downstream operations run on a particular thread Scheduler.

If observeOn is executed multiple times, it will work each time and the thread will keep switching.

Click on the observeOn source to see that an ObservableObserveOn object is created each time observeOn() is called.

    /**
     * Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
     * asynchronously with an unbounded buffer with {@link Flowable#bufferSize()} "island size".
     *
     * <p>Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly
     * asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload.
     *
     * @param scheduler
     *            the {@link Scheduler} to notify {@link Observer}s on
     * @return the source ObservableSource modified so that its {@link Observer}s are notified on the specified
     *         {@link Scheduler}
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
Copy the code

The way ObservableObserveOn actually subscribes is by subscribeActual(Observer<? Super T > the observer).

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
    
        // If the scheduler is TrampolineScheduler, subscriptions are immediately generated for upstream and downstream events.
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            // Otherwise the Scheduler creates its own Worker, and then upstream and downstream events generate subscriptions, generating an ObserveOnObserver object that encapsulates the real downstream Observer.
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(newObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}Copy the code

If the scheduler is trampoline escheduler, subscriptions are immediately generated for upstream and downstream events.

If not TrampolineScheduler, the Scheduler creates its own Worker, and then upstream and downstream events generate subscriptions, generating an ObserveOnObserver that encapsulates the real downstream Observer.

ObserveOnObserver is an internal class of ObservableObserveOn, which implements the Observer and Runnable interfaces. Different from SubscribeOnObserver, SubscribeOnObserver implements the Observer, Disposable interface.

In onNext() of the ObserveOnObserver, schedule() performs the specific scheduling methods.

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if(sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }void schedule(a) {
            if (getAndIncrement() == 0) {
               // The member variable worker: final Scheduler.Worker worker;
               // This refers to the current ObserveOnObserver object. This implements the Runnable interface
                worker.schedule(this); }}Copy the code

Then, take a look at the Runnable interface implementation method run(), which is executed in the Worker’s thread. DrainNormal () pulls data out of the queue in the ObserveOnObserver and sends it.

        @Override
        public void run(a) {
            if (outputFused) {
                drainFused();
            } else{ drainNormal(); }}Copy the code

If the downstream calls observeOn() multiple times, the thread will keep switching. Each time a thread is switched, the processing of the corresponding Observe object’s methods is performed on the specified thread.

Note: This is different from the subscribeOn() method, which is subscribeOn() multiple times and only once.

A simple example:

    public void multipleUsingSubscribeOnAndObserveOnTest(a) {
        Observable.just("HELLO WORLD")
                .subscribeOn(Schedulers.single())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {
                        s = s.toLowerCase();
                        print("map1:");
                        System.out.println(s);
                        return s;
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {
                        s = s + " tony.";
                        print("map2");
                        System.out.println(s);
                        return s;
                    }
                })
                .subscribeOn(Schedulers.computation())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {
                        s = s + " it is a test.";
                        print("map3");
                        System.out.println(s);
                        return s;
                    }
                })
                .observeOn(Schedulers.newThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        print("subscribe"); System.out.println(s); }}); }public void print(String label) {
        System.out.println(label);
        System.out.println(Thread.currentThread().getName());
    }
Copy the code

Output result:

The 2020-06-23 14:51:31. 699, 2627-2646 /? I/ system. out: map1: 2020-06-23 14:51:31.699 2627-2646/? I/System. Out: RxSinglescheduler-1 2020-06-23 14:51:31.699 2627-2646/? I/ system. out: Hello world 2020-06-23 14:51:31.701 2627-2647/? I/ system. out: map2 2020-06-23 14:51:31.701 2627-2647/? I/ system. out: RxCachedThreadScheduler-1 2020-06-23 14:51:31.701 2627-2647/? I/ system. out: Hello world Tony. 2020-06-23 14:51:31.702 2627-2647/? I/ system. out: map3 2020-06-23 14:51:31.702 2627-2647/? I/ system. out: RxCachedThreadScheduler-1 2020-06-23 14:51:31.702 2627-2647/? I/ system. out: Hello world tony. it is a test. 2020-06-23 14:51:31.703 2627-2648/? I/ system. out: SUBSCRIBE 2020-06-23 14:51:31.703 2627-2648/? I/ system. out: rxNewThreadScheduler-1 2020-06-23 14:51:31.703 2627-2648/? I/System.out: hello world tony. it is a test.Copy the code

Note: When subscribeOn and observeOn are used multiple times, as in the above example, the specific call thread is described as follows:

  • SubscribeOn () : Schedulers. Single ()
  • ObserveOn () : Schedulers. IO ()
  • SubscribeOn () : Schedulers. IO ()

Note: The thread called here is not schedulers.single () because subscribeOn() has already been called once, and its previous thread, called by observeOn(), will be used when this method is called again. But not the last subscribeOn() thread!!

  • ObserveOn () : Schedulers. NewThread ()