preface

Today we start with Rxjava, the amazing, difficult and awesome framework.

First, Rxjava two key words:

  • Asynchronous. Rxjava allows you to switch threads at will via chain calls while keeping your code simple.
  • Observer mode. The core of Rxjava is, in plain English, an observer pattern, which performs the sending of subsequent events by subscribing to the observed layer of subscriptions.

Then began to ask questions, Rxjava involves a lot of content, I will still take three questions as a unit, from easy to difficult, one by one to go on, today’s three questions are:

  • RxJava subscription relationships
  • Will the Observer still be able to onNext after processing onComplete?
  • Operators in RxJava

RxJava subscription relationships

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
            emitter.onNext(1);
            emitter.onComplete();
        }
    }).subscribe(new Observer<Integer>() {
    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "onNext: " + integer);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});
Copy the code

There are three main roles in the code:

  • The subscriber Observable, which is the source of the whole event, can send data to the subscriber.
  • The subscriber Observer, through the subscribe method to create a relationship with the subscriber, that is, to start the subscription, and can accept the message sent by the subscriber.
  • Subscriber/Emitter, which is changed to Emitter after Rxjava2, is mainly used to emit a series of events, such as next event, complete event and so on.

With these three roles, a complete subscription relationship is generated.

Will the Observer still be able to onNext after processing onComplete? To figure this out, look at what the onComplete, onNext method does.

@Override public void onComplete() { if (! isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (! isDisposed()) { observer.onNext(t); } } public static boolean isDisposed(Disposable d) { return d == DISPOSED; } public static boolean dispose(AtomicReference<Disposable> field) { Disposable current = field.get(); Disposable d = DISPOSED; if (current ! = d) { current = field.getAndSet(d); if (current ! = d) { if (current ! = null) { current.dispose(); } return true; } } return false; }Copy the code

Now the source code is fairly clear, so whether onComplete or onNext determines whether the current subscription is DISPOSED, whether a reference to a variable of type Disposable is DISPOSED, if it is DISPOSED it means that the subscription is DISPOSED, and the starting point and the ending point are DISPOSED. Now at the end of the onComplete method you call the Dispose method, which sets the Disposable object in the atomic reference class to the DISPOSED enumeration instance within The DisposableHelper, so all onNext after that, OnComplete, isDisposed in the onError method will not pass, and you won’t be able to do subsequent processing such as sending data.

The concatMap flatMap operator in RxJava has the same function. Both operators transform one Observables that emit data into multiple Observables, and then put their emitted data into a single Observable. The difference lies in that concatMap is ordered while flatMap is unordered. The final output sequence of concatMap is consistent with the original sequence, while flatMap is not necessarily interleaved.

For example, sending the numbers 01234, +1 with the operator, and a delay of 2:

Observables. FromArray (1, 2, 3, 4, 5). FlatMap (new Function < Integer, ObservableSource<Integer>>() { @Override public ObservableSource<Integer> apply(@NonNull Integer integer) throws Exception { int delay = 0; if(integer == 2){ delay = 500; } return Observable. Just (integer*10). Delay (delay, timeunit.milliseconds); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.e("jimu","accept:"+integer); }});Copy the code

If the preceding operations are performed, the final output is 10,20,40,50,30. Because when I send the number 2, there’s a delay.

However, if the flatMap operator is changed to concatMap, the output will be 10,20,30,40,50. This is because concatMap is ordered and the output will be transformed in the same order as the original sequence.

  • Merge, concat, zip, merge

These operators are used to merge projectiles and can join multiple obserables into a single Obserable:

Observables < Integer > odds = observables. Just (1, 2, 3, 4); Observables < Integer > events = observables. Just,6,7,8 (5); Observable.merge(odds,events).subscribe(i->Log.d("TAG","merge->"+i));Copy the code

The difference is that the concat operator executes sequentially after the merge, while the merge operator executes in parallel on a timeline after the merge. If a data is delayed, the result sequence changes.

The zip operator executes in parallel after merging, emitting the same events as the least one. For example, if an Obserable that sends two data and an Obserable that sends four data zip merge, only two data will be sent. Here’s an example:

Observables. Zip (observables. Just (1, 2), observables. Just (3,4,5,6), new BiFunction < Integer, Integer, Integer>() { @Override public Integer apply(@NonNull Integer response, @nonNULL Integer (response2) throws Exception {return response2; } }) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer s) throws Exception { Log.e("lz","accept+"+s); }});Copy the code

There are only two numbers: 4 and 6. The next two bits of data from the second transmitter are discarded.

  • Interval: the interval is executed periodically

This operator is mainly used for periodic tasks, such as I need to send data every 100ms:

Observable.interval(100, TimeUnit.MILLISECONDS)
                  .subscribe(new Observer<Long>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(Long aLong) {
                    }
                });
Copy the code
  • Timer, delay Indicates the delay in sending data

Both operators are used to delay sending data. The difference is that timer is the creation operator and delay is the auxiliary operator. The Timer operator creates an Observable and sends data items after subscribing.

Observable
  .timer(1000,TimeUnit.MILLISECONDS)
  .subscribeOn(Schedulers.io())
  .subscribe(disposableObserver);
Copy the code

Delay is when the original Observable sends data, starts a timer, and delays sending the data. Therefore, delay is an auxiliary item between upstream and downstream, and its object of action must be a created Observable:

Observable
  .just(0L)
  .doOnNext(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
            }
        }
  .timer(1000,TimeUnit.MILLISECONDS)
  .subscribeOn(Schedulers.io())
  .subscribe(disposableObserver);
Copy the code

Related Learning videos

The 8 most commonly used Android modules and third-party open source modules (Jetpack/IOC/RxJava/ Plug-in/hotfix design/Network Access Framework/componentized/Image loading box