preface

The last article combs the entire RxJava/RxSwift definition, subscription, event generation -> consumption process and source code analysis, this article is mainly to talk about the RxJava subscribeOn and observeOn source code implementation through the previous summary of ideas, if you have not seen the previous suggestions to move to read. As the previous article involves two languages of Rx version, resulting in a large length, this article first summarizes the RxJava version, followed by a RxSwift version (PS: this article Rx source code based on RxJava 2.1.16).

process

Based on the Observable. Create parsed above, we add subscribeOn and observeOn operators. A flow chart is attached with the code, which is mainly to supplement the code data flow analyzed above and the subscribeOn and observeOn we talk about this time.

Observable.create<String> {	// it == CreateEmitter
            Log.d(TAG, "Event generation thread:${Thread.currentThread().name}")
            it.onNext("rx")
            it.onComplete()
        }
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {	// onNext
            Log.d(TAG, "Event consuming thread:${Thread.currentThread().name}")
            Log.d(TAG, it)
        }
Copy the code

As you can see from the previous code parsing, RxJava is inThe subscription period relies on the implementation of the abstract method subscribeActual. Event delivery relies on the implementation of methods such as onNext/onError. Therefore, the following article will also summarize the implementation of subscribeOn and observeOn based on these two aspects.

subscribeOn

The outer layer calls the subscribeOn method, creates an ObservableSubscribeOn object, and uses itself as the source of the ObservableSubscribeOn.

// Obserable.java
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
Copy the code
// ObservableSubscribeOn.java
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T.T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler; }...Copy the code

Source:

  • SubscribeOn method: Observable. Java line 12359
  • ObservableSubscribeOn.java

subscribeActual

When you call subscribeActual,

  • Create the SubscribeOnObserver object that decorates the S (upper level Observer Observer) that we pass in.
  • Create a SubscribeTask object that SubscribeOnObserver as its member variable, and the code below shows that SubscribeTask is a Runnable object.
  • Here is an operation that uses Scheduler to set the SubscribeTask object (Runnable object). This paper does not involve the code parsing of scheduler thread scheduler, which can be simply understood as setting the thread scheduling.
  • Then look at the run method of the SubscribeTask object (Runnable object), that is, the logic when the thread is scheduled to this Runnable is source.subscribe(parent); The source is the source passed in when the ObservableSubscribeOn is created, and the parent SubscribeOnObserver is sent up after the thread is switched.
// ObservableSubscribeOn.java 
@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
Copy the code
// ObservableSubscribeOn.java 
final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run(a) { source.subscribe(parent); }}Copy the code

Source:

  • ObservableSubscribeOn.java

onNext

Referring to the flowchart at the beginning, the event flow is generated and passed down onNext through the onNext method of SubscribeOnObserver. Here, the onNext method of the member variable actual (that is, the observer passed in the subscribeActual method, that is, the next level observer) is simply called without thread processing.

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

   private static final long serialVersionUID = 8094547886072529208L;
   final Observer<? super T> actual;

   final AtomicReference<Disposable> s;

   SubscribeOnObserver(Observer<? super T> actual) {
       this.actual = actual;
       this.s = newAtomicReference<Disposable>(); }...@Override
   public void onNext(T t) { actual.onNext(t); }...Copy the code

Source:

  • ObservableSubscribeOn.java

observeOn

The outer layer calls the observeOn method and creates an ObservableObserveOn object, using itself as the source of the ObservableObserveOn.

// Observable.java
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
   ObjectHelper.requireNonNull(scheduler, "scheduler is null");
   ObjectHelper.verifyPositive(bufferSize, "bufferSize");
   return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
Copy the code

Source:

  • ObserveOn methods: Observable. Java lines 9694, 9759
  • ObservableObserveOn.java

It is important to note that there is a default bufferSize, which is used to define the size of the cache queue in observeOn, as discussed later. BufferSize () refers to flowable.buffersize (). A system property, rx2.buffer-size, is read, which defaults to 128 by default and can be modified if desired.

// Observable.java
public static int bufferSize(a) {
    return Flowable.bufferSize();
}

// Flowable.java
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
    BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size".128));
}
Copy the code

subscribeActual

When you call subscribeActual,

  • Create a Scheduler.Worker object that encapsulates the logic for scheduling threads in Rx.
  • Create a wrapper class ObserveOnObserver object.
  • The subscribe call to source (the source passed in when ObservableObserveOn was created) takes the ObserveOnObserver object as an argument, passing the subscription up.
  • Note: There is no thread switching logic involved.
// ObservableObserveOn.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
       source.subscribe(observer);
    } else {
       Scheduler.Worker w = scheduler.createWorker();

       source.subscribe(newObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}Copy the code

Source:

  • ObservableObserveOn.java

onNext

Buffersize initializes the SimpleQueue queue in ObserveOnObserver. Trigger time is onSubscribe. This queue is used to cache events to facilitate thread switching when onNext’s events fire.

// ObservableObserveOn.java
@Override
public void onSubscribe(Disposable s) {
     if (DisposableHelper.validate(this.s, s)) {
          this.s = s; . queue =new SpscLinkedArrayQueue<T>(bufferSize);

          actual.onSubscribe(this); }}Copy the code

Following the flowchart at the beginning, the event flow is generated by onNext and passed down through the onNext method of ObserveOnObserver. In onNext, instead of calling onNext down, the data is first cached to the queue mentioned above, and then the schedule() method is called.

// ObservableObserveOn.java
@Override
public void onNext(T t) {
    if (done) {
       return;
    }

    if(sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }Copy the code

The schedule() method calls worker.schedule to switch the logic executed by the thread. This is passed in because ObserveOnObserver is itself a Runnable object. So we can read its run method to get the logic to run after switching threads.

// ObservableObserveOn.java
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {...void schedule(a) {
    if (getAndIncrement() == 0) {
       worker.schedule(this); }}Copy the code

The run method,

  • In the run method, the flag bit outputFused is usually false. Here we look at the drainNormal method first.
  • The drainNormal method calls the onNext method of the actual member variable (that is, the observer passed in to the subscribeActual method) using an infinite loop of the items previously cached in the queue. Of course, onError handling is called back when poll throws an exception. Note that the logic here is already running on the thread corresponding to the scheduler we originally set up.
  • Another interesting thread synchronization design is that the ObserveOnObserver is itself an AtomicInteger object. There’s getAndIncrement() in the schedule() method and addAndGet at the end of the drainNormal’s first layer loop, which is actually the value of AtomicInteger itself +1, -1. To ensure the above worker.schedule(this); Logic is called only once in concurrent cases to ensure thread-safety.
@Override
public void run(a) {
    if (outputFused) {
        drainFused();
    } else{ drainNormal(); }}...void drainNormal(a) {
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = actual;

    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        for (;;) {
            boolean d = done;
            T v;

            try {
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                s.dispose();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;

            if (checkTerminated(d, empty, a)) {
                return;
            }

            if (empty) {
                break;
            }

            a.onNext(v);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break; }}}Copy the code

Source:

  • ObservableObserveOn.java

conclusion

At this point, the subscribeOn and observeOn source code is parsed. The points to understand are:

  • SubscribeOn thread scheduling occurs in subscribeActual, that is, the subscription stage.
  • ObserveOn thread scheduling takes place during event processing/consumption phases such as onNext/onError.

Next, let’s use the aforementioned source code for scientific analysis to solve threading problems that are often encountered in Rx.

Q&A

Sometimes I see some tutorials that mention “the thread generated by the event depends on subscribeOn” and “the thread processed by the event is the same as the thread consumed by the event by default, and is affected by observeOn”. This common problem can be resolved by combining the source code.

Which thread does Rx run on by default?

As you can see in the following code, the code is called on the main thread and the final output is run on the main thread.

It is important to understand that Rx does not default to the main thread, but to the current thread. Because only the subscribeOn and observeOn operators have the ability of thread scheduling, to be precise, the scheduler in subscribeOn and observeOn has the ability of thread scheduling, and the two operators only control the process.

So, Rx code runs on the current thread without a scheduler to switch threads.

Observable.create<String> {	// it == CreateEmitter
    Log.d(TAG, "Event generation thread:${Thread.currentThread().name}")
    it.onNext("I'm ")
    it.onComplete()
}.map {
    Log.d(TAG, "Event generation thread:${Thread.currentThread().name}")
    return@map it + "rx"
}.subscribe { // onNext
    Log.d(TAG, "Event consuming thread:${Thread.currentThread().name}") log.d (TAG, it)} Output: event generation thread: main Event processing thread: main Event consumption thread: mainCopy the code

Determine which thread executes the problem

Now that you understand that thread switching only occurs when subscribeOn or observeOn has a thread switching operation, let’s look at how to determine what thread each code block executes on. 1. SubscribeOn thread scheduling occurs in subscribeActual, that is, in the subscription stage. 2. ObserveOn thread scheduling occurs during onNext/onError event processing/consumption phase.

Here’s a slightly more complex example where we can try to determine the thread status directly from this code:

Observable. Create The thread of execution generated by the incoming event

In the example of Observable.create, the common events are generated after the subscription, and the subscribeActual of subscribeOn calls the upper subscription (in combination with the code, source is the ObservableCreate object). Call the subscribe method will eventually call ObservableOnSubscribe. The subscribe method, namely observables. Create the incoming the code block. The subscribeActual that is subscribeOn undergoes a thread switch. So this block of code is called in the schedulers.io () (rxcachedThreadScheduler-1) thread. (ps: In most cases, event generation and subscription are not continuous, so event generation does not necessarily occur in the subscribed thread. This can be a confusing point.)

The first map operator executes the thread

The logic that triggers the map operator is called after onNext, and the next onNext goes through the observeOn operator, and there’s a thread switch, This means that this block will be called on the Schedulers.computation() (RxComputationThreadPool-1) thread.

The second map operator executes the thread

On the analysis of reference 2 and a code block in AndroidSchedulers. MainThread () (main) threads to execute.

The final event consumes the thread of execution

Event consumption also takes place in the onNext event stream, where you can look for the nearest observeOn definition above it, since subsequent onNext does not pass through the observeOn operator again, indicating that the thread has not switched again. So the final event consumption in AndroidSchedulers. MainThread () threads (the main).

Observable.create<String> {	// it == CreateEmitter
            Log.d(TAG, "Event generation thread:${Thread.currentThread().name}")
            it.onNext("I'm ")
            it.onComplete()
        }
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation())
            .map {
                Log.d(TAG, "Event processing thread:${Thread.currentThread().name}")
                return@map it + "rx"
            }
            .observeOn(AndroidSchedulers.mainThread())
            .map {
                Log.d(TAG, "Event processing thread 2:${Thread.currentThread().name}")
                return@map "$it!"
            }
            .subscribe {  // onNext
                Log.d(TAG, "Event consuming thread:${Thread.currentThread().name}") log.d (TAG, it)} Event generation thread: RxCachedThreadScheduler-1Event processing thread: RxComputationThreadPool-1Event processing thread2: main Event consuming thread: mainCopy the code

Conclusion:

Through the above analysis, we can summarize:

  • The subscription phase is a bottom-up process and the observation phase (event processing/event consumption) is a top-down process.
  • Judgment of subscribing threads: Because subscribing is bottom-up and the timing of thread switching is also in the subscribeActual method, we can find the first subscribeOn operator from top to bottom to judge threads.
  • Observe thread judgment: top-down process, can find the code block above the nearest observeOn. This is why many tutorials talk about the effects of operators such as map and the event consuming thread observeOn.

One-time validity issue of subscribeOn

You’ll also often see a lot of people saying that subscribeOn is only valid when you set it the first time. Does that mean that the thread will only be switched once? In fact, the above introduction has been mentioned, let’s use a graphic illustration to illustrate this problem. (The event is processed and ignored for parsing purposes.)

Observable.create<String> {	// it == CreateEmitter
            Log.d(TAG, "Event generation thread:${Thread.currentThread().name}")
            it.onNext("I'm ")
            it.onComplete()
        }
            .subscribeOn(Schedulers.io())
            .subscribeOn(Schedulers.computation())
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe {  // onNext
                Log.d(TAG, "Event consuming thread:${Thread.currentThread().name}")
                Log.d(TAG, it)
            }
Copy the code

The process maps the data flow during the subscription process, and you can see that each call to subscribeActual undergoes a thread switch. That is, “the first set is valid” does not mean that there is only one thread switch, but that it will eventually switch to the first set thread.

The last

This paper analyzes the source code of subscribeOn and observableOn operators, as well as the analysis of some common problems. You can see that the subscribeOn and observableOn operators are related to thread scheduling, but their role is more of process control over Rx. So we can completely separate these two operators from the thread scheduling scheduler. I will continue to write articles summarizing scheduler’s source code parsing.

Series of articles:

  • What does Rx’s Observable.create do?
  • SubscribeOn of RxSwift and source code analysis of observeOn
  • Take a look at the Scheduler of RxJava.
  • How is RxSwift Scheduler implemented