The source code parsing

create Observable

Observable
    .fromArray(1.2.3.4)
 	.subscribe (object:Observer<Int> {override fun onSubscribe(d: Disposable){}override fun onNext(t: Int){}override fun onError(e: Throwable){}override fun onComplete(a){}})Copy the code

First look at Observable.fromArray(1, 2, 3, 4) and enter the Observable class

class Observable{
    public static <T> Observable<T> fromArray(T... items) {
        / / found empty
        ObjectHelper.requireNonNull(items, "items is null");
        // Obviously, with some optimizations, zero elements and one element is much simpler.
        if (items.length == 0) {
            return empty();
        } else
        if (items.length == 1) {
            return just(items[0]);
        }
        //1.RxJavaPlugins.onAssembly() 
        //Call the associated hook function.
        // Well, you can associate some external logs and monitor things. That doesn't affect the main flow.
        // RxJavaPlugins are omitted for the rest of the analysis.
        //2. ObservableFromArray is created, so this is the next core.
        return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }
    
    @Override
    public final void subscribe(Observer<? superT> observer) { subscribeActual(observer); }}Copy the code

Look at ObservableFromArray code, plain and simple! ObservableFromArray inherits Observables. The subclass’s subscribeActual() is called by Observable.subscribe(). A lot of the logic actually happens here. The analytical perspective is often here as well.

Fusionmode-related codes have been deleted. We cannot follow the logic here, which will be analyzed in the following chapters.

class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    public ObservableFromArray(T[] array) { this.array = array; }
   
    @Override
    public void subscribeActual(Observer<? super T> s) {
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
        s.onSubscribe(d);
        d.run();
    }

    static final class FromArrayDisposable<T> implements Disposable {
        final Observer<? super T> actual;
        final T[] array;
        volatile boolean disposed;

        FromArrayDisposable(Observer<? super T> actual, T[] array) {
            this.actual = actual;
            this.array = array;
        }
        void run(a) {
            T[] a = array;
            int n = a.length;
            for (int i = 0; i < n && ! isDisposed(); i++) { T value = a[i];if (value == null) {
                    actual.onError(new NullPointerException("The " + i + "th element is null"));
                    return;
                }
                actual.onNext(value);
            }
             if(! isDisposed()) actual.onComplete(); }@Override
        public void dispose(a) {
            disposed = true;
        }

        @Override
        public boolean isDisposed(a) {
            returndisposed; }}}Copy the code

With regard to Disposable, RxJava provides us with a switch that can be used to unsubscribe. This is passed around the code like a tracker, and it becomes unrecognizable as you read the source code. So let’s look at the forward function.

map

Observable.fromArray(1.2.3.4)
    .map { it * 5 }
 	.subscribe { println(it) }
Copy the code

From here, the code seems to get complicated. Because RxJava has multiple operators, reuse logic. There’s a lot of abstraction and encapsulation. When ObservableMap. SubscribeActual (), the source. The subscribe (MapObserver (yourObserver)), MapObserver is upstream of the Observer. We also passed mapper function, IT * 5, to MapObserver. Call order: –>FromArrayDisposable. Run () –> mapObserver.onNext (value) –> YourobServer.onNext (mapper.apply(value))

abstract class AbstractObservableWithUpstream<T.U> extends Observable<U> {
    protected final ObservableSource<T> source;
}

public final class ObservableMap<T.U> extends AbstractObservableWithUpstream<T.U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T.U> extends BasicFuseableObserver<T.U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) { U v = mapper.apply(t); actual.onNext(v); }}}Copy the code

BasicFuseableObserver removes the FusionMode part, which is still simple. MapObserver(i.e., BasicFuseableObserver), contains the upstream Disposable and the downstream Observer.

/**
 * Base class for a fuseable intermediate observer.
 * @param <T> the upstream value type
 * @param <R> the downstream value type
 */
public abstract class BasicFuseableObserver<T.R> implements Observer<T>, QueueDisposable<R> {

    /** The downstream subscriber. */
    protected final Observer<? super R> actual;

    /** The upstream subscription. */
    protected Disposable s;

    public BasicFuseableObserver(Observer<? super R> actual) {
        this.actual = actual;
    }

    @Override
    public final void onSubscribe(Disposable s) {
            this.s = s;
            actual.onSubscribe(this); }}@Override
    public void onError(Throwable t) { actual.onError(t); }

    @Override
    public void onComplete(a) { actual.onComplete(); }@Override
    public void dispose(a) {s.dispose(); }@Override
    public boolean isDisposed(a) {returns.isDisposed(); }}Copy the code

You can see that map’s own MapObserver subscribes to the upstream Observable.

subscribeOn

Recall the code mentioned in the previous section

println("in main:${Thread.currentThread()}")
Observable.create<Int> {
        println("in create:${Thread.currentThread()}");
        it.onNext(1) }
	.subscribeOn(Schedulers.newThread())
    .subscribe { println("in next  :${Thread.currentThread()} $it")}// Run the result
in main:Thread[main,5,main]
in create:Thread[RxNewThreadScheduler-1.5,main]
in next  :Thread[RxNewThreadScheduler-1.5,main] 5
Copy the code

Schedulers.newthread () creates the NewThreadScheduler. Scheduler content is not the focus of this section. NewThreadScheduler. ScheduleDirect (Runnable) ultimately calls the ExecutorService. Submit (Runnable). The runnable is thrown into the thread pool for execution. The runnable here is SubscribeTask. Execute source.subscribe(SubscribeOnObserver) in the new thread. If there is no related thread switch operation upstream. The entire execution is switched from the main thread to the new thread. The entire chain process is equivalent to executorService.submit {source. Subscribe (SubscribeOnObserver)}. Look at SubscribeOnObserver, which contains yourObserver.

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T.T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;
        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
        @Override
        public void onSubscribe(Disposable s) { DisposableHelper.setOnce(this.s, s); }
        @Override
        public void onNext(T t) {actual.onNext(t); }@Override
        public void onError(Throwable t) {actual.onError(t); }@Override
        public void onComplete(a) {actual.onComplete();}
    }
    
    final class SubscribeTask implements Runnable {
    	private final SubscribeOnObserver<T> parent;
    	@Override
    	public void run(a) {source.subscribe(parent); }}}Copy the code

�subscibeOn can understand this

observeOn

println("in main:${Thread.currentThread()}")
Observable.create<Int> {
        println("in create:${Thread.currentThread()}");
        it.onNext(1) }
	.observeOn(Schedulers.newThread())
    .subscribe { println("in next :${Thread.currentThread()} $it")}// Run the result
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next  :Thread[RxNewThreadScheduler-1.5,main] 5
Copy the code

Let’s look at the ObservableObserveOn code, which I’ve trimmed a lot from the source code. But regardless of external cancellations or internal exceptions, this is exactly how it works. After deleting the Fusion code, ObservableCreate is still not needed.

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T.T> {
    // The actual running thread is dependent
    final Scheduler scheduler;
    
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        Scheduler.Worker w = scheduler.createWorker();
        //subscribe() is still executed on the source thread, and subscribeOn throws the entire source.subscribe into the new thread.
        Scheduler and yourObserver are passed to the new ObserveOnObserver.
		source.subscribe(new ObserveOnObserver<T>(observer, w));
    }

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        SimpleQueue<T> queue;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker) {
            this.actual = actual;
            this.worker = worker;
        }
        @Override
        public void onSubscribe(Disposable s) {
            SpscLinkedArrayQueue SpscLinkedArrayQueue SpscLinkedArrayQueue SpscLinkedArrayQueue SpscLinkedArrayQueue
            queue = new SpscLinkedArrayQueue<T>(bufferSize);
			actual.onSubscribe(this);
        }

        // Execute run() in the Scheduler thread, that is, drainNormal(). We find that upstream and downstream are not on the same thread.
        // We assume that the rate of production is higher than the rate of consumption.
        @Override
        public void onNext(T t) {
            // Put thread shared queue, production behavior
            queue.offer(t);
            schedule();
        }
        void schedule(a) {
            // When first called, the get value is 0 and +1, which is understood as the number of times to process
            // If the get value is 0, the consumption behavior is triggered, otherwise it is not triggered.
            if (getAndIncrement() == 0) worker.schedule(this);
        }
        @Override
        public void run(a) { drainNormal(); } 

       We need to drain the water from the pipes.
        void drainNormal(a) {
            // Can enter drain with at least one data.
            int missed = 1;
            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;
            // The first time I saw this for-for, I was also confused. This is actually a code model that deals with producer-consumer issues.
            for (;;) {
                for (;;) {
                    //1. Look at the inner loop first, q is SpscLinkedArrayQueue, is not blocking.
                    // Empty the data.
                    T v = q.poll();
					if (v == null)  break;
                   	a.onNext(v);//yourObserver.onNext
                }
                //2. Remember the above assumption that production rate > consumption rate.
                GetAndIncrement ()! =0, cannot enter the consuming thread.
                // Missed records the number of requests to be processed for the current inner loop.
                // Update missed values, current number of requests - number of requests processed in the last round.
                missed = addAndGet(-missed);
                if (missed == 0) break;
                //3. Note that the number of requests is equal to the number of data. But in this model, it's not mandatory.}}... }}Copy the code

For observeOn, the bold thing is that only yourObserver ends up running in a new thread.



This is the first time you see the drain code. Obviously, you can fix this by locking. But here’s oneThe wip skills(Working-in-progress) using CAS(Compare And Set) to solve the problem. In RxJava2, multithreading is usually done in this way.

Scheduler

From above, we know that Scheduler basically throws code scenarios into another thread to run. Scheduler manages threads through a Java thread pool.

The core code

Here is a piece of pseudocode

public abstract class Scheduler {
    
    public abstract Worker createWorker(a);

    public Disposable scheduleDirect(@NonNull Runnable run) {
        createWorker().schedule(run);
    }
    public abstract static class Worker implements Disposable {
        ExecutorService  executor;
        public Disposable schedule(@NonNull Runnable run) {
            Task task = new Task(run);
            executor.submit(task)
        }
    }
    // In RxJava, a Task might be called ScheduledRunnable or ScheduledDirectTask
    public static class Task implements Callable<Void>,Disposable{
        private Runnable run;
    	@Override
    	public Void call(a) {run.run();return null; }@Override
        public void dispose(a) {... }@Override
        public boolean isDisposed(a) {... }}}Copy the code

The actual call looks like this.

The scheduler. The worker. The schedule (Runnable code} {need to throw into the new thread) executor. Submit (Task (Runnable))Copy the code

About the Task

In RxJava, a Task might be called ScheduledRunnable or ScheduledDirectTask, etc. Task wraps our runnable and provides control over runnable. In ObserveOnObserver, each next execution produces a task. In the ObservableSubcribeOn source.subcribe, there is only one task because it is a one-off task. Tasks in each Worker are executed sequentially.

About the Scheduler

Schedulers provide Schedulers for different scenarios:

  • Schedulers.immediate()

By default, no thread is specified

  • Schedulers.newThread():

Executing on a new thread, there really is only one thread. Each worker has a separate thread pool.

  • Schedulers.computation():

The maximum number of threads that apply to CPU computation operations is the number of JVM processors. Pregenerate a fixed number of poolworkers. Different scenarios share poolworkers.

  • Schedulers. IO () :

There is no upper limit to the number of threads that can operate on IO. There is a CachedWorkerPool where the actual worker can be reused.

  • Schedulers.trampoline():

The current thread executes, not immediately, until the previous task completes. The current task is executed in the team.

  • Schedulers.single():

All worker thread pools created by Single are common compared to schedulers.newthread ().

  • AndroidSchedulers.mainThread()

Executed on the Android main thread, internally via Handler, and only by Handler. About RxAndroid.