introduce

For basic usage and basic flow of RxJava, see the previous article “Android tripartite Library – RxJava: RxJava Usage and Basic Subscription Flow”.

There will often be some data retrieval operations in real projects, which will require the use of RxJava threads. So let’s look at thread switching in RxJava.

1. Thread switchover case

Let’s start with a small example:

Log.i(TAG, "Current thread:" + Thread.currentThread().getName());
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        Log.i(TAG, "subscribe: currentThread : " + Thread.currentThread().getName());
        emitter.onNext("ONE");
        emitter.onComplete();
    }
}).subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe: currentThread : " + Thread.currentThread().getName());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: currentThread : " + Thread.currentThread().getName());
        Log.i(TAG, "onNext: s : " +  s);
    }

    @Override
    public void onError(Throwable e) {
    
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete: currentThread : "+ Thread.currentThread().getName()); }});Copy the code

Output result:

CurrentThread: main onSubscribe: currentThread: main subscribe: currentThread: rxcachedthreadscheduler-1 onNext: currentThread : main onNext: s : ONE onComplete: currentThread : mainCopy the code

We then execute this code in a thread:

new Thread(new Runnable() {
    @Override
    public void run() { RxObservable3(); }}).start();Copy the code

Output result:

CurrentThread: thread-5 onSubscribe: currentThread: thread-5 subscribe: currentThread: rxcachedthreadscheduler-1 onNext: currentThread : main onNext: s : ONE onComplete: currentThread : mainCopy the code

We can clearly draw several conclusions:

1. OnSubscribe runs on the same thread as the code runs on. 2. Observable runs on the thread specified by subscribeOn. 3. The Observer runs on the thread specified by observeOn.Copy the code

2. Source code analysis

Now let’s analyze the source code.

2.1 subscribeOn ()

In our case:

subscribeOn(Schedulers.io())
Copy the code

Enter its source code:

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
Copy the code

The parameter passed in here is the Observable (ObservableCreate) that is currently being created. Wrap it as ObservableSubscribeOn and return its object.

Scheduler is the schedulers.io () that you use.

We go into IO ()

public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}
Copy the code

IO is an object whose implementation depends on IOTask:

static final class IOTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return IoHolder.DEFAULT;
    }
}
--------
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}

Copy the code

As you can see, we get the IoScheduler object in this way. I’ll leave it here for later.

2.1.1 ObservableSubscribeOn# subscribeActual ()

According to the analysis of the last article, we know that subscribeActual() is an abstract method, which was implemented in ObservableCreate.

Now the ObservableCreate object is wrapped as a new ObservableSubscribeOn object

So let’s look at the subscribeActual method in ObservableSubscribeOn:

@Override
public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

    observer.onSubscribe(parent);
    
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
Copy the code

It is seen that no other thread has been used, so the observer onSubscribe() thread is the current thread.

So the thread that onSubscribe() executes is the current thread.

That is:

onSubscribe: currentThread : Thread-5
Copy the code

Moving on to the next line of code, you execute the SubscribeTask class.

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }
    
    @Override
    public void run() { source.subscribe(parent); }}Copy the code

This is a Runnable class that executes subscribe in the run method,

The source is the object of the last Observable, ObservableCreate.

2.1.2 Schedule# scheduleDirect ()

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    returnscheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } ----------------------------------- @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @nonnull TimeUnit unit) {// createWorker is an abstract method implemented in subclasses of Schedule. IoScheduler final Worker w = createWorker(); Final Runnable decoratedRun = rxJavaplugins.onschedule (run); // Wrap Workder and Runnable as a DisposeTask DisposeTask = new DisposeTask(decoratedRun, w); // Execute w.schedule(task, delay, unit);return task;
}
Copy the code

Further look at the createWorker() and worker’s schedule() methods in IoScheduler.

final AtomicReference<CachedWorkerPool> pool; // AtomicReference is an atomic update @nonnull @override public WorkercreateWorker() {// Get an EventLoopWorker object and pass in a cache poolreturnnew EventLoopWorker(pool.get()); } -------------------------- static final class EventLoopWorker extends Scheduler.Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean(); EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); this.threadWorker = pool.get(); }... @nonnull @Override public Disposable Schedule (@nonnull Runnable action, long delayTime, @NonNull TimeUnit unit) {if (tasks.isDisposed()) {
            // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } / / Runnable is ultimately to return to threadWorker threadWorker. ScheduleActual (action, delayTime, unit, the tasks); }}Copy the code

We can look at the ThreadWorker, which does not implement scheduleActual(),

We go to its parent class, NewThreadWorker

public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); } @NonNull public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {// Run is SubscribeTask Runnable decoratedRun = rxJavaplugins.onschedule (run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); . // omit irrelevant code Future<? > f; Try {// execute with thread pool hereif (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if(parent ! = null) { parent.remove(sr); } RxJavaPlugins.onError(ex); }return sr;
}
Copy the code

And then finally there’s the Executor thread pool object that executes the tasks and SubscribeTask is executed by the thread pool,

That is, the Subscribe () method of Observable is called in the IO thread.

Previous output:

subscribe: currentThread : RxCachedThreadScheduler-1
Copy the code

So the thread that subscribe() executes is the thread that subscribeOn specifies (in this case, the IoScheduler).

2.1.3 Problem of setting subscrineOn() multiple times

Case study:

// omit before and after code, Schedulers.io() // subscribeOn(schedulers.newthread ())// SubscribeOn (AndroidSchedulers. MainThread ()) / / for the third timeCopy the code

Output result:

subscribe: currentThread : RxCachedThreadScheduler-1 
Copy the code

That’s why the first subscribeOn() setting works?

Because each call to subscribeOn() rewraps the previous Observable.

Let’s take a look at this diagram: a detailed illustration of RxJava’s message subscription and thread switching by Yu Gang

From the third ObservableSubscribeOn every time it would notify the last Obsevable

And then it was uploaded to the first ObservableSubscribeOn,

So you only set the first subscribeOn() that takes effect.

2.2 observeOn( )

Settings in the case:

.observeOn(AndroidSchedulers.mainThread())
Copy the code

Enter the source code:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

-----------------

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
Copy the code

This also rewraps an ObservableObserveOn object,

Here this is the ObservableSubscribeOn object that was wrapped earlier.

And then we move on to ObservableObserveOn.

2.2.1 ObservableObserveOn# subscribeActual ()

@Override protected void subscribeActual(Observer<? Super T> observer) {// If it is the current thread, call the Subscribe () method of ObservableSubscribeOn directlyif (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else{/ / scheduler is AndroidSchedulers here. MainThread () scheduler. The Worker w = scheduler. CreateWorker (); Subscribe source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}Copy the code

Let’s look at the source code in ObserveOnObserver:

2.2.2 ObserveOnObserver# onNext ()

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); // join queue} schedule(); }Copy the code

The main thing is to execute schedule().

2.2.3 ObserveOnObserver# schedule ()

void schedule() {
    if(getAndIncrement() == 0) { worker.schedule(this); }}Copy the code

ObserveOnObserver implements Runnable itself, so it calls itself.

The worker here is the mainThread you pass in.

Then let’s look at the run method:

@Override
public void run() {
    if(outputFused) {// outputFused is the defaultfalse
        drainFused();
    } else{ drainNormal(); }}Copy the code

Enters the drainNormal() :

void drainNormal() { int missed = 1; Final SimpleQueue<T> q = queue; // This is a custom Observer. Final Observer<? super T> a = downstream; . V = q.pll (); . // Omit some code // here is calling a custom Observer#onNext()a.onNext(v); . }}Copy the code

So we’ll end up calling our custom Observer#onNext()

So Observer#onNext() is called in the thread specified by observeOn().

Note:

The packing order here is

CreateEmitter wraps SubscribeOnObserver.

SubscribeOnObserver wraps ObserveOnObserver;

ObserveOnObserver wraps custom observers.

For calling the onNext() method:

We’re calling onNext() in the Subscribe ObservableEmitter;

OnNext () is called in the SubscribeOnObserver;

It will then proceed to ObserveOnObserver#onNext();

So the Observer object called in run is actually a custom Observer.

End of the 3.

Above is my RxJava thread switch source analysis, if there is wrong, please correct.