When we need to switch threads in the RxJava event stream to execute code, that is, asynchronous event stream programming using RxJava, we need to switch threads using subscribeOn and observeOn.

So what does the source code do when we switch threads using subscribeOn and observeOn?

subscribeOn

Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> emitter) throws Exception { } }) .subscribeOn(Schedulers.io()) .subscribe(new Observer<Object>() { @Override public void  onSubscribe(Disposable d) { } @Override public void onNext(Object o) { } @Override public void onError(Throwable e) { }  @Override public void onComplete() { } });Copy the code

This is plain RxJava code that uses the subscribeOn operator to specify the IO policy in Schedulers. The policy creates an IoScheduler class that encapsulates a thread pool. That is, our IO policy using the subscribeOn operator and specifying Schedulers essentially switches threads using a thread pool whose scheduling RxJava has wrapped for us.

From the last blog post, we learned that RxJava code execution is wrapped in a package using the decorator model, which then calls SUBSCRIBE

For this code, our first package looks like this

(Source is the ObservableOnSubscribe object we pass in the create method, using a shorthand here.)

When the package executes the subscribe method, the logic of the source code actually looks like this

(Executer is a thread pool and SubscribeTask is essentially a Runnable that encapsulates an Observer)

When the SubscribeTask Runnable is committed to the thread pool, the Run method in the SubscribeTask is executed. At this point, a new thread is switched to execute the following code.

As we know from our last blog post, the last package passed in source is package 2. In this code, the source logic for package 2 is as follows when it calls the onNext method for event distribution

At this point, the source code interpretation of this code is over. Here’s a look at the source code of the observeOn operator.

observeOn

So let’s go straight to the code

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {

            }
        })
        .observeOn(AndroidSchedulers.mainThread())
		.subscribe(new Observer<Object>() {
			@Override
			public void onSubscribe(Disposable d) {

			}

			@Override
			public void onNext(Object o) {

			}

			@Override
			public void onError(Throwable e) {

			}

			@Override
			public void onComplete() {

			}
		});
Copy the code

No more nonsense, directly put the whole process of the source code interpretation are posted

(HandlerScheduler is essentially a class that encapsulates the Handler that specifies the main thread, and ObserveOnObserver is essentially a Runnable that encapsulates the Observer.)

You can see the same source flow as the subscribeOn operator above, except that switching threads using the observeOn operator is done during the onNext event distribution phase. Observer.onNext is executed on the main thread.

conclusion

  • Using the subscribeOn operator switches thread execution starting with a package at a layer; The observeOn operator will switch threads from one onNext.
  • Directly from the code point of view, the subscribeOn operator specifies that the previous operator and all subsequent statements will be executed on the new thread (this is when the observeOn operator is not used later, and then the observeOn operator will be switched to the new thread). The observeOn operator specifies that the next onNext, and all subsequent onNext, be executed on a new thread.
  • The subscribeOn operator only takes the first one specified; The observeOn operator can be used many times. Because we can know from the above interpretation that the subscribeOn operator switches threads starting from the package of a certain layer, this thread switch will be based on the thread that switches the ObservableSubscribeOn wrapped in the innermost layer. The observeOn operator switches threads from one onNext, so different onNext can be executed on different threads.