The original article was first published in the wechat public number: Practice, welcome to pay attention to communication!

RxJava is the open source implementation of ReactiveX in Java, a library for asynchronous programming and event-based programs using observable sequences. RxJava focuses on asynchronous programming and chained calls and event sequences.

  1. The introduction of RxJava
  2. concept
  3. Basic implementation
  4. Just the operator
  5. From the operator
  6. Defer operator
  7. The empty operator
  8. Never operator
  9. The timer operator
  10. The interval operator
  11. The range operator
  12. conclusion

The introduction of RxJava

implementation "IO. Reactivex. Rxjava2: rxjava: 2.2.3"
implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.1.0'
Copy the code

concept

Several important concepts in RxJava are Observer, Observable and event sequence. The event sequence is completely controlled by the observed. What if the observed informs the Observer when necessary, which requires the establishment of a subscription relationship between the observed and the Observer? After the subscription relationship is established, when the observed changes, the observer can receive the observed changes in the first time.

The Observer callback method in RxJava2 has four:

  • OnSubscribe: Used to unsubscribe
  • OnNext: The observer calls back to the method to receive the sequence of events that were sent when the event was sent
  • OnError: The observer calls back when the event is sent. This method indicates that the event sequence is abnormal and events are no longer allowed to be sent
  • OnComplete: The observer calls back when the event is sent. This method indicates that the sequence of events has been sent, allowing the event to be sent

Note:

  1. Events are not allowed to continue after onError, and events are allowed to continue after onComplete. Regardless of whether the event can continue to be sent, both observers will not receive the message;
  2. OnError and onComplete are mutually exclusive, so if you call onError after onComplete it will crash, But calling onComplete after onError doesn’t crash because it doesn’t allow events to be sent after onError;
  3. Among the four callback methods, once the observer and the observed establish a subscription relationship, onSubscribe method will be callback, onNext, onError, onComplete method callback is completely determined by the observed trigger, which is easy to misunderstand.

Basic implementation

  1. Create an Observer Observer. The Observer decides what to do when the time happens.
/ / observer
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        // Unsubscribe
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        // The observer is called back when the event is sent
        Log.i(TAG, "onNext--->"+s);
    }

    @Override
    public void onError(Throwable e) {
        // Observer callback when sending events (event sequence is abnormal)
        Log.i(TAG, "onError--->");
    }

    @Override
    public void onComplete(a) {
        // The observer calls back when the event is sent (the sequence of events has been sent)
        Log.i(TAG, "onComplete--->"); }};Copy the code
  1. Create an Observable that determines when and what events are triggered. See the following for details:
// Observed
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Event1");
        emitter.onNext("Event2");
        emitter.onComplete();
        emitter.onNext("Event3"); }});Copy the code
  1. To establish the subscription relationship between the observer and the observed, refer to the following:
// Establish a subscription relationship between the observer and the observed
observable.subscribe(observer);
Copy the code

The output of the above code is as follows:

onSubscribe--->
onNext--->Event1
onNext--->Event2
onComplete--->
Copy the code

Obviously, since the onComplete method is called after Event2 is sent, Event3 sent after that will not be received by the observer.

The above code can also be written like this, the result is the same, specific reference is as follows:

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Event1");
        emitter.onNext("Event2");
        emitter.onComplete();
        emitter.onNext("Event3");
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->"+s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->");
    }

    @Override
    public void onComplete(a) {
        Log.i(TAG, "onComplete--->"); }});Copy the code

The above code uses the Create method of Observable to create an Observable and send related events. To help you understand, take a look at the official sketch of the create operator:

There are also a number of static methods available to create an Observable. These common methods are described below.

Just the operator

Just can be used to create an Observable that sends specified events. The upper limit of just sending events is 10, i.e., 10 events at most. Compared with create, it simplifies the processing process to some extent.

public static <T> Observable<T> just(T item) 
public static <T> Observable<T> just(T item1, T item2)
public static <T> Observable<T> just(T item1, T item2, T item3)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
Copy the code

Here’s a simple use of the just operator:

// Simple use of the just operator
Observable.just("Event1"."Event2"."Event3")
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe--->");
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext--->" + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError--->");
            }

            @Override
            public void onComplete(a) {
                Log.i(TAG, "onComplete--->"); }});Copy the code

The output of the above code is as follows:

onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->
Copy the code

Take a look at the official schematic of the just operator. Here is a schematic of the four events just sends:

From the operator

The FROM operators can be used to create observables that send arrays, Iterable, and asynchronous tasks. The FROM operators can be classified as follows:

/ / array
public static <T> Observable<T> fromArray(T... items)
/ / collection
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
// Asynchronous tasks
public static <T> Observable<T> fromFuture(Future<? extends T> future)
// Asynchronous task + timeout
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)
// Asynchronous task + timeout + thread scheduler
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)
// Asynchronous task + thread scheduler
public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler)
Publishers in Reactive Streams are used in a similar way to the create operator, where events are sent at the discretion of the publisher (the observed)
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
Copy the code
fromArray/fromIterable

Here’s how to use fromArray:

// Simple use of the fromArray operator
String[] events = {"Event1"."Event2"."Event3"};
Observable.fromArray(events).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->" + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->");
    }

    @Override
    public void onComplete(a) {
        Log.i(TAG, "onComplete--->"); }});Copy the code

Take a look at fromArray’s official schematic below:

Here are some ways to use fromIterable:

// Simple use of the fromIterable operator
List<String> list = new ArrayList<>();
list.add("Event1");
list.add("Event2");
list.add("Event3");
Observable.fromIterable(list).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->" + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->" + e);
    }

    @Override
    public void onComplete(a) {
        Log.i(TAG, "onComplete--->"); }});Copy the code

Take a look at fromIterable’s official sketch as follows:

The output reference for the above code is as follows:

onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->
Copy the code
fromCallable

Callable is located in the java.util.concurrent package, similar to Runnable, but with a return value. Events issued from fromCallable are issued from the main thread. Here are a few things to note when using fromCallable:

  1. When time-consuming tasks are involved, subscribeOn should be used to switch the subscribing thread;
  2. The time-consuming task is to receive the emission value of the Observable by switching it to the Main thread using observeOn.
  3. To avoid problems such as memory leaks, unsubscribe in the corresponding onDestroy method.

Here’s a simple way to use fromCallable:

// Simple use of the fromCallable operator
Observable.fromCallable(new Callable<String>() {
    @Override
    public String call(a) throws Exception {
        // Other operations...
        return "call";
    }
}).subscribe(new Observer<String>() {

    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->" + s+Thread.currentThread());
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->" + e);
    }

    @Override
    public void onComplete(a) {
        Log.i(TAG, "onComplete--->"); }});Copy the code

The above results are as follows:

onSubscribe--->
onNext--->call
onComplete--->
Copy the code

Take a look at the official fromCallable diagram below:

fromFuture

FromFuture has four overloaded methods that specify asynchronous tasks, task timeouts, thread schedulers, etc. The Future interface is located in the java.util.concurrent package. The main function is to judge whether the task is executed, obtain the task result, and cancel the specific task for the asynchronous task execution of Runnable and Callable. The Runnable and Callable are accompanied by the execution of the thread. This means that events sent from fromFuture are sent from non-main threads. Remember to switch the subscribeOn thread if you perform time-consuming tasks. The following uses FutureTask as an example to illustrate how to use fromFuture.

Create a Callable to execute an asynchronous task as follows:

// Asynchronous tasks
private class MCallable implements Callable<String> {
    @Override
    public String call(a) throws Exception {
        Log.i(TAG, "Mission execution begins -->");
        Thread.sleep(5000);
        Log.i(TAG, "Mission completed -->");
        return "MCallable"; }}Copy the code

Then, create a FutureTask as follows:

/ / create FutureTask
MCallable mCallable = new MCallable();
FutureTask<String> mFutureTask = new FutureTask<>(mCallable);
Copy the code

Then, execute the Future created above with Thread as follows:

/ / FutureTask execution
new Thread(mFutureTask).start();
Copy the code

Finally, use fromFuture to create the corresponding Observeable and subscribe to it as follows:

//fromFuture
Observable.fromFuture(mFutureTask)
        .subscribeOn(Schedulers.io()) // Switch the subscription thread
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe--->");
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext--->" + s+Thread.currentThread());
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError--->" + e);
            }

            @Override
            public void onComplete(a) {
                Log.i(TAG, "onComplete--->"); }});Copy the code

The result of the above code is as follows:

Task execution start --> onSubscribe-- > Task execution end --> onNext-- >MCallable onComplete-- >Copy the code

The fromFuture method carries a Future parameter as follows:

The above asynchronous task is delayed by 5 seconds. If the fromFuture overload method is used to specify a timeout of 4 seconds, see the following:

// Specify a timeout period of 4s
Observable.fromFuture(mFutureTask,4, TimeUnit.SECONDS,Schedulers.io())
/ /...
Copy the code

If the task cannot be completed within 4 seconds, the Observer will raise onError.

Mission start - > onSubscribe - > onError - > Java. Util. Concurrent. TimeoutException end of task execution -- - >Copy the code

How to cancel the asynchronous task, which is the advantage of the Future, you can cancel the task at will, see the following details:

// Cancel the asynchronous task
public void cancelTask(View view) {
    if (mFutureTask.isDone()) {
        Log.i(TAG, "Mission accomplished -->");
    } else {
        Log.i(TAG, "Mission in progress -->");
        boolean cancel = mFutureTask.cancel(true);
        Log.i(TAG, "Did the task cancel successfully --cancel->" + cancel);
        Log.i(TAG, "Mission cancelled successfully --isCancelled->"+ mFutureTask.isCancelled()); }}Copy the code

The following is the result of canceling a task:

Task execution started --> onSubscribe-- > Task executing --> Task cancellation successful -- Cancel -->trueMission cancelled successfully --isCancelled->true
onError--->java.util.concurrent.CancellationException
Copy the code

This cancels asynchronous tasks that are being executed, and this section is more about Java Futures.

Defer operator

When you use defer to create an Observable, it only creates an Observable and sends related events when you subscribe. Here’s how the defer operator is used:

//defer
defer = "old";
Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
    @Override
    public ObservableSource<String> call(a) throws Exception {
        returnObservable.just(defer); }}); defer ="new";
observable.subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->"+s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete(a) {
        Log.i(TAG, "onComplete--->"); }});Copy the code

The result of the above code is as follows:

onSubscribe--->
onNext--->new
onComplete--->
Copy the code

Obviously, the latest Observable factory is created before the subscription, and the data received in onNext is the latest. To understand the defer operator, take a look at the official defer operator schematic:

The empty operator

The empty operator can be used to create an Observable that terminates normally without any data, as follows:

//empty
Observable.empty().subscribe(new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Object o) {
        Log.i(TAG, "onNext--->"+o);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete(a) {
        Log.i(TAG, "onComplete--->"); }});Copy the code

The output of the above code is as follows:

onSubscribe--->
onComplete--->
Copy the code

To help you understand the empty operator, take a look at some official illustrations of the empty operator:

Never operator

The never operator allows you to create an Observable that does not generate any data and does not terminate.

//never
Observable.never().subscribe(new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Object o) {
        Log.i(TAG, "onNext--->"+o);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete(a) {
        Log.i(TAG, "onComplete--->"); }});Copy the code

The output of the above code is as follows:

onSubscribe--->
Copy the code

To help you understand the use of the never operator, take a look at some official schematics of the never operator:

The timer operator

The timer operator can create a delayed Observable that sends a fixed value of 0. It can also specify a thread scheduler.

/ / delay
public static Observable<Long> timer(long delay, TimeUnit unit)
// Delay + thread scheduler
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) 

Copy the code

Here is how the timer is used:

//timer
Observable.timer(3, TimeUnit.SECONDS, Schedulers.io()).subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Long s) {
        Log.i(TAG, "onNext--->"+s);
        Log.i(TAG, "Current thread -->"+Thread.currentThread().getName());
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete(a) {
        Log.i(TAG, "onComplete--->"); }});Copy the code

The result of the above code is as follows:

onSubscribe--->
// The data will be received after 3 seconds
onNext--->0Current thread -->RxCachedThreadScheduler --1
onComplete--->
Copy the code

To understand the use of the timer operator, take a look at some official diagrams of the timer operator. Here is an example of how a timer specifies a delayer and a thread scheduler:

The interval operator

The interval operator allows you to create an Observable that sends integer values at fixed intervals. Interval can specify initial latency, time intervals, thread schedulers, etc.

// Initial delay + interval
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) 
// Initial delay + interval + thread scheduler
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler
scheduler)
// Time interval
public static Observable<Long> interval(long period, TimeUnit unit)
// Interval + thread scheduler
public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler) 
Copy the code

Here’s how interval is used:

//interval
Observable.interval(3,TimeUnit.SECONDS).subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Long aLong) {
        Log.i(TAG, "onNext--->"+aLong);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete(a) {
        Log.i(TAG, "onComplete--->"); }});Copy the code

The above code will continue to send integer events every 3 seconds with the following result:

onSubscribe--->
onNext--->0
onNext--->1
onNext--->2.Copy the code

To understand the use of the interval operator, take a look at some of the official diagrams of the interval operator. Here is an example of how interval specifies the interval and unit of time:

The range operator

Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable Observable

// int
public static Observable<Integer> range(final int start, final int count)
// long
public static Observable<Long> rangeLong(long start, long count)
Copy the code

The usage of range is as follows:

//range
Observable.range(1.5).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Integer integer) {
        Log.i(TAG, "onNext--->"+integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete(a) {
        Log.i(TAG, "onComplete--->"); }});Copy the code

The result of the above code is as follows:

onSubscribe--->
onNext--->1
onNext--->2
onNext--->3
onNext--->4
onNext--->5
onComplete--->
Copy the code

To help you understand the range operator, take a look at some official illustrations of the range operator:

conclusion

This article mainly introduces RxJava2 related basic knowledge and RxJava2 creation operator understanding and use. You can choose to pay attention to personal wechat public number: practice to get the latest updates, exchange and study together!