The map operator

(1) Definition

A function is applied to each data emitted by the Observable to perform transformation operations, as shown in the figure below.

The Map operator applies a function of your choice to each data emitted by the original Observable, and returns an Observable that emits those results.

RxJava implements this operator as a map function, which by default is no longer executed on any particular scheduler.

(2) Examples

    public void mapTest(a){
        Observable.just("HELLO")
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        return s.toLowerCase();
                    }
                })
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        return s + " world";
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception { System.out.println(s); }}); }Copy the code

Output result:

hello world
Copy the code

(3) source code analysis

1. Call map()

    /**
     * Returns an Observable that applies a specified function to each item emitted by the source ObservableSource and
     * emits the results of these function applications.
     *
     * @param <R> the output type
     * @param mapper
     *            a function to apply to each item emitted by the ObservableSource
     * @return an Observable that emits the items from the source ObservableSource, transformed by the specified
     *         function
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
Copy the code

2. The bottom layer is calling the ObservableMap class

Assign a custom Function object to a member variable: Function. This class also has a subscribeActual() method that is overwritten in the

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
Copy the code

A subsequent call to subscribe() first calls the Subscribe () method in the Observable class

    /**
     * Subscribes to an ObservableSource and provides a callback to handle the items it emits.
     *
     * @param onNext
     *             the {@code Consumer<T>} you have designed to accept emissions from the ObservableSource
     * @return a {@link Disposable} reference with which the caller can stop receiving items before
     *         the ObservableSource has finished sending them
     * @throws NullPointerException
     *             if {@code onNext} is null
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
Copy the code

subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyconsumer ()) calls an overloaded method of the Observable class: subscribe().

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
        
        // Core method
        subscribe(ls);

        return ls;
    }
Copy the code

This method in turn calls the overloaded SUBSCRIBE () method of its class

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            
            // Core method, which is required by all classes that inherit Observable
            subscribeActual(observer);
            
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            thrownpe; }}Copy the code

The subscribeActual() method implementation in the ObservableMap class is called.

This method in turn calls the above method, but in this case the subscribeActual() method is in the ObservableJust class

    @Override
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
    }
Copy the code

The method then uses s.subscribe () to the OnSubscribe() method in the BasicFuseableObserver class

    // final: fixed protocol steps to support fuseable and non-fuseable upstream
    @SuppressWarnings("unchecked")
    @Override
    public final void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {

            this.s = s;
            if (s instanceof QueueDisposable) {
                this.qs = (QueueDisposable<T>)s;
            }

            if (beforeDownstream()) {
            
                // The corresponding code
                actual.onSubscribe(this); afterDownstream(); }}}Copy the code

After several calls, the accept() method we overwrote in the main method will be called and the corresponding result will be printed.

The flatmap operator

(1) Definition

FlatMap transforms one Observable emitting data into multiple Observables, and then merges the data emitted by them into a single Observable, as shown in Figure 2:

The flapMap operator transforms each data emitted by the original Observables using a specified function that returns an Observable that also emits data. The flatMap then merges the data emitted by the Observables. Finally, the combined result is transmitted as its own data sequence.

(2) Examples

Data classes

public class User {
    public String userName;
    public List<Address> addresses;

    public static class Address {
        public String street;
        publicString city; }}Copy the code
    public void flatMapTest(a) {
        User user = new User();
        user.userName = "tony";
        user.addresses = new ArrayList<>();
        User.Address address1 = new User.Address();
        address1.street = "ren ming road";
        address1.city = "Su zhou";
        user.addresses.add(address1);

        User.Address address2 = new User.Address();
        address2.street = "dong wu bei road";
        address2.city = "Su zhou";
        user.addresses.add(address2);

        Observable.just(user)
                .flatMap(new Function<User, ObservableSource<User.Address>>() {
                    @Override
                    public ObservableSource<User.Address> apply(User user) throws Exception {
                        return Observable.fromIterable(user.addresses);
                    }
                })
                .subscribe(new Consumer<User.Address>() {
                    @Override
                    public void accept(User.Address address) throws Exception { System.out.println(address.street); }}); }Copy the code

Output result:

ren ming road
dong wu bei road
Copy the code

(3) source code analysis

1.flatMap()

FlatMap () calls multiple overloaded methods underneath, eventually calling the following methods:

    /**
     * Returns an Observable that emits items based on applying a function that you supply to each item emitted
     * by the source ObservableSource, where that function returns an ObservableSource, and then merging those resulting
     * ObservableSources and emitting the results of this merger, while limiting the maximum number of concurrent
     * subscriptions to these ObservableSources.
     *
     * @param <R> the value type of the inner ObservableSources and the output type
     * @param mapper
     *            a function that, when applied to an item emitted by the source ObservableSource, returns an
     *            ObservableSource
     * @param maxConcurrency
     *         the maximum number of ObservableSources that may be subscribed to concurrently
     * @param delayErrors
     *            if true, exceptions from the current Observable and all inner ObservableSources are delayed until all of them terminate
     *            if false, the first one signalling an exception will terminate the whole sequence immediately
     * @param bufferSize
     *            the number of elements to prefetch from each inner ObservableSource
     * @return an Observable that emits the result of applying the transformation function to each item emitted
     *         by the source ObservableSource and merging the results of the ObservableSources obtained from this
     *         transformation
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        
        if (this instanceof ScalarCallable) {
            @SuppressWarnings("unchecked")
            T v = ((ScalarCallable<T>)this).call();
            if (v == null) {
                return empty();
            }
            return ObservableScalarXMap.scalarXMap(v, mapper);
        }
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }
Copy the code

We then return a different object depending on whether this is ScalarCallable:

This sample is the sample, and then call ObservableScalarXMap. ScalarXMap (v, mapper);

    /**
     * Maps a scalar value into an Observable and emits its values.
     *
     * @param <T> the scalar value type
     * @param <U> the output value type
     * @param value the scalar value to map
     * @param mapper the function that gets the scalar value and should return
     * an ObservableSource that gets streamed
     * @return the new Observable instance
     */
    public static <T, U> Observable<U> scalarXMap(T value,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper) {
        return RxJavaPlugins.onAssembly(new ScalarXMapObservable<T, U>(value, mapper));
    }
Copy the code

Create a ScalarXMapObservable object.

2.subscribe()

This method also has multiple overloaded methods that end up calling SUBSCRIBE (Consumer
onNext, Consumer
onError, Action onComplete, Consumer
onSubscribe) and execute the **subscribe(ls); The subscribe(Observer
observer) and execute the subscribeActual(observer); * * method

The **subscribeActual of ObservableScalarXMap (Observer
s)** method

        @SuppressWarnings("unchecked") @Override public void subscribeActual(Observer<? super R> s) { ObservableSource<? extends R> other; Try {/ / / / core code here call custom in the user-defined flatMap method apply () method. Other = ObjectHelper requireNonNull (mapper. Apply (value),"The mapper returned a null ObservableSource");
            } catch (Throwable e) {
                EmptyDisposable.error(e, s);
                return; } // Check the type of other to execute a different methodif (other instanceof Callable) {
                R u;

                try {
                    u = ((Callable<R>)other).call();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    EmptyDisposable.error(ex, s);
                    return;
                }

                if (u == null) {
                    EmptyDisposable.complete(s);
                    return;
                }
                ScalarDisposable<R> sd = new ScalarDisposable<R>(s, u);
                s.onSubscribe(sd);
                sd.run();
            } else{// Since other is an Observable, we subscribe other.subscribe(s); }}}Copy the code

Mapper.apply (value) is called in subscribeActual() to execute the apply() method in the user-defined flatMap method.

The Subscribe () method of the Observable class is called.

Note that subscribe() is not the same Observable as the one called above!!

Call the subscribeActual() method in this method again, and then call the **subscribeActual()** method in the ObservableFromIterable class:

@Override public void subscribeActual(Observer<? super T> s) { Iterator<? extends T> it; Try {// this is truesourceIt = source.iterator(); } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptyDisposable.error(e, s);return; } boolean hasNext; Try {/ / determine whether it -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- hasNext. = it hasNext (); } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptyDisposable.error(e, s);return;
        }
        if(! hasNext) { EmptyDisposable.complete(s);return;
        }

        FromIterableDisposable<T> d = new FromIterableDisposable<T>(s, it);
        s.onSubscribe(d);

        if (!d.fusionMode) {
            d.run();
        }
    }
Copy the code

(4) Summary