preface

From this article, step by step into the RxJava2 source code world, explore the Rx idea. Before that, you need to have a simple Rx, or at least have used it. For necessary proof, no specific case is posted. The article is divided into several parts to ensure clear understanding and clear structure.

An overview of

This article will explain:

  1. A brief understanding of Rx
  2. Basic flow of RxJava2

About the Rx

1. What is Rx

Rx was invented to write asynchronous and event-based programs with observable sequences and LINQ-style query operators (quoted from official documentation). In short, Rx’s programming idea is to process a series of events without breaking the call chain. First, the processing of main process, branch process and exception process are all in the chain, which enables users to focus on the main event process, that is, the main process. The processing of branch process and exception process is also included in the invocation chain. This has the advantage of being able to quickly locate scenarios when exceptions occur, when requirements are more variable, and when logic is complex, especially when asynchronous processing is involved. In both cases, everything on the chain is an observable object, an observable object, can be captured by a designated observer, and then respond, so that each does its own thing, each does its own thing, and works together.

For the above, a graph is shown as follows:

Rx aims to solve the process problem shown in the figure above with a chain of calls, that is, in a linear flow, focusing on the main process, ABCD, to reach the normal exit, and at the same time, including the exception process, the branch process processing, and the exit reached. It should be noted that any node of ABCD may be abnormal and need to directly reach the abnormal outlet, or branch needs to be processed and returned to the normal outlet. In this case, good rules and trade-offs should be formulated for undertaking, cooperation and interruption to ensure the normal operation and non-fracture of the chain. For the most part, Rx does it, and it’s elegant.

2. What are the characteristics of Rx

  • Without prejudice
  • pluggable
  • The logic

First, Rx is unbiased. Rx has no preference for concurrency or asynchrony; for it, all interactions are treated as if they were asynchronous. Interaction is reactive, a process of active notification and reaction. When an asynchronous event arrives, the event can be actively pushed to a destination for processing, rather than being expected by a specific target.

Second, Rx is pluggable. Borrow from the previous picture, ABCD completes their requirements, and then the event continues to be passed on. One day, the branch situation in C does not appear any more. When C is pulled out, the process becomes ABD, and the whole operation is still normal. In other words, further verification is needed between B and C, and G is added. Then, insert G into BC to ensure the normal operation of the whole. The advantage of this pluggable operation is that the location is fast, accurate, and the scene is first.

Third, the logic is clear. Logical clarity is the ability to expose the backbone of a series of events and quickly understand the appearance of the normal process of events from occurrence to completion, while the processing of specific situations at each node and the processing of other situations are stripped to other places. When the scenario changes, locate the target node. It can be seen that the two and three points complement each other.

3. Main characters in Rx

  • Observable observed
  • The Observer observers
  • Emitter launcher
  • I don’t know what Chinese name is good ~!

To the user, these are intimate and the responsibilities are clear. An Observable describes the event that will happen, an Observer declares the event of interest and plans how to respond, and an Emitter determines how the event will happen. Disposable is used to break the relationship between Observable and Observer so that no response is generated.

4, flow

  • The data flow
  • Flow of events

In the Rx mechanism, there are two kinds of flow — the flow of events and the flow of data.

A data flow describes the flow of data from upstream to downstream (simply think of An Observable as upstream and an Observer as downstream for better understanding). During this process, data will be processed in various ways, as shown in the figure below:

Event flow refers to the additional events that occur in the process of data flow, such as thread switching, combining, filtering, etc. (Of course, the events with data as the protagonist can also be regarded as data flow, and the two have intersection without conflict). The diagram below:

(The above two figures refer to the official document pinball chart)

It can be seen from the above two figures that during the process of data from the upstream to the downstream, it goes through the transfer of data flow and the change of event flow, and finally the downstream will get an appropriate product and respond. During this period, no matter what kind of specific transmission and change, all guarantee the connection of the whole chain. In other words, from the beginning to the end, it goes through a series of different changes (data states, event states, context, etc.).

That’s a brief introduction to Rx

The basic flow

It has to be mentioned that Rx itself is very large, and it is very difficult to grasp the whole framework idea in one move. As an appetizer, here’s a look at the basic flow of Rx, with all the extras.

The RxJava version is 2.x

Case as follows

Observable<Integer> ob = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onComplete(); }}); ob.subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integerinteger) {
        Log.d(TAG, "onNext: " );
    }

    @Override
    public void onError(Throwable e) {
        
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete: "); }});Copy the code

The above code completes the subscription relationship between the observer and the observed, including the specific event description and occurrence, and the specific response. The transmitter emits two pieces of data and sends a completion signal, which is then responded to downstream.

Observable

First, watch Observable. For each Observable, there are the following relationships

public abstract class Observable<T> implements ObservableSource<T>
Copy the code

Obviously, ObservableSource specifies what An Observable does. In ObservableSource, only one function is declared.

 void subscribe(@NonNull Observer<? super T> observer);
Copy the code

What subscribe() does is sign with the observer. Observable provides a default implementation. In addition to checking and catching various empty exceptions, the core is to hand over an Observer to the current Observable and trigger the execution of subscribeActual(Observer).

 protected abstract void subscribeActual(Observer<? super T> observer);
Copy the code

SubscribeActual () is important, and serves as the medium to connect the Observable to the observer. For a large number of Observances in RxJava, different processing is required due to different responsibilities, and subscribeActural() is used as the lead to ignite the different part.

Subscribe () and subscsribeActual() work together, with the former responsible for notification and handover and the latter for executing differences. In this article, however, I won’t go into the beginning and end of the collaborative relationship, and will focus on the basic process.

Observer

For the observer, Rx provides a set of deterrence, waiting for the target event to arrive, in some way to respond. In Rx, there are four core behaviors of the observer:

  • onSubscribe()
  • onNext()
  • onError()
  • onComplete()

In general, the trigger time is in the subscsribeActual(), and it carries the Disposable from the Observable to the observer, where it responds. Current status: data not arrived, contract signed. And Disposable is an important player, which I’ll talk about later.

OnNext () Responds to the arrival of new events or data. But after the onError() or onComplete() response, onNext() does not respond to subsequent events or data.

OnCompelete () This response is triggered, indicating the normal completion of the process, and is mutually exclusive with onError().

OnError () is a response to an error. It is a general exception exit provided by Rx, where exceptions can reach without special handling of exception events (excluding exceptions that are not passed when Rx is limited in time). In other words, a consumer of the API will always receive an exception on the Rx chain here. Is there a problem with all the exceptions that come here? The answer is, probably, because all exceptions are based on receiving Throwable, which is biased. This is part of the sacrifice that Rx makes to keep the call chain intact. Fortunately, for most scenarios, it is sufficient to handle exceptions here. When you need to deal with more difficult and specific exceptions, there are solutions, which will be explained in a future article. Note that onError() and onCompelete() are mutually exclusive.

In summary, the observer responded to each of the possible events. Of course, the Observer’s responsibilities don’t stop there, and it has more responsibilities when it comes to the Rx mechanism, which will be covered in the next article.

Disposable

The existence of Disposable is important and necessary. Why? In a word, cut ties. In other words, when the Observable signs with the observer, the connection is created and the interaction follows. But then I thought, what if the observer has changed its mind and doesn’t want to interact with the Observable? The existence of Disposable solves this problem by using him as a medium to tell the Observable, “You are a good person, but we are not fit, goodbye.” After that, the observer no longer responds to the event, and it’s easy to think of scenarios like memory leaks, not to mention.

Disposable has only two core behaviors:

  • Dispose () : Tells Observable that it is no longer interested in the event.
  • IsDisposed () : Tell The Observable if it is still interested in the event.

Emitter

When it comes to launchers, I refuse, because there are not many launchers in use on a daily basis. Emitters mostly came about because they needed to provide a way for API users to dynamically send data downstream. The logic of sending data is controlled by the emitters, but ultimately retrieved through the Observer.

For other scenarios, the Observer handles the logic of the data and events itself, and the data generation does not depend on the emitter, as you’ll see in the next article.

In short, the emitter declares some behavior similar to that of an Observer to ensure consistency in response timing. Of course, the Emitter will respond first, and the Observer will respond later. Yes, it will, because the Emitter knows in advance that Disposable is disconnected and will not pass the event on.

process

Back to the case. In this case, an Observable is created and signed to the observer by observable.create (). In Rx, each link along the invocation chain performs the necessary checks, such as non-empty checks, to get a proper Observable for subsequent operations. The current situation is that you get the ObservableCreate.

Next, the ObservableCreate signs with the observer via Observable.subscribe(), and the mechanism starts to work.

Once the Observable has connected to the observer, the subscribeActual() executes the logic to handle it, as shown below

@Override protected void subscribeActual(Observer<? Super T> observer) {CreateEmitter<T> parent = new CreateEmitter<T>(observer); // The observer responds to observer.onsubscribe (parent); Try {// connect to the transmitter source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code

The source in the code above is the ObservableOnSubscribe that comes from observable.create (). In the current subscribeActual(), the emitter gets the information about the observer, and the observer responds to the signed event with onSubscribe(). Finally, call source-subscribe () to transmit the transmitter to the external, the external generated data and events, thus completing the signing process.

In the case of an emitter, the emitter generates the data and pushes it. As we know, the behavior declared by the emitter is consistent with that of the Observer, including onNext(), onError(), and onComplete(). After the emitter is fired, the corresponding behavior will be emitted to the corresponding response position of the observer, but the Disposable is checked before being delivered to the observer. Current transmitter for ObservableCreate CreateEmitter, onNext listed below (), topic and the rest of the code to look at it.

@override public void onNext(T T) {// Void data is not supportedif(t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if(! IsDisposed () {// If the status is satisfied, the downstream will respond observer.onNext(t); }}Copy the code

This completes the simple process of creating an Observable, signing an observer, pushing data and events from the emitter, and the observer responding. The summary is as follows:

  1. Create observables
  2. Observable.subscribe() signs with the observer
  3. SubscribeActual () obtains the execution timing, executes the specific logic, and notifying the observer that it has been signed
  4. The data is generated and the observer responds

The process is as follows:

For now, the basic flow is simple, nothing more than observer mode in response to events. However, in the next article, which gets to the heart of the call chain, this process needs to be continuously understood.


Next: The Chain of Secrets