Let’s start with a little chestnut

Observable.just(1)
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                / / 1
            }

            @Override
            public void onNext(Integer integer) {
                / / 2
            }

            @Override
            public void onError(Throwable e) {
                / / 3
            }

            @Override
            public void onComplete(a) {
                / / 4}});Copy the code

The just method of Observale returns an ObservableJust object

public static <T> Observable<T> just(T item) {
    ObjectHelper.requireNonNull(item, "item is null");
    return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
Copy the code

The onAssembly, onObservableAssembly of RxJavaPlugins is empty by default and simply returns the source we passed in

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if(f ! =null) {
        return apply(f, source);
    }
    return source;
}
Copy the code

When we call the Subscribe method, we call the abstract method that is defined by the subscribeActual, Observable

public final void subscribe(Observer<? super T> observer) {... subscribeActual(observer); . }Copy the code

Let’s look at what ObservableJust implements

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {...@Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = newScalarDisposable<T>(observer, value); observer.onSubscribe(sd); sd.run(); }... }Copy the code

The subscribeActual method creates a ScalarDisposable object, from which we can see that onSubscribe is executed first.

public static final class ScalarDisposable<T>
extends AtomicInteger
implements QueueDisposable<T>, Runnable {...@Override
    public void run(a) {
        if (get() == START && compareAndSet(START, ON_NEXT)) {
            observer.onNext(value);
            if(get() == ON_NEXT) { lazySet(ON_COMPLETE); observer.onComplete(); }}}... }Copy the code

The run method calls onNext first, then onComplete, onError is not executed, the execution is finished.

From this little chestnut, there is no thread switching, and the code is executed synchronously.