preface

RxJava version from the release to now, has gone through a number of versions, although the source code is constantly modified, but I do not know if you found that the main framework of RxJava is still not changed, why? It’s the RxJava architecture that defines its features, such as the simplicity of its code logic and the ability of its operators to extend, that have been enhanced, not diminished, over the course of many iterations of RxJava. This feature is responsive programming. And then, Let’s talk about why RxJava has this feature, and what is the nature that makes it unchangeable!

This article mainly explains the RxJava architecture idea, will not involve a lot of source code analysis, please rest assured to eat, the article length is long, suggested collection, slowly taste!

Implementation ‘IO. Reactivex. Rxjava2: rxjava: 2.1.4’ implementation ‘IO. Reactivex. Rxjava2: rxandroid: 2.0.2’

1. What does RxJava not change?

I do not know if you will have such anxiety, the framework of the update sometimes let people sigh!

Although the version of RxJava update is not so frequent, but every update, always let a person feel, just before the source code, is not wasted 😭, source code update, I have to continue to learn;

But I don’t know if you have this thought, the framework has been updated, there is no constant things at the bottom, today I am happy to tell you, RxJava framework has, so what is this constant thing?

architecture

Yes, RxJava has gone through several iterations, but the underlying architecture has not changed much. Why? Because of its nature, its architecture will not change much;

What are its characteristics? Simple code logic, powerful operators, and these characteristics are the idea of responsive programming;

Just like a house, its characteristics have seismic, windproof, then its bottom structure is necessarily according to the characteristics of seismic and windproof to build, and once built successfully, its characteristics, will not follow the house decoration and change, then its bottom structure is the same reason;

So do you want to know what architecture implements this feature?

Now let’s talk about design patterns in RxJava. Why are design patterns so important?

Because design patterns are the foundation of architecture, how we design to make the architecture have certain characteristics, this is inseparable from design patterns;

RxJava observer mode

2.1 Observer mode

The observer pattern is perhaps the most familiar design pattern. Why? Because we use it all the time in our code;

It’s the click event of the View;

Why is the View click event observer mode? Let’s first look at the definition of the observer pattern;

Defines a one-to-many dependency between objects so that all dependent objects are notified when an object’s state changes.

Simply put, there is a one-to-many relationship between the observed and the observer. One observed can be dependent on by multiple observers. When the observed changes, the observer will be notified.

The View click event is just a simple one-to-one relationship, but it can be a one-to-many relationship;

The essence of the observer mode is that when the observed changes, the observer will be notified. When the View is clicked, the observer will be notified through the callback listener. When there is a subscription relationship between the observer and the observed, the observer will be notified.

The click event of the View is subscribed by setting the listener method, which is the most familiar observer pattern;

2.2 what is the observer pattern for RxJava?

The Observer pattern of RxJava is the observer pattern of extensions. Why is it called extensions? Because it is not quite the same as the normal observer mode, the extended observer mode has several methods to notify the observer, so let’s see how it works.

Let’s first look at the classes involved in the RxJava Observer pattern:

  • Observable: an Observable
  • -Blair: I’m an Observer.
  • Event: Indicates the Event notified to the observer by the observer
  • Subscribe: to Subscribe to

After looking at the figure below, you already have a clear idea of the observer pattern in RxJava;

The Event type is used by the observer to notify the observer. The Event type can be divided into the following types.

  • Next: regular events, which can pass all kinds of data;
  • Complete: The end event. When the observer receives the end event, it will not receive any subsequent events sent by the observer.
  • Error: Exception event, when the observer sends an exception event, then other events will not be sent;

Let me use example code to illustrate RxJava’s observer pattern once;

First define an Observer Observer:

public abstract class Observer<T> {

    // This method is called after the observer has subscribed to it;
    public abstract void onSubscribe(Emitter emitter);

    // Pass regular events for passing data
    public abstract void onNext(T t);

    // Pass an exception event
    public abstract void onError(Throwable e);

    // Pass the end event
    public abstract void onComplete(a);
}
Copy the code

The methods of the Observer class are very simple, they’re all callbacks, and here’s a new interface class Called Emitter. What does this Emitter do?

It’s called an Emitter, and it’s called an Emitter.

public interface Emitter<T> {

    void onNext(T value);

    void onError(Throwable error);

    void onComplete(a);
}
Copy the code

The implementation logic is to see what methods the class has by wrapping an Observer that will eventually be called through the Observer;

public class CreateEmitter<T> implements Emitter<T> {

    final Observer<T> observer;

    CreateEmitter(Observer<T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        observer.onNext(t);
    }

    @Override
    public void onError(Throwable error) {
        observer.onError(error);
    }

    @Override
    public void onComplete(a) { observer.onComplete(); }}Copy the code

The implementation is called through an Observer object;

Let’s take a look at how the Observable is implemented;

public abstract class Observable<T> {
    
    // Implement the subscription logic
    public void subscribe(Observer<T> observer){
    	// This is done by wrapping the incoming observer as a CreateEmitter for callbacks
        CreateEmitter<T> emitter = new CreateEmitter<T>(observer);
        // Call back the method that subscribed successfully
        observer.onSubscribe(emitter);
		
        // Callback emitter emitter
        subscribe(emitter);
    }

	// When the subscription is successful, call back
    public abstract void subscribe(Emitter<T> emitter);

}
Copy the code

The logic of this class is very simple. The first step is to subscribe. The second step is to call back an Emitter object, which is used to emit events.

So let’s see how to call it;

private void observer(a) {
    // First, create the observed
    Observable<String> observable = new Observable<String>() {
            @Override
            public void subscribe(Emitter<String> emitter) {
                emitter.onNext("The first time.");

                emitter.onNext("The second time");

                emitter.onNext("Third time."); emitter.onComplete(); }};// The second step is to create the observer
    Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Emitter emitter) {
                Log.i("TAG"." onSubscribe ");
            }

            @Override
            public void onNext(String s) {
                Log.i("TAG"." onNext s:" + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i("TAG"." onError e:" + e.toString());
            }

            @Override
            public void onComplete(a) {
                Log.i("TAG"." onComplete "); }};// Step 3: the observer subscribs to the observer
    observable.subscribe(observer);
}
Copy the code

Here is the use of logic very simple, divided into three steps:

  • Step 1: Create an Observable;
  • Step 2: Create an Observer;
  • Step 3: An Observable subscribes to an Observer;

When the subscription is successful, the observer’s subscribe method can transmit various events through the transmitter, and finally in the observer’s method callback;

RxJava is also a subscription process for both the observer and the observed. However, when the observed changes, various events are emitted through the emitter, so that it is not limited to one event.

3. RxJava decorator mode

3.1 decorator mode

What is decorator pattern?

To understand this model is actually not difficult, we can see from the two words “decoration”, this model is used for decoration, as to how to decorate, and listen to me carefully;

For example, I now have a mobile phone, how can I not change the original structure of this mobile phone, and let it have the function of fall resistance, of course, you can also say that my mobile phone is Nokia, from several floors to drop, the mobile phone does not take knock, but the reality is, we use the mobile phone, is not so fall resistance;

So how can I make it more resistant to falls?

I believe the answer is clear to you, that is, the cover of the phone case, film, and these two actions, without changing the original structure of the phone, so that it has the function of anti-fall, and this process can be called decoration, and the principle of decorator mode is the same;

Adding additional functions without changing its original structure is the packaging of its original structure, and this process is called decoration;

So how does that show up in the code?

Similarly, if we want to add new functionality to a class without changing its logic, we can use decorator mode to encapsulate it. Let’s see how to do that.

Using the example above, define an Appearance interface Appearance. There is an abstract method, structure;

public interface Appearance {

    void structure(a);
}
Copy the code

Then define a mobile Phone to implement this interface. The structure of the mobile Phone has glass back cover, metal frame and other properties, as follows:

public class Phone implements Appearance {

   @Override
   public void structure(a) {
       // Phone properties: glass back cover, metal frame
       Log.i("TAG"."Properties of the phone: Glass back, metal frame."); }}Copy the code

Okay, so what we’re going to do is we’re going to make this phone stronger, but we’re not going to change the structure of the phone, so what are we going to do?

If I can’t change the structure of the phone, I can wrap the phone with decorations. I define a PhoneDecorator class to wrap the phone. I call it a PhoneDecorator, and I implement the Appearance interface. The structure method of the appearance class is called to ensure its original function.

In short, the purpose of this class is to implement the function of the original class;

public abstract class PhoneDecorator implements Appearance {
   
   protected Appearance appearance;

   public PhoneDecorator(Appearance appearance) {
       this.appearance = appearance;
   }

   @Override
   public void structure(a) { appearance.structure(); }}Copy the code

Then the next is the specific implementation of the packaging class, the definition of a mobile PhoneShell function PhoneShell class, function implementation is on the basis of the original function, give the mobile phone cover on the mobile PhoneShell, let’s see the specific implementation;

public class PhoneShell extends PhoneDecorator{

   public PhoneShell(Appearance appearance) {
       super(appearance);
   }

   @Override
   public void structure(a) {
       super.structure();

       Log.i("TAG"."Put a case on your phone."); }}Copy the code

The implementation here is very simple, inherits the packing class of mobile phone, and realizes the operation of “covering the mobile phone shell” in the structure;

So the case of the phone shell class, there is still a film class, and the phone shell, we also define a film of the packaging class PhoneCover, look at the concrete implementation;

public class PhoneCover extends PhoneDecorator{

   public PhoneCover(Appearance appearance) {
       super(appearance);
   }

   @Override
   public void structure(a) {
       super.structure();

       Log.i("TAG"."Put toughened film on your phone."); }}Copy the code

So we’re going to do the same thing with the phone case, so we’re going to have both wrapper classes, so let’s see how we can call them;

private void decorator(a) {

       // Create a mobile phone
       Phone phone = new Phone();

       // Put the case on the mobile phone
       PhoneShell phoneShell = new PhoneShell(phone);

       // Put the toughened film on the mobile phone
       PhoneCover phoneCover = new PhoneCover(phoneShell);

       // The final structure of the phone
       phoneCover.structure();

   }
Copy the code

It’s easy to use. Pass the class that needs to be wrapped as a constructor parameter to the wrapper class, so that the class can have the function of wrapping. For example, pass the Phone into the PhoneShell class, so that the Phone has the function of covering the Phone shell.

Similarly, then cover the PhoneShell of the phone PhoneShell class, passed to the film of the class PhoneCover inside, then the phone has the function of the film, and finally call the structure method structure, then you can see that the phone has been covered on the PhoneShell, and paste the film;

The structure after final packaging is as follows:

Have you noticed at this point that the decorator is not using the inherited method to extend the functionality, why?

Because inheritance causes subclasses to swell as functionality increases, the two sides of the decorator pattern can expand at will without coupling each other;

So how is the decorator pattern implemented in RxJava?

Listen to me;

How is RxJava’s decorator pattern implemented?

The decorator for RxJava is primarily a wrapper that implements Observable and Observer Observer. Why wrap it?

As we can see from the above, the decorator mode is an extension of the basic function without modifying its original logic;

So why do RxJava observers need this feature?

Let’s say I want to do something where I want to get data from a child thread and then switch to the main thread to assign the data. Normally, I would do that, first I want to get data from the child thread and then switch to the main thread via the Handler’s POST method.

But what if I want to get the data from the child thread, and then convert it, and then call back to the main thread?

As aspiring engineers, we can’t stand this kind of writing.

So is there a way to be more elegant?

The answer is: yes;

RxJava uses decorator pattern + observer pattern to design the effect of chained calls, so that the code logic is clear, but also easy to maintain;

For example, a chained call logic looks like this:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) {
                // The transmitter emits data
                emitter.onNext("1");
                
                emitter.onNext("2");

                emitter.onNext("3");

                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {}@Override
                    public void onNext(@NonNull Integer s) {}@Override
                    public void onError(@NonNull Throwable e) {}@Override
                    public void onComplete(a) {}});Copy the code

This chain call logic is not very clear!

Let’s look at how the Decorator pattern in RxJava is implemented;

Let’s take the mobile phone model as an example to deconstruct the RxJava decorator pattern step by step;

(1) Observable:

  • Step 1: You want an abstract interface that corresponds to the Appearance interface above. RxJava’s interface is ObservableSource, which has a method, subscribe, for subscribing to observers.
public interface ObservableSource<T> {

    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull Observer<? super T> observer);
}
Copy the code
  • The second step: To have a wrapper class that implements the ObservableSource interface, which corresponds to the PhoneDecorator wrapper class above, RxJava’s wrapper class is Observable, which implements the same interface as PhoneDecorator. And in the subscribe method, the abstract method subscribeActual is called to subscribe to the observer.
public abstract class Observable<T> implements ObservableSource<T> {
	@Override
    public final void subscribe(Observer<? super T> observer) {... subscribeActual(observer); . }protected abstract void subscribeActual(Observer<? super T> observer);
}
Copy the code
  • Step 3: The next step is the concrete wrapper class, and the same wrapper function PhoneShell, PhoneCover and so on, and RxJava wrapper class, very powerful, first look at the diagram;

With over a hundred wrapper classes, you can see the power of RxJava and the extensibility that the decorator pattern gives it.

After looking at the logic of the observed, let’s look at the logic of the decorator of the observer.

(2) Observer:

  • The first step is to have an abstract interface that corresponds to the Appearance interface above. RxJava’s interfaces are Emitter and Observer, and there are several methods in it. Basically, onNext, onError, and onComplete are used to call back the observed.

  • Step 2: Have a wrapper class that implements an Emitter or Observer interface. Observers are special. There is no base wrapper class, but a bunch of wrapper classes.

There are also more than 100 wrapper classes;

So at this point, do you wonder, so many observers and observers of the wrapper class, how to use?

From the example above, we can see that there is a process for wrapping a class, some of which are wrapped at creation time, some of which are wrapped at invocation time.

For example, in the example code above, the first step is to create ObservableCreate objects by creating ObservableCreate objects. The first step is to create ObservableCreate objects by creating ObservableCreate objects.

When the subscribeOn method of the second step is called, the subscribeOn method of the second step is wrapped at the second layer, and the structure is as follows:

The observeOn method in step 3 is called with the fourth layer of wrapping, so the structure looks like this:

When the subscription method is finally called, it has been wrapped four times, so you can understand that each time the operator is invoked, there is a layer of wrapped by the observer;

What are the benefits of this packaging?

As we mentioned earlier, decorator mode is designed to add additional functionality without changing the original;

This has been wrapped up several times to add additional functionality, so take a look at the additional functionality added at each layer.

3.3. The subscribe method of the observer

When we finally call the subscribe method, we’re going to call it from the outermost wrapper class, step by step;

We know that the wrapper for the observer is implemented in the subscribeActual method, so let’s take a look at the subscribeActual method logic of these wrapper classes;

Take a look at the outermost wrapper and get a glimpse of the subscribeActual logic:

The source here is an instance of the wrapper class from the previous layer, namely ObservableSubscribeOn;

There’s a wrapper around the observer, which is the ObserveOnObserver, which is the wrapper around the ObserveOnObserver, and it implements the thread switching logic, which is in onNext;

Why do you do that? Because that’s the benefit of the decorator pattern, the observer of onNext tells the observer what method to call back to, and then there’s an extra thread switch in there, by wrapping the class, that’s going to switch to the main thread;

Now, the structure of the observer looks like this:

Let’s look at the logic for the subscribeActual method of the penultimate wrapper class, ObservableSubscribeOn;

The subscribeActual method that wraps this class, and you SubscribeOnObserver class, wraps another layer of the observer, and what does this wrapper do?

There’s some thread release, which we’ll talk about next;

After wrapping, the structure of the observer looks like this:

Going back to the implementation logic of the observed, we call the thread of execution method, scheduleDirect. If you pass in a thread Scheduler, the SubscribeTask will be executed on the child thread, which is what we pass in here.

In the SubscribeTask, the subscribe method is called. This source is the wrapper class of the last layer, namely ObservableCreate. Then the subscribeActual method of the ObservableCreate is executed in the child thread.

Let’s look at the subscribeActual method of the ObservableCreate wrapper class;

This is wrapping the observer, also called CreateEmitter, so what extra functionality does this observer wrapper class implement?

The main implementation of this thread is to determine whether the thread is released, if it is released, then the observer no longer callback;

So in this case, the structure of the observer looks like this:

Then we call the Observer’s onSubscribe method, which will eventually call back to the Observer’s onSubscribe method;

The source is the original ObservableOnSubscribe that we created. This will call back to the subscribe method of ObservableOnSubscribe;

At this point, our wrapper class ObservableSubscribeOn above, after switching to the child thread, then our subscribe method of ObservableOnSubscribe also executes on the child thread;

3.4. How do the events notified to the observer by the observer flow?

Then we call the transmitter in the subscribe method of ObservableOnSubscribe, emitting the string, so what is the call logic for this time?

From the above observer structure, when the emitter sends an event, it calls back to the corresponding observer wrapper class, starting with the outermost layer.

We know that ObservableEmitter is a wrapper around CreateEmitter for observers, so this onNext will take the onNext method of CreateEmitter, and we know that this method just makes the judgment, The end result is to call back to the onNext method of the wrapper class at the previous level;

The wrapper class that goes to the next level is SubscribeOnObserver, and the onNext of this method does nothing to observers;

The wrapper class at the next level up is ObserveOnObserver. The onNext method of this class performs the switch to the main thread.

So when you finally call back here, you’re executing on the main thread;

4. Summary:

When we create the observer, we wrap the observer as many times as we create it, and then when the subscribe method is called by the observer, we call the subscribeActual method of the observer one by one, and in the subscribeActual method of the observer, They put a wrapper around the observer;

That is, the observer is wrapped when it is created, and the subscribeActual method implements the additional functionality;

The observer is wrapped in the subscribeActual method called by the observer, and then implements its own additional functions for the observer.

Let’s take a look at the flowchart:

So here, RxJava underlying architecture is not clear, summed up as the observer pattern + decorator pattern;

Wrap the observer and observed through the decorator pattern, and then implement additional functionality inside the wrapper class;

So the final architecture is as follows:

  • Step 1: When creating the observed, or when using the operator, the observed is wrapped:

  • Step 2: The observer subscribes to the observer. In this case, the subscribeActual method of the wrapper class of the observer is called one layer at a time and the observer is wrapped.

In this case, the implementation of the observer function is in the subscribeActual method, while the implementation of the observer is in the wrapper class.

  • Step 3: The difference between the observed and the observer is that the observed implements the corresponding function of the wrapper class after the successful subscription, while the observer implements the corresponding function in the wrapper class of the observer during the event callback.

Final flow chart:

5. Why is RxJava designed this way?

5.1 Concept of transactions

Before we begin, let’s take a look at the concept of “transactions”;

What is a transaction?

Business, usually means something to be done or done. In code we can think of it as a piece of code logic;

The relationship between transactions, we can understand as the relationship between business logic, may be related, may not be related;

I enter a list of pages, for example, the list of pages of data, to request returns from several interface, and the request returns after I will according to the data and network to display the corresponding page, such as a list of pages or no data, no web pages, then I show logic is according to the list of pages returned by the logic to show;

These requests, I’ll just call transaction A, transaction B, transaction C, transaction D;

Transaction A, transaction B and transaction C respectively correspond to the data of the requested network interface, while transaction D is the logic of processing the display according to the returned data.

So it might be, my normal processing logic, in the child thread to deal with these three transaction, A transaction B, transaction C, finally after all finished processing, processing transactions D again, and the disadvantages of writing is I put this A few interface data on A child thread to perform, then the final result is lead to load slow;

So if we can switch to A different kind of writing, the transaction, A transaction B, transaction C respectively in three child thread to execute, and finally in three child thread callback to judge whether the several interface has been loaded, so that we can solve the above problems, but if later there will be new transaction, then eventually lead to the logic of judgment is more and more bloated;

RxJava offers the idea of responsive programming to solve these problems;

5.2 responsive programming

What is responsive programming?

Reactive programming is a programming model that builds transactional relationships through asynchrony and data flow

We can understand that the transaction is driven by the event, such as I successfully requested network data, send the event that the request was successful to notify the next layer of transaction processing;

RxJava provides A series of features, such as transaction transformation, concatenation, assembly and so on to operate on transactions, such as transaction A, transaction B, transaction C, we can be assembled to handle;

So the final RxJava processing logic is as follows:

Transaction A, transaction B, and transaction C are assembled. After processing, an event is sent to notify transaction D for processing.

What are the benefits of RxJava responsive programming?

  • 1. Significantly reduce the coupling between transactions to facilitate later maintenance and expansion;
  • 2. Simplify complex thread operations, let us focus on business development, avoid many concurrent thread problems, such as thread deadlock;
  • 3, provides a rich operator, let us for the transaction processing more convenient;
  • 4. For complex business, we can build clear code logic to facilitate understanding of business logic;

5.3. RxJava Architecture for responsive programming thinking

RxJava pass through the observer pattern to handle at the bottom of the event, through the operation of the decorator pattern to handle affairs, by the two designers model to build the reactive programming ideas, and decorator pattern also ensures its flexible extensibility, such as I later want to add a new operator, only need to implement the corresponding observer and the observed the wrapper classes;

RxJava is more than just an asynchronous framework. It also provides us with the ability to handle transactions and make complex logic clearer and easier to understand through the idea of responsive programming, which makes it easier for us to handle complex business.

This is a very good source code, also very exclamation author wonderful thinking, is worth us to learn;

When we master the core principles of RxJava, then no matter how the source code update, also can not be separated from this architecture idea, when we take this architecture idea to see the source code details, architecture idea is your lighthouse, so that you will not be lost in the vast sea of code 😊;

other

Android you have to learn about HTTP

Android network programming TCP, UDP detailed explanation

Android network programming HTTPS detailed explanation

Android network framework OkHttp source code parsing

Android Network framework Retrofit source analysis

About me

Brother Dei, if my post is helpful to you, please give me a like ️, and follow me on Github and my blog.