Preface:

RxJava provides a large number of operators, which greatly improves our development efficiency. The two most basic transformation operators are map and flatMap. The other operators work in a similar way to Map.

  • Both map and flatMap take a function as parameter (Func1) and return an observedObservable
  • The < I,O >I and O templates of Func1 are input and output values, respectively. The call method of Func1 processes type I and returns type O data, but the method executed in flatMap returns type Observable

role

A Map applies a function to each piece of data emitted by an Observable, performing transformations. Apply a function of your choice to each data emitted by the original Observable, and return an Observable that emits those results.

flatMap

Usage:

See how to use both in code:

map

Observable.just(new User("Rhett Butler"))
                  .map(new Function<User, String>() {

                      @Override
                      public String apply(User user) throws Throwable {

                          returnuser.getName(); } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Throwable { System.out.println(s); }}); < < < rhettCopy the code

This code takes a User object and finally prints out the name in User.

flatMap

Suppose there is a requirement that the library print the name of each book borrowed by each User: the User structure is as follows:

class User {
    private String       name;
    private List<String> book;
}
Copy the code

Let’s look at the implementation of map:

Observable.fromIterable(userList)
                  .map(new Function<User, List<String>>() {

                      @Override
                      public List<String> apply(User user) throws Throwable {

                          return user.getBook();
                      }
                  })
                  .subscribe(new Consumer<List<String>>() {

                      @Override
                      public void accept(List<String> strings) throws Throwable {

                          for(String s : strings) { System.out.println(s); }}});Copy the code

As you can see, map transformations are always one-to-one and can only be single transformations. We have to print through a loop. Let’s look at the implementation of flatMap:

Observable.fromIterable(userList)
                  .flatMap(new Function<User, ObservableSource<String>>() {

                      @Override
                      public ObservableSource<String> apply(User user) throws Throwable {

                          returnObservable.fromIterable(user.getBook()); } }) .subscribe(new Consumer<String>() { @Override public void accept(String o) throws Throwable { System.out.println(o); }});Copy the code

Flatmap can be either a single conversion or a one-to-many/many-to-many conversion. FlatMap uses a specified function to transform each data row emitted by the original Observable. This function returns an Observable that emits data itself, so events can be distributed internally again. The flatMap then merges the data emitted by these Observables and finally emits the combined results as its own data sequence.

Source code analysis

Let’s look at the two operators in combination with the source code. To reduce code reading, keep only the core code here:

map

public final <R> Observable<R> map(Function<? super T, ? Extends R> mapper) {// Takes an instance of Function and returns an ObservableMapreturn new ObservableMap<T, R>(this, mapper);
}

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {// Call the parent constructor to initialize downstream super in the parent class.source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function)); } static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { v = mapper.apply(t); downstream.onNext(v); }}}Copy the code

This code is simplified after removing some checksums and other related callbacks from the Map source code. Let’s look at the code flow:

  • When the callmap, the Map accepts an anonymous inner classFunctionAnd return oneObservableMapObject.
  • ObservableMapIt’s essentially oneObservableIs also an observed whose constructor accepts the outermost oneObservableInstance, andFunctionInstance.ObservableMapRewrite thesubscribeActualMethods,subscribeActualCreate a new one using theMapObserverImplemented against the originalObservableObservation.
  • The originalObservableData changes are sent toMapObserver“.
  • MapObserverThe constructor receives the primitiveObservableThe observeractual, andFunctionAn instance of themapper
  • MapObserverIn itsonNextMethod callmappertheapplyThe v apply method is used in map instances:public String apply(User user) throws Throwable { return user.getName(); }
  • calldownstreamTo the onNext method and pass in the argument v. Among themdownstreamisMapObserverA variable defined in the parent class, inMapObserverIn the construction methodsuper(actual);Class is itself passed inactualIt’s essentially primitiveObservable

The whole process can be summarized as follows: There is an original ObservableA and an observer ObservableA. When the original ObservableA calls map and passes in an anonymous inner class instantiation ‘function’, the map creates and returns an observed ObservableB, Subscribe tells the observer ObserverA to subscribe to it. Rewrite the subscribeActual method, create a new observer ObserverB that it receives when it is subscribed, and use the ObserverB to subscribe to the original ObservableA. When the original ObservableA issues an event and calls the onNext method of ObserverB, the observer that subscribeActual accepts is the original observer ObserverA. ObserverB executes the data v via the apply method of ‘function’ instantiated by the anonymous inner class, then calls the onNext method of the original ObservableA, passing in the argument v, and ObserverA observes the event. In a nutshell: an original observer and an observer, but let the original observer subscribe to a new observer, when the new observer is subscribed, create a new observer to subscribe to the original observer, and notify the original observer after performing the specified operation after the listening event.

flatMap

FaltMap and MAP have similar basic principles. The code is as follows:

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        return new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize);
}


public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    final boolean delayErrors;
    final int maxConcurrency;
    final int bufferSize;

    public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source); } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize)); } static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> { MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { ... this.observers = new AtomicReference<InnerObserver<? ,? >[]>(EMPTY); } @Override public void onSubscribe(Disposable d) { downstream.onSubscribe(this); } @Override public void onNext(T t) { ObservableSource<? extends U> p; p = mapper.apply(t); subscribeInner(p); } @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
           
                InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                p.subscribe(inner);
        }

        void drain() {    
            drainLoop();
        }
        void drainLoop() {
            final Observer<? super U> child = this.downstream;
            child.onNext(o);
        }
    }

    static final class InnerObserver<T, U> extends AtomicReference<Disposable>
    implements Observer<U> {

        private static final long serialVersionUID = -4606175640614850599L;
        final long id;
        final MergeObserver<T, U> parent;

        volatile boolean done; volatile SimpleQueue<U> queue; int fusionMode; InnerObserver(MergeObserver<T, U> parent, long id) { this.id = id; this.parent = parent; } @Override public void onNext(U t) { parent.drain(); }}}Copy the code

The above code is the simplified source code of faltMap, most of which operates in the same way as the map source code in the previous article. We continue to continue the observer and observed in the above explanation. Focus on the differences: FaltMap returns a new ObservableB and overwrites the subscribeActual method of ObservableB to create a new observer ObserverB to subscribe to the original ObservableA when the original observer ObserverA subscribes to it. The new observer ObserverB holds the original ObserverA and faltMap received anonymous object instance function. When ObserverB listens to the original ObservableA event, ObserverB calls function apply to get the new ObservableC, and then creates a new observer ObserverC to subscribe to ObservableC. ObserverC holds the original observer ObserverA, and calls the method of the original observer ObserverA when ObserverC observes the ObservableC time.

conclusion

So far, the basic analysis of Map and flatMap has been completed. The map code is relatively simple and easy to understand, and a lot of auxiliary operations are involved in flatMap. The merging and other operations are not involved in this paper, which makes it difficult to read. Just to see how both work, you can read the code in Single

. The amount of code is much less than the amount of code in an Observable. If you are not familiar with the basic patterns of RxJava, you can read a handwritten minimalist version of RxJava on the God blog