preface

  • RxjavaBecause of itsChain call based on event flow, simple logic & easy to useThe characteristics of the deepAndroidDeveloper welcome.


Making screenshots

If you are not familiar with RxJava, check out Android: a clear and easy-to-understand introduction to RxJava

  • RxJavaThat’s why it’s so popularProvides rich & powerful operators that can fulfill almost all functional requirements
  • Today, I will give you a detailed introductionRxJavaThe most commonly used of the operatorsFunctional operatorAnd the supplementaryRetrofit combined with RxJava example Demo teachingI hope you like it.
  1. This series of articles is based onRxjava 2.0
  2. In the coming days, I will continue to publish a series of articles on Rxjava 2.0 in Android, including principles, operators, application scenarios, back pressure, etc. If you are interested, please continue to follow Carson_Ho’s Android development notes!!


Schematic diagram


directory


Schematic diagram


1. The role

Assists an Observable to fulfill some functional requirements when sending events

Error handling, thread scheduling, and so on


Type 2.

  • RxJava 2The common functional operators are:


Schematic diagram

  • I’ll look at each operator in detail below

3. Description of application scenarios & Corresponding operators

Note: Before using the RxJava 2 operator, remember to add dependencies to your project Gradle:

Dependencies {the compile 'IO. Reactivex. Rxjava2: rxandroid: 2.0.1' compile 'IO. Reactivex. Rxjava2: rxjava: 2.0.7' / / note: RxJava2 and RxJava1 cannot coexist, i.e. dependencies cannot exist at the same time}Copy the code

3.1 Connect the observed with the observer

  • The requirement scenario even has to be observed by the observer to form a subscription

  • Corresponding operator

The subscribe ()

  • Function subscription, that is, connect observer and observed

  • The specific use

observable.subscribe(observer); // The former = Observable; The latter = observer <-- 1. Full call in steps --> // Step 1: Observable<Integer> Observable = Observable. Create (new ObservableOnSubscribe<Integer>() {ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); }}); // Step 2: Observer<Integer> Observer = new Observer<Integer>() {// Override public Void onSubscribe(Disposable d) {Log. D (TAG, "subscribe "); Override public void onNext(Integer value) {log.d (TAG, "Next event "+ value +" response"); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }}; // Step 3: connect the observer and the observed observable by subscribe. subscribe(observer); ObservableOnSubscribe (new ObservableOnSubscribe<Integer>() {// 1. @override Public void subscribe(ObservableEmitter<Integer> Emitter) throws Exception {emitter. OnNext (1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).subscribe(new Observer<Integer>() { // 2. By connecting the observer to the observed via subscribe // 3. Override public void onSubscribe(Disposable d) {Log. D (TAG, "subscribe "); Override public void onNext(Integer value) {log.d (TAG, "Next event "+ value +" response"); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }}); }}Copy the code
  • The test results


Schematic diagram

  • Extension instructions
<-- Observable. Subscribe (Subscriber) --> public Subscription (Subscriber) { subscriber.onStart(); // In the observer subscriber abstract class override method onSubscribe. Call (subscriber), used for initialization work // through this call, Thus the corresponding method in the callback observer responds to the event produced by the observed // Thus realizing the callback method of the observed calling the observer & event passing from the observed to the observer, i.e., the observer mode // Also see: An Observable only produces events, and actually sends events when it subscribes, when the subscribe() method executes}.Copy the code

3.2 Thread Scheduling

  • Requirements scenarios quickly and easily specify & control the observed & observer worker thread
  • Android RxJava: Thread control (switching/scheduling) with Retrofit examples

3.3 Delayed Operation

  • The requirement scenario involves some delayed action before the observed sends the event

  • Use the corresponding operator

Delay ()

  • Function causes the observed to delay sending the event for some time

  • Delay () has multiple overloaded methods, as follows:

// 1. Specify delay time // parameter 1 = time; Parameter 2 = Delay (long delay,TimeUnit unit) // 2. Specify delay time & scheduler // parameter 1 = time; Parameter 2 = time unit; 3 = thread scheduler delay(long delay,TimeUnit unit,mScheduler scheduler) // 3. Specify delay time & Error delay // Error delay, that is: if there is an Error event, the execution as usual, after the execution of the Error exception // parameter 1 = time; Parameter 2 = time unit; Delay (long delay,TimeUnit unit, Boolean delayError) // 4. Specify delay time & scheduler & error delay // parameter 1 = time; Parameter 2 = time unit; Parameter 3 = thread scheduler; Parameter 4 = Error delay parameter Delay (long delay,TimeUnit unit,mScheduler Scheduler, Boolean delayError): Specifies how long to delay and adds a scheduler. Error notification can be set to delay or notCopy the code
  • The specific use
Observable. Just (1, 2, 3). Delay (3, timeunit.seconds) Subscribe(new Observer<Integer>() {@subscribe public void onSubscribe(Disposable d) {} @override public Void onNext(Integer value) {log. d(TAG, "received event "+ value); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }});Copy the code
  • The test results


Schematic diagram

3.4 Operate in the event lifecycle

  • Requirements scenarios operate throughout the life cycle of event send & receive

For example, initialization before sending an event or callback request after sending an event

  • Use the corresponding operator

Do ()

  • Function is called during the life cycle of an event
  • type

    Do ()There are many operators, as follows:


Schematic diagram

  • The specific use
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); E.onror (new Throwable(" error ")); DoOnEach (new Consumer<Notification<Integer>>() {@override public void Override public void accept(Notification<Integer> integerNotification) throws Exception { Log.d(TAG, "doOnEach: " + integerNotification.getValue()); Call.doonNext (new Consumer<Integer>() {@override public void accept(Integer Integer) throws Exception { Log.d(TAG, "doOnNext: " + integer); DoAfterNext (new Consumer<Integer>() {@override public void accept(Integer Integer) throws Exception { Log.d(TAG, "doAfterNext: " + integer); }}) // 4. Observable calls.doonComplete (new Action() {@override public void run() throws Exception {log.e (TAG, TAG) "doOnComplete: "); }}).doonError (new Consumer<Throwable>() {@override public void accept(Throwable); throws Exception { Log.d(TAG, "doOnError: " + throwable.getMessage()); <Disposable>() {@override public void Accept (@nonnull Disposable) disposable) throws Exception { Log.e(TAG, "doOnSubscribe: "); }}) // 7. Observable sends events. DoAfterTerminate (new Action() {@override public void run() throws Exception {log.e (TAG, "doAfterTerminate: "); .dofinally (new Action() {@override public void run() throws Exception {log.e (TAG, "doFinally: "); } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void OnNext (Integer value) {log. d(TAG, "received event "+ value); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }});Copy the code
  • The test results


Schematic diagram

3.5 Error Handling

  • The handling mechanism for errors encountered during event sending in requirement scenarios

  • Corresponding operator type


Schematic diagram

  • Use the corresponding operator

OnErrorReturn ()

  • When the function encounters an error, it sends a special event & normal termination

Can catch exceptions that occur before it

  • The specific use
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); E.onror (new Throwable(" error ")); } }) .onErrorReturn(new Function<Throwable, Integer>() {@override public Integer apply(@nonnull Throwable Throwable) throws Exception {// Catch error exceptions log.e (TAG, "Error handled in onErrorReturn: "+throwable.toString()); return 666; // After an error event occurs, send a "666" event Subscribe(new Observer<Integer>() {@override public void onSubscribe(Disposable d) {} @override public Void onNext(Integer value) {log. d(TAG, "received event "+ value); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }});Copy the code
  • The test results


Schematic diagram

OnErrorResumeNext ()

  • role

    When an error is encountered, send a new oneObservable

Note:

  1. OnErrorResumeNext ()Intercepting error =Throwable; If need to interceptExceptionPlease useOnExceptionResumeNext ()
  2. ifOnErrorResumeNext ()Intercepting error =Exception, will pass the error to the observeronErrormethods
  • The specific use
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); E.onror (new Throwable(" error ")); } }) .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(@NonNull Throwable throwable) throws Exception { // 1. Catch error exception log.e (TAG, "handled error in onErrorReturn: "+throwable.toString()); // 2. Send a new Observable after an error event & return Observable. Just (11,22); } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void OnNext (Integer value) {log. d(TAG, "received event "+ value); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }});Copy the code
  • The test results


Schematic diagram

OnExceptionResumeNext ()

  • role

    When an error is encountered, send a new oneObservable

Note:

  1. OnExceptionResumeNext ()Intercepting error =Exception; If need to interceptThrowablePlease useOnErrorResumeNext ()
  2. ifOnExceptionResumeNext ()Intercepting error =Throwable, will pass the error to the observeronErrormethods
  • The specific use
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); E.onror (new Exception(" error ")); } }) .onExceptionResumeNext(new Observable<Integer>() { @Override protected void subscribeActual(Observer<? super Integer> observer) { observer.onNext(11); observer.onNext(22); observer.onComplete(); } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void OnNext (Integer value) {log. d(TAG, "received event "+ value); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }});Copy the code
  • The test results


Schematic diagram

Retry ()

  • role

    Retry, that is, when an error occurs, the observed (Observable) retransmit data
  1. Re-subscribe & send the event when onError () is received
  2. ThrowableExceptionCan be intercepted
  • type

There are five overloaded methods

<-- 1. retry () --> <-- 1. retry () --> <-- 1. retry () --> If an error occurs, retry the data (with retry limit = retry times <-- 3. retry (Predicate) --> // Retry (new BiPredicate<Integer, Throwable>) --> After an error occurs, determine whether data needs to be resend (if resend is required & persistent errors are encountered, Retry (long time,Predicate) --> If an error occurs, determine whether to resend data (with retry limit // = Set retry times & determine logicCopy the code
  • The specific use
<-- 1. retry () --> If I keep getting it wrong, Create (new ObservableOnSubscribe<Integer>() {@override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); E.onror (new Exception(" error ")); e.onNext(3); }}).retry() // If an error is encountered, retry the data. Subscribe(new Observer<Integer>() {@override public void onSubscribe(Disposable d) {} @override public void OnNext (Integer value) {log. d(TAG, "received event "+ value); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }}); <-- 2. retry (long time) --> When something goes wrong, Observable. Create (new ObservableOnSubscribe<Integer>() {@override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); E.onror (new Exception(" error ")); e.onNext(3); Subscribe(new Observer<Integer>() {@override public void onSubscribe(Disposable) {} @override public void onNext(Integer value) {log. d(TAG, "received event "+ value); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }}); <-- 3. Retry (Predicate) --> After an error occurs, determine whether data needs to be resend (if resend is required & persistent errors are encountered, Create (new ObservableOnSubscribe<Integer>() {@override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); E.onror (new Exception(" error ")); e.onNext(3); }}) // After intercepting errors, Determine whether to retry the request. Retry (new Predicate<Throwable>() {@override public Boolean test(@nonnull Throwable Throwable) throws E (TAG, "Retry error: "+ throwable.tostring ())); // Return false = do not resend the data & call observer onError to end // Return true = resend the request (if an error persists) return true; } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void OnNext (Integer value) {log. d(TAG, "received event "+ value); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }}); <-- 4. Retry (new BiPredicate<Integer, Throwable>) --> After an error occurs, determine whether data needs to be resend (if resend is required & persistent errors are encountered, Observable. Create (new ObservableOnSubscribe<Integer>() {@override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); E.onror (new Exception(" error ")); e.onNext(3); }}) // After intercepting errors, Retry (new BiPredicate<Integer, Throwable>() {@override public Boolean test(@nonnull Integer Integer, @nonnull Throwable Throwable) throws Exception {// Trap Exception log. e(TAG, "Exception error = "+ Throwable. ToString ()); Log.e(TAG, "Number of current retries = "+integer); // Return false = do not resend the data & call observer onError to end // Return true = resend the request (if an error persists) return true; } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void OnNext (Integer value) {log. d(TAG, "received event "+ value); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }}); <-- 5. Retry (long time,Predicate) --> After an error occurs, Observable. Create (new ObservableOnSubscribe<Integer>() {@override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); E.onror (new Exception(" error ")); e.onNext(3); }}) // After intercepting errors, Determine whether to retry the request. Retry (3, New Predicate<Throwable>() {@override public Boolean test(@nonnull Throwable Throwable) throws Exception {// Catch exceptions Log.e(TAG, "retry error: "+ throwable.tostring ()); // Return false = do not resend the data & call observer onError () to end // Return true = resend the request (up to 3 times) return true; } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void OnNext (Integer value) {log. d(TAG, "received event "+ value); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }});Copy the code

RetryUntil ()

  • Determine whether to resend data after an error occurs
  1. If you need to resend & continue to encounter an error, continue to retry
  2. It works like thisRetry (sometimes called a retry)
  • The specific use

    The specific use is similar toRetry (sometimes called a retry), the only difference: returntrueThe data event is not resend. Not much will be described here

RetryWhen ()

  • role

    When an error is encountered, the error is passed to a new observed (Observable) and decide if the original observed needs to be re-subscribed (Observable) & send events
  • The specific use
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); E.onror (new Exception(" error ")); e.onNext(3); }}). RetryWhen (new Function<Observable<Throwable>, ObservableSource<? >>() { @Override public ObservableSource<? > apply(@nonnull Observable<Throwable> throwableObservable) throws Exception {// Generics = in the parameter Observable<Throwable> An exception thrown by an upstream operator that identifies the type of exception // Returns Observable<? Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable If the event sent by the new Observable = Error, the original Observable does not re-send the event: // 2. If the event sent by the new Observable = the Next event, the original Observable resends the event:  return throwableObservable.flatMap(new Function<Throwable, ObservableSource<? >>() { @Override public ObservableSource<? > apply(@NonNull Throwable throwable) throws Exception { // 1. If the returned Observable sends an event = Error, Return Observable. Error (new Throwable("retryWhen terminates ")); // Return Observable (new Throwable("retryWhen terminates ")); If the event sent by the returned Observable = Next, the original Observable resends the event (retries if errors persist) // Return Observable.just(1); }}); } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void OnNext (Integer value) {log. d(TAG, "received event "+ value); } @override public void onError(Throwable e) {log. d(TAG, "response to Error" + e.tostring ()); } @override public void onComplete() {log.d (TAG, "respond to Complete event "); }});Copy the code
  • The test results


New Observable sends error event = original Observable terminates sending

New Observable sends data events = original Observable keeps retrying

3.6 Resending

  • The requirement scenario repeatedly sends observed events

  • Corresponding operator types repeat () & repeatWhen ()

Repeat ()

  • Function to unconditionally and repeatedly send observed events

With overloaded methods, you can set the number of repeated creation

  • The specific use
// Pass no arguments = repeat () = infinite times; RepeatWhen (Integer int); Upon receiving the.onCompleted() event, re-subscribe & send is triggered // 2. The default run on a new thread on the specific use of observables. / / just (1, 2, 3, Subscribe(new Observer<Integer>() {@override public void onSubscribe(Disposable d) { Log.d(TAG, "Subscribe "); } @override public void onNext(Integer value) {log. d(TAG, "received event" + value); } @override public void onError(Throwable e) {log.d (TAG, "response to Error "); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }});Copy the code

  • The test results


Schematic diagram

RepeatWhen ()

  • Function to conditionally and repeatedly send observed events

  • The principle converts the original Observable stop event identifier (Complete ()/Error ()) into an Object type data and passes it to a new Observable (Observable), so as to decide whether to re-subscribe and send the original Observable

  1. If the new observed (Observable) returns 1Complete / ErrorEvent, then do not re-subscribe & send the originalObservable
  2. If the new observed (Observable) returns the remaining events, then re-subscribe & send the originalObservable
  • The specific use
Observables. Just (1 4-trichlorobenzene). RepeatWhen (new Function < observables < Object >, ObservableSource <? >>() {Override // Observable<Object> must be processed in the Function. Here we use the flatMap operator to receive the data from an Observable<? > apply(@nonnull Observable<Object> objectObservable) throws Exception {// Stop the original Observable from sending the event flag (Complete () / Error ()) is converted to an Object and passed to a new Observable to decide whether to re-subscribe & send the original Observable // there are two cases: // 1. If the new Observable returns a Complete ()/Error () event, do not re-subscribe & send the original Observable // 2. If the new Observable returns other events, Observable return objectObservable. FlatMap (new Function<Object, ObservableSource<? >>() { @Override public ObservableSource<? > apply(@nonnull Object throwable) throws Exception { If the new Observable returns a Complete ()/Error () event, do not re-subscribe & send the original Observable return Observable.empty(); // Observable. Empty () = sends the Complete event, but does not call back the observer's onComplete () // return Observable. Error (new Throwable(" do not re-subscribe to events ")); // Return the Error event = callback to the onError () event and receive the Error message passed. // Case 2: if the new Observable returns other events, re-subscribe & send the original Observable // return Observable. Just (1); // Just as a notification that triggers the observer to re-subscribe, it does not matter what data is sent, as long as it is not a Complete ()/Error () event}}); } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "Subscribe "); } @override public void onNext(Integer value) {log. d(TAG, "received event" + value); } @override public void onError(Throwable e) {log. d(TAG, "+ e.tostring ()); } @override public void onComplete() {log.d (TAG, "response to Complete event "); }});Copy the code
  • The test results


New Observable sends Complete event = original Observable stops sending & does not resend

New Observable sends Error events = original Observable stops sending & does not resend

The new Observable sends the remaining events = the original Observable sends again

This concludes the explanation of functional operators in RxJava 2.


4. Actual development requirements cases

  • Now, I’m going to combineRetrofit & RxJava, illustrating three practical requirements case scenarios for functional operators:
    1. Thread operations (switching/scheduling/control)
    2. polling
    3. Error retry mechanism for sending network requests

4.1 Thread Control (Switching/scheduling)

  • That is, a new worker thread is opened to perform time-consuming operations; After the execution is complete, switch to the main thread for real-time updateUI
  • Android RxJava: Thread control (switching/scheduling)

4.2 the polling

  • Requirement Scenario Description


Schematic diagram

  • Now, I’m going to combineRetrofitRxJavaImplement the polling requirement with a concrete example
  • Android RxJava (conditional) network request polling

4.3 Error Retry Mechanism for Sending Network Requests

  • Requirement Scenario Description


    Schematic diagram

  • Functional specifications


    Schematic diagram

  • Below I’ll combine Retrofit and RxJava with a concrete example to implement the error retry mechanism requirement for sending network requests

  • Network request error reconnection (Retrofit)


5. The Demo address

All of the above Demo source code is stored at: Github address of Carson_Ho: RxJava2_ functional operator


6. Summary

  • Now, I’ll sum it up with a pictureRxJava2Commonly used functional operators in


Schematic diagram

  • I’ll continue to dig into the other RxJava2 operators, but if you’re interested, stay with Carson_Ho’s Android development notes

Thumb up, please! Because your encouragement is the biggest power that I write!

Related articles reading

  • Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: Android RxJava: A comprehensive tutorial on the functional operators
  • Android RxJava application description: (Unconditional) Network request polling Android RxJava application description: (conditional) Network request polling Android RxJava application description: Network request nested callback Android RxJava: merge data source Android RxJava: merge data from disk/memory cache: merge data from disk/memory cache Android RxJava: Network request error reconnection (Retrofit)

Welcome to attentionCarson_HoJane books!

Share the dry things about Android development from time to time, the pursuit of short, flat, fast, but there is no lack of depth.