This is the second post in the “RxJava Meditations “series. All shares in this series:

  • RxJava Meditations I: Do you think RxJava is really good?
  • RxJava Meditations ii: Spatial Dimensions
  • RxJava Meditations (III) : The Time Dimension
  • RxJava Meditations (IV) : Summary

In our last post, we clarified some of the most popular misconceptions about RxJava right now: “Chaining is what RxJava is all about,” “RxJava is asynchronous plus succinct,” and “RxJava is for Callback Hell.” At the end of the last article, we learned that RxJava actually gives us the most basic functionality to help us unify the interface for all asynchronous callbacks. But RxJava doesn’t stop there. In this article, we’ll first introduce the ability of Observables to reorganize events on a spatial dimension.

Let’s start with a simple example

Scenario: Have a photo album app that gets a list of current users’ photos from the web and displays them in RecyclerView:

public interface NetworkApi {
    @GET("/path/to/api")
    Call<List<Photo>> getAllPhotos();
}
Copy the code

Above is the interface that uses Retrofit’s DEFINED API to get photos from the web. As you know, if we use Retrofit’s RxJavaCallAdapter we can change the return type of the interface from Call > to Observable >:

public interface NetworkApi {
    @GET("/path/to/api")
    Observable<List<Photo>> getAllPhotos();
}
Copy the code

So our code for displaying photos using this interface should look like this:

NetworkApi networkApi = ...
networkApi.getAllPhotos()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(photos -> {
        adapter.setData(photos);
        adapter.notifyDataSetChanged();
    });
Copy the code

Now add a new requirement that requests the current user’s photo list. This network request needs to be cached (the URL of the image in the network response is cached, and the Bitmap cache of the image is given to a special image loading framework such as Glide). This means that when the user wants to display the photo list, First, the cache reads the user’s photo list for loading (if the cache contains the data from the last access to this interface), and then initiates a network request. When the network request returns, the cache is updated, and the photo list is refreshed with the latest returned data. If we choose to use JakeWharton’s DiskLruCache as our cache medium, the above code will change to:

DiskLruCache cache = ... 
DiskLruCache.Snapshot snapshot = cache.get("getAllPhotos");
if(snapshot ! =null) {
    // Read the cached data and deserialize it
    List<Photo> cachedPhotos = new Gson().fromJson(
        snapshot.getString(VALUE_INDEX),
        new TypeToken<List<Photo>>(){}.getType()
    );
    // Refresh the photo list
    adapter.setData(photos);
    adapter.notifyDataSetChanged();
}
NetworkApi networkApi = ...
networkApi.getAllPhotos()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(photos -> {
        adapter.setData(photos);
        adapter.notifyDataSetChanged();

        // Update the cache
        DiskLruCache.Editor editor = cache.edit("getAllPhotos");
        editor.set(VALUE_INDEX, new Gson().toJson(photos)).commit();
    });
Copy the code

The above code is the most intuitive solution to the requirement. Let’s think about it a little bit more. Reading a file cache is also a time-consuming operation.

Observable<List<Photo>> cachedObservable = Observable.create(emitter -> {
    DiskLruCache.Snapshot snapshot = cache.get("getAllPhotos");
    if(snapshot ! =null) {
        List<Photo> cachedPhotos = new Gson().fromJson(
            snapshot.getString(VALUE_INDEX),
            new TypeToken<List<Photo>>(){}.getType()
        );
        emitter.onNext(cachedPhotos);
    } 
    emitter.onComplete();
});
Copy the code

So far, the asynchronous operations of making a network request and reading from the cache have both been encapsulated as Observables. The first benefit of converting the original asynchronous callbacks into Observables is that the observables can be reorganized in a spatial dimension.

networkApi.getAllPhotos()
    .doOnNext(photos -> 
        // Update the cache
        cache.edit("getAllPhotos")
            .set(VALUE_INDEX, new Gson().toJson(photos))
            .commit()
    )
    // Read the existing cache
    .startWith(cachedObservable)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(photos -> {
        adapter.setData(photos);
        adapter.notifyDataSetChanged();
    });
Copy the code

When the startWith operator is called, a new Observable is generated. The new Observable emits the elements contained in the incoming Observable before it emits the elements contained in the original Observable. For example, Observable A contains elements A1 and A2, and Observable B contains elements B1 and B2, then b.Tartwith (A) returns new Observable emission sequence a1, A2, B1, b2. — Resources: StartWith

In the example above, we connect the network request and the cache read two Observables, which originally required two asynchronous tasks to process the result separately. We now combine them into one, specifying an observer to satisfy the requirement. This observer is called back twice, the first time as a result from the cache, and the second time as a result from the network, which is reflected on the interface as the list is refreshed twice.

This leads us to think that if we have n asynchronous tasks, we need to specify n callbacks. If n asynchronous tasks have been encapsulated into Observable, we can classify, combine, and transform The Observable. After such processing, the number of observers will be reduced, and the responsibility will become simple and direct. It only needs to respond to the data type it cares about. It doesn’t matter where the data came from or how it changed.

Taking this one step further, the above example adds another requirement: if the data returned from the network request is consistent with the data in the cache that was responded in advance, there is no need to refresh again. In other words, if the cached data is consistent with the network data, the network data does not need to refresh the list again after the cache data is refreshed.

Consider that if we use the traditional Callback form and specify two callbacks to handle the request, we would need to define a variable outside of the two callbacks to hold the cached data in order to ensure that the same data is not refreshed on the second network request. Then, in the callback to the network request, the two values are compared to determine whether the interface needs to be refreshed.

But if we use RxJava to implement this requirement, how do we write:

networkApi.getAllPhotos()
    .doOnNext(photos -> 
        cache.edit("getAllPhotos")
            .set(VALUE_INDEX, new Gson().toJson(photos))
            .commit()
    )
    .startWith(cachedObservable)
    // Guarantee that the same data will not appear
    .distinctUntilChanged()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(photos -> {
        adapter.setData(photos);
        adapter.notifyDataSetChanged();
    });
Copy the code

The distinctUntilChanged operator is used to ensure that two adjacent elements emitted by an Observable must be unequal. Reference: Distinct

With just one more line than the original.distinctUntilChanged() (we assume the equals method to compare whether two objects are equal has been implemented), the observer can only call back once if the network data and cached data are consistent.

When we compare the method of writing the Callback with the method of assembling the Observable, we can see that using the method of writing the Callback often causes the internal logic of the Callback to change due to the change of the requirement. Using the method of writing the Callback often causes the logic of the Callback to change due to the change of the requirement. The core logic of the observer is more stable and rarely changes (in this case, refreshing the list). Observable reorganizes the emitted elements in spatial dimension through built-in operators, or reorganizes them together with other Observables in spatial dimension, so that the logic of observers is simple and direct, without the need to care about where the data comes from, so that the logic of observers is relatively stable.

A complicated example

Scenario: Implement a RecyclerView with multiple types, as shown in the figure:

Let’s assume that there are three types of data in the list that together populate a RecyclerView. For simplicity, let’s define the Retrofit interface as follows:

public interface NetworkApi {
    @GET("/path/to/api")
    Observable<List<ItemA>> getItemListOfTypeA();
    @GET("/path/to/api")
    Observable<List<ItemB>> getItemListOfTypeB();
    @GET("/path/to/api")
    Observable<List<ItemC>> getItemListOfTypeC();
}
Copy the code

So far, the situation is simple, I just maintain 3 RecyclerViews and update them separately. However, we have now received a new requirement that the order in which the three types of data appear in the list is configurable, and not all of the three types of data need to be displayed, that is, it is possible to display three or only two of them. We define the corresponding interface:

public interface NetworkApi {
    @GET("/path/to/api")
    Observable<List<ItemA>> getItemListOfTypeA();
    @GET("/path/to/api")
    Observable<List<ItemB>> getItemListOfTypeB();
    @GET("/path/to/api")
    Observable<List<ItemC>> getItemListOfTypeC();
    // The order of the data to be displayed
    @GET("/path/to/api")
    Observable<List<String>> getColumns();
}
Copy the code

The newly added getColumns interface returns data like:

  • ["a", "b", "c"]
  • ["b", "a"]
  • ["b", "c"]

First consider how this requirement can be implemented using the normal Callback form. Because the three kinds of data are now variable order, the number of RecyclerView can not be determined, if still consider by multiple RecyclerView to maintain the need to call addView in the layout, removeView to add and remove RecyclerView, so that the performance is not good enough, Consider filling all data into a RecyclerView, with different types of data separated by different ItemTypes. In the following code I still use Observable, but I use it as a normal Callback function:

private NetworkApi networkApi = ...
// The order in which different types of data appear
private List<String> resultTypes;
// The collection of data corresponding to these types
private LinkedList<List<? extends Item>> responseList;

public void refresh(a) {
    networkApi.getColumns().subscribe(columns -> {
        // Save the configuration column order
        resultTypes = columns;
        responseList = new LinkedList<>(Collections.nCopies(columns.size(), new ArrayList<>()));
        for (String type : columns) {
            switch (type) {
                case "a":
                    networkApi.getItemListOfTypeA().subscribe(data -> onOk("a", data));
                    break;
                case "b":
                    networkApi.getItemListOfTypeB().subscribe(data -> onOk("b", data));
                    break;
                case "c":
                    networkApi.getItemListOfTypeC().subscribe(data -> onOk("c", data));
                    break; }}}); }private void onOk(String type, List<? extends Item> response) {
    // Update the data in the corresponding location according to the configuration order
    responseList.set(resultTypes.indexOf(type), response);
    // Fill a List with the currently returned data
    List<Item> data = new ArrayList<>();
    for (List<? extends Item> itemList: responseList) {
        data.addAll(itemList);
    }
    // Update the list
    adapter.setData(data);
    adapter.notifyDataSetChanged();
}
Copy the code

In the above code, to avoid Callback Hell, I’ve moved onOk up to the external level in advance to make the code easy to read from the top down. But if you have the same feeling as me, code like this is not very “cohesive” because it flatters the Callback, leaving some of the intermediate variables exposed.

With that in mind, let’s analyze the data flow:

  1. refreshMethod makes the first request to get what needs to be presentednThe type and order of the data.
  2. According to the result of the first request, n requests are initiated and the results of each data are obtained respectively.
  3. onOkMethods are called back as observersnIn the order returned by the first interface2And notifies the interface of updates.

It’s a bit like writing a composition, it’s a total-sub-total structure.

Observable reorganizes events in a spatial dimension

Next, we implement this requirement using RxJava. We’ll use some of RxJava’s operators to reorganize the Observable:

NetworkApi networkApi = ...

networkApi.getColumns()
    .map(types -> {
        List<Observable<? extends List<? extends Item>>> requestObservableList = new ArrayList<>();
        for (String type : types) {
            switch (type) {
                case "a":
                    requestObservableList.add(
                        networkApi.getItemListOfTypeA().startWith(new ArrayList<ItemA>())
                    );
                    break;
                case "b":
                    requestObservableList.add(
                        networkApi.getItemListOfTypeB().startWith(new ArrayList<ItemB>())
                    );
                    break;
                case "c":
                    requestObservableList.add(
                        networkApi.getItemListOfTypeC().startWith(new ArrayList<ItemC>())
                    );
                    break; }}return requestObservableList;
    })
    .flatMap(requestObservables -> Observable.combineLatest(requestObservables, objects -> {
        List<Item> items = new ArrayList<>();
        for (Object response : objects) {
            items.addAll((List<? extends Item>) response);
        }
        returnitems; })) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(data -> { adapter.setData(data);  adapter.notifyDataSetChanged(); });Copy the code

Let’s take a step-by-step look at the specific steps of RxJava processing. Networkapi.getcolumns () returns an Observable that emits only one element, which is the column list to be displayed. For the sake of further discussion, Suppose the order of columns is [“a”, “b”, “c”], as shown in the figure below:

The next operator is the Map operator, which transforms the original Observable into a new Observable, which once again emits only one element of type List, The data type inside the List is changed from a string (representing the data type) to an Observable. Can elements emitted by an Observable still be a “List of Observables”? Yes, there is no reason why not:)

The map operator converts all emitted elements from an Observable, creating an Observable that emits new elements. The type of element changes, but the number of emitted elements does not change. Resources: Map

[“a”, “b”, “c”], respectively, to obtain the data list of the corresponding column according to the data type. For example, if the column type is A, Corresponds to a networkApi. GetItemListOfTypeA () request. StartWith (new ArrayList<>())) is an Observable that returns an empty List of data before returning the actual List. The reasons for this will be explained in the next step.

The next step is probably the most difficult to understand. The map operator is followed by the flatMap operator, and inside the lambda expression passed by the flatMap operator, the Observable.combineLatest operator is called, Let’s start with the combineLatest operator, as shown below:

The first parameter to the combineLatest operator is requestObservables, which is a List of Observables of type. It is the data emitted by the new Observable after the map operator is transformed in the previous step

  • networkApi.getItemListOfTypeA().startWith(...)
  • networkApi.getItemListOfTypeB().startWith(...)
  • networkApi.getItemListOfTypeC().startWith(...)

A List of three Observables.

The second argument to the combineLatest operator is a lambda expression of type Object[]. The length of the array is equal to the length of the requestObservables. Each element in the Object[] array is the element emitted by each Observable in the requestObservables, i.e.

  • Object[0]The correspondingrequestObservables[0]Emitted element
  • Object[1]The correspondingrequestObservables[1]Emitted element
  • Object[2]The correspondingrequestObservables[2]Emitted element

So when is this lambda expression called? When any Observable in the requestObservables emits an element, that element is added to the last emitted element of all remaining Observables in the requestObservables. Call this lambda expression as an argument.

So what the whole combineLatest operator does is, it returns a new Observable, using a set of Obsevable vable things in the first argument, calling the lambda expression in the second argument, Take the return value of this lambda expression as the value that the new Observable emits, and emit the elements as many times as the lambda is called.

Reference: CombineLatest

The logic inside the lambda expression here is simple, which is to sum up the data returned by the three interfaces to form a new List. Looking back at the above diagram, we can see that the new Observable returned by Observable.combinLatest emits four elements. They are:

  • []
  • [{ItemB}, {ItemB}, ...]
  • [{ItemA}, {ItemA}, ..., {ItemB}, {ItemB}, ...]
  • [{ItemA}, {ItemA}, ..., {ItemB}, {ItemB}, ..., {ItemC}, {ItemC}, ...]

Front left a question does not explain why three obtain concrete column data interfaces need to call startWith operators launch a blank list, like this: networkApi. GetItemListOfTypeA () startWith (…). If the combineLatest operator is not called, the new Observable generated by the combineLatest operator will emit only one element, the last of the four. From the user’s point of view, the new Observable must wait until all columns are successfully requested. Instead of showing it incrementally.

Having said the inner combineLatest operator, it’s time to say the outer flatMap operator. The flatMap operator also generates a new Observable that passes a lambda expression, Map each element emitted by the old Observable into an Observable, and then take all elements emitted by those observables as elements emitted by the new Observable.

Reference: FlatMap

In our case, only one Observable was emitted before the call to flatMap, so the element emitted by the new Observable generated after the call to flatMap, That’s the element emitted by the generated Observable after the lambda passed in by the flatMap operator, This is consistent with the Observable emitted after the combineLatest operator has been executed.

So far, we’ve explained every step of the RxJava implementation. Let’s go back and rehash the RxJava transformation of an Observable, as shown below:

Using the RxJava operator, we reorganize the four observables returned by the four networkApi interfaces into an Observable, The element type emitted by the Observable is List

, which is the data type that our observer, the Adapter, cares about. The observer only needs to listen to the Observable and update the data.

Before we talked about this version of the RxJava implementation, we said that the Callback implementation was not cohesive enough, and a comparison of the current version of RxJava does show that this version is more cohesive. But it’s not that the Callback version doesn’t have a way to be more coherent. We can encapsulate the onOk, refresh, resultTypes, and responseList methods and fields in the Callback version into one object. Externally, exposing only the refresh method and a method to set the observer can achieve the same cohesion, but this requires additional work. This is different if we use RxJava, which provides a bunch of ready-made operators to write cohesive code directly by transforming and reorganizing observables.

Of all the operators that appear in the above code, the most central one is the combineLatest operator. A careful comparison between the RxJava version and the Callback version reveals that The combineLatest operator performs the same function as the first half of the onOk method, resultTypes, and responseList in the Callback version. It collects data from multiple interfaces. On the other hand, ensure that the collected data is in the same order that the previous interface returned the data that should be displayed.

A more functional way of writing it

The RxJava version has the same amount of code as the Callback version. For those who are good at functional programming, the RxJava version of the for loop is not “functional” enough. We can change the RxJava version of the for loop to a more compact and functional version:

NetworkApi networkApi = ...

netWorkApi.getColumns()
    .flatMap(types -> Observable.fromIterable(types)
        .map(type -> {
            switch (type) {
                case "a": return netWorkApi.getItemListOfTypeA().startWith(new ArrayList<ItemA>());
                case "b": return netWorkApi.getItemListOfTypeB().startWith(new ArrayList<ItemB>());
                case "c": return netWorkApi.getItemListOfTypeC().startWith(new ArrayList<ItemC>());
                default: throw new IllegalArgumentException();
            }
        })
        .<List<Observable<? extends List<? extends Item>>>>collectInto(new ArrayList<>(), List::add)
        .toObservable()
    )
    .flatMap(requestObservables -> Observable.combineLates(requestObservables, objects -> objects))
    .flatMap(objects -> Observable.fromArray(objects)
        .<List<Item>>collectInto(new ArrayList<>(), (items, o) -> items.addAll((List<Item>) o))
        .toObservable()
    )
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(data -> {
        adapter.setData(data);
        adapter.notifyDataSetChanged();
    });
Copy the code

A new operator, collectInto, is introduced to collect elements emitted from an Observable into a mutable container. In this example, it replaces the for loop logic. The details are not expanded here. Resources: CollectInto

summary

The second example takes a lot more time than I originally expected, and it’s obvious that the learning curve for RxJava is steep, but I think it captures the point I’m trying to make in this section, which is that Observables reorganize events on a spatial scale, Let’s event-driven programming more imagination, because of the original programming, we face many asynchronous task, will write how much a callback, if there is a dependency between tasks, our approach is to modify the observer (callback function) logic and new data structure to ensure dependency, RxJava bring us new ideas, Before an Observable event reaches the observer, it can perform a series of transformations (of course, the transformation rules are related to the specific business logic) through the operator to mask the complexity of the data generated by the observer and only provide the observer with a simple data interface.

In this case, is the RxJava version better? My personal opinion is that while the RxJava version shows a more imaginative approach to programming, in this particular case, there is not much difference. RxJava can write shorter, more cohesive code, but it’s harder to write and understand. The Callback version, while unsophisticated, is easier to write, easier to understand, and more maintainable. Let’s not jump to conclusions about the pros and cons of either, but let’s look at what other benefits RxJava has.

(To be continued)

This article belongs to the “RxJava Meditations “series. Please read the rest of this series.

  • RxJava Meditations I: Do you think RxJava is really good?
  • RxJava Meditations ii: Spatial Dimensions
  • RxJava Meditations (III) : The Time Dimension
  • RxJava Meditations (IV) : Summary

If you are interested in my technology sharing, please follow my personal public account: Muggle Diary, and update original technology sharing from time to time. Thank you! 🙂