The introduction

‘Read The Fucking Source Code’ is a well-known meme in programmer circles. Everyone knows that reading source code is boring, but it is painful to have to do it. My goal with this series is to make reading the source code enjoyable. In the first chapter, I decided to start with rxjava2 source code reading. Because the framework is used daily and is often asked in interviews, it has become an Essential skill for Android. But it’s not enough to know how to use it. It’s not enough to be confused when an interviewer asks you how it works. So there is the first volume of RTFSC, RXJavA2 source code reading. I will try to read the source code this boring thing, to everyone said a bit more interesting, popular. Rxjava2 source code analysis (a) basic process analysis RXJAVA2 source code analysis (two) thread switching analysis RXJAVA2 source code analysis (three) thread pool principle analysis

Start with the basics

First, write a basic usage of rxJava2. Let’s use this simple example to see what the entire rxjava2 process looks like.

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onNext("4");
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG,"onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG,"s = "+ s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {}});Copy the code

This top part, it looks too long, but we can simplify it.

Observable.create(ObservableOnSubscribe).subscribe(Observer);
Copy the code

Without further ado, let’s get to the point:

  • 1. You can see that there are three classes with very similar names:Observable.ObservableOnSubscribe.Observer. That’s what we call the observed, the observer.
  • 2. Let’s visualize it a little better.ObservableWe call them decorators,ObservableOnSubscribeWe also call it the emission source,ObserverWe call it the processor. Why so called, we can look at the source side of the side.
  • 3. We can visualize the above content as: decoratorObservableThrough acreateMethods and asubscribeMethod to connect the transmitter to the processor.

Let’s look at how this connection is implemented in the source code.

A decoratorObservable

Start with Observable Create.

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source."source is null"); // Void effectreturn RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
Copy the code

To highlight:

  • 1.createMethod, you need to pass in an emission sourceObservableOnSubscribe<T>Object that returns oneObservable<T>Object.
  • 2. Ignore null code,onAssemblyWe’ll leave the method aside for now, just to know that it returns the passed parameter. thecreateThe method is to return oneObservableCreateObject.

So let’s look at the ObservableCreate class.

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source; }... }Copy the code

To highlight:

  • 1.ObservableCreateThis class inherits fromObservable.
  • 2.ObservableCreateIn the construction method, the emission source in the parameters is directlyObservableOnSubscribeStored locally as source.

OK, I’m done with the create method. Simple, one-sentence summary, creates a decorator object that stores the emitter locally for backup. (Do you feel like watching Wang Gang cook?)

Why do we call an Observable an decorator? Because RXJava uses the decorator pattern here, Observable is the base class in the decorator pattern. The decorator mode is not obvious here, but will be at the back.

Source ObservableOnSubscribe

The ObservableOnSubscribe parameter is passed to the create method.

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
Copy the code

To highlight:

  • 1. SourceObservableOnSubscribeIs an interface that we’ll override when we use itsubscribeMethods.
  • 2. We’ll be theresubscribeMethod defines the sequence of events to be performed next, so we callObservableOnSubscribeIs the event emission source.
  • 3.subscribeThe method has an argument that is the emitterObservableEmitter(More on that later).

Subscribe (Connect)

Now, the next step: Subscribe. Observable create returns an ObservableCreate object. The ObservableCreate subscribe method is not rewritten, so let’s just look at the Subscribe method in an Observable.

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null"); Try {/ / to empty and hock mechanism, ignored the observer. = RxJavaPlugins onSubscribe (this, the observer). ObjectHelper.requireNonNull(observer,"The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); // focus on the subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }}Copy the code

Let’s cut through the unimportant code and get to the point. After simplifying the key code, it can become:

public final void subscribe(Observer<? super T> observer) {
    observer = RxJavaPlugins.onSubscribe(this, observer);
    subscribeActual(observer);
}
Copy the code

RxJavaPlugins are also left aside for the moment, as is the case with the onAssembly above, and all we need to know is that this is a return to the passed observer. Then only the subscribeActual(Observer) key code is left. SubscribeActual in Observable is an abstract method that is implemented in subclasses.

In fact, we can see here that this is a decorator pattern.ObservableIs the base class for the decorator pattern, and its subclasses do virtually all the work. So we call them decorators. Don’t justcreateMethod, some other operators, for examplemap.flatMapSame thing. You’ll get a better sense of this later when we talk about operators and thread switching.

So when we analyze the Subscribe method of an Observable, we simply look at the subscribeActual(Observer) in the subclass.

@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code

To highlight:

  • 1. Create oneCreateEmitterobjectparentAnd then invokes the processorobservertheonSubscribeMethod holds it.
  • 2. Call againsource.subscribe(parent)Pass it in tosourceIn the middle. thissourceIt’s the backup source that we talked about earlierObservableOnSubscribe, in which thesubscribeThe method just needs a transmitterCreateEmitter.

The entire subscription line is clear:

  • 1.ObservablecallcreateMethod, parameter is an emission sourceObservableOnSubscribe(We are concerned about itsubscribeMethod to override) to generate aObservableCreateObject.
  • 2.ObservableCreatecallsubscribeMethod, and the argument is a processorObserver.
  • In 3.subscribeIn the method we takeObserverAn emitter is generated for the parameterCreateEmitter, and calls the emitter with the emitter as an argumentObservableOnSubscribethesubscribeMethods.

What is this CreateEmitter? Let’s take a look at its source code.

The emitterCreateEmitter

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if(! isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) {if(! tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) {if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if(! isDisposed()) { try { observer.onError(t); } finally { dispose(); }return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if(! isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } @Override public voiddispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            returnDisposableHelper.isDisposed(get()); }... }Copy the code

To highlight:

  • 1.CreateEmitterisObservableCreateClass, inherited fromAtomicReference<Disposable>.ObservableEmitter<T>, DisposableWe call it a transmitter.
  • 2. We learn fromonNextAs you can see from the method, the emitter is directly connected to an external processor.
  • 3. The emitter inherits fromDisposableDisposed() interface, which has only two methods, Dispose () and isDisposed(), to cut off the firing process.
  • 4. On topsubscribeActualAnd what we see in the method is,ObserverHave a callonSubscribeMethod holds thisCreateEmitterEmitter object. So we can pass through the processordispose() The interface interrupts the transmitting process at any time.
  • 5. Also, we can see in the code,onErrorandonCompleteThe two are mutually exclusive. Only one will be executed, because if one of them is executed, it will immediately cut off the launch process.

conclusion

To summarize some of the classes that have emerged:

  • Observable-> Base class for the decorator pattern, which we call decorators. There is acreateMethod, parameter is oneObservableOnSubscribeThe source will return oneObservableCreateObject.
  • ObservableCreate-> Decorator implementation class. There is asubscribeMethod, the argument isObserverThe processor. insubscribeMethod inside, we takeObserverOne is generated for the parameterCreateEmitterEmitter, and with this emitter as an argument, calls the emitter’ssubscribeMethods.
  • ObservableOnSubscribe-> The transmitter itself is just an interface, we rewrote itsubscribeMethod, which defines the event to be processed next, is called the emitter.
  • CreateEmitter -> emitter, the constructor contains a processor. The processor holds this emitter object and can interrupt the firing process at any time. In the emitteronErrorandonCompleteBoth are mutually exclusive and only one will be executed.
  • Observer-> Processor. Used to process data sent by the transmitter.

To summarize the whole operation process is as follows:

  • 1.ObservablecallcreateMethod, parameter is an emission sourceObservableOnSubscribe(We are concerned about itsubscribeMethod to override) to generate aObservableCreateObject.
  • 2.ObservableCreatecallsubscribeMethod, and the argument is a processorObserver.
  • In 3.subscribeIn the method we takeObserverOne is generated for the parameterCreateEmitterEmitter, and calls the emitter with this emitter as an argumentObservableOnSubscribethesubscribeMethods.
  • 4. The sourceObservableOnSubscribethesubscribeMethod defines the event we want to process and passes the result to the emitterCreateEmitter.CreateEmitterFirst determine whether the event stream is disconnected, continuously open to pass the result to the processorObserver.
  • 5. The processorObserverProcessing results.

expand

This time we’ll look back at what we throw away in front of the, RxJavaPlugins. OnAssembly and RxJavaPlugins onSubscribe. Let’s go straight to the source code.

    /**
     * Calls the associated hook function.
     * @param <T> the value type
     * @param source the hook's input value * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) @NonNull public static 
      
        Observable
       
         onAssembly(@NonNull Observable
        
          source) { Function
          f = onObservableAssembly; if (f ! = null) { return apply(f, source); } return source; }
        
       
      Copy the code

Method description: Calls the associated hook function. This is equivalent to using Java reflection mechanism to wrap source interception. Rxjava provides us with a hook injection method that we can use to call the interceptor function we set up before calling source. All we need to know now is that we have this thing, and we need to use this thing later.

The last

This article is mainly about the basic use of RXJava source process, the next article I said that the thread switch.

Updated weekly, stay tuned ~