preface

RxJava is a responsive programming extension based on the Java VM implementation – an observer pattern-based asynchronous and event processing framework. RxJava currently maintains two versions of 1.x and 2.x, with different group ids and namespaces.

version group id namespaces
v1.x io.reactivex io.reactivex
v2.x io.reactivex.rxjava2 rx

This series of articles will introduce RxJava 1.x and give Github’s address:

  • RxJava:github.com/ReactiveX/R…
  • RxAndroid:github.com/ReactiveX/R…

Introducing dependencies with Gradle:

compile 'the IO. Reactivex: rxjava: 1.0.14' 
compile 'the IO. Reactivex: rxandroid: 1.0.1' 
Copy the code

The body of the

1. Definition of RxJava

A precise explanation is as follows: RxJava is an asynchronous, event-based library of observable sequences running on the Java VM.

2. Advantages of RxJava

In other words, “Why do people use AsyncTask/Handler/XXX /… when it’s done asynchronously?”

In one word: simplicity.

A key aspect of asynchronous operations is program brevity, because asynchronous code is often both difficult to write and difficult to read in complex scheduling situations. Android created AsyncTask and Handler to make asynchronous code simpler. RxJava also benefits from brevity, but its brevity is distinguished by its ability to keep it simple as the program logic becomes more complex.

In Android development, suppose there is a requirement for a custom view, imageCollectorView, which is used to display multiple images and can use addImage(Bitmap) method to arbitrarily add images to the display. Now we need the program to load the PNG images in each directory in a given directory array File[] folders and display them in imageCollectorView.

Note: Since the process of reading images is time-consuming, it needs to be performed in the background, while the display of images must be performed in the UI thread.

There are many common implementations, one of which is given here:

new Thread() {
    @Override
    public void run(a) {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run(a) { imageCollectorView.addImage(bitmap); }}); } } } } }.start();Copy the code

With RxJava, the implementation looks like this:

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) { imageCollectorView.addImage(bitmap); }});Copy the code

As you can see, the amount of code in RxJava is significantly increased. Where does simplicity come from?

And by simplicity I mean logical. If you look at this implementation of RxJava, it is a chain call from top to bottom, with no nesting, which has the advantage of logical simplicity. This advantage becomes even more apparent when the requirements become more complex (imagine the normal approach if only the top 10 images are required? What if there were more requirements? What if, two months after you’ve implemented a bunch of requirements, you need to change a feature, and when you go back and see the indentation of the puzzle you wrote, you can be sure that you’ll understand it quickly, rather than going through the code all over again?) .

In addition, if your IDE is Android Studio, you will actually see an automatic lambdazed preview every time you open a Java file, which will give you a clearer view of the application logic:

Observable.from(folders) .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) }) .filter((Func1) (file) -> {  file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
Copy the code

So, what are the advantages of RxJava? The beauty of simplicity is that you can thread complex logic through a functional programming model.

3. Expansion of observer mode

The asynchronous implementation of RxJava is implemented through an extended observer pattern.

3.1. The Universal observer model

The observer mode is oriented to the requirement that object A (observer) is highly sensitive to A certain change of object B (observed) and needs to respond at the moment when B changes.

For example, in the news, the police catch a thief. The police need to catch the thief when he reaches for his hand. In this example, the policeman is the observer and the thief is the observed. The policeman needs to keep an eye on the thief’s every move to ensure that no moment is missed.

The observer mode of the program is slightly different. The observer does not need to stare at the observed all the time (for example, A does not need to check the status of B every 2ms), but registers or subscribes to tell the observed: I need a certain state of you, and you need to let me know when it changes.

Adopting this passive observation mode not only saves the resource consumption of repeatedly retrieving the state, but also can get the highest feedback speed.

A typical example of this in Android development is the OnClickListener. For OnClickListener, the View is the observed and OnClickListener is the observer, and the subscription relationship is achieved through the setOnClickListener() method. The moment the user clicks a button after subscribes, the Android Framework sends the click event to the registered OnClickListener.

The observer mode for OnClickListener looks like this:

As shown, Button holds a reference to OnClickListener through the setOnClickListener() method (this process is not illustrated). Button automatically calls the onClick() method of OnClickListener when the user clicks.

Concepts abstracted from the observer model:

  • Button: Observed
  • An OnClickListener: observer
  • SetOnClickListener () : subscriptions
  • OnClick (): event handling

From the dedicated observer mode to the general observer mode, as shown below:

3.2. Observer mode in RxJava

RxJava has four basic concepts:

  • Observable Observable Observable Observable Observable
  • The Observer: the Observer
  • Subscribe: to Subscribe to
  • Event: Event processing

Observables and observers subscribe through the Subscribe () method, which allows an Observable to notify the Observer of events when needed.

Unlike the traditional observer mode, RxJava’s event callback method defines two special events, onCompleted() and onError(), in addition to the normal onNext() event (equivalent to onClick()).

  • OnCompleted (): The event queue is completed

RxJava not only treats each event individually, but also treats them as a queue. RxJava specifies that the onCompleted() method needs to be fired as an event completion signal when no new onNext() will be issued.

  • OnError (): The event queue is abnormal

When an exception occurs during event processing, onError() is raised and the queue terminates automatically, no more events are allowed to be emitted.

In a properly executed sequence of events, onCompleted() and onError() have one and only one called and are the last execution in the sequence of events.

The observer mode for RxJava looks like this:

4. Basic use of RxJava

Based on the above concepts, there are 3 steps to basic use of RxJava:

4.1. To create a Obsever

An Observer is an Observer that determines how events will behave when triggered. The Observer interface in RxJava is declared as follows:

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted(a) {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error: "+ e.getMessage()); }};Copy the code

In addition to the Observer interface, RxJava also comes with an abstract class that implements the Observer: Subscriber. Subscriber makes some extensions to the Observer interface, but their basic usage is exactly the same:

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted(a) {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error: "+ e.getMessage()); }};Copy the code

In essence, the Observer will always be converted to a Subscriber before being used during the SUBSCRIBE process in RxJava. So if you only want to use basic functions, it is exactly the same to select Observer and Subscriber. There are two main differences between them for users:

  • onStart()

This is the method of adding Subscriber. It will be called just before the subscribe starts, but before the event is sent. Can be used to do some preparatory work, such as data clearing or reset. This is an optional method whose implementation is null by default.

Note that onStart() does not apply if there is a requirement for the thread preparing for work (for example, a pop-up dialog showing progress must be executed on the main thread). Because it is always called in the thread where subscribe occurs, the thread cannot be specified. To do the preparatory work on a specified thread, use the doOnSubscribe() method, as you’ll see in a later section.

  • unsubscribe()

This is another interface Subscription method implemented by Subscriber for unsubscribing. After this method is called, Subscriber will no longer receive events. Typically, before invoking this method, you can use isUnsubscribed() to determine the status.

The unsubscribe() method is important because an Observable holds a Subscriber reference after subscribe(). This reference risks a memory leak if it is not released in time.

Note: Call unsubscribe() as soon as possible when it is no longer in use(for example, onPause(), onStop(), etc.) to unsubscribe() to avoid memory leaks.

4.2 create Obsevable

2. Obsevable. The create ()

An Observable, or Observable, decides when and what events are triggered. RxJava uses the create() method to create an Observable and define event-triggering rules for it. The following is an example:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha"); subscriber.onCompleted(); }});Copy the code

As you can see, an OnSubscribe object is passed in as an argument. The OnSubscribe is stored in the returned Observable.

It acts as a scheduler. When an Observable is subscribed, the OnSubscribe call() method is automatically called, and the sequence of events is triggered accordingly. The observer Subscriber will be called onNext() three times and onCompleted() once.

In this way, the observer callback method is invoked by the observed, and the event passing from the observed to the observer is realized, that is, the observer pattern.

4.2.2. Obsevable. Just (T)…

The create() method is RxJava’s most basic method for creating a sequence of events. Based on this approach, RxJava also provides methods for creating event queues quickly, such as the just() method:

Observable observable = Observable.just("Hello"."Hi"."Aloha");
OnNext ("Hello") -> onNext("Hi") -> onCompleted()
Copy the code

Obsevable. From (T[]) and from(Iterable<? Extends T>)

Divide an array or Iterable into concrete objects and send them to observers in turn, as shown in the following example:

String[] words = {"Hello"."Hi"."Aloha"};
Observable observable = Observable.from(words);
OnNext ("Hello") -> onNext("Hi") -> onCompleted()
Copy the code

4.3. The Subscribe link

Once you’ve created an Observable and an Observer, you can associate them with the Subscribe () method, and the chain works. The code is simple:

observable.subscribe(observer);
/ / or
observable.subscribe(subscriber);
Copy the code

You may notice that the subscribe() method is a bit odd: it looks like “Observable subscribes observer/subscriber”, not “Observer/subscriber subscribes Observable”. This seems like a “magazine subscribes to readers” inversion of the object relationship.

This is a bit awkward to read, but it would be more logical to design the API as observer.subscribe(Observable)/subscriber.subscribe(Observable). But it has an impact on the design of the streaming API, which is obviously not worth the cost.

The internal implementation of Observable.subscribe(Subscriber) looks like this (core code):

public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}
Copy the code

You can see that subscriber() does three things:

(a). Calls to the Subscriber. OnStart ()

This method, described earlier, is an optional preparation method.

(b). Call OnSubscribe. Call (Subscriber) in Observable

The event sending logic starts running. As you can see, in RxJava, an Observable doesn’t start sending events immediately when it’s created, but when it’s subscribed, when the subscribe() method executes.

(c). Return to the Subscription

Incoming Subscriber is returned as Subscription. This is for the convenience of unsubscribe().

The relationship between objects in the whole process is shown as follows:

Or watch the GIF:

In addition to SUBSCRIBE (Observer) and subscribe(Subscriber), subscribe() supports incomplete defined callbacks, and RxJava automatically creates Subscriber based on the definition. The form is as follows:

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) { Log.d(tag, s); }}; Action1<Throwable> onErrorAction =new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling}}; Action0 onCompletedAction =new Action0() {
    // onCompleted()
    @Override
    public void call(a) {
        Log.d(tag, "completed"); }};// Automatically create Subscriber and use onNextAction to define onNext()
observable.subscribe(onNextAction);
// Automatically create Subscriber and define onNext() and onError() using onNextAction and onErrorAction
observable.subscribe(onNextAction, onErrorAction);
// Automatically create Subscriber and use onNextAction, onErrorAction and onCompletedAction to define onNext(), onError() and onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
Copy the code

Briefly explain the Action1 and Action0 that appear in this code.

  • Action0

Action0 is an RxJava interface that has only one method, call(), which takes no arguments and returns no values. Since the onCompleted() method also takes no arguments and returns no value, Action0 can be treated as a wrapper object, wrapping up the contents of onCompleted() and passing subscribe() itself as an argument to implement an incompletely defined callback.

  • Action1

Action1 is also an interface and again has only one method, call(T param), which also returns no value but takes one argument. As with Action0, onNext(T obj) and onError(Throwable error) are also single arguments with no return value, Action1 can therefore wrap onNext(obj) and onError(error) and pass subscribe() to implement an incompletely defined callback.

In fact, although Action0 and Action1 are the most widely used apis, RxJava provides multiple ActionX-style interfaces (e.g. Action2, Action3) that can be used to wrap different methods with no return value.

4.4. Scenario Example

4.4.1. Print string arrays

Print all the strings in the string array NAMES in turn:

String[] names = ... ; Observable.from(names) .subscribe(new Action1<String>() {
        @Override
        public void call(String name) { Log.d(tag, name); }});Copy the code

4.4.2. Get picture display by ID

intdrawableRes = ... ; ImageView imageView = ... ; Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
}).subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted(a) {}@Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); }});Copy the code

Create an Observable and Subscriber, and subscribe() to connect them. RxJava is easy to use.

However.

summary

By default in RxJava, events are emitted and consumed in the same thread. In other words, if the above method is used only, the result is a synchronous observer pattern. The observer pattern itself is intended to be a “background processing, foreground callback” asynchrony mechanism, so asynchrony is essential to RxJava. To implement asyncracy, another core concept of RxJava called Scheduler is used, which is described in more detail later.


Welcome to pay attention to the technical public number: Zero one Technology Stack

This account will continue to share learning materials and articles on back-end technologies, including virtual machine basics, multithreaded programming, high-performance frameworks, asynchronous, caching and messaging middleware, distributed and microservices, architecture learning and progression.