This article is the third in a series of RxJava Meditations. All shared in this series:

  • RxJava Meditation (1) : Do you think RxJava really works?
  • RxJava Meditation (ii) : Spatial dimensions
  • RxJava Meditation (iii) : Time dimension
  • RxJava Meditation (iv) : Summary

In the last post, we should have been impressed by the ability of Observables to reorganize events in the spatial dimension. Naturally, we tend to think of the temporal dimension. In fact, personally, I think Observable’s ability to reorganize events in the temporal dimension is more prominent than its ability in the spatial dimension. Similar to the previous article, the rest of this article will illustrate this argument with real life examples.

Click event anti-jitter

This is a common situation. When the user is stuck on the mobile phone, he clicks a button, which should normally start a page. However, the mobile phone is stuck and does not start immediately, so the user clicks several times.

This requirement is a bit more difficult to handle with Callback, but developers who have used RxJava already know how to handle it:

RxView.clicks(btn)
    .debounce(500, TimeUnit.MILLISECONDS)
    .observerOn(AndroidSchedulers.mainThread())
    .subscribe(o -> {
        // handle clicks
    })
Copy the code

The debounce operator generates a new Observable that emits only the last element of the largest subsequence of the original Observable whose interval is less than the specified threshold. Reference: Debounce

Although this example is relatively simple, it shows that an Observable can reorganize the events it emits in the temporal dimension, which is difficult to do in the previous Callback form.

Likes and unlikes of messages on social software

Liking and unliking is a common requirement on social software. Suppose we have the following code for liking and unliking:

boolean like;

likeBtn.setOnClickListener(v -> {
    if (like) {
        // Unlike
        sendCancelLikeRequest(postId);
    } else {
        / / thumb upsendLikeRequest(postId); } like = ! like; });Copy the code

The following images are from Dribbble

If you happen to be implemented a very cool thumb up animation, the user may play, this time is likely to cause a certain pressure to the backend server, because each thumb up and cancel the thumb up a network request, if many users at the same time playing with the thumb up animation, the server may be under strain.

Similar to the previous example, we need to prevent jitter first:

booleanlike; PublishSubject<Boolean> likeAction = PublishSubject.create(); likeBtn.setOnClickListener(v -> { likeAction.onNext(like); like = ! like; }); likeAction.debounce(1000, TimeUnit.MILLISECONDS)
    .observerOn(AndroidSchedulers.mainThread())
    .subscribe(like -> {
        if (like) {
            sendCancelLikeRequest(postId);
        } else{ sendLikeRequest(postId); }});Copy the code

Wrote this, actually already can solve the problem of server stress, but there are still optimizing space, assuming that the current is already state, users quickly click under 2, according to the above code, will send a thumb up request, because the current is already state, to send a thumb up request is meaningless, So the goal of our optimization is to filter out such events:

Observable<Boolean> debounced = likeAction.debounce(1000, TimeUnit.MILLISECONDS);
debounced.zipWith(
    debounced.startWith(like),
    (last, current) -> last == current ? new Pair<>(false.false) : new Pair<>(true, current)
)
    .flatMap(pair -> pair.first ? Observable.just(pair.second) : Observable.empty())
    .subscribe(like -> {
        if (like) {
            sendCancelLikeRequest(postId);
        } else{ sendLikeRequest(postId); }});Copy the code

The zipWith operator can transform the element emitted by two Observables with the same serial number (both the XTH), and the new element is used as the element emitted by the corresponding serial number of the new Observable. Resources: ZipWith

In the above code, we will see that first we will do a Debounce operation for the event stream and get the Debounce event stream, Then we will do a zipWith operation for the Debmentioning event stream and debmentioning. StartWith (like) event stream. Corresponding to the NTH element (n >= 2) of the new Observable will be calculated by the NTH and n-1st element of the debmentioning event stream (the first element of the new Observable will be by debmentioning The first element in the event stream is derived from the original like state.

The result of this operation is a Pair object, which is a double Boolean tuple. The first element of the tuple is true, which means that the event should not be ignored and should be observed by the observer. If false, it should be ignored. The second element of a tuple is meaningful only if the first element is true, which means that a like operation should be initiated, and false, which means that an unlike operation should be initiated. The rule of the “operation” mentioned above is to compare two elements, if they are equal, set the first element of the bivariate to false, if not, set the first element of the bivariate to true, and at the same time set the second element of the bivariate to the element of debmentioning event flow.

The subsequent flatMap operator does two pieces of logic, filtering out tuples whose first element is false, and converting the tuple back to the original Boolean event stream. This logic can also be done by combining the filter and map operators, but one operator is used here for simplicity.

Although a lot of space has been devoted to explaining what each operator means, the idea is simple: based on the original debounce operator, compare each element in the resulting event stream to its previous element, If the element is the same as the previous one (for example, if a “like” is initiated again while the “like” status is already in place), the element is filtered out so that the final observer will only initiate a network request when it really needs to change the “like” status.

We considered using Callback to implement the same logic, although we could have done the same with Callback, but the debounce operator would have been more complicated to implement using Callback and we would have needed to define a timer. Also responsible for starting and turning off the timer, our Callback had a lot of logic embedded in it that was independent of the observer itself, far less pure than the RxJava version.

Detecting double click events

First, we need to define a double click event. Let’s define a double click event if two clicks are less than 500 milliseconds. Callback (Callback);

long lastClickTimeStamp;

btn.setOnClickListener(v -> {
    if (System.currentTimeMillis() - lastClickTimeStamp < 500) {
        // handle double click}});Copy the code

The above code is easy to understand. We introduce an intermediate variable, lastClickTimeStamp, to confirm that a double click event has occurred by comparing the time difference between the click event and the last click event to be less than 500 milliseconds. So how do you do that with RxJava? As in the previous example, we can reorganize events emitted by the Observable in the time dimension and filter out only click events less than 500 milliseconds after the last click event. The code is as follows:

Observable<Long> clicks = RxView.clicks(btn)
    .map(o -> System.currentTimeMillis())
    .share();
    
clicks.zipWith(clicks.skip(1), (t1, t2) -> t2 - t1)
    .filter(interval -> interval < 500)
    .subscribe(o -> {
        // handle double click
    });
Copy the code

Once again, we use the zipWith operator to compare two adjacent elements of the event stream itself, and this time we use the share operator to ensure that the Observable clicking on the event is turned into a Hot Observable.

In RxJava, observables can be divided into Hot Observables and Cold Observables, citing an image in Learning Reactive Programming with Java 8: We can say that the Cold Observable sends all the elements available to each Subscriber individually each time it is subscribed, while the Hot Observable is always in the running state. During the running process, it sends elements (broadcast and events) to its subscribers. A Hot Observable can be likened to a radio station. Listeners can listen to the current program and the subsequent program from a certain moment, but can’t listen to the previous program. A Cold Observable is like a music CD. People may buy CDS at different times, but they listen to them from the first track. That means everyone hears the same thing on a CD, whether they listen early or late.

While the above double-click detection example doesn’t demonstrate the advantages of RxJava, let’s make the requirement a little more complicated: if the user clicks multiple times in a “short” period of time, only one double click is counted. This requirement is reasonable, because if Callback is written in the above way, although double-clicking can be detected, n-1 double-clicking event will be triggered if the user clicks it quickly n times (interval is less than 500 ms, n >= 2). Assume that network request needs to be initiated in double-clicking processing function. It stresses the server. Implementing this requirement is also simple, as in the previous example, we use the debounce operator:

Observable<Object> clicks = RxView.clicks(btn).share()

clicks.buffer(clicks.debounce(500, TimeUnit.MILLISECONDS))
    .filter(events -> events.size >= 2)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(o -> {
        // handle double click
    });
Copy the code

The buffer operator takes an Observable as a parameter. What matters is not the elements emitted by the Observable, but the time at which these elements are emitted. These time points divide the elements emitted by the original Observable into groups of elements in the time dimension. The new Observable emitted elements returned by the buffer operator are those groups. Reference: Buffer

The above code uses the buffer and debounce operators to neatly convert a stream of clicks into a stream of events that we care about “more than 2 clicks in a short period of time,” and any two adjacent events in the new stream must be more than 500 milliseconds apart.

In this case, if we wanted to use Callback to implement similar logic, the amount of code would be huge and the robustness would not be guaranteed.

Search tips

In the search box we usually use, when the user enters part of the content, the corresponding search prompt will be displayed below. Take Alipay as an example, when the keyword “ant” is entered in the search box, the result related to the keyword will be automatically refreshed below:

To simplify the example, let’s define the interface for searching by keyword as follows:

public interface Api {
    @GET("path/to/api")
    Observable<List<String>> queryKeyword(String keyword);
}
Copy the code

Now that the query interface has been identified, let’s consider what needs to be considered in implementing this requirement:

  1. To prevent too many network requests triggered by too fast user input, you need to perform anti-jitter function for input events.
  2. Multiple requests may be triggered when a user enters a keyword. In this case, if the result of the previous request is returned first and the result of the previous request is returned later, ensure that the interface displays the result of the previous request.
  3. The user may trigger multiple requests during the keyword input process. Therefore, if the result of the previous request has not been returned when the result of the previous request is returned, the previous request should be cancelled.

After considering the above factors, we use RxJava to implement the corresponding code as follows:

RxTextView.textChanges(input)
    .debounce(300. TimeUnit.MILLISECONDS) .switchMap(text -> api.queryKeyword(text.toString())) .observeOn(AndroidSchedulers.mainThread()) .subscribe(results -> {// handle results
    });
Copy the code

The switchMap operator is similar to the flatMap operator, but the difference is that if two elements in the original Observable are converted into Observables through the switchMap operator, if the corresponding Observable emits an element, The previous Observable is automatically unsubscribed if it has not launched all elements, and elements that have not been launched are not included in the new Observable launched elements generated after the switchMap operator is called. Resources: SwitchMap

Analyzing the code above, we can see that the debounce operator solves problems 1 and the switchMap operator solves problems 2 and 3. An RxJava Observable can simplify observer logic by reorganizing events in time through a series of operators. This example would have been very complicated to implement using Callback, requiring a timer and a bunch of intermediate variables, as well as a lot of additional logic in the observer to keep the event dependent on the event.

(To be continued)

This article is part of the “RxJava Meditations “series. Please read more in this series:

  • RxJava Meditation (1) : Do you think RxJava really works?
  • RxJava Meditation (ii) : Spatial dimensions
  • RxJava Meditation (iii) : Time dimension
  • RxJava Meditation (iv) : Summary

If you are interested in my technology sharing, please follow me on my personal account: Muggle Diary, and update my original technology sharing from time to time. Thank you! 🙂