preface

In RxJava, the line between upstream and downstream is not always clear. The upstream and downstream of RxJava needs to be compared with the observer. Before the observer, all can look upstream.

The map() operator below, the just() and map() operators can both look upstream because the observer subscription event takes place in the Consumer anonymous object.

Conversion operator combat

# map()
private void learnRxMap(a) {

    Observable.just(1.2.3.4)// Production events
            // Change the event
            .map(new Function<Integer, String>() {
                @Override
                public String apply(Integer pInteger) throws Throwable {
                    return "[" + pInteger + "]"; }})// Observers subscribe to events
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String pS) throws Throwable {
                    Log.d("Transform operator map"."accept: ----> "+ pS); }});/ / the 2020-04-04 16:11:18. 227, 9596-9596 / com. Example. Learnrxjava type D/a conversion operator map: accept: -- -- -- -- > [1]
/ / the 2020-04-04 16:11:18. 227, 9596-9596 / com. Example. Learnrxjava type D/a conversion operator map: accept: -- -- -- -- > [2]
/ / the 2020-04-04 16:11:18. 228, 9596-9596 / com. Example. Learnrxjava type D/a conversion operator map: accept: -- -- -- -- > [3]
/ / the 2020-04-04 16:11:18. 228, 9596-9596 / com. Example. Learnrxjava type D/a conversion operator map: accept: -- -- -- -- > [4]

}
Copy the code

The map() operator looks at the upstream and downstream nodes and transforms the upstream Integer variable into a String, while the map() operator acts as the middle node and transforms the upstream event properties.

# flatMap()

private void learnRxFlatMap(a) {

    String[] lStrings = {"learn "."RxJava's "."Op"};

    Observable.fromArray(lStrings)
            .flatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(final String pS) throws Throwable {

                    return new Observable<String>() {
                        @Override
                        protected void subscribeActual(@NonNull Observer<? super String> observer) {
                            observer.onNext(pS + "// ");
                            observer.onNext(pS + "== "); }}; } }) .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {}@Override
                public void onNext(@NonNull String pS) {
                    Log.d("Transform operator onNext"."onNext: ----> " + pS);
                }

                @Override
                public void onError(@NonNull Throwable e) {}@Override
                public void onComplete(a) {}});/ / the 2020-04-04 16:49:36. 806, 26443-26443 /? D/ transform operator onNext: onNext: ----> learn //
/ / the 2020-04-04 16:49:36. 806, 26443-26443 /? D/ transform operator onNext: onNext: ----> learn ==
/ / the 2020-04-04 16:49:36. 806, 26443-26443 /? D/ transform operator onNext: onNext: ----> RxJava's //
/ / the 2020-04-04 16:49:36. 806, 26443-26443 /? D/ transform operator onNext: onNext: ----> RxJava's ==
/ / the 2020-04-04 16:49:36. 806, 26443-26443 /? D/ transform operator onNext: onNext: ----> Op//
/ / the 2020-04-04 16:49:36. 806, 26443-26443 /? D/ transform operator onNext: onNext: ----> Op==
}

Copy the code

The flatMap operator, acting as an RxJava intermediary node, can transform the nature of upstream events as the map() operation does, and can also be used with ObservableSource
>() simultaneously sends multiple events downstream while changing the nature of the event.

private void learnRxFlatMap(a) {

        String[] lStrings = {"learn "."RxJava's "."Op"};

        Observable.fromArray(lStrings)
                .flatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(final String pS) throws Throwable {

                        List<String> lStringList = new ArrayList<>();

                        for (int i = 0; i < 5; i++) {
                            lStringList.add(pS + "The subscript" + "[" + i + "]");
                        }
                        // Simulate network latency
                        return Observable.fromIterable(lStringList).delay(5, TimeUnit.SECONDS);
                    }
                })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {}@Override
                    public void onNext(@NonNull String pS) {
                        Log.d("Transform operator onNext"."onNext: ----> " + pS);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {}@Override
                    public void onComplete(a) {}});/ / the 2020-04-04 17:08:55. 488, 27958-27989 / com. Example. Learnrxjava type D/a conversion operator onNext: onNext: -- -- -- -- > learn subscript [0]
/ / the 2020-04-04 17:08:55. 489, 27958-27989 / com. Example. Learnrxjava type D/a conversion operator onNext: onNext: -- -- -- -- > learn subscript [1]
/ / the 2020-04-04 17:08:55. 490, 27958-27989 / com. Example. Learnrxjava type D/a conversion operator onNext: onNext: -- -- -- -- > learn subscript [2]
/ / the 2020-04-04 17:08:55. 491, 27958-27989 / com. Example. Learnrxjava type D/a conversion operator onNext: onNext: -- -- -- -- > learn subscript [3]
/ / the 2020-04-04 17:08:55. 492, 27958-27989 / com. Example. Learnrxjava type D/a conversion operator onNext: onNext: -- -- -- -- > learn subscript [4]
/ / the 2020-04-04 17:08:55. 493, 27958-27990 / com. Example. Learnrxjava type D/a conversion operator onNext: onNext: -- -- -- -- > RxJava 's (subscript [0]
/ / the 2020-04-04 17:08:55. 494, 27958-27990 / com. Example. Learnrxjava type D/a conversion operator onNext: onNext: -- -- -- -- > RxJava 's (subscript [1]
/ / the 2020-04-04 17:08:55. 494, 27958-27991 / com. Example. Learnrxjava type D/a conversion operator onNext: onNext: -- -- -- -- > Op subscript [0]
/ / the 2020-04-04 17:08:55. 495, 27958-27990 / com. Example. Learnrxjava type D/a conversion operator onNext: onNext: -- -- -- -- > RxJava 's (subscript [2]
/ / the 2020-04-04 17:08:55. 495, 27958-27991 / com. Example. Learnrxjava type D/a conversion operator onNext: onNext: -- -- -- -- > Op subscript [1]
/ / the 2020-04-04 17:08:55. 496, 27958-27990 / com. Example. Learnrxjava type D/a conversion operator onNext: onNext: -- -- -- -- > RxJava 's (subscript [3]
/ / the 2020-04-04 17:08:55. 497, 27958-27991 / com. Example. Learnrxjava type D/a conversion operator onNext: onNext: -- -- -- -- > Op subscript [2]
/ / the 2020-04-04 17:08:55. 497, 27958-27991 / com. Example. Learnrxjava type D/a conversion operator onNext: onNext: -- -- -- -- > RxJava 's (subscript [4]
/ / the 2020-04-04 17:08:55. 498, 27958-27991 / com. Example. Learnrxjava type D/a conversion operator onNext: onNext: -- -- -- -- > Op subscript [3]
/ / the 2020-04-04 17:08:55. 499, 27958-27991 / com. Example. Learnrxjava type D/a conversion operator onNext: onNext: -- -- -- -- > Op subscript [4]
    }
Copy the code

The flatMap() operator does not sequentially shoot downstream while converting upstream events into multiple events that continue downstream.

# concatMap()
private void learnRxConcatMap(a) {
    Observable.just("AA"."BBBB"."CVD")
            .concatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String pS) throws Throwable {
                    List<String> lStringList = new ArrayList<>();

                    for (int i = 0; i < 5; i++) {
                        lStringList.add(pS + "The subscript" + "[" + i + "]");
                    }
                    return Observable.fromIterable(lStringList).delay(5, TimeUnit.SECONDS);
                }
            })
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String pS) throws Throwable {
                    Log.d("Transform operator concatMap"."accept: ----> "+ pS); }});/ / the 2020-04-04 17:18:22. 629, 29058-29091 / com. Example. Learnrxjava type D/a conversion operator concatMap: accept: -- -- -- -- > AA subscript [0]
/ / the 2020-04-04 17:18:22. 630, 29058-29091 / com. Example. Learnrxjava type D/a conversion operator concatMap: accept: -- -- -- -- > AA subscript [1]
/ / the 2020-04-04 17:18:22. 630, 29058-29091 / com. Example. Learnrxjava type D/a conversion operator concatMap: accept: -- -- -- -- > AA subscript [2]
/ / the 2020-04-04 17:18:22. 630, 29058-29091 / com. Example. Learnrxjava type D/a conversion operator concatMap: accept: -- -- -- -- > AA subscript [3]
/ / the 2020-04-04 17:18:22. 630, 29058-29091 / com. Example. Learnrxjava type D/a conversion operator concatMap: accept: -- -- -- -- > AA subscript [4]
/ / the 2020-04-04 17:18:27. 665, 29058-29095 / com. Example. Learnrxjava type D/a conversion operator concatMap: accept: -- -- -- -- > BBBB subscript [0]
/ / the 2020-04-04 17:18:27. 667, 29058-29095 / com. Example. Learnrxjava type D/a conversion operator concatMap: accept: -- -- -- -- > BBBB subscript [1]
/ / the 2020-04-04 17:18:27. 667, 29058-29095 / com. Example. Learnrxjava type D/a conversion operator concatMap: accept: -- -- -- -- > BBBB subscript [2]
/ / the 2020-04-04 17:18:27. 668, 29058-29095 / com. Example. Learnrxjava type D/a conversion operator concatMap: accept: -- -- -- -- > BBBB subscript [3]
/ / the 2020-04-04 17:18:27. 669, 29058-29095 / com. Example. Learnrxjava type D/a conversion operator concatMap: accept: -- -- -- -- > BBBB subscript [4]
/ / the 2020-04-04 17:18:32. 706, 29058-29105 / com. Example. Learnrxjava type D/a conversion operator concatMap: accept: -- -- -- -- > CVD subscript [0]
/ / the 2020-04-04 17:18:32. 707, 29058-29105 / com. Example. Learnrxjava type D/a conversion operator concatMap: accept: -- -- -- -- > CVD subscript [1]
/ / the 2020-04-04 17:18:32. 708, 29058-29105 / com. Example. Learnrxjava type D/a conversion operator concatMap: accept: -- -- -- -- > CVD subscript [2]
/ / the 2020-04-04 17:18:32. 709, 29058-29105 / com. Example. Learnrxjava type D/a conversion operator concatMap: accept: -- -- -- -- > CVD subscript [3]
/ / the 2020-04-04 17:18:32. 709, 29058-29105 / com. Example. Learnrxjava type D/a conversion operator concatMap: accept: -- -- -- -- > CVD subscript [4]
}
Copy the code

The concatMap() operator, like the flatMap() operator, can transform upstream events into multiple events and send them downstream, but concatMap() sends them downstream in order of multiple events.

# groupBy()
private void learnRxGroupBy(a) {

    Observable.just(10.20.30.40.50)
            .groupBy(new Function<Integer, String>() {
                @Override
                public String apply(Integer pInteger) throws Throwable {
                    return pInteger > 30 ? "Middle-aged programmer" : "Young Programmers";
                }
            })
            .subscribe(new Consumer<GroupedObservable<String, Integer>>() {
                @Override
                public void accept(final GroupedObservable<String, Integer> pGroupedObservable) throws Throwable {

                    Log.d("Classified Key"."accept: " + pGroupedObservable.getKey());

                    pGroupedObservable.subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer pInteger) throws Throwable {
                            Log.d("Operator groupBy"."accept: ----> " + pGroupedObservable.getKey() + "年龄 :"+ pInteger); }}); }});/ / the 2020-04-04 17:32:21. 656, 3023-3023 / com. Example. Learnrxjava D/classification Key: accept: young programmer
/ / the 2020-04-04 17:32:21. 656, 3023-3023 / com. Example. Learnrxjava D/a conversion operator groupBy: accept: -- -- -- -- > young programmer age: 10
/ / the 2020-04-04 17:32:21. 657, 3023-3023 / com. Example. Learnrxjava D/a conversion operator groupBy: accept: -- -- -- -- > young programmer age: 20
/ / the 2020-04-04 17:32:21. 657, 3023-3023 / com. Example. Learnrxjava D/a conversion operator groupBy: accept: -- -- -- -- > young programmer age: 30
/ / the 2020-04-04 17:32:21. 657, 3023-3023 / com. Example. Learnrxjava D/classification Key: accept: middle-aged programmer
17:32:21 / / 2020-04-04. 657. 3023-3023 / com example. Learnrxjava D/a conversion operator groupBy: accept: -- -- -- -- > middle-aged programmers age: 40
17:32:21 / / 2020-04-04. 658. 3023-3023 / com example. Learnrxjava D/a conversion operator groupBy: accept: -- -- -- -- > middle-aged programmers age: 50
}
Copy the code

The main function of the groupBy() operator is to classify upstream events into groups according to certain criteria in its nodes and then send them downstream. However, downstream groups can be classified according to certain templates.

# buffer()
  private void learnRxBuffer(a) {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
            for (int lI = 0; lI < 100; lI++) {
                emitter.onNext(lI);
            }
            emitter.onComplete();
        }
    })
            .buffer(20)
            .subscribe(new Consumer<List<Integer>>() {
                @Override
                public void accept(List<Integer> pIntegers) throws Throwable {
                    Log.d("Transform operator buffer"."accept: ----> "+ pIntegers); }});/ / the 2020-04-04 17:49:38. 264, 13267-13267 / com. Example. Learnrxjava D/a conversion operator buffer: the accept: -- -- -- - > [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
/ / the 2020-04-04 17:49:38. 264, 13267-13267 / com. Example. Learnrxjava D/a conversion operator buffer: the accept: -- -- -- - > [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
/ / the 2020-04-04 17:49:38. 265, 13267-13267 / com. Example. Learnrxjava D/a conversion operator buffer: the accept: -- -- -- - > [40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
/ / the 2020-04-04 17:49:38. 265, 13267-13267 / com. Example. Learnrxjava D/a conversion operator buffer: the accept: -- -- -- - > [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
/ / the 2020-04-04 17:49:38. 266, 13267-13267 / com. Example. Learnrxjava D/a conversion operator buffer: the accept: -- -- -- - > [80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
}
Copy the code

When a large number of events are sent downstream, if you need to cache the events to a specified number before sending them downstream, the transformation operator buffer() can cache the events to a specified number before sending them downstream.