Recently, Rxjava framework was used in the project, which felt very powerful, so I searched a lot of related articles on the Internet, and found a very good article. Today, I share this article with you, thanks to the original author’s dedication, so let’s have a look at this article:

preface

I started using RxJava last year, more than a year now. After joining Flipboard this year, I saw that Flipboard’s Android projects are also using RxJava in more and more scenarios. In recent months, I’ve noticed more and more people in China talking about RxJava. Some people say “RxJava is really good”, some people say “RxJava is really difficult to use”, others say: I really Baidu and Google, but I still want to ask: what is RxJava?

Given RxJava’s current hot and mysterious status, and what I’ve learned from a year of using it, I decided to write this article to provide a relatively detailed introduction to RxJava for Android developers.

The purpose of this article is twofold: 1. To give those interested in RxJava some guidelines for getting started 2. For those of you who are using RxJava but still wondering, a little more in-depth parsing

Directory:

  • What exactly is RxJava
  • Where is good RxJava
  • API introduction and principle analysis
    • 1. Concept: Extended observer mode
      • Observer model
      • RxJava observer mode
    • 2. Basic implementation
      • 1) create the Observer
      • 2) create observables
      • 3) Subscribe
      • 4) Scenario examples
        • A. Print an array of strings
        • B. Obtain the picture by id and display it
    • 3. Thread Control — Scheduler (1)
      • 1) Scheduler API (I)
      • 2) Principle of Scheduler (I)
    • 4. The transformation
      • 1) API
      • 2) Transformation principle: lift()
      • 3) Compose: transform the whole Observable
    • 5. Thread Control: Scheduler
      • 1) Scheduler API (ii)
      • 2) Principle of Scheduler (II)
      • 3) Extension: doOnSubscribe()
  • Application scenarios and usage of RxJava
    • 1. Integration with Retrofit
    • 2. RxBinding
    • 3. Various asynchronous operations
    • 4. RxBus
  • The last
    • About the author:
    • Why do I write this?

At the end of the text before you start, put the GitHub link and gradle code to introduce dependencies: https://github.com/ReactiveX/RxJava https://github.com/ReactiveX/RxAndroid introduced depend on: The compile ‘IO. Reactivex: rxjava: 1.0.14’ compile ‘IO. Reactivex: rxandroid: 1.0.1’ (version number is the latest stable version of the post)

In addition, Thank RxJava Core members of the fire Feng Lin technical support and internal test reader code home, Bao Yongzhang, Drakeet, Ma Lin, sometimes indulge, the program is not ape, big head ghost, XZoomEye, Xideyu, TCahead, Tiiime, Ailurus, nerd, demon, greatly greatly minister brother, NicodeLee help, And zhou Botong recruitment sponsorship.

What exactly is RxJava

In a word: asynchronous.

RxJava – a library for composing Asynchronous and event-based programs using Observable sequences for composing The Java VM” (a library that uses observable sequences on the Java VM to compose asynchronous, event-based programs). This is RxJava, and it’s summed up very precisely.

However, for beginners, this is too hard to understand. Because it is a “summary”, and beginners need an “introduction”.

In fact, the essence of RxJava can be reduced to the word asynchronous. At its root, it is a library that implements asynchronous operations on which all other attributes are based.

Where is good RxJava

In other words, “Why do people use AsyncTask/Handler/XXX /… when it’s done asynchronously?”

In one word: simplicity.

A key aspect of asynchronous operations is program brevity, because asynchronous code is often both difficult to write and difficult to read in complex scheduling situations. Android created AsyncTask and Handler to make asynchronous code simpler. RxJava also benefits from brevity, but its brevity is distinguished by its ability to keep it simple as the program logic becomes more complex.

Suppose you have a need for a custom view, imageCollectorView, that displays multiple images and can arbitrarily add images using addImage(Bitmap). Now we need the program to load the PNG images in each directory in a given directory array File[] folders and display them in imageCollectorView. Note that since the process of reading images is time-consuming, it needs to be performed in the background, whereas the display of images must be performed in the UI thread. There are several common implementations, one of which I have posted here:

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() { imageCollectorView.addImage(bitmap); }}); } } } } }.start();Copy the code

With RxJava, the implementation looks like this:

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            returngetBitmapFromFile(file); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { imageCollectorView.addImage(bitmap); }});Copy the code

That talk: “you this code obviously changed much! Concise hair!” I’m talking about logical brevity, not simple code. (Logical brevity is the key to fast reading and writing code, right?) . If you look at this implementation of RxJava, it is a chain call from top to bottom, with no nesting, which has the advantage of logical simplicity. This advantage becomes even more apparent when the requirements become more complex (imagine the normal approach if only the top 10 images are required? What if there were more requirements? What if, two months after you’ve implemented a bunch of requirements, you need to change a feature, and when you go back and see the indentation of the puzzle you wrote, you can be sure that you’ll understand it quickly, rather than going through the code all over again?) .

In addition, if your IDE is Android Studio, you will actually see an automatic lambdazed preview every time you open a Java file, which will give you a clearer view of the application logic:

Observable.from(folders) .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) }) .filter((Func1) (file) -> {  file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });Copy the code

If you’re comfortable with Retrolambda, you can just write the code in the terse form above. And if you see here and don’t know what Retrolambda is, I don’t recommend learning about it right now. Lambda is a double-edged sword that makes your code less readable while keeping it clean, so learning RxJava and Retrolambda at the same time may cause you to overlook some of the technical details of RxJava; 2. Retrolambda is an unofficial Java 6/7 compatibility scheme for Lambda expressions, and its backward compatibility and stability are not guaranteed, so using Retrolambda is risky for enterprise projects. So, unlike many RxJava promoters, I don’t recommend learning Retrolambda along with RxJava. In fact, as much as I personally admire Retrolambda, I’ve never used it.

In Flipboard’s Android code, there is a piece of logic that is very complex, consisting of multiple memory operations, local file operations and network operations, objects merging and merging, threads collaborating and waiting for each other, and lines up words, and lines up words. It would have been a hell of a lot of writing to do in the normal way, but with RxJava, it’s still just a chain call. It’s long, but clear.

So what’s so good about RxJava? It’s the simplicity, the simplicity that threads any complex logic into a single line.

API introduction and principle analysis

I can’t say that in one word… Because the main content of this section is to explain step by step how RxJava is really asynchronous and concise.

1. Concept: Extended observer mode

The asynchronous implementation of RxJava is implemented through an extended observer pattern.

Observer model

Let’s start with a brief description of the observer pattern. You can skip this section if you’re already familiar with it.

The observer mode is oriented to the requirement that object A (observer) is highly sensitive to A certain change of object B (observed) and needs to respond at the moment when B changes. For example, in the news, the police catch a thief. The police need to catch the thief when he reaches for his hand. In this example, the policeman is the observer and the thief is the observed. The policeman needs to keep an eye on the thief’s every move to ensure that no moment is missed. The observer mode of the program is slightly different from this kind of real “observation”. The observer does not need to stare at the observed all the time (for example, A does not need to check the status of B every 2ms), but registers, or subscribes, to tell the observed: I need your status, and you need to let me know when it changes. A typical example of this in Android development is the OnClickListener. For OnClickListener, the View is the observed and OnClickListener is the observer, and the subscription relationship is achieved through the setOnClickListener() method. The moment the user clicks a button after subscribes, the Android Framework sends the click event to the registered OnClickListener. Adopting this passive observation mode not only saves the resource consumption of repeatedly retrieving the state, but also can get the highest feedback speed. Of course, this also benefits from the fact that we can customize the observer and observed in our own program, while the police obviously cannot ask the thief to “inform me when you commit a crime”.

The OnClickListener mode looks like this:

As shown, Button holds a reference to OnClickListener through the setOnClickListener() method (this process is not drawn); Button automatically calls the onClick() method of OnClickListener when the user clicks. In addition, if the concepts in this diagram are abstracted (Button -> Observed, OnClickListener -> Observer, setOnClickListener() -> subscription, onClick() -> event), From a dedicated observer mode (for example, only listening for clicks on controls) to a general observer mode. The diagram below:

RxJava, as a tool library, uses a generic form of the Observer pattern.

RxJava observer mode

RxJava has four basic concepts: Observable, Observer, Subscribe, and event. Observables and observers subscribe through the subscribe() method, which allows an Observable to notify the Observer of events when needed.

Unlike traditional observer mode, RxJava’s event callback method defines two special events: onCompleted() and onError(), in addition to the normal onNext() event (equivalent to onClick()/onEvent()).

  • onCompleted(): The event queue ends. RxJava not only treats each event individually, but also treats them as a queue. RxJava stipulates that there will be no new onesonNext()When emitted, it needs to be firedonCompleted()Method as a flag.
  • onError(): The event queue is abnormal. When an exception occurs during event processing,onError()Is emitted, and the queue terminates automatically. No more events are allowed to be emitted.
  • In a correctly running sequence of events,onCompleted()onError()There is one and only one, and it is the last one in the sequence of events. It’s important to note that,onCompleted()onError()They are also mutually exclusive, meaning that if one is called in the queue, the other should not be called.

The observer mode for RxJava looks like this:

2. Basic implementation

Based on the above concepts, the basic implementation of RxJava has three main points:

1) create the Observer

An Observer is an Observer that determines how events will behave when triggered. The Observer interface in RxJava is implemented as follows:

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!"); }};Copy the code

In addition to the Observer interface, RxJava also comes with an abstract class that implements the Observer: Subscriber. Subscriber makes some extensions to the Observer interface, but their basic usage is exactly the same:

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!"); }};Copy the code

In essence, in the subscribe process of RxJava, Observer will always be converted to a Subscriber and then used. So if you only want to use basic functions, it is exactly the same to select Observer and Subscriber. There are two main differences between them for users:

  1. onStart(): this isSubscriberWays to increase. It is called at the beginning of the subscribe but before the event is sent, and can be used to do some preparatory work, such as clearing or resetting data. This is an optional method whose implementation is null by default. It is important to note that if there is a requirement for the thread preparing work (such as a pop-up dialog showing progress, which must be executed on the main thread),onStart()Does not apply, because it is always called in the thread where subscribe occurs, and cannot specify the thread. To do preparatory work in the specified thread, you can usedoOnSubscribe()Method, which can be seen in a later article.
  2. unsubscribe(): this isSubscriberAnother interface implementedSubscriptionTo unsubscribe. After this method is called,SubscriberEvents will no longer be received. It can be used before this method is calledisUnsubscribed()Let’s judge the state.unsubscribe()This method is important because insubscribe()After that,ObservableWill holdSubscriberThis reference, if not released in time, risks a memory leak. So it’s best to keep the principle that it should be in place as soon as it’s no longer in use (e.gonPause() onStop()And so onunsubscribe()Dereference to avoid memory leaks.
2) create observables

An Observable, or Observable, decides when and what events are triggered. RxJava uses the create() method to create an Observable and define event-triggering rules for it:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<?  super String> subscriber) { subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha"); subscriber.onCompleted(); }});Copy the code

As you can see, an OnSubscribe object is passed in as an argument. OnSubscribe is stored in the returned Observable, which acts as a schedule. When an Observable is subscribed, the OnSubscribe call() method is automatically called. The sequence of events will be fired accordingly (for the above code, observer Subscriber will be called onNext() three times and onCompleted() once). In this way, the observer callback method is invoked by the observed, and the event passing from the observed to the observer is realized, that is, the observer pattern.

The example is simple: the content of the event is a string, not some complex object; The content of the event is already determined, unlike some observer patterns that are to be determined (e.g. the result of a network request is unknown until the request is returned); All events are sent at once, rather than interspersed with certain or uncertain intervals of time or triggered by some trigger. In short, this example seems to be of no practical value. But just for the sake of illustration, you can essentially write all sorts of event sending rules yourself if you want. We’ll talk about how to do that later, but not right now. Only the basic principle of the first said clearly, the application of the upper can be easier to say.

The create() method is RxJava’s most basic method for creating a sequence of events. Based on this approach, RxJava also provides methods to quickly create event queues, such as:

  • just(T...): Sends the parameters passed in sequence.
Observable observable = Observable.just("Hello"."Hi"."Aloha"); // will call: // onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();Copy the code
  • from(T[]) / from(Iterable<? extends T>): will pass an array or可迭代Break them down into specific objects and send them out in turn.
String[] words = {"Hello"."Hi"."Aloha"}; Observable observable = Observable.from(words); // will call: // onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();Copy the code

Just above the (T)… The create(OnSubscribe) example and the from(T[]) example are both equivalent to the previous create(OnSubscribe) example.

3) Subscribe

Once you’ve created observables and observers and joined them together with the Subscribe () method, the chain works. The code form is simple:

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);Copy the code

One might notice that the subscribe() method is a bit odd: It looks like “Observalbe subscribed to observer/subscriber” instead of “Observer/subscriber subscribed to observalbe”, which looks like “magazine subscribed to reader” inverting the object relationship. Subscribe (Observable)/subscriber.subscribe(Observable) is more logical, But it has an impact on the design of the streaming API, which is obviously not worth the cost.

The internal implementation of Observable.subscribe(Subscriber) looks like this (core code only) :

Subscribe (); subscribe(); subscribe(); // If you need to see the source code, you can go to RxJava GitHub repository to download. public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); onSubscribe.call(subscriber);return subscriber;
}Copy the code

As you can see, subscriber() does three things:

  1. callSubscriber.onStart(). This method, described earlier, is an optional preparation method.
  2. callObservableIn theOnSubscribe.call(Subscriber). At this point, the logic for event sending starts running. This also shows that in RxJava,ObservableIt does not start sending events as soon as they are created, but as soon as they are subscribed, i.e. whensubscribe()Method is executed.
  3. Will the incomingSubscriberAs aSubscriptionTo return. It’s for convenienceunsubscribe().

The relationship between objects in the whole process is shown as follows:

Or watch the GIF:

In addition to SUBSCRIBE (Observer) and subscribe(Subscriber), subscribe() supports incomplete defined callbacks, and RxJava automatically creates Subscriber based on the definition. The form is as follows:

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed"); }}; // Automatically create Subscriber and define onNext() observable.subscribe(onNextAction) with onNextAction; // Automatically create Subscriber, And use onNextAction and onErrorAction to define onNext() and onError() Observable. Subscribe (onNextAction, onErrorAction); // Automatically create Subscriber, And define onNext(), onError(), and onCompleted() using onNextAction, onErrorAction, and onCompletedAction observable.subscribe(onNextAction, onErrorAction, onCompletedAction);Copy the code

Briefly explain the Action1 and Action0 that appear in this code. Action0 is an RxJava interface that has only one method, call(), which takes no arguments and returns no values; Since the onCompleted() method also takes no arguments and returns no value, Action0 can be treated as a wrapper object, wrapping up the contents of onCompleted() and passing subscribe() itself as an argument to implement an incompletely defined callback. This can also be seen as passing the onCompleted() method as an argument to subscribe(), the equivalent of a “closure” in some other languages. Action1 is also an interface. It also has only one method, call(T param), which also returns no value but takes one argument; As with Action0, onNext(T obj) and onError(Throwable error) are also single arguments with no return value, Action1 can therefore wrap onNext(obj) and onError(error) and pass subscribe() to implement an incompletely defined callback. In fact, although Action0 and Action1 are the most widely used apis, RxJava provides multiple ActionX-style interfaces (such as Action2 and Action3) that can be used to wrap different methods with no return value.

Note: As mentioned earlier, Observer and Subscriber have the same role, and Observer is eventually converted to Subscriber object during subscribe(), so from here, In the following description, I will use Subscriber instead of Observer, which is more rigorous.

4) Scenario examples

Here are two examples:

In order to express the principles in a clearer way, the examples selected in this article are as simple as possible, so that some of the sample code looks like “gild the lily” and “could have solved the problem more easily without RxJava”. When you see this, it’s not because RxJava is too long-winded, but because giving examples of real scenarios early in the day is not good for the principle, so I deliberately chose simple scenarios.

A. Print an array of strings

Print all the strings in the string array NAMES in turn:

String[] names = ...;
Observable.from(names)
    .subscribe(new Action1<String>() {
        @Override
        public void call(String name) {
            Log.d(tag, name);
        }
    });Copy the code
B. Obtain the picture by id and display it

Drawable (drawableRes, drawableRes, drawableRes, drawableRes, drawableRes);

int drawableRes = ... ; ImageView imageView = ... ; Observable.create(new OnSubscribe<Drawable>() { @Override public void call(Subscriber<? super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); } }).subscribe(new Observer<Drawable>() { @Override public void onNext(Drawable drawable) { imageView.setImageDrawable(drawable); } @Override public voidonCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); }});Copy the code

As in the two examples above, create an Observable and Subscriber, and then connect them with subscribe() to complete the basic use of RxJava in one go. Very simple.

However,

By default in RxJava, events are emitted and consumed in the same thread. In other words, if the above method is used only, the result is a synchronous observer pattern. The observer pattern itself is intended to be a “background processing, foreground callback” asynchrony mechanism, so asynchrony is essential to RxJava. To implement asynchrony, we use another CONCEPT of RxJava: Scheduler.

3. Thread Control — Scheduler (1)

Without specifying a thread, RxJava follows the thread-invariant principle: the thread on which subscribe() is called produces events; Events are consumed in the thread where they are produced. If threads need to be switched, a Scheduler is used.

1) Scheduler API (I)

In RxJava, Scheduler — the equivalent of a thread controller — specifies which thread each piece of code should run on. RxJava already has several schedulers built in that are suitable for most usage scenarios:

  • Schedulers.immediate(): Runs directly on the current thread. This is the defaultScheduler.
  • Schedulers.newThread(): Always enable a new thread and perform operations on the new thread.
  • Schedulers.io(): Used for I/O operations, such as reading and writing files, reading and writing databases, and network information interactionScheduler. Behavioral patterns andnewThread()Almost. The difference isio()The internal implementation is to use an unlimited number of thread pool, can reuse idle threads, so most of the timeio()newThread()More efficient. Don’t put the calculation work onio()To avoid creating unnecessary threads.
  • Schedulers.computation(): used in calculationsScheduler. This calculation refers to CPU intensive computing, that is, operations that do not limit performance by operations such as I/O, such as graphics computation. thisSchedulerFixed thread pool used, number of CPU cores. Do not place I/O operations incomputation()Otherwise, the CPU will be wasted waiting time for I/O operations.
  • In addition, Android has a dedicated oneAndroidSchedulers.mainThread(), which specifies that the operation will run on the Android main thread.

With these schedulers in place, threads can be controlled using the subscribeOn() and observeOn() methods. SubscribeOn (): specifies the thread where subscribe() occurs, that is, the thread where Observable.OnSubscribe is activated. Or the thread of event generation. * observeOn(): specifies the thread on which Subscriber is running. Otherwise known as event-consuming threads.

Text narration is always difficult to understand, above code:

Observable.just(1, 2, 3, 4). SubscribeOn (Schedulers. IO ()) / / specifies the subscribe () in IO thread. ObserveOn (AndroidSchedulers. MainThread ()) / / specifies the Subscriber Subscribe (new Action1<Integer>() {@override public void Call (Integer number) {log.d (tag,"number:"+ number); }});Copy the code

In the above code, contents 1, 2, 3 and 4 of the created event will be issued in the IO thread due to subscribeOn(schedulers.io ()). As observeOn (AndroidScheculers mainThread ()) of the specified, thus the subscriber digital printing will take place in the main thread. Before this, in fact, the subscribe () write two sentences subscribeOn (Scheduler. IO ()) and observeOn (AndroidSchedulers. MainThread (), the use of the method is very common, It applies to most “background thread fetch, main thread display” program strategy.

Get the image from the image id and display the image.

int drawableRes = ... ; ImageView imageView = ... ; Observable.create(new OnSubscribe<Drawable>() { @Override public void call(Subscriber<? super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); }}). SubscribeOn (Schedulers. IO ()) / / specify the subscribe () in IO thread. ObserveOn (AndroidSchedulers. MainThread ()) / / specified Subscribe (new Observer<Drawable>() {@override public void onNext(Drawable Drawable) { imageView.setImageDrawable(drawable); } @Override public voidonCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); }});Copy the code

So, loading images will take place on the IO thread, and setting images will take place on the main thread. This means that loading images can take tens or even hundreds of milliseconds without causing any interface lag.

2) Principle of Scheduler (I)

The Scheduler API of RxJava is handy and magical. And isn’t subscribe() the outermost method called directly, even if it can be assigned a thread? . The principle of Scheduler needs to be left behind, however, as it is based on the principle of The next section, Transformations.

Well, I didn’t say anything in this video, just to reassure you that I didn’t forget the principle, but put it in a more appropriate place.

4. The transformation

We’re finally going somewhere cool, and whether you’re excited or not, I am.

RxJava provides support for transforming sequence of events, which is one of its core features and the biggest reason most people say “RxJava is awesome”. The so-called transformation is to process the object or the whole sequence of events into different events or event sequences. Concepts are always vague and hard to understand. Look at the API.

1) API

Let’s start with an example of map() :

Observable.just("images/logo.png"Map (new Func1<String, Bitmap>() {@override public Bitmap Call (String filePath) {// Parameter type StringreturngetBitmapFromPath(filePath); Subscribe (new Action1<Bitmap>() {@override public void call(Bitmap Bitmap) {// Argument type Bitmap showBitmap(bitmap); }});Copy the code

There is a class called Func1. Much like Action1, it is also an RxJava interface for wrapping methods that take one argument. Func1 differs from Action in that Func1 wraps a method that returns a value. Also, FuncX, like ActionX, has multiple methods for different numbers of arguments. The difference between FuncX and ActionX is that FuncX wraps a method that returns a value.

As you can see, the map() method converts the String in the argument to a Bitmap and returns it, and after the map() method, the parameter type of the event is changed from String to Bitmap. This direct transformation of the object and return is the most common and easily understood transformation. However, RxJava transforms much more than that. It can be applied not only to event objects, but to entire event queues, which makes RxJava very flexible. Let me list a few common transformations:

  • map(): The direct transformation of the event object, the specific function has been introduced above. It is the most commonly used RxJava transformation.map()Schematic diagram of:

  • FlatMap (): This is a useful but very difficult transformation to understand, so I decided to cover it in more detail. Let’s start with the requirement that we have a data structure “students” and now print out the names of a group of students. The implementation is simple:

Student[] students = ... ; Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String name) { Log.d(tag, name);  }... }; Observable.from(students) .map(new Func1<Student, String>() { @Override public String call(Student student) {return student.getName();
        }
    })
    .subscribe(subscriber);Copy the code

Very simple. And suppose: what if you were to print out the names of all the courses each student needs to take? (The difference in requirements is that each student has only one name but multiple courses.) First, it can be implemented like this:

Student[] students = ... ; Subscriber<Student> subscriber = new Subscriber<Student>() { @Override public void onNext(Student student) { List<Course> courses = student.getCourses();for(int i = 0; i < courses.size(); i++) { Course course = courses.get(i); Log.d(tag, course.getName()); }}... }; Observable.from(students) .subscribe(subscriber);Copy the code

It’s still very simple. So what if I don’t want to use a for loop in Subscriber, but rather want a single Course object passed directly into Subscriber (which is important for code reuse)? This is obviously not possible with map(), because map() is a one-to-one conversion, and I’m now asking for a one-to-many conversion. So how do you convert one Student into multiple courses?

At this point, flatMap() is needed:

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);Copy the code

As you can see from the above code, flatMap() has one thing in common with map() : it also converts an passed parameter and returns another object. However, it should be noted that, unlike map(), flatMap() returns an Observable, which is not directly sent to the Subscriber callback method. FlatMap () works like this: 1. Create an Observable using the incoming event object. 2. Instead of sending the Observable, it activates it and it starts sending events. 3. Events sent by each created Observable are merged into the same Observable, which is responsible for transferring these events to the Subscriber callback method. These three steps split the event into two levels, “flattening” the original object through a set of newly created Observables and distributing it down a unified path. This “flat” is what a flatMap() calls a flat.

FlatMap () :

Extension: flatMap() is also used for nested asynchronous operations, such as nested network requests, since asynchronous code can be added to nested Observables. Sample code (Retrofit + RxJava) :

Networkclient.token () // Returns Observable<String>, which requests a token when subscribing. FlatMap (new Func1<String, Observable<Messages> () {@override public Observable<Messages> call(String token) {// Return Observable<Messages>, The list of messages is requested at subscription time and the list of messages requested is sent in responsereturnnetworkClient.messages(); Subscribe (new Action1<Messages>() {@override public void call(Messages) {// Process display message list showMessages(messages); }});Copy the code

Traditional nested requests require nested Callback to be implemented. FlatMap (), on the other hand, allows you to write nested requests in a chain to keep your program logic clean.

  • throttleFirst(): Discards new events within a certain interval after each event is triggered. Commonly used for jitter filtering, such as button click listeners:Rxview.clickevents (button) // RxBinding, ThrottleFirst (500, timeunit.milliseconds) // Subscribe (subscriber);Mom is no longer afraid of my user’s hand shaking open two duplicate screens.

In addition, RxJava also provides many convenient methods to implement the transformation of the sequence of events, here are not one example.

2) Transformation principle: lift()

Although these transformations have different functions, they are essentially for the processing and retransmission of event sequences. Inside RxJava, they are based on the same basic transformation method: Lift (Operator). First take a look at the internal implementation of Lift () (core code only) :

// Note: this is not the source code for Lift (), but the core code after removing the code related to performance, compatibility, and extensibility. // If you need to see the source code, you can go to RxJava GitHub repository to download. public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {returnObservable.create(new OnSubscribe<R>() { @Override public void call(Subscriber subscriber) { Subscriber newSubscriber = operator.call(subscriber); newSubscriber.onStart(); onSubscribe.call(newSubscriber); }}); }Copy the code

This code is interesting: It generates a new Observable and returns it, and the implementation in call() of the OnSubscribe callback used to create the new Observable looks exactly like the observable.subscribe () one! The key difference is that onSubscribe in the second line onSubscribe. Call (subscriber) refers to a different object (warning: the next few sentences may cause serious discomfort)

  • subscribe()In this sentenceonSubscribeRefers to theObservableIn theonSubscribeObject, no problem with that, butlift()After that, things got a little more complicated.
  • When containinglift()When:

    1.lift()Created aObservablePlus the originalObservableThere are already twoObservableA;

    1. 2. Smart refrigeratorObservableIn the newOnSubscribePlus the originalObservableThe originalOnSubscribeSo there are two of themOnSubscribe;

    3. After the call is made by the userlift()After theObservablesubscribe()“Is usedlift()The new one returnedObservableSo what it triggeredonSubscribe.call(subscriber)It’s new, tooObservableIn the newOnSubscribeIn which thelift()The one generated inOnSubscribe;

    4. And this new oneOnSubscribecall()In the methodonSubscribe“, refers to the originalObservableThe originalOnSubscribeAnd, in thecall()In the method, newOnSubscribeusingoperator.call(subscriber)It generates a new oneSubscriber(OperatorRight here, through his owncall()Method will be the newSubscriberAnd the originalSubscriberAssociation, and insert your own “transform” code to implement the transform), and then take advantage of this newSubscriberTo the originalObservableSubscribe.

    So that’s itlift()Process, sort ofLike a proxy mechanism, event sequence transformation is implemented through event interception and processing.

If you cut down the details, you can also say: After an Observable executes the Lift (Operator) method, it returns a new Observable that acts as a proxy for events emitted by the original Observable. And send to Subscriber after processing.

If you prefer concrete thinking, here’s a picture:

Or watch the GIF:

The same goes for double and multiple lift(), as shown below:

Give a concrete Operator implementation. Here is an example of converting an Integer object from an event to a String, just for reference:

observable.lift(new Observable.Operator<String, Integer>() { @Override public Subscriber<? super Integer> call(final Subscriber<? Super String> subscriber) {// Convert the Integer object in the event sequence to a String objectreturn new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                subscriber.onNext("" + integer);
            }

            @Override
            public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); }}; }});Copy the code

The rationale behind Lift () is simply to give you a better understanding of RxJava so you can use it better. However, regardless of whether you understand lift() or not, RxJava does not recommend that developers customize operators to use lift() directly. Instead, RxJava recommends that existing lift() wrapper methods (such as map() flatMap(), etc.) be combined to implement the requirements. Using lift() directly is very easy to make hard-to-spot errors.

3) Compose: transform the whole Observable

In addition to lift(), an Observable has a transform called compose(Transformer). It differs from lift() in that lift() is for event items and event sequences, while compose() transforms for the Observable itself. For example, suppose you have multiple Observables in your application, and they all need to apply the same set of Lift () transforms. You can write it like this:

observable1
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);
observable2
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber2);
observable3
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber3);
observable4
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);Copy the code

You think this is too unsoftware engineering, so you change it to this:

private Observable liftAll(Observable observable) {
    returnobservable .lift1() .lift2() .lift3() .lift4(); }... liftAll(observable1).subscribe(subscriber1); liftAll(observable2).subscribe(subscriber2); liftAll(observable3).subscribe(subscriber3); liftAll(observable4).subscribe(subscriber4);Copy the code

Readability and maintainability have been improved. Observable, however, is wrapped in a method that seems to limit Observale’s flexibility a bit. How to do? At this point, you should compose() :

public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
    @Override
    public Observable<String> call(Observable<Integer> observable) {
        returnobservable .lift1() .lift2() .lift3() .lift4(); }}... Transformer liftAll = new LiftAllTransformer(); observable1.compose(liftAll).subscribe(subscriber1); observable2.compose(liftAll).subscribe(subscriber2); observable3.compose(liftAll).subscribe(subscriber3); observable4.compose(liftAll).subscribe(subscriber4);Copy the code

With the compose() method, an Observable can use the call method of the incoming Transformer object to process itself directly without being wrapped in the method itself.

For compose(), the principle is simple.

5. Thread Control: Scheduler

In addition to flexible transformations, RxJava’s other great feature is free control of threads.

1) Scheduler API (ii)

As mentioned earlier, you can implement thread control by subscribeOn() and observeOn() so that events are generated and consumed on different threads. However, after learning about transformation methods like map() flatMap(), some good people (in fact, when I was first introduced to RxJava) asked: Can I switch threads a few more times?

The answer is: yes. Because observeOn() specifies the thread of Subscriber, which is not in the subscribe() argument, It is the Subscriber corresponding to the current Observable when observeOn() executes, that is, its direct subordinate Subscriber. In other words, observeOn() specifies the thread on which the operations that follow it occur. So if you need to switch threads multiple times, simply call observeOn() once for each location where you want to switch threads. The code:

Observable. Just (1, 2, 3, 4) // IO thread, Schedulers.io().observeon (schedulers.newthread ()).map(mapOperator) // newThread, Schedulers.io(). Map (mapOperator2) // the IO thread is schedulers.io (). By observeOn () to specify. ObserveOn (AndroidSchedulers. MainThread). The subscribe (the subscriber); // Android main thread, specified by observeOn()Copy the code

As shown above, the program implements multiple thread switches through multiple calls to observeOn().

However, unlike observeOn(), the subscribeOn() location can be placed anywhere, but it can only be called once.

Again, it’s a good thing (and it’s me) to ask: What if I have to call multiple subscribeOn()? What effect will it have?

Let’s start with the principle of RxJava thread control.

2) Principle of Scheduler (II)

Lift () is also used for the internal implementations of subscribeOn() and observeOn(). Look at the picture (different colored arrows indicate different threads) :

SubscribeOn () Schematic diagram:

ObserveOn () schematic diagram:

As you can see from the figure, both subscribeOn() and observeOn() do the work of thread switching (“schedule…” in the figure). Parts). The difference is that the thread switch of subscribeOn() occurs in OnSubscribe, that is, when it notifies the next level of OnSubscribe, the event has not been sent yet, so the thread control of subscribeOn() can be affected from the beginning of sending the event. The thread switch for observeOn() happens in its built-in Subscriber, that is, when it is about to send an event to the next Subscriber, so observeOn() controls the thread behind it.

Finally, I use a diagram to explain how thread scheduling occurs when multiple subscribeOn() and observeOn() are mixed (with some simplified structure adjustments compared to the diagram above due to the large number of objects in the diagram) :

There are five actions on events in the diagram. As can be seen from the figure, ① and ② are affected by the first subscribeOn() and run in the red thread; ③ and ④ are affected by the first observeOn() and run on the green thread; ⑤ is affected by the second onserveOn() and runs on the purple thread; While for the second subscribeOn(), the thread is truncated by the first subscribeOn() in the notification process, so it has no impact on the whole process. This answers the previous question: When multiple subscribeOn() are used, only the first subscribeOn() plays a role.

3) Extension: doOnSubscribe()

However, although more than one subscribeOn() has no effect on the process of event processing, it is available before the process.

When talking about Subscriber, it was mentioned that onStart() of Subscriber can be used as initialization before the process starts. However, since onStart() is called when SUBSCRIBE () occurs, it cannot specify a thread, but can only execute the thread when subscribe() is called. As a result, if onStart() contains code that requires a thread (such as displaying a ProgressBar on the interface, which must be executed on the main thread), there is a risk of thread ilvalidation, because sometimes you can’t predict which thread subscribe() will execute on.

Corresponding to subscriber.onstart (), there is a method observable.doonsubscribe (). Like subscriber.onstart (), it is executed after subscribe() and before the event is sent, but the difference is that it can specify a thread. By default, doOnSubscribe() is executed on the thread where subscribe() occurs; If doOnSubscribe() is followed by subscribeOn(), it will execute the thread specified by the closest subscribeOn() to it.

Sample code:

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() { progressBar.setVisibility(View.VISIBLE); / / need in the main thread execution}}) subscribeOn (AndroidSchedulers. MainThread ()) / / specify the main thread. ObserveOn (AndroidSchedulers. MainThread ()) .subscribe(subscriber);Copy the code

As you see, you specify the thread to prepare by subscribeOn() after doOnSubscribe().

Application scenarios and usage of RxJava

1. Integration with Retrofit

Retrofit is a well-known web request library for Square. If you haven’t used Retrofit before you can skip this section, it doesn’t matter. Each of the scenarios I’ve mentioned are just examples, and they’re not connected, they’re just examples, so you can skip here and look at other scenarios.

Retrofit provides the traditional Callback API as well as the RxJava version of the Observable API. Let me show you how Retrofit’s RxJava version OF the API differs from traditional versions by way of comparison.

Take the interface to get a User object as an example. Using Retrofit’s traditional API, you can define requests like this:

@GET("/user")
public void getUser(@Query("userId") String userId, Callback<User> callback);Copy the code

During the process of building the application, Retrofit automatically implements the methods and generates code so that the developer can get specific users and handle the responses using the following methods:

getUser(userId, new Callback<User>() { @Override public void success(User user) { userView.setUser(user); } @Override public void failure(RetrofitError error) { // Error handling ... }};Copy the code

Using the RXJava-style API, the same request is defined like this:

@GET("/user")
public Observable<User> getUser(@Query("userId") String userId);Copy the code

Here’s how it works:

getUser(userId)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() { } @Override public void onError(Throwable error) { // Error handling ... }});Copy the code

See the difference?

In RxJava form, Retrofit encapsulates the request into an Observable that calls onNext() when the request ends or onError() when the request fails.

By contrast, the Callback form looks different from the Observable form, but the essence is similar, and the Observable form seems to be inferior to the Callback form in details. So why does Retrofit support RxJava?

Because it works! You can’t tell from this example because this is just the simplest case. As soon as the situation gets complicated, the Callback form becomes a headache. Such as:

Imagine a situation where the User fetched by your application should not be displayed directly, but should be compared and corrected against data in the database. Callback: Callback Callback

getUser(userId, new Callback<User>() { @Override public void success(User user) { processUser(user); // Try to modify User data userview.setuser (User); } @Override public void failure(RetrofitError error) { // Error handling ... }};Copy the code

Any questions?

It’s easy, but don’t do it. Why is that? Because doing so can affect performance. The operation of the database is very heavy, and it is common for a read and write operation to take 10~20ms, which is easy to cause interface lag. So in general, you should avoid processing databases in the main thread if you can. So to improve performance, this code can be optimized:

getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        new Thread() {
            @Override
            public void run() { processUser(user); // Try to correct User data runOnUiThread(new)Runnable() {// Override public voidrun() { userView.setUser(user); }}); }).start(); } @Override public void failure(RetrofitError error) { // Error handling ... }};Copy the code

Performance issues resolved, but… This code is really too messy, the indented mystery ah! Messy code is often more than just a matter of aesthetics, because the more messy the code, the harder it is to read, and if a project is filled with messy code, it will undoubtedly reduce the readability of the code, resulting in a decrease in team development efficiency and an increase in error rate.

At this point, if you use RxJava form, much easier to do. The code in RxJava form looks like this:

getUser(userId)
    .doOnNext(new Action1<User>() {
        @Override
        public void call(User user) {
            processUser(user);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() { } @Override public void onError(Throwable error) { // Error handling ... }});Copy the code

The background code and the foreground code are all in one chain, which is much clearer.

Here’s another example: Suppose the /user interface is not directly accessible, but you need to fill in a token obtained online.

Callback method, which can be nested with Callback:

@GET("/token")
public void getToken(Callback<String> callback);

@GET("/user")
public void getUser(@Query("token") String token, @Query("userId") String userId, Callback<User> callback); . getToken(new Callback<String>() { @Override public void success(String token) { getUser(token, userId, new Callback<User>() { @Override public void success(User user) { userView.setUser(user); } @Override public void failure(RetrofitError error) { // Error handling ... }}; } @Override public void failure(RetrofitError error) { // Error handling ... }});Copy the code

There’s no performance problem, but you know it, I know it, you know it better if you’re working on a big project.

With RxJava, the code looks like this:

@GET("/token")
public Observable<String> getToken();

@GET("/user")
public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId); . getToken() .flatMap(new Func1<String, Observable<User>>() { @Override public Observable<User> onNext(String token) {return getUser(token, userId);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() { } @Override public void onError(Throwable error) { // Error handling ... }});Copy the code

The logic is done with a flatMap(), which is still a chain. It just looks good, doesn’t it?

Update 03/31, 2016, plus a Sample project I wrote: Rengwuxian RxJava Samples

Okay, so that’s it for Retrofit.

2. RxBinding

RxBinding is an open source library by Jake Wharton that provides an RxJava-based Binding API for the Android platform. Binding is an API for registering Binding objects such as OnClickListener and TextWatcher.

Take an example of setting up click-listening. With RxBinding, event listeners can be set like this:

Button button = ... ; Subscribe (new Action1<ViewClickEvent>() {@override public void Override call(ViewClickEvent event) { // Click handling } });Copy the code

It doesn’t seem to make much difference except in form, and it does in substance. Even if you look at the source code, you’ll see that the implementation isn’t surprising: it’s implemented directly inside with a wrapped setOnClickListener(). However, this one form change is exactly what RxBinding is all about: extensibility. Once the click listener is converted into an Observable with the RxBinding, it becomes possible to extend it. There are many ways to extend, depending on your needs. An example is the aforementioned throttleFirst(), which is used to eliminate jitter, that is, the rapid chain of clicks caused by hand shaking:

RxView.clickEvents(button)
    .throttleFirst(500, TimeUnit.MILLISECONDS)
    .subscribe(clickAction);Copy the code

If you want to learn more about RxBinding, check out its GitHub project.

3. Various asynchronous operations

The previous examples of Retrofit and RxBinding are two libraries that provide ready-made Observables. If you have an asynchronous operation that doesn’t automatically generate an Observable using these libraries, you can write it yourself. Database reading and writing, loading large images, file compression/decompression, and other time-consuming operations that need to be done in the background can all be implemented in RxJava. The examples in previous chapters should not be used here.

4. RxBus

RxBus’s name looks like a library, but it’s not a library, it’s a pattern, and the idea is to use RxJava to implement EventBus without having to use Otto or GreenRobot’s EventBus. For details on what RxBus is, see this article. By the way, Flipboard has replaced Otto with RxBus with no adverse reaction so far.

The last

RxJava is a difficult library for Android developers to get started with because it is so unfamiliar to Android developers. But it’s really cool. That’s why I’ve written RxJava For Android Developers, hoping to give those who still don’t understand what RxJava is, or those who are using RxJava but still have doubts, some insight. In any case, as long as I can give you fellow Android engineers some help, this article has served its purpose.

The author blog: http://gank.io/post/560e15be2dca930e00da1083