Basic API supplement

1. Incomplete call() method in Action1


        Observable<String> observable = Observable.just("hello"."world!");
        // The way we wrote it before
        // observable.subscribe(new Observer<String>() {
        //
        // @Override
        // public void onCompleted() {
        //
        // }
        //
        // @Override
        // public void onError(Throwable e) {
        //
        // }
        //
        // @Override
        // public void onNext(String t) {
        // log. I ("main", "value :" + t);
        // }
        // });

        observable.subscribe(new Action1<String>() {

            /** * is equivalent to onNext */
            @Override
            public void call(String t) {
                Log.e("main"."Values."+ t); }});// observable.subscribe(onNext, onError)
        // observable.subscribe(onNext, onError, onCompleted);
Copy the code

Result output:

08-07 02:46:32. 001, 4533-4533 / com haocai. Architect. Rxjava E/main: Value: hello 08-07 02:46:32. 001. 4533-4533 / com haocai. Architect. Rxjava E/main: value: world!Copy the code

Call () is equivalent to the onNext method

2. Filter function

(1) filter

Filter (Func1) is used to filter the values we do not want in the observation sequence and only returns those values that meet the condition. Let’s look at the schematic diagram:


public class FilterActivity extends Activity {
    private Observable<AppInfo> observable;
    private AppInfoAdapter appInfoAdapter;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_simple9);
        observable = getApps();
        initView();
    }

    private void initView(a) {
        ListView listView = (ListView) findViewById(R.id.lv_app_name);
        appInfoAdapter = new AppInfoAdapter(this);
        listView.setAdapter(appInfoAdapter);
    }

    /** * Create Observable **@return* /
    private Observable<AppInfo> getApps(a) {
        AppInfo appInfo1 = new AppInfo("Xiong".0);
        AppInfo appInfo2 = new AppInfo("Tony".0);
        AppInfo appInfo3 = new AppInfo("Tomcat".0);
        AppInfo appInfo4 = new AppInfo("Lucy".0);
        AppInfo appInfo5 = new AppInfo("Lucy pioneer".0);
        return Observable
                .just(appInfo1, appInfo2, appInfo3, appInfo4, appInfo5).filter(
                        new Func1<AppInfo, Boolean>() {

                            @Override
                            public Boolean call(AppInfo t) {
                                return t.getName().contains("Lucy"); }}); }public void click(View v) {
        observable.subscribe(new Observer<AppInfo>() {

            @Override
            public void onCompleted(a) {
                // Refresh the UI when done
                appInfoAdapter.notifyDataSetChanged();
            }

            @Override
            public void onError(Throwable e) {}@Override
            public void onNext(AppInfo t) {
                Log.e("main",t.getName());
                // Add dataappInfoAdapter.addAppInfo(t); }}); }}Copy the code

Result output:

08-07 03:19:10. 492, 32521-32521 / com haocai. Architect. Rxjava E/main: Lucy 08-07 03:19:10. 492. 32521-32521 / com haocai. Architect. Rxjava E/main: Lucy pioneerCopy the code

Filter source code:


public final class OnSubscribeFilter<T> implements OnSubscribe<T> {

    final Observable<T> source;

    final Func1<? super T, Boolean> predicate;

    public OnSubscribeFilter(Observable<T> source, Func1<? super T, Boolean> predicate) {
        this.source = source;
        this.predicate = predicate;
    }

    @Override
    public void call(final Subscriber<? super T> child) {
        FilterSubscriber<T> parent = new FilterSubscriber<T>(child, predicate);
        child.add(parent);
        source.unsafeSubscribe(parent);
    }

    static final class FilterSubscriber<T> extends Subscriber<T> {

        final Subscriber<? super T> actual;

        final Func1<? super T, Boolean> predicate;

        boolean done;

        public FilterSubscriber(Subscriber<? super T> actual, Func1<? super T, Boolean> predicate) {
            this.actual = actual;
            this.predicate = predicate;
            request(0);
        }

        @Override
        public void onNext(T t) {
            boolean result;

            try {
                result = predicate.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            if (result) {
                actual.onNext(t);
            } else {
                request(1); }}@Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted(a) {
            if (done) {
                return;
            }
            actual.onCompleted();
        }
        @Override
        public void setProducer(Producer p) {
            super.setProducer(p); actual.setProducer(p); }}}Copy the code

(2) take(get the first few digits or specified range)


 /** * Create Observable **@return* /
    private Observable<AppInfo> getApps(a) {
        AppInfo appInfo1 = new AppInfo("Xiong".0);
        AppInfo appInfo2 = new AppInfo("Tony".0);
        AppInfo appInfo3 = new AppInfo("Tomcat".0);
        AppInfo appInfo4 = new AppInfo("Lucy".0);
        AppInfo appInfo5 = new AppInfo("Lucy pioneer".0);
        // Get the first two pieces of current data
        return Observable
                .just(appInfo1, appInfo2, appInfo3, appInfo4, appInfo5).take(2);
    }

    public void click(View v) {
        observable.subscribe(new Observer<AppInfo>() {

            @Override
            public void onCompleted(a) {
                // Refresh the UI when done
                appInfoAdapter.notifyDataSetChanged();
            }

            @Override
            public void onError(Throwable e) {}@Override
            public void onNext(AppInfo t) {
                Log.e("main",t.getName());
                // Add dataappInfoAdapter.addAppInfo(t); }}); }Copy the code

Result output:

08-07 05:54:16. 887, 12684-12684 / com haocai. Architect. Rxjava E/main: Xiong 08-07 05:54:16. 887, 12684-12684 / com haocai. Architect. Rxjava E/main: TonyCopy the code

Take source code


public final class OperatorTake<T> implements Operator<T.T> {

    final int limit;

    public OperatorTake(int limit) {
        if (limit < 0) {
            throw new IllegalArgumentException("limit >= 0 required but it was " + limit);
        }
        this.limit = limit;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> child) {
        final Subscriber<T> parent = new Subscriber<T>() {

            int count;
            boolean completed;

            @Override
            public void onCompleted(a) {
                if(! completed) { completed =true; child.onCompleted(); }}@Override
            public void onError(Throwable e) {
                if(! completed) { completed =true;
                    try {
                        child.onError(e);
                    } finally{ unsubscribe(); }}}@Override
            public void onNext(T i) {
                if(! isUnsubscribed() && count++ < limit) {boolean stop = count == limit;
                    child.onNext(i);
                    if(stop && ! completed) { completed =true;
                        try {
                            child.onCompleted();
                        } finally{ unsubscribe(); }}}}/** * We want to adjust the requested values based on the `take` count. */
            @Override
            public void setProducer(final Producer producer) {
                child.setProducer(new Producer() {

                    // keeps track of requests up to maximum of `limit`
                    final AtomicLong requested = new AtomicLong(0);

                    @Override
                    public void request(long n) {
                        if (n > 0 && !completed) {
                            // because requests may happen concurrently use a CAS loop to
                            // ensure we only request as much as needed, no more no less
                            while (true) {
                                long r = requested.get();
                                long c = Math.min(n, limit - r);
                                if (c == 0) {
                                    break;
                                } else if (requested.compareAndSet(r, r + c)) {
                                    producer.request(c);
                                    break; }}}}}); }};if (limit == 0) {
            child.onCompleted();
            parent.unsubscribe();
        }

        /* * We decouple the parent and child subscription so there can be multiple take() in a chain such as for * the groupBy Observer use case where you may take(1) on groups and take(20) on the children. * * Thus, we only unsubscribe UPWARDS to the parent and an onComplete DOWNSTREAM. * * However, if we receive an unsubscribe from the child we still want to propagate it upwards so we * register 'parent' with 'child' * /
        child.add(parent);

        returnparent; }}Copy the code

(3) takeLast (get the last few digits)


    /** * Create Observable **@return* /
    private Observable<AppInfo> getApps(a) {
        AppInfo appInfo1 = new AppInfo("Xiong".0);
        AppInfo appInfo2 = new AppInfo("Tony".0);
        AppInfo appInfo3 = new AppInfo("Tomcat".0);
        AppInfo appInfo4 = new AppInfo("Lucy".0);
        AppInfo appInfo5 = new AppInfo("Lucy pioneer".0);
        // Get the first two pieces of current data
        return Observable
                .just(appInfo1, appInfo2, appInfo3, appInfo4, appInfo5).takeLast(2);
    }


    public void click(View v) {
        observable.subscribe(new Observer<AppInfo>() {

            @Override
            public void onCompleted(a) {
                // Refresh the UI when done
                appInfoAdapter.notifyDataSetChanged();
            }

            @Override
            public void onError(Throwable e) {}@Override
            public void onNext(AppInfo t) {
                Log.e("main",t.getName());
                // Add dataappInfoAdapter.addAppInfo(t); }}); }Copy the code

Result output:

08-07 06:16:08. 483, 32192-32192 / com haocai. Architect. Rxjava E/main: Lucy 08-07 06:16:08. 483. 32192-32192 / com haocai. Architect. Rxjava E/main: Lucy pioneerCopy the code

(4) Distinct


  private Observable<String> getApps(a) {

        // Get the first two pieces of current data
        return Observable.just("Tony"."pioneer"."Tomcat"."Tony"."Lucy"."Tomcat"."Tony").distinct();
    }

    public void click(View v) {
        observable.subscribe(new Observer<String>() {

            @Override
            public void onCompleted(a) {
                // Refresh the UI when done
                appInfoAdapter.notifyDataSetChanged();
            }

            @Override
            public void onError(Throwable e) {}@Override
            public void onNext(String t) {
                Log.e("main",t); }}); }Copy the code

Result output:

08-07 06:47:40. 045, 28387-28387 / com haocai. Architect. Rxjava E/main: Tony 08-07 06:47:40. 045, 28387-28387 / com haocai. Architect. Rxjava E/main: Pioneer 08-07 06:47:40. 045, 28387-28387 / com haocai. Architect. Rxjava E/main: Tomcat 08-07 06:47:40. 045, 28387-28387 / com haocai. Architect. Rxjava E/main: LucyCopy the code

(5) distinctUntilChanged(delete adjacent duplicate data of position)



  /** * Create Observable **@return* /
    private Observable<String> getApps(a) {
        list = new ArrayList<String>();
        list.add("Michael");
        list.add("Michael");
        list.add("pioneer");
        list.add("Michael");
        list.add("Michael");
        list.add("Huni");
        list.add("Huni");
        list.add("Huni");
        list.add("King");
        list.add("Huni");
        return Observable.from(list).distinctUntilChanged();
    }


    public void click(View v) {
        observable.subscribe(new Observer<String>() {

            @Override
            public void onCompleted(a) {}@Override
            public void onError(Throwable e) {}@Override
            public void onNext(String t) {
                Log.e("main"."Filtered value:"+ t); }}); }Copy the code

Result output:

08-07 07:46:45. 444, 17378-17378 / com haocai. Architect. Rxjava E/main: the filtered value: Michael 08-07 07:46:45. 445, 17378-17378 / com haocai. Architect. Rxjava E/main: the filtered value: Pioneer 08-07 07:46:45. 445, 17378-17378 / com haocai. Architect. Rxjava E/main: the filtered value: Michael 08-07 07:46:45. 445, 17378-17378 / com haocai. Architect. Rxjava E/main: the filtered value: Huni 08-07 07:46:45. 445, 17378-17378 / com haocai. Architect. Rxjava E/main: the filtered value: King 08-07 07:46:45. 445. 17378-17378 / com haocai. Architect. Rxjava E/main: the filtered value: HuniCopy the code

As the name suggests, an Observable sends only the First data item in an Observable sequence.


    private Observable<String> getApps(a) {
        list = new ArrayList<String>();
        list.add("Michael");
        list.add("pioneer");
        list.add("Huni");
        list.add("King");
        list.add("Cookie");
        // first: send the first value in the sequence (internal call take(1).single()))
        // last: send last (takeLast(1).single()))
        return Observable.from(list).first();
       // return Observable.from(list).last();
    }

  public void click(View v) {
        observable.subscribe(new Observer<String>() {

            @Override
            public void onCompleted(a) {}@Override
            public void onError(Throwable e) {}@Override
            public void onNext(String t) {
                Log.e("main"."Filtered value:"+ t); }}); }Copy the code

Result output:

08-07 08:30:57. 648, 25076-25076 / com haocai. Architect. Rxjava E/main: the filtered value: MichaelCopy the code

(7) Last

Last () emits only the last data item in the observation sequence.


    private Observable<String> getApps(a) {
        list = new ArrayList<String>();
        list.add("Michael");
        list.add("pioneer");
        list.add("Huni");
        list.add("King");
        list.add("Cookie");
        // first: send the first value in the sequence (internal call take(1).single()))
        // last: send last (takeLast(1).single()))
      return Observable.from(list).last();
    }
Copy the code

Result output:

08-07 08:30:57. 648, 25076-25076 / com haocai. Architect. Rxjava E/main: the filtered value: cookiesCopy the code

Skip (int) allows us to ignore the first n items emitted by an Observable.


    /** * Create Observable **@return* /
    private Observable<String> getApps(a) {
        list = new ArrayList<String>();
        list.add("Michael");
        list.add("Pioneer");
        list.add("Huni");
        list.add("King");
        list.add("Cookie");
        list.add("Faker");
        list.add("Gigi");
        // Skip: Start at the beginning, and then send
        // skipLast: How many of the last ones I don't need
        return Observable.from(list).skip(2);
    }


    public void click(View v) {
        observable.subscribe(new Observer<String>() {

            @Override
            public void onCompleted(a) {}@Override
            public void onError(Throwable e) {}@Override
            public void onNext(String t) {
                Log.i("main"."Filtered value:"+ t); }}); }Copy the code

08-07 08:51:58. 061, 12238-12238 / com haocai. Architect. Rxjava I/main: the filtered value: Huni 08-07 08:51:58. 061, 12238-12238 / com haocai. Architect. Rxjava I/main: the filtered value: King 08-07 08:51:58. 061. 12238-12238 / com haocai. Architect. Rxjava I/main: the filtered value: Cookie 08-07 08:51:58. 061. 12238-12238 / com haocai. Architect. Rxjava I/main: the filtered value: Faker 08-07 08:51:58. 061, 12238-12238 / com haocai. Architect. Rxjava I/main: the filtered value: GigiCopy the code

(9)SkipLast

SkipLast (int) ignores the last n items emitted by an Observable.


/** * Create Observable **@return* /
    private Observable<String> getApps(a) {
        list = new ArrayList<String>();
        list.add("Michael");
        list.add("Pioneer");
        list.add("Huni");
        list.add("King");
        list.add("Cookie");
        list.add("Faker");
        list.add("Gigi");
        // Skip: Start at the beginning, and then send
        // skipLast: How many of the last ones I don't need
        return Observable.from(list).skipLast(2);
    }
Copy the code

Result output:

08-07 08:50:01. 719, 10295-10295 / com haocai. Architect. Rxjava I/main: the filtered value: Michael 08-07 08:50:01. 719, 10295-10295 / com haocai. Architect. Rxjava I/main: the filtered value: Pioneer 08-07 08:50:01. 719, 10295-10295 / com haocai. Architect. Rxjava I/main: the filtered value: Huni 08-07 08:50:01. 719, 10295-10295 / com haocai. Architect. Rxjava I/main: the filtered value: King 08-07 08:50:01. 719. 10295-10295 / com haocai. Architect. Rxjava I/main: the filtered value: cookiesCopy the code

(10)SkipLast elementAt(int) is used to get the NTH item in the event sequence emitted by the element Observable and send it as the unique data.


    private Observable<String> getApps(a) {
        list = new ArrayList<String>();
        list.add("Michael");
        list.add("Pioneer");
        list.add("Huni");
        list.add("King");
        list.add("Cookie");
        list.add("Faker");
        list.add("Gigi");
        // Skip: Start at the beginning, and then send
        // skipLast: How many of the last ones I don't need
        return Observable.from(list).elementAt(2);
    }
Copy the code

08-07 09:01:17. 495, 20645-20645 / com haocai. Architect. Rxjava I/main: the filtered value: HuniCopy the code

The Sample operator periodically scans the results generated by the source Observable and takes samples within a specified interval

Get the latest data from the regularly launched Observable

Patients with a


observable.interval(1, TimeUnit.SECONDS).sample(2, TimeUnit.SECONDS).subscribe(
                new Observer<Long>() {

                    @Override
                    public void onCompleted(a) {}@Override
                    public void onError(Throwable e) {}@Override
                    public void onNext(Long t) {
                        Log.i("main"."Received value:"+ t); }});Copy the code

08-07 09:36:08. 478, 20117-20195 / com haocai. Architect. Rxjava I/main: receives the value: 0 08-07 09:36:10. 477, 20117-20195 / com haocai. Architect. Rxjava I/main: receives the value: 2 08-07 09:36:12. 478, 20117-20195 / com haocai. Architect. Rxjava I/main: receives the value: 4 08-07 09:36:14. 479, 20117-20195 / com haocai. Architect. Rxjava I/main: receives the value: 6 08-07 09:36:16. 478, 20117-20195 / com. Haocai. Architect. Rxjava I/main: receives the value: eight...Copy the code

Example 2


 Observable.create(subscriber -> {
        subscriber.onNext(1);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            throw Exceptions.propagate(e);
        }
        subscriber.onNext(2);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            throw Exceptions.propagate(e);
        }

        subscriber.onNext(3);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw Exceptions.propagate(e);
        }
        subscriber.onNext(4);
        subscriber.onNext(5);
        subscriber.onCompleted();

    }).sample(999, TimeUnit.MILLISECONDS)// or throttleLast(1000, timeunit.milliseconds)
            .subscribe(item-> Log.d("JG",item.toString())); 
Copy the code

// result is 2,3,5Copy the code

(12)Timeout Timeout: If the original Observable emits no data after a specified period of time, it emits an exception or uses an alternate Observable.


private Observable<String> getApps(a) {
        observable = Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> observer) {
                        observer.onNext("Kpioneer");
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            throw Exceptions.propagate(e);
                        }
                        observer.onNext("Lucy"); observer.onCompleted(); }});return observable;
    }


    public void click(View v) {
    
        observable.timeout(999, TimeUnit.MILLISECONDS,Observable.just("Michel"."QQ")).subscribe(
                new Observer<String>() {

                    @Override
                    public void onCompleted(a) {}@Override
                    public void onError(Throwable e) {}@Override
                    public void onNext(String t) {
                        Log.i("main"."Received value:"+ t); }}); }Copy the code

Result output:

08-07 10:02:30. 806, 11757-11757 / com haocai. Architect. Rxjava I/main: receives the value: Kpioneer 08-07 10:02:31. 808, 11757-11824 / com haocai. Architect. Rxjava I/main: receives the value: Michel 08-07 10:02:31. 808, 11757-11824 / com haocai. Architect. Rxjava I/main: receives the value: QQCopy the code

If no alternate Observable is specified, the result is Kpioneer, onError

3. Transform operations

The Map () function takes a parameter of type Func1 (as in Map (Func1<? super T, ? Extends R> func)), and then apply Func1 to each value emitted by an Observable, converting the emitted value to the desired value. I’m sure you don’t understand this definition either, but let’s take a look at the official schematic:


    userModelList = new ArrayList<UserModel>();
        for (int i = 0; i < 3; i++) {
            UserModel userModel = new UserModel("userId_" + i, "userName_" + i);
            List<OrderModel> orderList = new ArrayList<OrderModel>();
            for (int j = 0; j < 2; j++) {
                OrderModel orderModel = new OrderModel("userId_" + i
                        + "_orderId_" + j, "user_" + i + "_orderName_" + j);
                orderList.add(orderModel);
            }
            userModel.setOrderList(orderList);
            userModelList.add(userModel);
        }

  Observable.from(userModelList).map(new Func1<UserModel, String>() {
           @Override
           public String call(UserModel userModel) {
               return userModel.getUserName();
           }
       }).subscribe(new Action1<String>() {
           @Override
           public void call(String s) {
               Log.i("main"."Converted value :"+s); }});Copy the code

08-07 11:39:51. 493, 2499-2499 / com haocai. Architect. Rxjava I/main: After the conversion value: userName_0 08-07 11:39:51. 493, 2499-2499 / com. Haocai. Architect. Rxjava I/main: After the conversion value: userName_1 08-07 11:39:51. 493, 2499-2499 / com. Haocai. Architect. Rxjava I/main: after conversion value: userName_2Copy the code

Flatmap () works like this:

1. Replace the incoming event object with an Observable. 2. Instead of sending the Observable directly, it activates the Observable to start sending events by itself; 3. Events sent by each created Observable are merged into the same Observable, which is responsible for transferring these events to the Subscriber callback method. These three steps split the event into two levels, “flattening” the original object through a set of newly created Observables and distributing it down a unified path. This “flat” is what a flatMap() calls a flat.

Finally, let’s look at the schematic of flatMap:

As you must have noticed from the previous examples, flatMap() and map() both convert the parameters passed in and return another object. But unlike map(), flatMap() returns an Observable, which is not sent directly to the Subscriber callback method.


    userModelList = new ArrayList<UserModel>();
        for (int i = 0; i < 3; i++) {
            UserModel userModel = new UserModel("userId_" + i, "userName_" + i);
            List<OrderModel> orderList = new ArrayList<OrderModel>();
            for (int j = 0; j < 2; j++) {
                OrderModel orderModel = new OrderModel("userId_" + i
                        + "_orderId_" + j, "user_" + i + "_orderName_" + j);
                orderList.add(orderModel);
            }
            userModel.setOrderList(orderList);
            userModelList.add(userModel);
        }

       // FlatMap provides this solution (tradeoff)
        // Scenario: solve the problem of interface nesting (for example, login scenario after successful authentication)
        Observable.from(userModelList).flatMap(new Func1<UserModel, Observable<OrderModel>>() {
            @Override
            public Observable<OrderModel> call(UserModel userModel) {
                return Observable.from(userModel.getOrderList());
            }
        }).subscribe(new Action1<OrderModel>() {
            @Override
            public void call(OrderModel orderModel) {
                Log.i("main"."Converted value :"+orderModel.getOrderId()); }});Copy the code