preface

RxJava is a library that often makes people make mistakes. Some colleagues are completely confused when they first touch RxJava. Personally, I feel that I have not grasped the core concept (in fact, I am biased against RxJava).

Of course, RxJava has many, many operators that make it harder to understand, but there are usually only a few operators used in daily development, and it’s not too late to explore unfamiliar operators.

RxJava also comes up a lot in interview questions, and I think interviewers who are looking at operators are being a bully

Here’s a simple example to help those of you who don’t understand RxJava, as well as myself, understand more about RxJava, and answer the following interview questions:

What are the roles and different behaviors of subscribeOn and observeOn?

A simple example

Here’s an extremely simple piece of code that contains some of the most common operations in everyday Android development (only RxJava is introduced here, so Schedulers has nothing to do with Android) :

Observable.just(1)
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .subscribe(integer -> System.out.println(integer));
Copy the code

There’s actually another problem here

Observable. Subscribe () returns a value of Disposable. Do you need to accept it? Why is that?

The code above simply processes an event (sending a number 1) and calls subscribeOn and observeOn to do the thread switch.

Regular users of Retrofit often write this code because Android network requests need to be in child threads and UI-related operations go back to the main thread, so you can’t avoid these two operators when making network requests in Android.

So what happened?

Creation of observables

In fact, just, subscribeOn, and so on all help us recreate an Observable and hold the former one.

Specifically:

  1. Just returns an observable1
  2. Observable1. SubscribeOn returns an observable2(Observable1), and observable1 is held
  3. Observable2. ObserveOn similarly

At this point you need to have this graph in mind (ObservableJust is the source of the event) :

So whatever operator you use, it’s basically the same thing, except that the link gets longer and longer.

Interpretation of subscribe() in code

For a Cold Observable, we need subscribe() to trigger the event stream.

Corresponds to the code above

subscribe(integer -> System.out.println(integer))
Copy the code

So this is very simple, we just pass in one Consumer, so what is Consumer? Well, producer-consumer is a kind of Consumer event.

What types can SUBSCRIBE be passed in? Here will not mention, take a look at the source code. You can pass in one or more consumers, or you can pass in an Observer.

If you subscribe to this event, you pass in an Observer. Observable.subscribe(Observer)

LambdaObserver

If you are a good finder, you will feel something is wrong. Consumer = Observer Consumer = Observer Consumer = Observer Consumer = Observer

If YOU pass in Consumer, you’re going to end up in this code

 public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError, @NonNull Action onComplete) {
    Objects.requireNonNull(onNext, "onNext is null");
    Objects.requireNonNull(onError, "onError is null");
    Objects.requireNonNull(onComplete, "onComplete is null");
    LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());
    subscribe(ls);
    return ls;
}
Copy the code

So, you’re passing in an Observer called LambdaObserver, which accepts multiple consumers, onNext, onError, and onComplete, using the apply() method for each Consumer. This corresponds to system.out.println (integer)(I just passed in a Consumer(onNext), which in this case is a lambda expression, reverting back to the apply() for Consumer))

Observable.subscribe()

Subscribe (Observer) when observable. subscribe(Observer) is called, the Observable first holds the Observer and calls its own subscribeActual() code as follows:

public final void subscribe(@NonNull Observer<? super T> observer) {
    Objects.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        thrownpe; }}Copy the code

You can see that you end up calling the subscribeActual(Observer) and passing in the Observer.

If you are a good finder, you will feel something is wrong.

observer = RxJavaPlugins.onSubscribe(this, observer);
Copy the code

What does this line of code do? Aren’t you curious? Actually, it was a hook opening, and it had nothing to do with normal procedure.

Keep in mind that the source is held in layers in the section of Observable creation.

SubscribeActual () is overwritten by subclasses, and each operator is different, but the general principle is the same. In this example we have the subscribeOn and observeOn operators, so here we look at the subscribeActual() of both operators

Observable.subscribeActual()

In fact, the subscribeActual() of the two operators are basically similar

// ObservableSubscribeOn.java -> subscribeOn
public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

    observer.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
// ObservableObserveOn.java -> observeOn
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(newObserveOnObserver<>(observer, w, delayError, bufferSize)); }}Copy the code

Look at the following observeOn. SubscribeActual () is easy to understand.

observeOn.subscribeActual()

First scheduler.createWorker() is associated with the scheduler we pass in, then we call source.subscribe() and pass in a newly created ObserveOnObserver(which is an observer). And pass in the ObserveOnObserver that you subscribeActual(Observer) sent in. Who are the source and the incoming observer here?

In this case, the source is the Observable returned by the subscribeOn operator, and the observer is the lambdaObserver we subscribe to.

Simply bind an Observer to the subscribeOn Observable.

Part of the scheduler problem is that the ObserveOnObserver constructor passes in W (worker) and observer. The drainNormal() is executed when observeonObserver.onNext () is called. The drainNormal() is executed when observeonObserver.onNext (). We then execute downstream.onNext() in a thread of the Scheduler, which is the LambdaObserver we passed in earlier, and get the event we want to receive (i.e., a number 1)

The source code process is as follows:

// ObserveOnObserver.java
// ObserveOnObserver builds the LambdaObserver as downstream
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
  // downstream
  this.downstream = actual;
  this.worker = worker;
  this.delayError = delayError;
  this.bufferSize = bufferSize;
}

// ObserveOnObserver.java
// ObserveOnObserver.onNexr() -> schedule()
public void onNext(T t) {
  if (done) {
      return;
  }

  if(sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }// ObserveOnObserver.java
// schedule() -> drainNormal()
public void run(a) {
    if (outputFused) {
        drainFused();
    } else{ drainNormal(); }}// ObserveOnObserver.java
void drainNormal(a) {

  final SimpleQueue<T> q = queue;
  final Observer<? super T> a = downstream;

  for (;;) {
      if (checkTerminated(done, q.isEmpty(), a)) {
          return;
      }
      for (;;) {
        / / a little
        a.onNext(v);
      }
      / / a little}}Copy the code

The only thing we don’t know at this point is how onNext() is called. But don’t worry, look at subscribeOn. SubscribeActual ()

subscribeOn.subscribeActual()

The specific code that subscribeOn subscribeOn is a little strange and doesn’t see any logic of source.subscribe(), which is actually quite unreasonable. How do you get notified if you don’t register an observer?

SubscribeTask, scheduleDirect() executes the task we pass in as follows:

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run(a) { source.subscribe(parent); }}Copy the code

Source. Subscribe (), and who is the parent? Is the SubscribeOnObserver newly created when subscribeActual() is SubscribeOnObserver, which holds the downstream observer(observeOn created observer), The source is our event source observable.just ().

Stage summary

Combined with the creation of an Observable, you should now have this map in mind

At this point, we know that observables and observers are layered, and that there are two thread pools because the operators are operators that switch threads and specify different schedulers.

But how do you string them together? How is observeonObserver.onnext () called?

The Observer. OnNext () series

We already know that subscribeOn calls ObservableJust. Subscribe () because in our code we have ObservableJust (1).subscribeon ().

The core processes of Observable.subscribe() are all subscribeActual(), so look at the subscribeActual() of ObservableJust.

protected void subscribeActual(Observer<? super T> observer) {
    ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
    observer.onSubscribe(sd);
    sd.run();
}
Copy the code

You can see that a ScalarDisposable() is created and run() is called, so look at the source code for run()

if (get() == START && compareAndSet(START, ON_NEXT)) {
    observer.onNext(value);
    if(get() == ON_NEXT) { lazySet(ON_COMPLETE); observer.onComplete(); }}Copy the code

Observer.onnext () is called, passing in a value(which is Observable. Just (value), so it’s the number 1).

What thread is currently in?

The thread that subscribeOn’s Scheduler subscribeOn Because we are through SubscribeTask. The run (), then call the ObservableJust. SubscribeActual ().

The observer. OnNext and then call the SubscribeOnObserver. OnNext (). Then call the ObserveOnObserver onNext (), finally appeared ObserveOnObserver. OnNext (), We already know that observeonObserver.onNext () will be called into our final LambdaObserver. So at this point, the whole process is worked out.

If you are a good finder, you will feel something is wrong. Isn’t onCompelete also called? Not at all. With these two thread switching operators, an onCompelete() is sent when the Scheduler can’t get the task, triggering the subsequent process, and the rest of the operators get more complicated.

So this is just the tip of the RxJava iceberg. But trust me, if you didn’t know it before, you should have some idea now.

So this simple example of all the processes strung together is shown in this diagram:

Difference between subscribeOn and observeOn operators

This is just to say that the previous interview question basically asks you how switching threads behaves differently and whose threads are affected.

SubscribeOn () is called many times, the thread where the event source is located is the earliest (closest to the event source), and observeOn() is called many times, and the thread which is closest to the observer finally takes effect. SubscribeOn affects the observed thread, and observeOn affects the observer thread.

Interviewers often ask, why? (Actually there is no reason, the source code is so, and the design is reasonable)

Let’s just force it and see why?

Because subscribe() is executed after subscribeOn is called many times, Just make the ObservableSubscribeOn(which is an Observable) layer hold an Observable in front of it that is closer to the event source and call its Observable.subscribe()(to be exact) SubscribeActual ()). But after each subscribe() call, it first executes run() in its own scheduler thread. So when the next Observable.subscibe() is called many times, the thread is switched (before the event source is reached) until the previous Observable of the event source is passed, its subscribe() triggers the event source event. So the event source executes in the thread that the earliest subscribeOn scheduler executes. ObserveOn, by contrast, occurs during a callback. Subscribe () also causes observables to create an observer, which calls back to the last observer. While our obser.onNext () or other methods are in the thread of the scheduler executed by observeOn, the last observer is our LambdaObserver, so it is the latest observeOn to take effect.

conclusion

RxJava is a layered, nested structure where each operator helps you create a new Observable and holds a reference to the previous Observable all the way to the source of the event.

As a common Cold Observable, the process starts when you call SUBSCRIBE (lambda) and pass in a lambdaObserver. Then there are layers of subscribe() calls that create multiple observers, and the current observer holds the downstream observer. When the event is executed by the event source, the event is passed along, and the observer observes the event and calls its own onNext() or other method (operator implementation) to continue notifying downstream observers until it is called back to the LambdaObserver we originally sent. (This example is the simplest case, with different implementations of the operators, but the principles are basically the same.)