Rxjava 2.x source code series – Basic framework analysis

Rxjava 2.x Source code series – Thread Switching (1)

Rxjava 2.x Source code series – Thread Switching (2)

Rxjava 2.x source code series – Transform operator Map (top)

preface

In our last post in the Rxjava source code series – Infrastructure Analysis, we analyzed the infrastructure of Rxjava.

Observables and observers subscribe through the Subscribe () method, which notifies the Observer of events and calls back to Observer methods when needed.

A simple flow chart describes it as follows:


Observable#subscribeOn

In Android, we know that the default execution is on the main thread, so how does Rxjava implement thread switching?

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onComplete();
            }
        })
        .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("TAG"."onSubscribe(): ");
            }

            @Override
            public void onNext(String s) {
                Log.e("TAG"."onNext(): " + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.e("TAG"."onComplete(): "); }});Copy the code

So let’s look at the subscribeOn method first, and you can see that

@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) {/ / scheduler to empty ObjectHelper. RequireNonNull (scheduler,"scheduler is null"); // Wrap scheduler with ObservableSubscribeOnreturn RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}


Copy the code

As we know from the last blog, when we call Observable. subscibe(Observable), the subscribActual method of the specific Observable instance will be eventually called. The observables subscribeon is an example of an observable.

Next, let’s take a look at ObservableSubscribeOn this class, you can see inheriting AbstractObservableWithUpstream, While AbstractObservableWithUpstream inherit observables, realize HasUpstreamObservableSource this interface.

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
    ---
}


abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;

    /**
     * Constructs the ObservableSource with the given consumable.
     * @param source the consumable Observable
     */
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}

public interface HasUpstreamObservableSource<T> {
    /**
     * Returns the upstream source of this Observable.
     * <p>Allows discovering the chain of observables.
     * @return the source ObservableSource
     */
    ObservableSource<T> source(a); }Copy the code

The subscribeActual method of observableSubscribeOn is similar to the subscribeActual method of Observablecate, which is also a subclass of Observable. Just more than ObservableCreate HasUpstreamObservableSource implements an interface, the interface is very interesting, his source is () method returns the type ObservableSource (remember the role of this class? . In other words, ObservableSubscribeOn is an Observable with upstream. He has a key attribute, Source, which represents his upstream.

Let’s look at the implementation of ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }}Copy the code

Let’s start with its constructor, which has two arguments source, scheduler.

  • Source represents an upstream reference and is an instance of an Observable
  • Scheduler can create instances of schedulers.newthread () or schedulers.io ()

Here we have an overview of what Scheduler is. Scheduler encapsulates Worker and DisposeTask, which will be discussed in detail below.

Schedulers.newThread()

@NonNull
public static Scheduler newThread() {
    return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}


NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
static final class NewThreadTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return NewThreadHolder.DEFAULT;
    }
}
static final class NewThreadHolder {
    static final Scheduler DEFAULT = new NewThreadScheduler();
}


Copy the code
public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}

IO = RxJavaPlugins.initIoScheduler(new IOTask());

static final class IOTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return IoHolder.DEFAULT;
    }
}
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}
Copy the code

Back to the subscribeActual method of ObservableSubscribeOn, I explained how observables and observers implement subscriptions in my last blog post, but I won’t go into details here.

Next, let’s focus on this line of code

 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
 
Copy the code

Let’s start by looking at the SubscribeTask class, which is a non-static inner class of ObservableSubscribeOn. You can see that it is also relatively simple in that it implements the Runnable interface and holds the parent reference.

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() { source.subscribe(parent); }}Copy the code

Then in the run method, the connection is established by source.subscribe(parent). Thus, when our SubscribeTask run method runs on that thread, the corresponding Observer subscribe method runs on that thread.

You might wonder how the SubscribeTask, which does not have the source attribute, gets access to the ObservableSubscribeOn attribute.

We know that in Java, a non-static inner class holds a reference to an external class by default, so it has normal access to the source property of the external ObservableSubscribeOn class.


Next, take a look at the scheduler.scheduleDirect method

@NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); // Run is null final Runnable decoratedRun = rxJavaplugins.onschedule (run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit);return task;
}


Copy the code
  • First, create a Worker W
  • Second, the DisposeTask wraps the decoratedRun
  • Step 3: W to schedule tasks

Here we use NewThreadScheduler as an example to see what the Worker is.

public Worker createWorker() {
    return new NewThreadWorker(threadFactory);
}



public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    
    --- 
}


public static ScheduledExecutorService create(ThreadFactory factory) {
    final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
    if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
        ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
        POOLS.put(e, exec);
    }
    return exec;
}



Copy the code

As you can see from the above, worker encapsulates executor (thread pool) in worker. If you can see from the above, you can understand the principle of Rxjava thread switching.

In the ObservableSubscribeOn subscribeActual method, SubscribeTask wraps parent SubscribeTask implements Runnable interface and calls source.subscribe(parent) in the run method, so the thread executed by the run method will be determined by the worker. This is how the downstream determines the thread of the upstream Observable to execute.

Let’s take a look at DisposeTask

static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
        final Runnable decoratedRun;
        final Worker w;

        Thread runner;

        DisposeTask(Runnable decoratedRun, Worker w) {
            this.decoratedRun = decoratedRun;
            this.w = w;
        }

        @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }

        @Override
        public void dispose() {
            if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
                ((NewThreadWorker)w).shutdown();
            } else {
                w.dispose();
            }
        }

        @Override
        public boolean isDisposed() {
            return w.isDisposed();
        }

        @Override
        public Runnable getWrappedRunnable() {
            returnthis.decoratedRun; }}}Copy the code
// Set the new Disposable to parent to cancel the subscription, // The Disposable of the original parent cannot represent the latest Disposable. Parent. SetDisposable (Scheduler.scheduleDirect (new SubscribeTask(parent)))Copy the code

DisposeTask implements the Disposable Runnable, SchedulerRunnableIntrospection interface, the Disposable interface is mainly used to unsubscribe relations the Disposable.


Observable#subscribeOn(Scheduler) first valid principle

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.i(TAG, "subscribe: getName=" +Thread.currentThread().getName());
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3"); emitter.onComplete(); SubscribeOn (schedulers.io ()).subscribe(new) Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.e("TAG"."onSubscribe(): ");
            }

            @Override
            public void onNext(String s) {
                Log.e("TAG"."onNext(): " + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.e("TAG"."onComplete(): "); }});Copy the code

subscribe: getName=RxCachedThreadScheduler-1

If the order of subscribeOn described above is replaced

subscribeOn(Schedulers.computation()).subscribeOn(Schedulers.io())
Copy the code

So it will print out

subscribe: getName=RxComputationThreadPool-1

Why is the first Observable#subscribeOn(Scheduler) valid?

Observable#subscribe(Observer) is an Observable#subscribe(Observer) operation that is placed on a specified thread. When subcribe is called, the process is bottom-up. The Observanle is called by the Observanle below.

So for our first example above, his call flow looks like this: The third Observable invokes Observable#subscribe(Observer) to initiate the subscription, and internally activates the Observable#subscribe(Observer) method of the second Observable. However, a Schedulers.computation() thread is wrapped around the method

The subscription process is then run on the thread. The pseudo-code is shown below

Public class Observable {// The "second" Observablesource;
    Observer observer;

    public Observable(Observable source, Observer observer) {
        this.source = source;
        this.observer = observer;
    }

    public void subscribe(Observer Observer) {
        new Thread("computation") {
            @Override
            public void run() {// The second Observable subscribes source.subscribe(observer); }}}}Copy the code

Further up, the second Observable subscription internally activates the Observable#subscribe(Observer) method of the first Observable, again wrapped in schedulers.io () thread and demonstrated in pseudocode

Public class Observable {// First Observable Observablesource;
    Observer observer;

    public Observable(Observable source, Observer observer) {
        this.source = source;
        this.observer = observer;
    }

    public void subscribe(Observer Observer) {
        new Thread("io") {
            @Override
            public void run() {// First Observable source.subscribe(observer); }}}}Copy the code

After reaching the first Observable, it starts emitting events, and the thread of execution is obviously the IO thread. You can also use Thread pseudocode instead.

new Thread("computation") {
    @Override
    public void run() {// Second observable. subscribe(Observer) {// New Thread() {// New Thread();"io") {
            @Override
            public void run() {// The first observable. subscribe(Observer) is a system.out.println ()"OnNext (T)/onError(Throwable)/onComplete() is executed by: + Thread
                                   .currentThread().getName());
            }
        } .start();
    }
} .start();
Copy the code

conclusion

The flow chart is described as follows:


Reference blog:

Friendly RxJava2.x source code parsing (two) thread switching

Our next article will explain to observeOn (AndroidSchedulers. MainThread ()).

Scan, welcome to follow my public account. If you have good articles, you are also welcome to contribute.