preface

When I was writing about RxJava, I was asked to talk about the new features of RxJava2.0, and to be honest, I initially turned them down. Because in my opinion, RxJava2.0 is a major upgrade to the version, but it is still RxJava. How can you upgrade a version? Wouldn’t it be nice to have a look at its updated documentation? Is there really a need for a single article on this?

But after a detailed understanding of RxJava2.0 and part of the source code, I think it is necessary to do a description of RxJava2.0, to help you have a better understanding of RxJava.


matting

If you are not familiar with RxJava, or if you are not familiar with the concept of back pressure (which will be covered in the 2.0 update), I encourage you to read the following two introductory articles:

  • The friendliest article about RxJava
  • The most friendly article about RxJava —- back pressure

The article on back pressure was originally part of this article, but has been removed due to its length, so I suggest you read it as well when you have time.


The body of the

There are a lot of updates to RxJava2.0, and some of the changes even impact the content of my previous article, which is one of the reasons I wanted to write this article. But want to write this article is actually difficult, because the relevant information to actually is very few, but also have their own hard scalp…. But as the saying goes, there are difficulties to go up, there are no difficulties to create difficulties to go up.

Here, I’ll look at the updates in the order we covered them in our previous article on RxJava: observer mode, operators, and thread scheduling.


Add the dependent

That’s got to be at the top of the list.

Using RxJava on Android requires a new package name:

    //RxJava dependencies (the latest version I'm using)
    compile 'the IO. Reactivex. Rxjava2: rxjava: 2.0.1'
    //RxAndroid dependency package
    compile 'the IO. Reactivex. Rxjava2: rxandroid: 2.0.1'Copy the code

Observer model

First of all, RxJava is built around the Observer schema, which is still the case in 2.0.

In this update, however, there are two observer modes:

  • Observable /Observer
  • Flowable(observed)/Subscriber(observer)

In rxjava2. X, Observeable is used to subscribe to Observer and does not support Backpressure, while Flowable is used to subscribe to Subscriber and supports Backpressure.

The concept of back pressure and its shortcomings in 1.0 was introduced in my last article, but if you are not clear, I will introduce it here: Back pressure refers to a strategy that tells the upstream observer to slow down the sending speed when the event sending speed of the observed is much faster than the processing speed of the observer in asynchronous scenarios. In 1.0, the biggest regret about back pressure is that it is concentrated in the class Observable, which leads to some Observables supporting back pressure while others do not. To address this deficiency, the new version distinguishes between observables that support back pressure and those that do not.

Observable/Observer

Observable Observable

  Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2); e.onComplete(); }}); Observer mObserver=new Observer<Integer>() {
            // This is a new method, before sending data after subscribing,
            // Call this method first, and Disposable can be used to unsubscribe
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

mObservable.subscribe(mObserver);Copy the code

This observer model does not support back pressure.

What do you mean you don’t support back pressure?

When the observer quickly send a large amount of data, the downstream won’t do other processing, even if a large number of data accumulation, the chain will not quote MissingBackpressureException, consumes too much memory will only OOM

When I tested it, I quickly sent 100,000 bits of integer data, with downstream latency. As a result, all the observed data was sent, and the memory did increase significantly. Unfortunately, there was no OOM.

Therefore, when we use Observable/Observer, we need to consider whether the data volume is very large (the official line is 1000 events, just for your reference).

Flowable/Subscriber

        Flowable.range(0.10)
        .subscribe(new Subscriber<Integer>() {
            Subscription sub;
            // When subscribed, this method is called first, which is equivalent to onStart(),
            // The Subscription s parameter passed in can be used to request data or unsubscribe
            @Override
            public void onSubscribe(Subscription s) {
                Log.w("TAG"."onsubscribe start");
                sub=s;
                sub.request(1);
                Log.w("TAG"."onsubscribe end");
            }

            @Override
            public void onNext(Integer o) {
                Log.w("TAG"."onNext--->"+o);
                sub.request(1);
            }
            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }
            @Override
            public void onComplete() {
                Log.w("TAG"."onComplete"); }});Copy the code

The output is as follows:

onsubscribe start
onNext--->0
onNext--->1
onNext--->2. onNext--->9
onComplete
onsubscribe endCopy the code

Flowable supports back pressure, that is, in general, the upstream observed responds to the downstream observer’s data request, and the downstream calls Request (n) to tell the upstream how many data to send. This prevents large amounts of data from piling up in the call chain, keeping memory low.

Of course, Flowable can also be created by creat() :

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4); e.onComplete(); }}Backpressure policy needs to be specified
        , BackpressureStrategy.BUFFER);Copy the code

Flowable can be created with create(), but you must specify a backpressure policy to ensure that the Flowable you create supports backpressure (this was difficult to guarantee back in 1.0, so RxJava2.0 tightened the create() permissions).

As you can see in the output from the code above, when we call the subscription. Request (n) method, the onNext method is executed immediately, without waiting for the code following onSubscribe(). Therefore, if you use the class that needs to be initialized in the onNext method, Try to initialize the subscription. Request (n) method before calling it;

Of course, this is not always the case. In my tests, I found that custom Flowable through create () calls to the subscription. Request (n) method wait until the following code in onSubscribe () has been executed before calling onNext.

TIPS: As far as possible, make sure that all initialization is done before request (), otherwise you risk null Pointers.

Other observer model

Of course, in addition to these two kinds of observers, there is another kind of observer

  • Single/SingleObserver
  • Completable/CompletableObserver
  • Maybe/MaybeObserver

In fact, the three are almost the same. Maybe/MaybeObserver can be said to be the complex of the first two. Therefore, taking Maybe/MaybeObserver as an example, I will briefly introduce the usage of this observer mode

// Determine whether to log in
Maybe.just(isLogin())
    // May involve IO operations placed in child threads
    .subscribeOn(Schedulers.newThread())
    // fetch the result to the main thread
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new MaybeObserver<Boolean>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Boolean value) {
                if(value){
                    ...
                }else{... } } @Override publicvoid onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });Copy the code

As you can see, the observer mode is not used to send a large amount of data, but to send a single data. That is, you can use the observer mode when you only want the true or false result of an event


This is the upper interface for those being observed above:

/ / interface to observables
interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}
/ / Single interface
interface SingleSource<T> {
    void subscribe(SingleObserver<? super T> observer);
}
/ / Completable interface
interface CompletableSource {
    void subscribe(CompletableObserver observer);
}
/ / Maybe interface
interface MaybeSource<T> {
    void subscribe(MaybeObserver<? super T> observer);
}
/ / Flowable interface
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}Copy the code

In fact, we can see that each kind of observer inherits its own interface, which makes them completely separate and independent (especially Observable and Flowable), ensuring that their respective extensions or supporting operators do not affect each other.

For example, the flatMap operator implements:

// The definition of flatMap in Flowable
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);

Observable flatMap; //Observable flatMap
Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);Copy the code

If you want to write a custom operator for Flowable, just make sure that the type in Function< Publisher > implements the Publisher interface. This may sound abstract, but it doesn’t matter if you don’t understand it, because custom operators are not recommended, and the combination of manipulators in RxJava is sufficient for your needs.

Of course, you’ll also notice that the subscribe () method in those interfaces now returns type void. In 1.x, this method generally returns a Subscription object, which is used to unsubscribe. Now, the object of this function has been placed in the internal implementation method of Observer or subscriber,

Flowable/Subscriber

public interface Subscriber<T> {  
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}Copy the code

In the example above, onSubscribe(Subscription s) is passed in as an unsubscribe parameter, which can also be used to request data upstream.

In Observable/ Observer, the parameter passed in is another object

Observable/Observer

public interface Observer<T> {
   void onSubscribe(Disposable d);
    void onNext(T value);
    void onError(Throwable e);
    void onComplete();
}

public interface Disposable {
    /** * Dispose the resource, the operation should be idempotent. */
    void dispose();
    /** * Returns true if this resource has been disposed. * @return true if this resource has been disposed */
    boolean isDisposed();
}Copy the code

In the Observer interface, onSubscribe(Disposable d) is passed into Disposable, which is also used to unsubscribe.

In fact, this is a logical design, because unsubscribe is an action that only the Observer can do, and now it is a logical integration of the Observer.

One final update is that the observer no longer receives NULL as the data source.


Operator correlation

This is essentially the same area. Most of the operators you’ve used before are the same, and if they are, it’s just the package name or the class name. The ones you probably use a lot are Action and Function.

The Action related

I’ve written about interfaces like Action in this article. In 1.0, these interfaces started with Action0, Action1… To the back (the numbers represent acceptable parameters), the changes have now been made

Rx1.0 — — — — — — — — — — – Rx2.0

Action0——–Action Action1——–Consumer Action2——–BiConsumer Action is removed, only ActionN is retained

The Function related to the

Again, a change in naming

Compared to RxJava1.0, both classes add throws Exception, meaning that try-catches are not required to perform certain operations on these methods.

Such as:

Flowable.just("file.txt")
.map(name -> Files.readLines(name))
.subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);Copy the code

I/O methods like files.readlines (name) are try-catch methods. Now throw an exception directly, and you can use lambda to keep your code simple and elegant.

doOnCancel/doOnDispose/unsubscribeOn

Take doOnCancel, which is presumably called when a subscription is unsubscribed, for example:

Flowable.just(1.2.3)
.doOnCancel(() -> System.out.println("Cancelled!"))
.take(2)
.subscribe(System.out::println);Copy the code

The take new operator cancels subsequent events that have not yet been sent, thus triggering doOnCancel

Other operators remain the same, or just have changed their names. I won’t cover all of them here, but most operators have two sets, one for Observables and one for Flowable.


Thread scheduling

It’s fair to say that this part is basically unchanged, if I must say so.

  • The schedulers.immediate () threading environment is removed
  • Also removed is schedulers.test () (I’ve never used this method before)
  • IO. Reactivex.Scheduler is an abstract class that supports scheduling custom threaded tasks directly (I don’t use that much either).

supplement

If you want to migrate your RxJava1.0 to version 2.0, you can use this library, RxJava2Interop, which can convert between RxJava1.0 and 2.0, that is, not only can you migrate the 1.0 code to 2.0, you can migrate the 2.0 code to 1.0, haha.

Add 2

In RxJava1.0, some people used CompositeSubscription to collect subscriptions and unsubscribe. Now in RxJava2.0, subscribe() now returns void.

In fact, in RxJava2.0, Flowable provides the subscribeWith method that returns the current subscribed observer, and provides the Disposable interface through observers such as ResourceSubscriber DisposableSubscriber.

So, if you want to achieve the RxJava1.0 effect, you should now do this:

CompositeDisposable composite = new CompositeDisposable();

composite.add(Flowable.range(1, 8).subscribeWith(subscriber));

This subscriber should be an instance of ResourceSubscriber or DisposableSubscriber.


At the end

In fact, from the analysis of the whole article, the biggest change is the implementation of observer mode, which is split and refined, mainly into Observable and Flowable, as well as other changes associated with it. In general, this version can be said to be the reconstruction of observer and observed.

RxJava2.0 demo: RxJava2.0 demo: RxJava2.0

RxJava2-Android-Samples

Of course, if you have any questions during learning 2.0, please leave a comment here.


Afterword.

I started writing this article half a month ago, but I was not satisfied, so I lay in the draft box for a long time, but in the belief that I would not give up any lagging article, I pulled myself together and finished the introduction of RxJava2.0. It may surprise you, but after writing it, you immediately feel relieved.

The body is hollowed out…


The appendix

Below I take a screenshot to show the details of some changes from 2.0 to 1.0, just for reference.





In fact, these are the official list, screenshots here just for your convenience.