RxJava is currently 3.x version, but our company has been using 1.x version; So I’m going to start with RxJava 1.x, look at its source code structure, and understand some concepts and implementation principles in RxJava from the source code. In fact, I can only use RxJava, no system to understand; I hope I can understand his design ideas from the source code;

Rxjava mainly uses the idea of observer mode, so it mainly has two roles of observer and observed, although it is very easy to use, but in fact there are many important roles. First I’ll list the classes or actions that I think are important;

  • Rx.Observable: subscriptable content (observed)

  • Rx.Observer: Observer. Although the Observer and the observed are a pair, Observable and rx.Subscriber are used together in practice.

  • Rx. Observables. OnSubscribe: this is an inner class, is very important in the observables of a class, each observables object must be OnSubscribe this object;

  • Rx. Subscriber: a class that combines Observer and Subscription. An important method, setProducer(), sets an emitter to send data.

  • Rx. Subscription: Maintains subscriptions, which are used to release resources

  • Subscribe (): This is a subscribe action, a trigger where data flow begins;

From the above list, I can find that the words in the list are somewhat similar to the class. At the beginning, I kept switching back and forth among these words, and there were many Action and Function related things in the list, which made me feel dizzy. So let’s start with a simple example:

 Observable observable = Observable.just(1.2.3); // Generate an observer
 Observer observer =  new Observer<Integer>() { // Generate an observer
            @Override
            public void onCompleted(a) {
                Log.d("Observable"."onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d("Observable"."OnError:" + e);
            }

            @Override
            public void onNext(Integer integer) {
                Log.d("Observable"."onNext: "+ integer); }}; Subscription subscription = observable.subscribe(observer);// Generate a subscription
Copy the code

This code contains three types: Observable, Observer, and Subscription. That’s the easiest way to use it. These are three classes that most people should be able to understand. So let’s take a look at the internal structure of these classes, starting with Obserable;

Just (1,2,3) generates the observed

This is the process of creating an Observable. The trace code will find that an OnSubscriber instance needs to be instantiated:

public static <T> Observable<T> from(T[] array) {
    int n = array.length;
    if (n == 0) {
        return empty();
    } else
    if (n == 1) {
        return just(array[0]);
    }
    return create(new OnSubscribeFromArray<T>(array)); // This is where the instance is created
}
Copy the code

So each Observable must have an OnSubscriber with it; Let’s keep track of source discovery;

public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
    final T[] array;
    public OnSubscribeFromArray(T[] array) {
        this.array = array;
    }
    
    @Override
    public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array)); / / look here
    }
Copy the code

The OnSubscribeFromArray will create a Producer and associate it with a child(Subscriber) (if you look at the implementation here, you’ll see that they are two-way associations). Let’s look at the subscriber.setproducer () method;

public void setProducer(Producer p) {
    long toRequest;
    boolean passToSubscriber = false;
    synchronized (this) {
        toRequest = requested;
        producer = p;
        if(subscriber ! =null) {
            // middle operator ... we pass through unless a request has been made
            if (toRequest == NOT_SET) {
                // we pass through to the next producer as nothing has been requested
                passToSubscriber = true; }}}// do after releasing lock
    if (passToSubscriber) {
        subscriber.setProducer(producer);
    } else {
        // we execute the request with whatever has been requested (or Long.MAX_VALUE)
        if (toRequest == NOT_SET) {
            producer.request(Long.MAX_VALUE); // Look here, I am here
        } else{ producer.request(toRequest); }}}Copy the code

If you look closely, there is the producer.request() method, which triggers onNext; I drew a diagram; You can look at it together;

Just – observerable – figure. PNG

The diagram above shows that four key roles are created using observable. just(1,2,3). The one without color on the right is the main base class; In addition to being a Producer, the other three are familiar. In fact, being a Producer is not mandatory when creating an Observable in other ways. As follows:

Observable observable1 = Observable.create(new Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {
			subscriber.onNext(111); }});Copy the code

So what is the use of the Producer? In fact, it is mainly used to control the data flow between Obserable and Subscriber according to the notes. For a more detailed understanding, please refer to the article of Producer, which is quite good.

Trigger the subscription

The previous section explains the four main roles in the generation of observed: Observable, OnSubscriber, Producer, Subscriber; Observable generates an Observable that triggers a subscription as follows:

Subscription subscription = observable.subscribe(observer); // Generate a subscription
Copy the code

If you dig deeper, you’ll see that the main trigger execution is

static <T> Subscription subscribe(Subscriber<? super T> subscriber /* Mark a */,
                                  Observable<T> observable/* Mark two */) {
  // Get rid of useless code
        // new Subscriber so onStart it
        subscriber.onStart(); 
 
        try {
            // Allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
           / /... Exception code removal
            returnSubscriptions.unsubscribed(); }}Copy the code

Tag 1: It was wrapped up in the previous Observer. That was the original Observer that we wrote ourselves; You’ll notice that the Observer we pass is wrapped as Subscriber, which means Observable and Subscriber are a couple.

Tag 2: This is the Observable itself, handed down from above as this.

This hook is used to intercept or decorate onSubscribe and Observable. But this place is not realized, so leave it alone for the time being; Hook type is rx. Plugins. RxJavaObservableExecutionHook this on the name of it, I always thought this class on the back of the process will affect many, in fact, this class is an empty, The purpose of this class is to observe the life cycle of an Observable as it executes. For example, you can write some listening log, or do some manipulation in the corresponding place of the key life cycle, but it is not implemented at present. If you want to use it, you need to implement it by yourself and configure it in system.properties. It has little immediate significance;

From tracking subscribe(), you can find that Observer, Subscriber, Observable, Producer and other objects use decorative mode more often.

After the Subscription is triggered, one product is the Subscription, which can be unsubscribed at any time;

The main class diagram

This article focuses on Observable, OnSubscribe, Observer, Subscriber, Producer, and Subscription. I drew a class diagram; For future reference;

Rxjava – Main class diagram. PNG

feeling

Originally I wanted to summarize and record my reading of the rxJava1.x source code with an article, but I found that there were a lot of things to write, not even one. It is not easy to write, my idea is that each article should not record too many knowledge points, or I would not like to read in the future, so I plan to separate the article and write a few more. In the process of looking at the source code will find, RxJava with a lot of decoration mode, an object layer embedded a layer, there are a lot of Action, Function and other things, not patient enough, not careful enough is easy to be dizzy, which fully shows my ability to see the source code is still very weak. Analysis of the source code for my biggest harvest is, from the source CODE I saw the decoration mode, but also see AtomicXX related things to use, CAS (compareAndSwap) industry called spin lock, this in Android write application is not to touch. When you see a good framework, learn not just the idea, but how it’s coded; I wish myself progress;