Welcome to pay attention to the wechat public account “Conveniently Record Technical Team”, check out more technical articles of conveniently Record team. In this paper, the author: Zhou Haoyuan The original link: mp.weixin.qq.com/s/OJCEyH1gJ…

Since 2015, RxJava1.0 has been rapidly gaining popularity. In just two years, RxJava has become a well-known tool in Android development, and its perfect integration with popular frameworks such as Retrofit has become a must-have tool for Android project development. As a large project, we have been cautious about introducing a tripartite framework, but since the beginning of this year, we have officially introduced RxJava2.0, and cooperated with Retrofit to reconstruct the network framework and cumbersome asynchronous logic of the project. RxJava although easy to use, but with the inevitable learning costs, in order to let everyone quickly understand the context of RxJava and quickly start to use, specially summarized this article. This article will explain in detail how to quickly understand RxJava operators and analyze the principles of RxJava operators from a source perspective.

The advantages of RxJava

RxJava is simply a library to simplify asynchronous calls, but in fact it is an elegant way of programming and programming ideas, once you are familiar with the use of RxJava, it will be easy to love it. I summarize its advantages mainly in two aspects:

  • Brevity eliminates callback hell in traditional asynchronous code logic
  • Increase readability of business logic code

On the first point, we can all agree, and on the second point, there may be some confusion, because many people feel that RxJava’s large number of unknown operators make code less readable. In fact, this impression is caused by the lack of understanding of the use and mechanism of the RxJava operators. For example, the code of this logic involves three asynchronous interfaces, two for QQ login SDK and one for background. Before RxJava was used for reconstruction, this code used three Asynctasks, namely three nested callbacks. The code was complex and the readability was very poor. After modification, it looks like this

If you’re familiar with a few of these RxJava operators, you’ll quickly see what my code does without having to comb through a bunch of nested callbacks, which is the readability of RxJava. Therefore, learning RxJava, understanding and mastering the operators is an inevitable first step.

RxJava2.0 to RxJava1.0

The basic idea remains the same from RxJava1.0 to RxJava2.0, but RxJava2.0 redesigns the entire architecture in accordance with the reaction-streams specification and changes the Maven repository dependency addresses and package names. So RxJava1.0 and RxJava2.0 are now two separate branches on the Github site that are not compatible with each other and cannot be used at the same time, and RxJava1.0 will no longer be maintained after a while. Therefore, if you are still using RxJava1.0, it is recommended to switch to RxJava2.0 as soon as possible, and if you have not been exposed to RxJava1.0, you can directly use and learn RxJava2.0. For details of the differences between RxJava1.0 and RxJava2.0, please refer to the official documentation. From here on out, Rx is used to represent RxJavA2.x for ease of writing.

What are the operators of Rx

Just contact Rx people face a pile of all kinds of operators will think I don’t know how to learning and memory, in fact you only need to know on the whole Rx operator categories and master some operator is enough high, as for the other operators, you only need to know its usage scenarios and master the methods of how to quickly understand a operator, It can be used quickly when needed. Below is my summary of the classification of Rx operators and the representative operators under each category based on the official documentation

Rx main operator


How to quickly understand an Rx operator

Speaking of Rx operators, many of you will be impressed by the colorful gem diagrams that describe Rx operators.

Rx gem figure





  • io.reactivex.Flowable: Event source (0.. N elements), support reaction-Streams and back pressure
    • io.reactivex.Observable: Event source (0.. N elements), does not support back pressure
    • io.reactivex.Single: emits only one element or an event source that generates an error,
    • io.reactivex.Completable: An event source that emits no elements and produces only completion or error
    • io.reactivex.Maybe: emits no elements, emits only one element, or generates an error event source
    • Subject: is both the event source and the event receiver

      And you can see that the most important concept in Rx isThe event sourceAlmost all operators perform transformations, combinations, and other operations on event sources, and the most commonly used event source isObservable.

In this paper, we take the Observable event source as an example to explain the Rx operator. The event emitted by an Observable is uniformly called item. First we need to look at the meaning of each graphic element in the gem map in detail:

  • -- - >:ObservableFrom left to right
  • u: stars, circles, squares, etcObservableThe launch of the item
  • |: the little vertical line at the end of the timelineObservableThe stream of events has been successfully launched
  • XThe X at the end of the timeline matches for some reasonObservableAn error was generated when the launch was terminated abnormally

Taken together, the above elements represent a completeObservable, can also be called a sourceObservable

The dotted arrows pointing down and the rectangle in the middle indicate that the source Observable above is doing some kind of transformation. The text in the rectangle shows the nature of the transformation. The following Observable is the result of converting the source Observable above.

Knowing what a gem diagram means, we can quickly understand an operator based on its gem diagram. A few examples:

1. map

map



Observable
map
Observable
map



map
Observable


Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
    }
}).map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(@NonNull Integer integer) throws Exception {
        return integer * 10;
    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer result) throws Exception {
        Log.i(TAG, "accept : " + result +"\n"); }});Copy the code

Output result:

2. zip



zip
Observable
Observable
1A
1
A
zip


Observable.zip(Observable.just(1, 2, 3),
        Observable.just("A"."B"."C"),
        new BiFunction<Integer, String, String>() {
            @Override
            public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
                return integer + s;
            }
        })
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.i(TAG, "zip : accept : " + s + "\n"); }});Copy the code

Output result:

3. concat



concat
Observable
concat
Observable
Observable
Observable


Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.i(TAG, "concat : " + integer + "\n"); }});Copy the code

Output result:

Most operators come with such a gem map, which can be found either in the official documentation or directly in the Rx source code by looking at JavaDoc without further examples. You can also check out more dynamically interactive gem maps at sites like RxMarbles.

Principles of the Rx operator

To understand the principle of the operator, you must start with the source code. So let’s start by simply stroking through the source code of Rx’s most basic Create operator. The source directory structure of Rx is quite clear. We analyze it from observable. create method first

Observable.create(new ObservableOnSubscribe<String>() {
  @Override
  public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
      e.onNext("s");
  }
}).subscribe(new Observer<String>() {
  @Override
  public void onSubscribe(@NonNull Disposable d) {
    // Create an Observer with a callback method onSubscribe to Disposable, which is equivalent to Subscription in rxjava1. x.
  }

  @Override
  public void onNext(@NonNull String s) {}@Override
  public void onError(@NonNull Throwable e) {}@Override
  public void onComplete(a) {}});Copy the code

The create method looks like this

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
   ObjectHelper.requireNonNull(source, "source is null");
   return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}Copy the code

RxJavaPlugins (RxJavaPlugins, RxJavaPlugins, RxJavaPlugins, RxJavaPlugins, RxJavaPlugins, RxJavaPlugins, RxJavaPlugins, RxJavaPlugins, RxJavaPlugins, RxJavaPlugins, RxJavaPlugins, RxJavaPlugins, RxJavaPlugins, RxJavaPlugins) ObservableCreate is a subclass of Observable that implements some of Observable’s abstract methods, such as subscribeActual. In fact, each Rx operator corresponds to a subclass of Observable. The create method accepts an ObservableOnSubscribe interface implementation class:

/**
 * A functional interface that has a {@code subscribe()} method that receives
 * an instance of an {@link ObservableEmitter} instance that allows pushing
 * events in a cancellation-safe manner.
 *
 * @param <T> the value type pushed
 */
public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}Copy the code

This interface accepts an instance of an ObservableEmitter type via a SUBSCRIBE method. ObservableOnSubscribe when ObservableCreate executes, we pass in an anonymous inner class of type ObservableOnSubscribe, implements its SUBSCRIBE method, and then passes in the ObservableCreate, the return object of the Create method. Finally, he became the member source of observablecate

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source; }...Copy the code

The subscribe method of an Observable takes an Observer as an input.

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);

       ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

       subscribeActual(observer);
   } catch (NullPointerException e) { // NOPMD
       throw e;
   } catch (Throwable e) {
       Exceptions.throwIfFatal(e);
       // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
       RxJavaPlugins.onError(e);

       NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }}Copy the code

Eventually it calls the subscribeActual method of its subclass, ObservableCreate:

@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code

CreateEmitter object Parent is first created in subscribeActual to emit events. CreateEmitter implements interfaces Emitter and Disposable and holds an Observer. The key statement in this code is source.subscribe(parent), which triggers the event source to emit the event, i.e. e.onnext (“s”) is called. The observer onSubscribe() method takes an argument of type Disposable, which can be used to unsubscribe. Unsubscribe is possible because observer onSubscribe() was called before the event was emitted, giving us a chance to call CreateEmitter’s unsubscribe method dispose(). Moving on to CreateEmitter’s onNext() method, it ultimately emits events by calling the Observer’s onNext() method

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {


   private static final long serialVersionUID = -3434801548987643227L;

   final Observer<? super T> observer;

   CreateEmitter(Observer<? super T> observer) {
       this.observer = observer;
   }

   @Override
   public void onNext(T t) {
       if (t == null) {
           onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
           return; } // CreateEmitter is determined to be unsubscribed before launchingif(! isDisposed()) { observer.onNext(t); }}... }Copy the code

At this point, the process of creating and subscribing to Rx event sources is complete.

Let’s start with the map operator to see how the Rx operator works

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
   ObjectHelper.requireNonNull(mapper, "mapper is null");
   return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}Copy the code

The Map method takes a Function parameter, mapper, and returns an ObservableMap object, which also inherits from an Observable, The Mapper is passed to the ObservableMap member function, and the current source Observable is passed to the ObservableMap member Source, which goes into the ObservableMap class

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode ! = NONE) { actual.onNext(null);return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            returnt ! = null ? ObjectHelper.<U>requireNonNull(mapper.apply(t),"The mapper function returned a null value.") : null; }}}Copy the code

You can see that the decorator pattern is used here, ObservableMap holds the event source from its upstream, MapObserver holds the event receiver from its downstream and the transformation method function that we implemented, Complete the source subscription of ObservableMap in the subscribeActual() method, trigger the onNext() method of MapObserver, and then send the original data from the source to the downstream event receiver after transformation by function Mapper. To achieve the map function.

Now we can finally summarize the subscription process with multiple operators, using the following code as an example

Observable.
        create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("holen");
            }
        })
        .map(new Function<String, Integer>() {
            @Override
            public Integer apply(@NonNull String s) throws Exception {
                return s.length();
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {

            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {}});Copy the code

ObservableCreate is created and returned when create is executed. When Map is executed, ObservableCreate, ObservableCreate, ObservableCreate, An ObservableMap is created and returned, and each new Observable holds its upstream source Observable(source) and the currently involved operator function. When the subscribe method is executed in the last step, the subscribeActual() method of ObservableMap is triggered and the most downstream Observer is wrapped as MapObserver. At the same time, the method continues to call the subscribe method of the ObservableCreate it owns (that is, execute source.subscribe), which triggers the subscribeActual() method of the ObservableCreate. The CreateEmitter calls its onNext() method to emit the event, which in turn calls the MapObserver operator mapper and onNext(), passing the event to the onNext() method of the most downstream Observer.

I simply represent this logic in the following diagram

The operatorliftandcompose

Lift and compose are two of the more special operators in Rx. Lift allows us to encapsulate the Observer, and in RxJava1.0 most of the transformations were based on lift, the magic operator.

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> lift(ObservableOperator<? extends R, ? super T> lifter) {
   ObjectHelper.requireNonNull(lifter, "onLift is null");
   return RxJavaPlugins.onAssembly(new ObservableLift<R, T>(this, lifter));
}Copy the code

The Lift operator takes an ObservableOperator object

/**
 * Interface to map/wrap a downstream observer to an upstream observer.
 *
 * @param <Downstream> the value type of the downstream
 * @param <Upstream> the value type of the upstream
 */
public interface ObservableOperator<Downstream, Upstream> {
    /**
     * Applies a function to the child Observer and returns a new parent Observer.
     * @param observer the child Observer instance
     * @return the parent Observer instance
     * @throws Exception on failure
     */
    @NonNull
    Observer<? super Upstream> apply(@NonNull Observer<? super Downstream> observer) throws Exception;
}Copy the code

As you can see from the comments, this is an interface that wraps the downstream subscriber as an upstream subscriber. Similar to MapObserver in the Map operator.

The compose operator lets you wrap an Observable

@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
   return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}Copy the code

The wrap method is as follows, simply following the RxJavaPlugins process

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> wrap(ObservableSource<T> source) {
   ObjectHelper.requireNonNull(source."source is null");
   if (source instanceof Observable) {
       return RxJavaPlugins.onAssembly((Observable<T>)source);
   }
   return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
}Copy the code

The compose method accepts an ObservableTransformer object

/**
 * Interface to compose Observables.
 *
 * @param <Upstream> the upstream value type
 * @param <Downstream> the downstream value type
 */
public interface ObservableTransformer<Upstream, Downstream> {
    /**
     * Applies a function to the upstream Observable and returns an ObservableSource with
     * optionally different element type.
     * @param upstream the upstream Observable instance
     * @return the transformed ObservableSource instance
     */
    @NonNull
    ObservableSource<Downstream> apply(@NonNull Observable<Upstream> upstream);
}Copy the code

ObservableSource is the only interface that our base Observable inherits. ObservableTransformer is an interface that combines multiple Observables. It receives upstream Observables through an apply() method and returns new Observables after performing some operations. Here combining multiple observables is the meaning of combining multiple operators, for example, we often need to be done in the use of Rx network thread asynchronous requests for change, the operation is generally the same, every time writing will be bored, then we can use the compose several common threads transform operator

private final ObservableTransformer schedulersObservable = new ObservableTransformer() {
   @Override
   public ObservableSource apply(Observable upstream) {
       returnupstream.subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); }}; protected voidtestCompose() { getNetObservable() .compose(schedulersObservable) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { mRxOperatorsText.append(s); }}); }Copy the code

For a typical example of compose, you can also take a look at the open source project RxLifecycle, which makes clever use of the Compose operator to address the memory leaks associated with using Rx.

Application scenarios of the Rx operator

Having said that, we are most concerned with the application scenarios of the Rx operator. The Rx operator can be used elegantly wherever asynchrony exists. For example, many popular open source projects around Rx

The Rx operator decision tree (Rx operator decision tree) provides guidance on how to select specific operators for the scenarios you want to implement.

In addition to these, there are various specific business scenarios in which we need to choose the appropriate operators. Here I summarize some common scenarios and the appropriate operators for them

Once we understand the principles of the Rx operator and become familiar with some of the more frequently used operators, we can easily use them in the above scenarios without confusing our code with complex business logic.


The above is all the content of this article. There are still many things worth further study on Rx, and I will share more experience of using Rx with you later when I have the opportunity.

reference

  • RxJava2Examples