Introduction to the

What is RxJava? RxJava is a Java VM implementation of ReactiveX (Reactive Extensions) : a library for writing asynchronous and event-based programs using observable sequences.

In my understanding, RxJava can mainly realize the functions of asynchronous tasks and event bus, which is also the strength of RxJava.

GitHub address for RxJava

Github.com/ReactiveX/R…

use

This article will go into more detail about the use of RxJava, and will focus on the use of RxJava.

create

There are two ways to create an Observer: create Observer or Subscriber.

The Observer:

Observer<String> observer = new Observer<String>() {
            @Override
            public void onError(Throwable e) {
                Log.i("test"."onError");
            }

            @Override
            public void onComplete() {
                Log.i("test"."onComplete");
            }

            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.i("test"."onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.i("test"."onNext----->"+ s); }};Copy the code

The Subscriber:

Subscriber subscriber = new Subscriber() {
            @Override
            public void onSubscribe(Subscription s) {
                Log.i("test"."onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                Log.i("test"."onNext");
            }

            @Override
            public void onError(Throwable t) {
                Log.i("test"."onError");
            }

            @Override
            public void onComplete() {
                Log.i("test"."onComplete"); }}Copy the code

Create an Observable and subscribe to it. In RxJava, the Observable subscribes using the Subscribe method.

The observed can be created in three ways:

1) Use Observable.create

        Observable.create(new ObservableOnSubscribe<String>(){

            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("Hello"); } }).subscribe(observer); / / subscribeCopy the code

In the subscribe method, the Observer onNext, onError, and onComplete methods are called manually, while the onSubscribe method is called automatically.

2) Observable. Just You can create an Observable using the Observable. Just method

Observable.just("Hello"."hello world").subscribe(observer);
Copy the code

Create a subscribe with Observable.just, which automatically calls onSubscribe, onNext, onError, and onComplete.

3) observables. FromArray

Create an Observable using observable. fromArray.

String[] quotations = {"Love my country."."Love the people"};
Observable.fromArray(quotations).subscribe(observer);
Copy the code

Create with Observables. FromArray, subscribe, and, like Observables. Just, automatically invoke the observer’s methods.

Observer method

Above we have created an observation, creating an observer with four methods: onError, onComplete, onSubscribe, and onNext.

So what do these methods mean?

OnSubscribe: This method is triggered when the observed subscribes to an observer.

OnCompleted: The event queue is completed. RxJava not only treats each event individually, but also treats them as a queue. RxJava specifies that the onCompleted method needs to be fired as a signal when no new onNext will be issued.

OnError: The event queue is abnormal. When an exception occurs during event processing, the onError method is raised and the queue terminates automatically. No more events are allowed to be emitted.

OnNext: represents a common event in which you can do some business logic.

The operator

Operators include plain operators, transform operators, filter operators, combinative operators, auxiliary operators, error handlers, conditional operators, Boolean operators, and conversion operators.

Common operators include interval, repeat, intervalRange, and so on.

Use, such as:

Observable. IntervalRange (0,6,0,3, timeunit.seconds). Create (new ObservableOnSubscribe<String>() {@override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { e.onNext("666");
            }
        }).subscribe(observer);
Copy the code

IntervalRange This operator is used to delay execution and execute periodically.

Transformation operators include map, flatMap, cast, concatMap, and so on.

Map: Specifies a Function object that converts an Observable into a new Observable and fires it.

        Observable.just("Hello"."hello world").map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                return s;
            }
        }).subscribe(observer);
Copy the code

FlatMap, cast:

        Observable.just("Hello"."hello world").flatMap(new Function<String, ObservableSource<? >>() { @Override public ObservableSource<? > apply(@NonNull String s) throws Exception {return Observable.just(s);
            }
        }).cast(String.class).subscribe(observer);
Copy the code

FlatMap transforms the data set emitted by an Observable into a collection of Observables, and then flatly puts the data emitted by these Observables into a single Observable, while CAST forces all data emitted by an Observable to be converted to a specified type.

Buffer operator functions:

1, can set multiple results to the list at one time, after subscription automatically empty the corresponding results, until completely cleared

2. You can also periodically collect multiple results to the list, and automatically empty the corresponding results after subscription until they are completely cleared

        Observable
                .range(0,5)
                .buffer(2)
                .subscribe(new Observer<List<Integer>>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull List<Integer> integers) {
                        Log.i("test"."----------------->onNext:" + integers);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {}});Copy the code

Observable
                .just("Hello"."hello world"."I love my family.")
                .buffer(3)
                .subscribe(new Observer<List<String>>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull List<String> strings) {
                        Log.i("test".""+strings);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {}});Copy the code

In addition to the above transformation operators, there is also the groupBy operator for grouping operations.

Filter operator

Filter operators include filter, skip, take, Element, and so on.

Filter: Filters the custom result rules generated by an Observable. Only results that meet the conditions are submitted to subscribers.

Observable
                .just("Hello"."hello world"."I love my family.")
                .filter(new Predicate<String>() {
                    @Override
                    public boolean test(@NonNull String s) throws Exception {
                        Log.i("test".""+s);
                        return s.equals("Hello");
                    }
                }).subscribe(observer);
Copy the code

Distinct: go to heavy

        Observable
                .just("Hello"."hello world"."I love my family."."I love my family.")
                .distinct()
                .subscribe(observer);
Copy the code

Skip: Filter out the first n items

        Observable
                .just("Hello"."hello world"."I love my family."."I love my family.")
                .skip(2)
                .subscribe(observer);
Copy the code

Take: take the first n terms

        Observable
                .just("Hello"."hello world"."I love my family."."I love my family.")
                .take(2)
                .subscribe(observer);
Copy the code

ThrottleWithTimeout: If the source Observable emits new data within a certain amount of time, the data is discarded and the throttleWithTimeout restarts the timer. If it emits data before the timer ends every time, the limit goes to an extreme (only emits the last data).

Observable
                .just("Hello"."hello world"."I love my family."."I love my family.")
                .throttleWithTimeout(200, TimeUnit.MILLISECONDS)
                .subscribe(observer);
Copy the code

Combinatorial operator

Combined operators include merge, startWidth, concat, jion, Switch, zip, and more.

Merge: Merges multiple Observables into one Observable for launching.

Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.i("test"."onSubscribe");
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.i("test"."onNext--->" + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.i("test"."onError");
            }

            @Override
            public void onComplete() {
                Log.i("test"."onComplete"); }}; Observable<String> observable1 = Observable.just("Hello"."hello World");
        Observable<String> observable2 = Observable.just("new obj"."mergeobj");
        Observable.merge(observable1, observable2).subscribe(observer);
Copy the code

Concat: merges and transmits data emitted by multiple Observables in a strict order, which has the characteristics of queues. The first data will not be launched until the first data is launched.

Observable<String> observable1 = Observable.just("Hello"."hello World");
Observable<String> observable2 = Observable.just("new obj"."mergeobj");
Observable.concat(observable1, observable2).subscribe(observer);
Copy the code

In addition to the above combined operators, there are zip, combineLastest, etc.

Zip: combines two or more data items emitted by Obserable, transforms them according to the specified function, and emits a new value.

Auxiliary operator

Auxiliary operators include DO, Delay, observeOn, timeout, timeInterval, timestamp, subscribeOn, meterialize and TO.

Delay: delays transmitting data

Observable<String> observable1 = Observable.just("Hello"."hello World");
Observable<String> observable2 = Observable.just("new obj"."mergeobj");
Observable.concat(observable1, observable2).delay(5, TimeUnit.SECONDS).subscribe(observer);
Copy the code

SubscribeOn: Specifies that the Obserable itself runs on that thread.

ObserveOn: Specifies the thread in which the data emitted by Obserable is run.

Other operators are left to the reader.

Error operator

In RXJava, error operators include Catch and retry.

Catch intercepts the original Observable’s onError notification and replaces it with other data items or data sequences, allowing the resulting Observable to terminate normally or not at all. The catch implementation is divided into three distinct operators:

1, onErrorReturn: The standby Observable returns the behavior of the original Observable. The standby Observable ignores the onError call of the original Observable, that is, does not pass the error to the observer, but emits a special item and calls the observer’s onCompleted.

OnErrorResumeNext: Returns the standby Observable like onErrorReturn. It will not call onError of the original Observable. It will send the standby Observable data.

OnExceptionResumeNext: If onError receives a Throwable that is not an Exception, it will pass the error to the observer’s onError method without using the alternate Observable.

Retry: Instead of passing onError notifications from the original Observable to the observer, it subscribs to the Observable, giving it another chance to complete its data sequence error-free, and always passes onNext notifications to the observer. This operator has the potential to duplicate data because of re-subscription. If the number of re-subscriptions is exceeded, it will not be re-subscribed and the latest onError notification will be passed to the observer.

Conditional operator

Conditional operators include defaultEmpty, skipUntil, amb, skipWhile, takeUtil, takeWhile

DefaultEmpty: Emits default data if the original Observable does not emit data.

SkipUntil: Subscribes to the original Observable, but ignores its emitters, until the second Observable emits a data item and starts emitting the original Observable.

Boolean operator

Boolean operators include: all, isEmpty, contains, EXISTS, and sequenceEqual.

For more information on conditional and Boolean operators, please refer to RxJava Operators (08- Conditional and Boolean operators).

Blog.csdn.net/xmxkf/artic…

Conversion operator

Transform operators convert an Observable to another object or data structure. Transform operators include toMap, toMultiMap, toList, toSortedList, Nest, and getIterator.

ToMap: Collects all the data items emitted by the original Observable into a Map and emits the Map.

String s1 = "Hello";
        String s2 = "hello world";
        String s3 = "lalala";

        Observable.just(s1,s2,s3).toMap(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                return s;
            }
        }).subscribe(new SingleObserver<Map<String, String>>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onSuccess(@NonNull Map<String, String> stringStringMap) {
                Log.i("test".""+stringStringMap);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }
        });
Copy the code

ToMultiMap: Similar to toMap, except that the value of a map is a collection.

ToList: Compose a List of transmitted data.

String s1 = "Hello";
        String s2 = "hello world";
        String s3 = "lalala";

        Observable.just(s1,s2,s3).toList().subscribe(new SingleObserver<List<String>>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onSuccess(@NonNull List<String> strings) {
                Log.i("test".""+strings);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }
        });
Copy the code

For other operators, you can refer to the RxJava Operators Guide.

RxJava thread control

Earlier in the tutorial, we learned that using subscribeOn you can specify which thread Obserable itself runs on. Using observeOn, you can specify which thread to run data emitted from Obserable. The RxJava default thread does callbacks on the thread calling the subcribe method, but if you want to switch threads, you need to use Scheduler.

The following schedulers are built into RxJava:

Scheduler.immediate() : runs on the current thread and is the default Scheduler for the timeout, timestamp, and timeInterval operators.

IO () : The Scheduler used for I/O operations.

Scheduler.newthread () : starts a newThread to perform operations.

The difference between 2 and 3 is that 2 internally implements an unlimited number of thread pools and reuses idle threads, so 2 is more efficient.

Scheduler.trampoline() : Tasks can be queued using the trampoline method. This Scheduler processes queued tasks sequentially and is the default Scheduler for repeat and retry operators.

5. Scheduler.computation() : the scheduler used for calculation, which has a fixed thread pool and a number of CPU cores. It is important not to try to computation IO operations, or WAIT time for IO operations will waste CPU. This scheduler is the default scheduler for Buffer, Delay, SAMPLE, Debounce, Interval and Skip.

6, AndroidSchedulers. MainThread () : it means in the main thread, the scheduler is RxAndroid.

Observable.just("Hello"."hello world")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(observer);
Copy the code

RxJava, Retrofix, OkHttp, RxJava, Retrofix, OkHttp