Reprint please indicate the address QuincySx: [www.jianshu.com/p/a9ebf730c]…


What will you gain from reading this article

  • The basic running flow of RxJava2 (not detailed)
  • RxJava2 thread switching principle
  • Why only the first switchover is valid for subscribeOn()
  • RxAndroid simple analysis

IO/POST /560e15… RxJava for Android developers

Then post the sample code analyzed in this article

CompositeDisposable comDisposable = new CompositeDisposable();

protected void test() {
        Observable<String> observable = Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws
                            Exception {
                        emitter.onNext("hello");
                    }
                })
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        return s;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());

        observable.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                comDisposable.add(d);
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {}}); }Copy the code

RxJava2 basic running flow

According to the source code analysis of the flow chart, where the same color on behalf of the same object. According to the flow chart to see the basic process of the source code can be explained

RxJava2 thread switching principle

How to switch RxJava threads I will not go into the details of this article please refer to my other article Android: Essay – RxJava thread switching

Thread switching principle of observeOn(

ObservableObserveOn () executes an ObservableObserveOn object, and then runs the SUBSCRIBE () method when ObservableObserveOn binds listeners

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); // call subscribeActual() subscribe to observer; } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { ... }}Copy the code

Let’s look at the subscribeActual() method

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else{// Scheduler is the thread scheduling object passed in, Such as Schedulers. IO (), AndroidSchedulers. MainThread (), etc., here called the createWorker () method just temporarily RxAndroid will illustrate the Scheduler for later analysis. The Worker w = scheduler.createWorker(); Source. subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}Copy the code

ObservableObserveOn is monitored by the ObserveOnObserver, so the ObserveOnObserver responds to the notification. Next we assume that the onNext() method of ObserveOnObserver is called when Rxjava sends an onNext notification (PS: of course the same logic applies to onComplete(), onError(), etc.), Then let’s look at the onNext() method of ObserveOnObserver,

@Override
public void onNext(T t) {
    if (done) {
        return;
    }
    if (sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); } // Switch thread schedule(); } voidschedule() {
    if(getAndIncrement() == 0) {// Call worker.schedule(this); }}Copy the code

Now I will post schedule(Runnable run)

public Disposable schedule(@NonNull Runnable run) {
    return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
Copy the code
  1. We see that the argument it receives is a Runnable. What’s going on here? If we look at the ObserveOnObserver object, it implements not only the Observer interface but also the Runnable interface
  2. Call the schedule(Runnable Action, Long delayTime, TimeUnit Unit) method, but this method is abstract. Let’s assume that the worker here is an IO thread. So I just posted the code for IoScheduler
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
     if (tasks.isDisposed()) {
         // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); }Copy the code

Then paste the scheduleActual method

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); Sr = new ScheduledRunnable(decoratedRun, parent);if(parent ! = null) {if(! parent.add(sr)) {returnsr; } } Future<? > f; Try {// determine the latency, then run Runnable using the thread poolif (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if(parent ! = null) { parent.remove(sr); } RxJavaPlugins.onError(ex); }return sr;
}
Copy the code

This will run the Run method of ObserveOnObserver in the appropriate thread

public void run() {// Call onNext() in this method, and the listener thread after the observeOn() operator changesif (outputFused) {
        drainFused();
    } else{ drainNormal(); }}Copy the code
Ii. Thread switching principle of subscribeOn()

PS: This switching principle is actually very similar to observeOn()

It’s the same as observeOn(), except that the object is ObservableSubscribeOn, which uses the same code logic, running the subscribe() method and then calling the subscribeActual() method, So you paste the subscribeActual() code directly

public void subscribeActual(final Observer<? Final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); //1. Create a Runnable //2. Call scheduler.scheduleDirect parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }Copy the code

Let’s look at the scheduleDirect method

public Disposable scheduleDirect(@NonNull Runnable run) {
    returnscheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); // DisposeTask task = new DisposeTask(decoratedRun, w); Runnable (task, delay, unit); Runnable (task, delay, unit);return task;
}
Copy the code

Let’s look at run() for DisposeTask

public void run() { runner = Thread.currentThread(); try { decoratedRun.run(); } finally { dispose(); runner = null; }}Copy the code

Call back and forth and we’re back to the run() of SubscribeTask

public void run() {
    source.subscribe(parent);
}
Copy the code

The new ObservableOnSubscribe(){}) thread that runs the subscribe anonymously has been changed. The thread environment is also changed to call onNext() and other methods

Why only the first switchover is valid for subscribeOn()

RxJava can only affect the environment in which the Anonymous implementation of the ObservableOnSubscribe interface is executed by subscribeOn(), which is run the last time RxJava subscribes from the bottom up. Therefore, the first subscribeOn() from the top is the last one to run, which results in the phenomenon that it is not messy to write multiple subscribeOn().


Take a look at RxAndroid

In fact, it is the principle and RxJava built-in those thread scheduler, if you want to understand RxJava IO thread pool, what can take a look, I here analysis RxAndroid main have the following reasons

  1. Find out exactly what the RxAndroid library does
  2. Figure out how he can switch threads to the main thread (how he provides the main thread environment)
  3. Find out how the thread scheduler works
  4. Most importantly, it is relatively simple and easy to analyze compared to the schedulers that come with RxJava

The text start

. First of all we look for entry AndroidSchedulers mainThread () should be the place is the entrance, let’s look at AndroidSchedulers source of this class, didn’t also a few lines in total

private static final class MainHolder {
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
        new Callable<Scheduler>() {
            @Override public Scheduler call() throws Exception {
                returnMainHolder.DEFAULT; }}); public static SchedulermainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }
Copy the code

I don’t have to tell you anything about this, but basically what we see here is that RxAndroid uses Handler to get the main thread

Let’s take some of the processes that are subscribeOn()

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);
    return task;
}
Copy the code

First we see that createWorker() is called which is an abstract method and we find the implementation class HandlerScheduler

public Worker createWorker() {
    return new HandlerWorker(handler);
}
Copy the code

Simply create a Worker and pass in the main thread Handler, then call the Worker’s schedule() method

Public Disposable schedule(Runnable run, long delay, TimeUnit unit) {/** Ignore some code **/ run = rxjavaplugins.onSchedule (run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as tokenfor batch disposal of this worker's runnables. handler.sendMessageDelayed(message, unit.toMillis(delay)); if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; }Copy the code

How does RxJava use RxAndroid to switch to the main thread, which is the core of RxAndroid


conclusion

RxJava 2.1.12 and RxAndroid:2.0.2 source code have to say that the status of the Handler in Android is really very cool insights do not welcome everyone to point out