• An introduction to responsive programming
    • Specific implementation of responsive programming -RxJava
      • The basic concept
        • The four roles of RxJava
        • Hot Observables and cold Observables
        • Observables create operator
        • Subject
        • Create an Observable directly
        • Create an Observable from a list
        • A creator with a special function
      • Filtering Observables
        • Filter sequence
        • Get the data we need
        • Once and only
        • First and last
        • Skip and SkipLast
        • ElementAt
        • Sampling
        • Timeout
        • Debounce
      • Transform Observables
        • The family of the map
        • GroupBy
        • Buffer
        • Window
        • Cast
      • Combination of Observables
        • Merge
        • ZIP
        • Join
        • combineLatest
        • AndThen and the When
        • Switch
        • StartWith
      • Schedulers- Resolves Android main thread issues
        • Schedulers
          • Schedulersio
          • Schedulerscomputation
          • Schedulersimmediate
          • SchedulersnewThread
          • Schedulerstrampoline
        • Non-blocking IO operations
        • SubscribeOn and ObserveOn
        • Deal with time-consuming tasks
      • conclusion

An introduction to responsive programming

  • Responsive programming is a programming pattern based on the concept of asynchronous data flow. A data stream is like a river: it can be observed, filtered, manipulated, or merged with another stream for a new consumer.
  • A key concept of reactive programming is events. Events can be waited on, processes can be triggered, and other events can be triggered. Events are the only way to map our reality to our software in a proper way: if the room gets too hot we open a window. Similarly, when we change some values in the spreadsheet (propagation of change), we need to update the entire table or our robot turns when it hits a wall (in response to an event).
  • Today, one of the most common scenarios for responsive programming is UI: our mobile apps must respond to network calls, user touch input, and system pop-ups. In this world, software is event-driven and responsive because it is in real life.

Specific implementation of responsive programming -RxJava

The basic concept

The four roles of RxJava

  • Observable
  • Observer
  • Subscriber
  • Subject

Observable and Subject are two “producing” entities, Observer and Subscriber are two “consuming” entities.

Hot Observables and cold Observables

From the emitter perspective, there are two different observables: hot and cold. A “hot” Observable typically starts emitting data as soon as it is created, so all observers that follow up to subscribe to it might start receiving data somewhere in the middle of the sequence (some data is missed). A “cold” Observable waits until an observer subscribes before transmitting data, so the observer can be sure to receive the entire data sequence.

Observables create operator

  • Observable.create()
Observable.create(new Observable.OnSubscribe<Object>(){
    @Override
    public void call(Subscriber<? super Object> subscriber{
    }
});Copy the code
  • The from() from() creator can create an Observable from a list/array, emitting each object one by one from the list/array, or from a Java Future class. And emits the value returned by the Future object’s.get() method. When passing the Future as an argument, we can specify a timeout value. An Observable waits for a result from the Future; If no result is returned by the time out, the Observable fires the onError() method to notify the observer of an error.
List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(10);
items.add(100);
items.add(200);

Observable<Integer> observableString = Observable.from(items);
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
    System.out.println("Observable completed");
    }
    @Override
    public void onError(Throwable e) {
    System.out.println("Oh,no! Something wrong happened!");
    }
    @Override
    public void onNext(Integer item) {
    System.out.println("Item is "+ item); }});Copy the code
  • Observable. Just () The just() method can pass in one to nine arguments, and they emit them in the order of the arguments they pass in. The just() method can also accept lists or arrays, like the FROM () method, but instead of iterating over the list to emit each value, it will emit the entire list. Usually, we use it when we want to emit a set of values that are already defined. But if our function is not time-dependent, we can use just to create a more organized and measurable code base.
Observable<String> observableString = Observable.just(helloWorld
());
Subscription subscriptionPrint = observableString.subscribe(new
Observer<String>() {
    @Override
    public void onCompleted() {
    System.out.println("Observable completed");
    }
    @Override
    public void onError(Throwable e) {
    System.out.println("Oh,no! Something wrong happened!");
    }
    @Override
    public void onNext(String message) { System.out.println(message); }});Copy the code

The helloWorld() method is simpler, like this:

private String helloWorld() {return "Hello World";
}Copy the code

Subject

A Subject can be either an Observable or Observer. RxJava provides four different subjects:

  • PublishSubject
  • The BehaviorSubject BehaviorSubject first sends the latest data object (or initial value) to its subscriber before the subscription, and then sends the subscribed data flow as normal.

    BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);Copy the code

    In this short example, we create a BehaviorSubject that emits an Integer. Because pax emits the latest data every time it subscribes, it needs an initial value.

  • ReplaySubject ReplaySubject caches all its subscribed data, retransmitting it to any observer that subscribed to it:

    ReplaySubject<Integer> replaySubject = ReplaySubject.create();Copy the code
  • AsyncSubject

    When Observable completes, the AsyncSubject publishes only the last data to each observer that has subscribed.

    AsyncSubject<Integer> asyncSubject = AsyncSubject.create();Copy the code

Create an Observable directly

In our first column, we will retrieve the list of installed applications and fill the RecycleView item to display them. We also envision a pull-down refresh feature and a progress bar to inform the user that the current task is being executed.

First, we create an Observable. We need a function to retrieve the list of installed applications and provide it to our observer. We transmitted the application data one by one, grouping them into a separate list to demonstrate the flexibility of a reactive approach.

private Observable<AppInfo> getApps() {return Observable.create(subscriber -> {
        List<AppInfoRich> apps = new ArrayList<AppInfoRich>();
        final Intent mainIntent = new Intent(Intent.ACTION_MAIN, null);
        mainIntent.addCategory(Intent.CATEGORY_LAUNCHER);
        List<ResolveInfo> infos = getActivity().queryIntentActivities(mainIntent, 0);
        for(ResolveInfo info : infos){
            apps.add(new AppInfoRich(getActivity(),info));
        }
        for (AppInfoRich appInfo:apps) {
            Bitmap icon = Utils.drawableToBitmap(appInfo.getIcon());
            String name = appInfo.getName();
            String iconPath = mFilesDir + "/" + name;
            Utils.storeBitmap(App.instance, icon,name);
            if (subscriber.isUnsubscribed()){
                return;
            }
            subscriber.onNext(new AppInfo(name, iconPath, appInfo.getLastUpdateTime()));
        }
        if (!subscriber.isUnsubscribed()){
            subscriber.onCompleted();
        }
    });
}Copy the code

AppInfo is the entity class of App information, including the last update time, icon, and name, which is omitted here.

It is important to detect the observer’s subscription before transmitting new data or completing the sequence. This makes the code more efficient, because we don’t generate unnecessary data items if there are no observers waiting.

Next, let’s define the method of the pull-down refresh:

private void refreshTheList() {
    getApps().toSortedList()
    .subscribe(new Observer<List<AppInfo>>() {
    @Override
    public void onCompleted() {
        Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
    }
    @Override
    public void onError(Throwable e) {
        Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
        mSwipeRefreshLayout.setRefreshing(false);
    }
    @Override
    public void onNext(List<AppInfo> appInfos) {
        mRecyclerView.setVisibility(View.VISIBLE);
        mAdapter.addApplications(appInfos);
        mSwipeRefreshLayout.setRefreshing(false); }}); }Copy the code

Create an Observable from a list

In this example, we will introduce the from() function. Using this special “create” function, we can create an Observable from a list. Observables emit each element in the list, and we can subscribe to them to respond to emitted elements.

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable.from(apps).subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }
        @Override
        public void onError(Throwable e) {
            Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
            mSwipeRefreshLayout.setRefreshing(false);
        }
        @Override
        public void onNext(AppInfo appInfo) {
            mAddedApps.add(appInfo);
            mAdapter.addApplication(mAddedApps.size() - 1, appInfo); }}); }Copy the code

One major difference from the first example is that we stopped the progress bar in the onCompleted() function because we emitted elements one by one; The Observable in the first example emits the entire list, so it’s safe to stop the progress bar in onNext().

A creator with a special function

  • just()

    You can pass a function as an argument to the just() method, and you’ll get a raw Observable version of the existing code. This approach can be a useful starting point for migrating existing code based on a new responsive architecture.

  • repeat()

    Suppose you want to emit data three times to an Observable:

    Observable.just(appOne,appTwo,appThree)
        .repeat(3)
        .subscribe();Copy the code

    We append repeat(3) after just() creates Observable, which will create a sequence of 9 elements, each emitted individually.

  • defer()

    There is a scenario where you want to declare an Observable here but you want to delay the creation of the Observable until the observer subscribes. Look at the getInt() function below:

    private Observable<Integer> getInt() {return Observable.create(subscriber -> {
            if(subscriber.isUnsubscribed()){
                return;
            }
            App.L.debug("GETINT");
            subscriber.onNext(42);
            subscriber.onCompleted();
        });
    }Copy the code

    It’s relatively simple, and it doesn’t do much, but it works for us. Now we can create a new Observable and apply defer() :

    Observable<Integer> deferred = Observable.defer(this::getInt);Copy the code

    This time, the deferred exists, but the getInt() create() method has not yet been called: the logcat log does not print “getInt” :

    deferred.subscribe(number -> {
        App.L.debug(String.valueOf(number));
    });Copy the code

    But once we subscribe, the create() method is called and we can also print two values in the logcat log: GETINT and 42.

  • range()

    Emit N numbers starting with a specified number X. The range() function takes two numbers as arguments: the first is the starting point and the second is the number of numbers we want to emit.

  • interval()

    The interval() function is useful when you need to create a polller. Interval () takes two arguments: one to specify the interval between launches and the other to specify the unit of time used.

  • timer()

    If you need an Observable to launch after a while, you can use timer().

Filtering Observables

Filter sequence

RxJava lets us use the filter() method to filter unwanted values from our observation sequence.

We filter out each emitted element that does not begin with a C:

.filter(new Func1<AppInfo,Boolean>(){
    @Override
    public Boolean call(AppInfo appInfo){
        return appInfo.getName().startsWith("C"); }})Copy the code

We pass a new Func1 object to the filter() function, which takes only one argument. Func1 has an AppInfo object as its parameter type and returns a Boolean object. The filter() function returns true as long as the criteria are true. At this point, the value is emitted and received by all observers.

One of the most common uses of the filter() function is to filter null objects:

.filter(new Func1<AppInfo,Boolean>(){
    @Override
    public Boolean call(AppInfo appInfo){
        returnappInfo ! =null; }})Copy the code

It saves us from having to detect null values in the onNext() function call and lets us focus on applying the business logic.

Get the data we need

We can use take() or takeLast() when we don’t need the whole sequence, but just a few elements at the beginning or end.

  • take()

    The take() function takes the integer N as an argument, emits the first N elements from the original sequence, and does:

    Observable.from(apps)
        .take(3) .subscribe(...) ;Copy the code
  • takeLast()

    If we want the last N elements, we just use the takeLast() function:

    Observable.from(apps)
        .takeLast(3) .subscribe(...) ;Copy the code

Once and only

  • distinct()

    Like takeLast(), Distinct () works on a complete sequence and then gets repeated filters, which need to record the value of each emission. Keep an eye on memory usage if you’re dealing with a lot of sequences or large data.

    Observable<AppInfo> fullOfDuplicates = Observable.from(apps)
        .take(3)
        .repeat(3); fullOfDuplicates.distinct() .subscribe(...) ;Copy the code
  • ditinctUntilChanged()

    What if we are notified when an observable sequence emits a new value that is different from the previous one? The ditinctUntilChanged() filter function does this. It can easily ignore all duplicates and only emit new values.

First and last

The first() and last() methods are easy to figure out. They emit only the first or last element from Observable. Both of these can be passed Func1 as an argument. Variables similar to first() and last() are: firstOrDefault() and lastOrDefault(). These functions are useful when no value is emitted when the observable sequence is complete. In this scenario, you can specify an Observable to emit a default value if it doesn’t emit any more.

Skip and SkipLast

The skip() and skipLast() functions correspond to take() and takeLast(). They take an integer N as an argument, and essentially don’t let observables emit the first or last N values.

ElementAt

What if we only want the fifth element emitted from the observable sequence? The elementAt() function emits only the NTH element from a sequence and is done. What if we want to find a fifth element but there are only three elements in the observable sequence to emit? We can use elementAtOrDefault().

Sampling

Adding sample() to an Observable creates a new Observable sequence that emits the most recent value at a specified time interval:

Observable<Integer> sensor = [...]  sensor.sample(30,TimeUnit.SECONDS) .subscribe(...) ;Copy the code

If we want it to be timed to fire the first element instead of the nearest one, we can use throttleFirst().

Timeout

We can use the timeout() function to listen for the source observable sequence, which emits an error if we don’t get a value within the time interval we set. We can think of timeout() as a time-limited copy of an Observable. If an Observable does not emit values within the specified time interval, the onError() function is raised when it listens to the original Observable.

Subscription subscription = getCurrentTemperature()
    .timeout(2,TimeUnit.SECONDS) .subscribe(...) ;Copy the code

Debounce

The debounce() function filters out data that is emitted too quickly by an Observable; If no one is launched after a specified time interval, it will launch the last one.

The following chart shows how often new data is emitted from an Observable. The debounce() function starts an internal timer. If no new data is emitted within this interval, the new Observable emits the last data:

Transform Observables

* map family

RxJava provides several mapping functions: map(), flatMap(), concatMap(), flatMapIterable(), and switchMap(). All of these functions operate on an observable sequence, transform the values it emits, and finally return them in a new form.

  • Map

    RxJava’s map function takes a specified Func object and applies it to each value emitted by an Observable.

    Observable.from(apps)
        .map(new Func1<AppInfo,AppInfo>(){
            @Override
            public Appinfo call(AppInfo appInfo){
                String currentName = appInfo.getName();
                String lowerCaseName = currentName.toLowerCase();
                appInfo.setName(lowerCaseName);
                returnappInfo; } }) .subscribe(...) ;Copy the code

    As you can see, after creating our emitted Observable as usual, we append a map call. We create a simple function to update the AppInfo object and provide a new version with a lowercase name to the observer.

  • FlatMap

    In a complex scenario, we have an Observable that emits a sequence of data that itself emits an Observable. RxJava’s flatMap() function provides a way to smooth out the sequence, merge the Observables emitted data, and use the merged result as the final Observable.

    When dealing with a potentially large number of Observables, it is important to remember that in any case where an error occurs in one Observables, flatMap() will fire its own onError() function and discard the entire chain. One important tip is about the merge section: it allows crossing. As shown in the figure above, this means that flatMap() does not guarantee the exact firing order of the source Observables in the resulting Observable.

  • ConcatMap

    RxJava’s concatMap() function solves the crossover problem of flatMap(), providing a flat function that can concatenate emitted values together, rather than merge them, as shown below:

  • FlatMapIterable

    As a member of the *map family, flatMapInterable() is similar to flatMap(). The only essential difference is that it pairs source data and generates Iterable instead of raw data items and generated Observables.

  • SwitchMap

    SwitchMap () is similar to flatMap() except that every time the source Observable fires a new data item (Observable), it unsubscribes and stops monitoring the previous one, and starts monitoring the current one.

  • Scan

    RxJava’s scan() function can be thought of as a cumulative function. Scan () applies a function to each data emitted by the original Observable, calculates the result value of the function, and fills the value back into the Observable sequence, waiting to be used with the data emitted next time.

    As a general example, give an accumulator:

    Observable.just(1.2.3.4.5)
        .scan((sum,item) -> sum + item)
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d("RXJAVA"."Sequence completed.");
            }
            @Override
            public void onError(Throwable e) {
                Log.e("RXJAVA"."Something went south!");
            }
            @Override
            public void onNext(Integer item) {
                Log.d("RXJAVA"."item is: "+ item); }});Copy the code

    Here’s what we got:

    RXJAVA: item is: 1

    RXJAVA: item is: 3

    RXJAVA: item is: 6

    RXJAVA: item is: 10

    RXJAVA: item is: 15

    RXJAVA: Sequence completed.

GroupBy

RxJava provides a useful function to group elements from a list according to a specified rule: groupBy(). The example below shows how groupBy() groups the emitted values according to their shape.

This function transforms the source Observable into a new Observable that emits Observables. Each of these new Observables emits a specified set of data.

To create a grouped list of installed applications, we introduce a new element in the loadList() function:

Observable<GroupedObservable<String,AppInfo>> groupedItems = Observable.from(apps)
            .groupBy(new Func1<AppInfo,String>(){
                @Override
                public String call(AppInfo appInfo){
                    SimpleDateFormat formatter = new SimpleDateFormat("MM/yyyy");
                    return formatter.format(newDate(appInfo.getLastUpdateTime())); }});Copy the code

Now we create a new Observable, groupedItems, that will emit a sequence of GroupedObservable. GroupedObservable is a special Observable that derives from a grouping key. In this case, key is String, which stands for the most recent update to the Month/Year format.

Buffer

The buffer() function in RxJava transforms the source Observable into a new Observable that emits a list of values at a time instead of one at a time.

The buffer() function has several variants. One of them allows you to specify a skip value: fill the buffer with count items for each skip item thereafter. The other is buffer(), which takes a Timespan argument and creates an Observable that emits a list every timespan.

Window

RxJava’s window() function is similar to buffer(), but it emits an Observable instead of a list.

Just like buffer(), Window () has a skip variant.

Cast

The cast() function is a special version of the map() operator. It converts every data in the source Observable to a new type, making it a different Class.

Combination of Observables

Merge

In an “asynchronous world” we often create scenarios where we have multiple sources but only one result: multiple inputs, single outputs. RxJava’s merge() method will help you merge two or more Observables into their emitted data items. The following image shows an Observable that merges two sequences into a final emitted one.

As you can see, the emitted data is cross-merged into an Observable. Note that if you synchronize merge Observables, they are linked together and do not cross.

Observable<AppInfo> mergedObserbable = Observable.merge(observableApps,observableReversedApps); mergedObserbable.subscribe(...) ;Copy the code

Note the toast message for errors. You can assume that every Observable throws an error that breaks the merge. If you need to avoid this, RxJava provides mergeDelayError(), which continues to emit data from an Observable even if one of them throws an error. MergeDelayError () will emit onError() when all Observables are complete.

ZIP

This comes when working with multiple data sources in a new possible scenario: receive data from multiple Observables, process them, and then combine them into a new observable sequence for use. RxJava has a special way of doing this: zip() merges the data items emitted by two or more Observables, transforms them according to the specified function Func*, and emits a new value. The following figure shows how the zip() method handles emitted “numbers” and “letters” and then merges them into a new data item:

Observable.zip(observableApp, tictoc, (AppInfo appInfo, Long time) -> updateTitle(appInfo, time)) .observeOn(AndroidSchedulers.mainThread()) .subscribe(...) ;Copy the code

The zip() function takes three arguments: two Observables and a Func2.

Join

The first two methods, zip() and merge(), operate in the context of transmitted data, and there are some scenarios where we need to consider time before deciding how to manipulate values. RxJava’s join() function combines the data emitted by two Observables based on a time window.

To properly understand the previous figure, let’s explain the parameters required for join() :

  • The second Observable combines with the source Observable.
  • The Func1 parameter: the returned Observable that coordinates the data emitted by the source Observable and the data emitted by the second Observable during the specified interval defined by the time window.
  • The Func1 parameter: returns an Observable that coordinates the data emitted by the second Observable with the data emitted by the source Observable during the specified interval defined by the time window.
  • Func2 parameter: Defines how transmitted data is combined with newly transmitted data items.

combineLatest

RxJava’s combineLatest() function is a bit like a special form of the zip() function. As we’ve learned, zip() operates on two Observables that were recently unpacked. Instead, combineLatest() works on the most recently launched data item: If Observable1 fires A and Observable2 fires B and C, combineLatest() will process AB and AC in groups, as shown below:

And, Then, And the When

There are scenarios in the future that zip() won’t suffice. For complex architectures, or just for personal preference, you can use And/Then/When solutions. They join together transmitted data sets under the RxJava joins package, using Pattern and Plan as intermediaries.

Switch

Given a source Observable that emits multiple Observables, switch() subscribes to the source Observable and starts emitting the same data as the first emitted Observable. When the source Observable emits a new Observable, switch() immediately unsubscribes to the previous Observable (thus interrupting the data flow transmitted from it) and then subscribes to a new Observable and starts emitting its data.

StartWith

RxJava’s startWith() is the corresponding part of concat(). Just as concat() appends data to the emitting Observables, startWith() emits a sequence of data by passing a parameter before the observables start emitting their data.

Schedulers- Resolves Android main thread issues

Schedulers

The scheduler uses multithreading in your Apps in the simplest way possible. They are an important part of RxJava and work well with Observables. They provide a flexible way to create concurrent programs without having to deal with implementation, synchronization, threading, platform constraints, and platform variations.

RxJava provides five types of schedulers:

  • .io()
  • .computation()
  • .immediate()
  • .newThread()
  • .trampoline()
Schedulers.io()

This scheduler is used for I/O operations. It is based on growing or shrinking the thread pool from adaptation as needed. We’ll use it to fix StrictMode violations we saw earlier. Because it is dedicated to I/O operations, it is not the default method in RxJava; It’s up to the developer to use it properly.

It is important to note that the thread pool is unlimited, and a large number of I/O scheduling operations will create many threads and consume memory. As always, we need to find the right balance between performance and simplicity.

Schedulers.computation()

This is the default scheduler for computing work and is independent of I/O operations. It is also the default scheduler for many RxJava methods: Buffer (), debounce(), delay(), interval(), sample(), skip().

Schedulers.immediate()

This scheduler allows you to immediately execute your specified task on the current thread. It is the default scheduler for timeout(), timeInterval(), and timestamp() methods.

Schedulers.newThread()

The scheduler is exactly what it looks like: it starts a new thread for the specified task.

Schedulers.trampoline()

When we want to execute a task on the current thread, but not immediately, we can queue it with.trampoline(). The scheduler will process its queue and run each task in the queue in order. It is the default scheduler for the repeat() and retry() methods.

Non-blocking I/O operations

Schedulers.io() creates a non-blocking version:

public static void storeBitmap(Context context, Bitmap bitmap, String filename) {
    Schedulers.io().createWorker().schedule(() -> {
        blockingStoreBitmap(context, bitmap, filename);
    });
}Copy the code

SubscribeOn and ObserveOn

We learned how to run a task on a scheduler. But how do we use it to work with Observables? RxJava provides a subscribeOn() method for each Observable. The subscribeOn() method takes the Scheduler as an argument and makes Observable calls on that Scheduler.

First, we need a new getApps() method to retrieve the list of installed apps:

private Observable<AppInfo> getApps() {
    return Observable.create(subscriber -> {
        List<AppInfo> apps = new ArrayList<>(a); SharedPreferences sharedPref= getActivity().getPreferences(Context.MODE_PRIVATE);
        Type appInfoType = new TypeToken<List<AppInfo>>() {}.getType();
        String serializedApps = sharedPref.getString("APPS"."");
        if (!"".equals(serializedApps)) {
            apps = new Gson().fromJson(serializedApps,appInfoType);
        }
        for (AppInfo app : apps) {
            subscriber.onNext(app);
        }
        subscriber.onCompleted();
    });
}Copy the code

Then, all we need to do is specify that getApps() needs to be executed on the scheduler:

getApps().subscribeOn(Schedulers.io())
    .subscribe(new Observer<AppInfo>() { [...]Copy the code

Finally, we just need to add a few lines of code to the loadList() function, and everything is ready:

getApps()
    .onBackpressureBuffer()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<AppInfo>() { [.]Copy the code

The observeOn() method returns the result on the specified scheduler: the UI thread in the example. The onBackpressureBuffer() method tells an Observable that if the emitted data is faster than the data consumed by the observer, it must store it in the cache and give it a proper time.

Deal with time-consuming tasks

A time-consuming task unrelated to I/O:

getObservableApps(apps)
    .onBackpressureBuffer()
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<AppInfo>() { [.]Copy the code

conclusion

RxJava provides an opportunity to think about data in a time-oriented way: everything is constantly changing, data is updating, events are firing, and then you can create event-responsive, flexible, smooth-running apps.

Remember that observable sequences are like a river: they flow. You can filter a river, you can transform a river, you can combine two rivers into one and still flow as before. In the end, it becomes the river you want.

“Be Water, My friend” – Bruce Lee