Read a lot of articles on RxJava, some of which are based on the first version, and some of which are based on some of the more common apis and basic concepts. But every time I look at the dozens of methods in RxJava’s classes, I feel a sense of uncertainty. So, I’m going to write an article of my own to systematically comb through the various methods and uses of RxJava from an API perspective.

1, RxJava basic

1.1 RxJava profile

RxJava is a library that uses observable sequences on the Java VM to compose asynchronous, event-based programs.

Although, in Android, we can use AsyncTask to complete asynchronous task operations, but when the task sorting is more, we need to define an AsyncTask for each task becomes very tedious. RxJava can help us keep the code clean while implementing asynchronous execution. Its principle is to create an Observable to complete an asynchronous task, combine various chain operations to achieve a variety of complex operations, and finally send the execution results of the task to the Observer for processing. Of course, RxJava is not only suitable for Android, but also for server-side scenarios.

Let’s summarize the following uses of RxJava:

  1. Simplify the flow of asynchronous programs;
  2. Program with operations that are similar to Java8 streams: Because there are many limitations to stream programming that want to use Java8 in Android, we can use RxJava for this purpose.

Before using RxJava, we need to add the following dependencies to our projects:

The compile 'IO. Reactivex. Rxjava2: rxjava: 2.2.0' compile 'IO. Reactivex. Rxjava2: rxandroid: 2.0.2'Copy the code

Here we use RxJava2, which is slightly different from the first version of RxJava. In this article, all of our examples of RxJava will be based on RxJava2.

Note: if you want to learn more about Java8 flow programming, you can refer to my previous article 5 minutes learning Java8 flow programming.

1.2 the profile

Here’s a basic use case for RxJava, where we define an Observable and then use an Emitter inside it to emit some data and information (in effect, calling methods inside the observed object to notify all observers). We then use an instance of the Consumer interface as a parameter to the subscribe() method to observe the result of the emission. (The methods of the interface here have been simplified by using lambdas, so get used to it.)

Observable<Integer> observable = Observable.create(emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
});
observable.subscribe(System.out::println);
Copy the code

This completes a basic Example of RxJava. You might not be able to see the concept of a hidden flow in an Observable from the above example. Look at the following example:

Observable.range(0, 10).map(String::valueOf).forEach(System.out::println);
Copy the code

Here we generate a sequence using the Observable.range() method, then map the sequence of integers to a sequence of characters using the map method, and print the resulting sequence. As you can see above, this operation is similar to Stream programming in Java8. But there is a difference:

  1. The so-called “push” and “pull” difference: Stream is to read data from the Stream to achieve the chain operation, while RxJava in addition to the Stream function, the “emit” data to achieve notification function, that is, RxJava on top of the Stream has an observer function.
  2. Streams in Java8 can pass throughparall()To achieve parallelism, that is, based on the divide and conquer algorithm to decompose the task and calculate the results and then combine the results; RxJava can only passsubscribeOn()Method to switch all operations to a thread.
  3. Stream can only be consumed once, howeverObservableCan be subscribed multiple times;

In addition to providing us with Observable, the new RxJava provides base classes for other scenarios. The functionality and main differences between them are as follows:

  1. Flowable: Multiple flow, responsive flow and back pressure
  2. Observable: Multiple flows, no back pressure
  3. Single: Only one element or the wrong stream
  4. Completable: Stream with no elements, just a complete and error signal
  5. Maybe: A stream with no elements or only one element or only one error

In addition to the base classes above, there is also a Disposable. So when we listen on a stream, we get a Disposable object. It provides two methods, isDisposed, which can be used to determine if you have stopped watching a given stream. The other method is Dispose, which disallows observation of a specified stream, and can be used to stop observation at any time.

1.3 summarize

The basic concepts and usage of RxJava were introduced above. In the following articles, we will explain how to use the following modules from an API perspective in the order defined above.

2. Use of RxJava

2.1 observables

As we can see from the above article, Observable is similar to the last three operations. The difference is that Flowable adds the concept of backpressure, and most of the Observable methods can be applied to the other three operations and Flowable. So, let’s start with Observable, and then we’ll focus on Flowable and back pressure.

Observables provide a number of static constructors to create an Observable, as well as a number of chained methods to perform complex functions. Here we categorize its methods by function and describe them in turn.

2.1.1 Create operations

1.interval & intervalRange

The following operation can send an integer every 3 seconds, starting from 0:

Observable.interval(3, TimeUnit.SECONDS).subscribe(System.out::println);
Copy the code

If you want to start at a given number, it’s ok, but in fact interval provides a number of overloads that you can use. We also present the intervalRange method with similar functionality below:

  1. public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
  2. public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
  3. public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

The initialDelay argument is used to indicate the time to pause before firing the first integer. The units of time are the same as peroid, and are specified by the unit argument. The period parameter is used to indicate how much time is left between each emission; Unit is a unit of time. It is of type TimeUnit. The scheduler parameter specifies the thread on which the data is emitted and waited.

The intervalRange method is used to limit the sequence of integers to a range, where start represents the starting value of the data to be emitted, and count represents the total number of numbers to be emitted. The other parameters are the same as the above interval method.

2.range & rangeLong

The following operation produces a sequence of 10 consecutive integers starting at 5:

Observable.range(5, 10).subscribe(i -> System.out.println("1: " + i));
Copy the code

This method needs to pass two arguments, the same function as the rangeLong method:

  1. public static Observable<Integer> range(final int start, final int count)
  2. public static Observable<Long> rangeLong(long start, long count)

The two arguments, start, specify the start value of the sequence to be generated, and count, indicate the total number of numbers in the sequence to be generated. The main difference between the two methods is that one is used to generate ints, and the other is used to generate longs.

3.create

The create method is used to create an Observable from scratch. As shown below, you need to use the create method and pass an emitter as an argument. Calling the onNext, onComplete, and onError methods inside the emitter sends data to the listener.

Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
    observableEmitter.onNext(1);
    observableEmitter.onNext(2);
    observableEmitter.onComplete();
}).subscribe(System.out::println);
Copy the code

4.defer

The Observable is created until any observers subscribe, and a new Observable is created for each observer. The defer operator waits until an observer subscribles to it, and then it generates an Observable using the Observable factory method. For example, in the following code, the output of two subscriptions is inconsistent:

Observable<Long> observable = Observable.defer((Callable<ObservableSource<Long>>) () -> Observable.just(System.currentTimeMillis()));
observable.subscribe(System.out::print);
System.out.println();
observable.subscribe(System.out::print);
Copy the code

Here’s the definition of this method. It takes a Callable object from which to return an Observable instance:

public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)

5.empty & never & error

  1. public static <T> Observable<T> empty(): Creates an Observable that terminates normally without transmitting any data
  2. public static <T> Observable<T> never(): Creates an Observable that does not emit data and does not terminate
  3. public static <T> Observable<T> error(Throwable exception): Creates an Observable that terminates with an error without transmitting data. There are several overloaded versions of this Observable, one of which is shown here.

Test code:

Observable.empty().subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));  Observable.never().subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));  Observable.error(new Exception()).subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));Copy the code

The output is completeError

6. The from series

The from series of methods get an Observable from the specified data source:

  1. public static <T> Observable<T> fromArray(T... items): gets from an array;
  2. public static <T> Observable<T> fromCallable(Callable<? extends T> supplier): obtained from the Callable;
  3. public static <T> Observable<T> fromFuture(Future<? extends T> future): Obtained from Future. There are multiple overloaded versions that can be used to specify information such as threads and timeouts.
  4. public static <T> Observable<T> fromIterable(Iterable<? extends T> source): from Iterable;
  5. public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher): Obtained from Publisher.

7. Just series

Public static

Observable

just(T item) ¶ Public static

Observable

Just (T item) ¶ public static

Observable

Just (T item) ¶

8.repeat

This method is used to indicate how many times the specified sequence is sent. The following method will send the sequence an infinite number of times:

Observable.range(5, 10).repeat().subscribe(i -> System.out.println(i));
Copy the code

The repeat method has the following similar methods:

  1. public final Observable<T> repeat()
  2. public final Observable<T> repeat(long times)
  3. public final Observable<T> repeatUntil(BooleanSupplier stop)
  4. public final Observable<T> repeatWhen(Function<? super Observable<Object>, ? extends ObservableSource<? >> handler)

The first method, which takes no arguments, sends the specified sequence an infinite number of times (actually calling the second method internally and passing in long.max_value), and the second method repeats the specified sequence a specified number of times; The third method stops repeating when the specified requirement is met, otherwise it keeps sending.

9.timer

The timer operator creates an Observable that returns a special value after a given period of time. It emits a simple number 0 after a given period of time. For example, the following program outputs the number 0 after 500 milliseconds.

Observable.timer(500, TimeUnit.MILLISECONDS).subscribe(System.out::print);
Copy the code

Here is the definition of this method and its overloaded method, which can also specify a scheduler:

  1. public static Observable<Long> timer(long delay, TimeUnit unit)
  2. public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler)

2.1.2 Transform operations

1.map & cast

  1. mapThe operator applies a function of your choice to each item of data emitted by the original Observable, and then returns an Observable that emitted those results. The default is not executed on any particular scheduler.
  2. castThe operator casts every item of data emitted by the original Observable into a specified type (polymorphism) and then emits the data, which is a special version of map:

The first section below converts the generated sequence of integers into a sequence of strings and outputs it. The second section of code is used to convert the Date type to the Object type and to print, where an exception will occur if the previous Class cannot be converted to the second Class:

Observable.range(1, 5).map(String::valueOf).subscribe(System.out::println);
Observable.just(new Date()).cast(Object.class).subscribe(System.out::print);
Copy the code

These two methods are defined as follows:

  1. public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
  2. public final <U> Observable<U> cast(Class<U> clazz)

Here the mapper function accepts two generics, one representing the original data type and one representing the data type to be converted, the logic of which is written in the method implemented by the interface.

2.flatMap & contactMap

FlatMap transforms an upstream Observable that sends events into multiple Observables that send events, and then combines the events they emit into a single Observable. Note that flatMap does not guarantee the order of events, that is, the order of the Observables after the transformation does not have to be the same as the order of the sequence before the transformation. For example, the following code converts a sequence of integers into a single Observable, which then forms an Observable and is subscribed to. The output below will still be a sequence of string numbers, but not necessarily in ascending order.

Observable.range(1, 5)
        .flatMap((Function<Integer, ObservableSource<String>>) i -> Observable.just(String.valueOf(i)))
        .subscribe(System.out::println);
Copy the code

The counterpart of flatMap is contactMap, which ensures that the final output is sent in the same order as the one sent upstream. Here are the definitions of the two methods:

  1. public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
  2. public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)

There are too many overloading methods of flatMap. They are slightly different in data sources. Some of them support optional parameters such as errors.

3.flatMapIterable

FlatMapIterable can be used to transform any element in the upper stream into an Iterable object that we can then consume. In the following code, we take a sequence of integers, map each integer to an Iterable

type, and finally, we subscribe to and consume it:

Observable.range(1, 5)
        .flatMapIterable((Function<Integer, Iterable<String>>) integer -> Collections.singletonList(String.valueOf(integer)))
        .subscribe(s -> System.out.println("flatMapIterable : " + s));
Copy the code

Here is the definition of the method and its overloaded methods:

  1. public final <U> Observable<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper)
  2. public final <U, V> Observable<V> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper, BiFunction<? super T, ? super U, ? extends V> resultSelector)

4.buffer

This method is used to group the entire flow. In the following example, we create a stream of 7 integers, and when we use buffer, these integers are printed as a set of 3, so we get a list Observable when we subscribe to the Buffer transform:

Observable.range(1, 7).buffer(3)
        .subscribe(integers -> System.out.println(Arrays.toString(integers.toArray())));
Copy the code

The following is the definition of this method and its overloading method. There are too many overloading methods for this method. We only show two of them here, and you can refer to the source code of RxJava for the rest. The buffer should be understood as a buffer to be emitted when the buffer is full or when there is not enough data left.

  1. public final Observable<List<T>> buffer(int count)
  2. public final Observable<List<T>> buffer(int count, int skip)
  3. .

5.groupBy

GroupBy is used to group elements, which can be used to group elements according to specified conditions. It gets an Observable of type Observable

>. As shown in the following program, we use the concat method to concatenate two Observables into one Observable and then group its elements. Here we’re grouping by Integer values, so we get an Observable of type

>. We then splice the resulting sequence into one and subscribe to the output:

Observable<GroupedObservable<Integer, Integer>> Observable = Observable. Concat (Observable. Range (1,4), Observables. Range (1, 6)). GroupBy (integer - > integer); Observable.concat(observable).subscribe(integer -> System.out.println("groupBy : " + integer));Copy the code

There are multiple overloaded versions of this method. Here we use the definition of one:

public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)

6.scan

The SCAN operator applies a function to the first item of data emitted by the original Observable, and then emits the result of that function as its first item of data. It populates the result of the function with the second term to the function to produce its own second term. It continues this process to produce the rest of the data sequence. This operator is called accumulator in some cases.

Take the following program as an example, the output result of this program is 2, 6, 24, 120, 720. It can be seen that the calculation rule here is that the function passed into scan is f, the sequence is x, and the generated sequence is y. Then the calculation formula here is y(0)=x(0); Y (I) = f (y (I – 1), x (I)), I > 0:

Observable.range(2, 5).scan((i1, i2) -> i1 * i2).subscribe(i -> System.out.print(i + " "));
Copy the code

In addition to the above form, there is an overloaded version of the SCAN method that we can use to specify an initial value when generating the sequence. In the example of the following program, the output is 3, 6, 18, 72, 360, 2160. You can see that the output is one more than the above form, because when the initial value is specified, the first number generated is the initial value, and the rest follows the rules above. So, using the same functional language, it would be of the following form: y(0)=initialValue; Y (I) = f (y (I – 1), x (I)), I > 0.

Observable.range(2, 5).scan(3, (i1, i2) -> i1 * i2).subscribe(i -> System.out.print(i + " "));
Copy the code

The above method is defined as:

  1. public final Observable<T> scan(BiFunction<T, T, T> accumulator)
  2. public final <R> Observable<R> scan(R initialValue, BiFunction<R, ? super T, R> accumulator)

7.window

WindowWindow is similar to Buffer, but instead of transmitting packets from the original Observable, it emits an Observable, and each of these Observables emits a subset of the original Observable data. An onCompleted notification is fired at the end.

As an example of the following program, we first generate a sequence of 10 numbers and then use the window function to group them into groups of three, each group returning an Observable. Here we subscribe to and consume the returned result, because there are 10 numbers, so it is divided into 4 groups, each of which corresponds to an Observable:

Observable.range(1, 10).window(3).subscribe(
        observable -> observable.subscribe(integer -> System.out.println(observable.hashCode() + " : " + integer)));
Copy the code

In addition to grouping packets, we can also group transmitted data by time. There are several overloaded versions of this method. Here are some representative ones:

  1. public final Observable<Observable<T>> window(long count)
  2. public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit)
  3. public final <B> Observable<Observable<T>> window(ObservableSource<B> boundary)
  4. public final <B> Observable<Observable<T>> window(Callable<? extends ObservableSource<B>> boundary)

2.1.3 Filtering operations

1.filter

The filter is used to filter the source according to the specified rules. For example, the following program is used to filter all numbers greater than 5 in integers 1 through 10:

Observable. Range (1,10).filter(I -> I > 5).subscribe(system.out ::println);Copy the code

Here is the definition of the method:

  1. public final Observable<T> filter(Predicate<? super T> predicate)

2.elementAt & firstElement & lastElement

ElementAt is used to get data at a specified location in the source. It has several overload methods, and here we show you how to use one of the simplest. Here is an example of elementAt, which takes an element indexed 1 from the source data and gives it to an observer to subscribe to. The following program will print 1

Observable.range(1, 10).elementAt(0).subscribe(System.out::print);
Copy the code

Here we give a definition of elementAt and its related methods, which are used similarly. Notice the return type here:

  1. public final Maybe<T> elementAt(long index)
  2. public final Single<T> elementAt(long index, T defaultItem)
  3. public final Single<T> elementAtOrError(long index)

In addition to retrieving the element with the specified index, RxJava also has methods that can be used to retrieve the first and last element directly. Here we directly define the methods:

  1. public final Maybe<T> firstElement()
  2. public final Single<T> first(T defaultItem)
  3. public final Single<T> firstOrError()
  4. public final Maybe<T> lastElement()
  5. public final Single<T> last(T defaultItem)
  6. public final Single<T> lastOrError()

3.distinct & distinctUntilChanged

Distinct is used to filter data from a source. For example, the following program filters out duplicate 7’s:

,2,3,4,5,6,7,7 observables. Just (1). Distinct (). The subscribe (System. Out: : print).Copy the code

The same is true for distinctUntilChanged, which, unlike distinct, filters out two adjacent elements only if they are the same. For example, the following program filters out the 2 and 5, so the final output is 12345676:

,2,2,3,4,5,5,6,7,6 observables. Just (1). DistinctUntilChanged (). The subscribe (System. Out: : print).Copy the code

This method also has several similar methods, which are defined as follows:

  1. public final Observable<T> distinct()
  2. public final <K> Observable<T> distinct(Function<? super T, K> keySelector)
  3. public final <K> Observable<T> distinct(Function<? super T, K> keySelector, Callable<? extends Collection<? super K>> collectionSupplier)
  4. public final Observable<T> distinctUntilChanged()
  5. public final <K> Observable<T> distinctUntilChanged(Function<? super T, K> keySelector)
  6. public final Observable<T> distinctUntilChanged(BiPredicate<? super T, ? super T> comparer)

4.skip & skipLast & skipUntil & skipWhile

The skip method is used to filter out the first n items of the data. For example, the following program will filter out the first two items, so the output is 345:

Observable.range(1, 5).skip(2).subscribe(System.out::print);
Copy the code

The corresponding skip method is the take method, which is used to indicate that only the first N items of the data source are selected, and an example of this method is not given. Here, we talk about overloading methods that are similar to such functions. Skip also has an overload method that takes two arguments and means to skip a specified amount of time, i.e., to subscribe and consume after a specified amount of time. The following program will continue to output numbers after 3 seconds:

Observables. Range (1, 5). Repeat (). Skip (3, TimeUnit. SECONDS). The subscribe (System. Out: : print).Copy the code

The opposite of skip function is skipLast, which is used to filter out the following items and not to launch for a period of time. For example, in the following method, we will time the program before it starts, and then we will output the number again and again until it ends after 5 seconds. Then, we use the skipLast method to indicate that the last 2 seconds do not launch. So the following program will output numbers for three seconds, stop output after three seconds, and finish the program after two seconds:

long current = System.currentTimeMillis(); Observables. Range (1, 5). RepeatUntil (() - > System. CurrentTimeMillis () - current > TimeUnit. SECONDS. ToMillis (5)) .skipLast(2, TimeUnit.SECONDS).subscribe(System.out::print);Copy the code

There are other methods similar to the above, but we won’t list them all here. Because there are many overloaded methods for these methods, we will give a representative part of them below:

  1. public final Observable<T> skip(long count)
  2. public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler)
  3. public final Observable<T> skipLast(int count)
  4. public final Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
  5. public final <U> Observable<T> skipUntil(ObservableSource<U> other)
  6. public final Observable<T> skipWhile(Predicate<? super T> predicate)

5.take & takeLast & takeUntil & takeWhile

The counterpart of the skip method is the take method, which represents the selection operation according to some rule. Let’s take the following program as an example, where the first program means that only the first two pieces of data in the sequence are emitted:

Observable.range(1, 5).take(2).subscribe(System.out::print);
Copy the code

The following program indicates that only the data output in the last 2 seconds is selected:

long current = System.currentTimeMillis(); Observables. Range (1, 5). RepeatUntil (() - > System. CurrentTimeMillis () - current > TimeUnit. SECONDS. ToMillis (5)) .takeLast(2, TimeUnit.SECONDS).subscribe(System.out::print);Copy the code

Here are the definitions of the related methods mentioned above. Again, we only choose a few representative ones:

  1. public final Observable<T> take(long count)
  2. public final Observable<T> takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
  3. public final <U> Observable<T> takeUntil(ObservableSource<U> other)
  4. public final Observable<T> takeUntil(Predicate<? super T> stopPredicate)
  5. public final Observable<T> takeWhile(Predicate<? super T> predicate)

6.ignoreElements

This method filters all results from the source Observable and only notifies subscribers of the Observable’s onComplete and onError events. Here is the definition of the method:

  1. public final Completable ignoreElements()

7.throttleFirst & throttleLast & throttleLatest & throttleWithTimeout

These methods are used to restrict the output of the data. They are limited by the time “window”, which you can think of as dividing the time according to the specified parameters, and then selecting the first, last, most recent, etc. to transmit according to the requirements of each method. Here is an example of the use of the throttleLast method, which prints the last digit in each number between 500 milliseconds:

Observable.interval(80, TimeUnit.MILLISECONDS)
        .throttleLast(500, TimeUnit.MILLISECONDS)
        .subscribe(i -> System.out.print(i + " "));
Copy the code

The functions of the other methods are summarized as follows:

  1. throttleFirstEmits only the first data emitted by the specified Observable within the specified event range.
  2. throttleLastOnly the last data emitted by the specified Observable within the specified event range is emitted.
  3. throttleLatestUsed to transmit the data closest to the specified time segment;
  4. throttleWithTimeoutTo launch data only after a specified period of time has not been launched, if the launch of data before the arrival of a time slice followed by a launch of data, then the previous launch of data within the time slice will be discarded, the bottom layer of the method is useddebounceMethod. If the frequency of the data is always faster than heretimeoutParameter, the data will no longer be emitted.

Here are the definitions of these methods and their overloaded methods (select some of them) :

  1. public final Observable<T> throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler)
  2. public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler)
  3. public final Observable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler, boolean emitLast)
  4. public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler)

8.debounce

Debounce is also used to limit the frequency of launch too fast, it only launches a data when no data has been launched after a specified period of time. Let’s illustrate this by looking at the following diagram:

The reason why the red, green, and blue balls are emitted is because no other balls are emitted for a certain amount of time after this one is reflected, which we can specify by parameter.

This method is used in a similar way to methods such as Throttle, which also use the Debounce implementation at the bottom, so we will not write the test code specifically for this method.

9.sample

In fact, the implementation of throttleLast calls sample internally.

2.1.4 Combined operations

1.startWith & startWithArray

The startWith method can be used to insert data in front of a specified data source. Similar methods include startWithArray, as well as several overloaded methods. To give a basic example of usage, the following program prefixes the original numeric streams 1-5 with zeros, so the final output is 012345:

Observables. Range (1, 5). StartWith (0). The subscribe (System. Out: : print).Copy the code

Here is the definition of startWith and several of its function-related methods:

  1. public final Observable<T> startWith(Iterable<? extends T> items)
  2. public final Observable<T> startWith(ObservableSource<? extends T> other)
  3. public final Observable<T> startWith(T item)
  4. public final Observable<T> startWithArray(T... items)

2.merge & mergeArray

Merge allows data from multiple sources to be merged for emission, although it may interleave data after the merge. Here’s an example where we merge two Observables using the merge method:

Observables. Merge (observables. Range (1, 5), observables. Range (6, 5)). The subscribe (System. Out: : print).Copy the code

In view of the merge method and its functionality similar methods are too many, we pick a few more representative methods here, you can see the specific RxJava source code:

  1. public static <T> Observable<T> merge(Iterable<? extends ObservableSource<? extends T>> sources)
  2. public static <T> Observable<T> mergeArray(ObservableSource<? extends T>... sources)
  3. public static <T> Observable<T> mergeDelayError(Iterable<? extends ObservableSource<? extends T>> sources)
  4. public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)

The mergeError method here behaves the same as the merge method, except when handling errors triggered by onError. The mergeError method waits for all data to be emitted before sending an error. Even if multiple errors are emitted, the method will only send a single error message. If the merger method is used, if there is an error triggered, the error will be thrown out and the operation will be terminated. Here’s an example of how this method can be used, where the main thread pauses for 4 seconds, and then one of the merged Observables fires an error 2 seconds after the thread starts, which will eventually be emitted after all data has been emitted:

Observables. MergeDelayError (observables. Range (1, 5), observables. Range (1, 5). Repeat (2), Observable.create((ObservableOnSubscribe<String>) observableEmitter -> { Thread.sleep(2000); observableEmitter.onError(new Exception("error")); }) ).subscribe(System.out::print, System.out::print); Thread.sleep(4000);Copy the code

3.concat & concatArray & concatEager

This method is also used to concatenate multiple Observables, but it fires in the exact order of the incoming observables. One Observable does not fire data from another until it has finished firing. Here’s an example of a program that passes two Observables that prints 12345678910 in order:

Observable.concat(Observable.range(1, 5), Observable.range(6, 5)).subscribe(System.out::print);
Copy the code

The following is the definition of this method. Since there are too many methods of this method and its overloading methods, here we choose a few representative descriptions:

  1. public static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources)
  2. public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources)
  3. public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
  4. public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
  5. public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources)
  6. public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>... sources)

For the concat method, we’ve already seen its use; The conactArray here does a similar thing; In the case of the concatEager method, when an observer subscribers to its result, it subscribers to all the Observablesources it concatenates, and the data emitted by those Observablesources is cached and then emitted in order. The concatDelayError function is similar to mergeDelayError in that the exception is only handled when all data has been transmitted.

4.zip & zipArray & zipIterable

The ZIP operation is used to merge multiple data items. You can specify the rules for merging these data items through a function. For example, the output of the following program is 6, 14, 24, 36, 50. Obviously, the rule for merging is the product of two data with the same index. However, a closer look at the output shows that if a data item has no corresponding value for its specified position, it will not participate in the transformation process:

Observable.zip(Observable.range(1, 6), Observable.range(6, 5), (integer, integer2) -> integer * integer2)
        .subscribe(i -> System.out.print(i + " "));
Copy the code

In addition to being used to merge two Observables, zip can also be used to specify the order of the two Observables:

Observable<String> a = / /... A request
Observable<Integer> b =  / /... B request
Observable.zip(a, b, new BiFunction<String, Integer, Object>(){
    @Override
    public Object apply(@NonNull String s, @NonNull Integer integer) throws Exception {
        // Obtain the NTH execution result of request A and request B
        return new Object();
    }
}).subscribe();
Copy the code

A and B will run in parallel in their respective child threads and will be incorporated into the apply() method. It guarantees that operation B will be performed before operation A. We can use this approach to achieve thread control. That is, when one task is completed, the other task is executed, and the results of their tasks can be merged. So what are the rules for merging? That is, if A and B send the result multiple times, then the onNext() method is called multiple times. At this time, the results sent by A and B are paired in sequence and the BiFunction function mentioned above is called back.

There are multiple overloaded versions of the zip method, as well as methods that are similar in function. Here we will select a few representative ones for illustration:

  1. public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper)
  2. ublic static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize, ObservableSource... sources)
  3. public static <T, R> Observable<R> zipIterable(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize)

In fact, the usage and functionality of the above methods are similar, with the difference being the form of the arguments passed in to the ObservableSource.

5.combineLastest

Similar to the zip operation, but the output of this operation is very different from that of zip. For example, the following program outputs 36 42 48 54 60:

Observable.combineLatest(Observable.range(1, 6), Observable.range(6, 5), (integer, integer2) -> integer * integer2)
        .subscribe(i -> System.out.print(i + " "));
Copy the code

The following chart makes it easier to illustrate the problem:

The top two lines in the figure above represent the two items used for splicing, and the bottom line is the result of the splicing. What combineLatest does is splice together the two most recent releases. Let’s use the procedure shown in the figure above to illustrate how this method works: When A appears in the second, the latest data is 1 and A, so it is combined into 1A. The second data item transmits B. At this time, the latest data is 1 and B, so it is combined into 1B. The first line emits 2, and the latest data is 2 and B, so you get 2B, and so on. Then back to our previous problem, when the first item emitted five data in a row, none of the second item was emitted, so there was no output; Then the second data item starts to transmit data. When the second data item transmits 6, the latest data combination is 6 and 6, so the value is 36. Then, the second data item fires 7, and the latest data combination is 6 and 7, so 42, and so on.

This method also has its counterpart, the combineLatestDelayError method, which is used to process error logic only when all data has been transmitted.

2.1.5 Auxiliary Operations

1.delay

The delay method is used to pause for a specified amount of time before transmitting data. For example, the following program pauses for 1 second before actually transmitting data:

Observable.range(1, 5).delay(1000, TimeUnit.MILLISECONDS).subscribe(System.out::print);
Thread.sleep(1500);
Copy the code

The delay method also has several overloaded methods that we can use to specify which thread is triggered. Here are two of them. For the rest, refer to the source code and documentation:

  1. public final Observable<T> delay(long delay, TimeUnit unit)
  2. public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler)

2. Do series

There are also a number of methods available in RxJava, all of which have the common feature of starting with do. Here’s a list of these methods and a brief description of their respective uses:

  1. public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext), can be inonNextMethod;
  2. public final Observable<T> doAfterTerminate(Action onFinally)Is fired after the Observable terminates;
  3. public final Observable<T> doFinally(Action onFinally)whenonCompleteoronErrorWhen the trigger;
  4. public final Observable<T> doOnDispose(Action onDispose)“Is triggered when it is disposed;
  5. public final Observable<T> doOnComplete(Action onComplete)“When complete;
  6. public final Observable<T> doOnEach(final Observer<? super T> observer)When eachonNextTriggered when called;
  7. public final Observable<T> doOnError(Consumer<? super Throwable> onError)When the callonErrorWhen the trigger;
  8. public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)
  9. public final Observable<T> doOnNext(Consumer<? super T> onNext), and will be inonNextWhen the trigger;
  10. public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe)Is triggered when subscribing;
  11. public final Observable<T> doOnTerminate(final Action onTerminate)Is triggered before termination.

These methods can be thought of as a listener for the execution of the operation, and are triggered when the specified operation is fired:

Observable.range(1, 5)
        .doOnEach(integerNotification -> System.out.println("Each : " + integerNotification.getValue()))
        .doOnComplete(() -> System.out.println("complete"))
        .doFinally(() -> System.out.println("finally"))
        .doAfterNext(i -> System.out.println("after next : " + i))
        .doOnSubscribe(disposable -> System.out.println("subscribe"))
        .doOnTerminate(() -> System.out.println("terminal"))
        .subscribe(i -> System.out.println("subscribe : " + i));
Copy the code

3.subscribeOn & observeOn

SubscribeOn is used to specify the thread that the Observable runs, and observeOn is used to specify the thread where the data is transmitted. For example, for an asynchronous task in Android, subscribeOn is used to specify the thread where the data is transmitted, and then the results are sent to the main thread after execution. You need to specify it with observeOn. For example, in the following program, we use these two methods to specify the thread:

Observable.create(new ObservableOnSubscribe<T>() {
    @Override
    public void subscribe(ObservableEmitter<T> emitter) throws Exception {
        // do nothing
    }
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
Copy the code

4.timeout

Is used to set a timeout period, if no data is emitted within the specified period, then the specified data item will be executed. As shown in the following program, we have set up a digital generator with an interval of 200 milliseconds, and pause for one second before starting to emit data. Because we set a timeout of 500 milliseconds, the data item we passed in will be executed at 500 milliseconds:

Observable.interval(1000, 200, TimeUnit.MILLISECONDS)
        .timeout(500, TimeUnit.MILLISECONDS, Observable.rangeLong(1, 5))
        .subscribe(System.out::print);
Thread.sleep(2000);
Copy the code

The timeout method has multiple overloads. You can specify parameters such as thread for the timeout method. Refer to the source code or documentation for details.

2.1.6 Error handling operators

Error handling operators are provided to Observables to uniformly handle error messages. The two commonly used operators are catch and retry.

1.catch

The catch operation intercepts the original Observable’s onError notification and replaces it with another data item or sequence that allows the resulting Observable to terminate normally or not at all. In RxJava, this operation has three final types:

  1. onErrorReturnThis operation returns a special item replacement error when onError is raised and calls the observer’s onCompleted method without passing the error to the observer;
  2. onErrorResumeNext: sends alternate data items to observers when onError is triggered;
  3. onExceptionResumeNextIf the Throwable received by onError is not an Exception when onError is raised, it will pass the error to the observer’s onError method without using an alternate Observable.

Here’s an example program for onErrorReturn and onErrorResumeNext, where the first code will print 666 in case of an error, and the second will release 12345 in case of an error:

Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> { observableEmitter.onError(null); observableEmitter.onNext(0); }).onErrorReturn(throwable -> 666).subscribe(System.out::print); Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> { observableEmitter.onError(null); observableEmitter.onNext(0); }). OnErrorResumeNext (observables. Range (1, 5)). The subscribe (System. Out: : print);Copy the code

2.retry

Retry uses an error retry mechanism that can be used in the event of an error. We can specify the conditions for the retry mechanism with parameters. For example, in the following program, we set it to retry twice if there is an error, so onNext is called the first time, onNext is called twice if there is an error, onNext is called twice if there is an error, and onError is triggered the second time. In other words, the following code will fire onNext3 once and onError() once:

    Observable.create(((ObservableOnSubscribe<Integer>) emitter -> {
        emitter.onNext(0);
        emitter.onError(new Throwable("Error1"));
        emitter.onError(new Throwable("Error2"));
    })).retry(2).subscribe(i -> System.out.println("onNext : " + i), error -> System.out.print("onError : " + error));
Copy the code

Retry has several overloading methods and methods that are similar in functionality. Here are the definitions of these methods (selection section) :

  1. public final Observable<T> retry(): Will retry an infinite number of times;
  2. public final Observable<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate)
  3. public final Observable<T> retry(long times): Specifies the retry times;
  4. public final Observable<T> retry(long times, Predicate<? super Throwable> predicate)
  5. public final Observable<T> retryUntil(final BooleanSupplier stop)
  6. public final Observable<T> retryWhen(Function<? super Observable<Throwable>, ? extends ObservableSource<? >> handler)

2.1.7 Conditional operators and Boolean operators

1.all & any

  1. allTo determine whether the specified data items all meet the specified requirements, where the “requirements” can be specified using a function;
  2. anyTo determine whether the specified Observable has a data item that meets the specified requirements.

In the following program, we use this function to determine whether all of the specified items satisfy the requirement greater than 5. Obviously, it does not, so the following program will print false:

Observable.range(5, 5).all(i -> i>5).subscribe(System.out::println); // false
Observable.range(5, 5).any(i -> i>5).subscribe(System.out::println); // true
Copy the code

Here is the definition of the method:

  1. public final Single<Boolean> all(Predicate<? super T> predicate)
  2. public final Single<Boolean> any(Predicate<? super T> predicate)

2.contains & isEmpty

These two methods are used to determine whether the data item contains the data item we specified, and have already determined whether the data item is empty:

Observable.range(5, 5).contains(4).subscribe(System.out::println); // false
Observable.range(5, 5).isEmpty().subscribe(System.out::println); // false
Copy the code

Here are the definitions of the two methods:

  1. public final Single<Boolean> isEmpty()
  2. public final Single<Boolean> contains(final Object element)

3.sequenceEqual

SequenceEqual is used to determine whether the sequence emitted by two Observables is equal. For example, the following method is used to determine whether two sequences are equal:

Observables. SequenceEqual (observables. Range (1, 5), observables. Range (1, 5)). The subscribe (System. Out: : println);Copy the code

4.amb

Amb acts on two or more Observables, but only emits all of the data from the first Observable:

Observable.amb(Arrays.asList(Observable.range(1, 5), Observable.range(6, 5))).subscribe(System.out::print)
Copy the code

The first two are static methods, and the second is an instance method:

  1. public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)
  2. public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources)
  3. public final Observable<T> ambWith(ObservableSource<? extends T> other)

5.defaultIfEmpty

DefaultIfEmpty specifies a value to emit when the specified sequence is empty. In the following program, we call the emitter’s onComplete method directly, so the sequence is empty and the result is an integer 6:

Observable.create((ObservableOnSubscribe<Integer>) Emitter::onComplete).defaultIfEmpty(6).subscribe(System.out::print);
Copy the code

Here is the definition of the method:

  1. public final Observable<T> defaultIfEmpty(T defaultItem)

2.1.8 Conversion operators

1.toList & toSortedList

ToList and toSortedList are used to turn a sequence into a list, the latter adding sort functionality to the former:

Observable.range(1, 5).toList().subscribe(System.out::println);
Observable.range(1, 5).toSortedList(Comparator.comparingInt(o -> -o)).subscribe(System.out::println);
Copy the code

Here are their definitions. There are multiple overloaded versions of them, two of which are selected here:

  1. public final Single<List<T>> toList()
  2. public final Single<List<T>> toSortedList(final Comparator<? super T> comparator)

Note that the return is Single, but that doesn’t stop us from using the chain because Single’s method is the same as Observable’s. Also note that the argument in Single here is a List

, that is, it transforms the entire sequence into a List object. Thus, the output of the two sample programs above is:

[1, 2, 3, 4, 5]
[5, 4, 3, 2, 1]
Copy the code

2.toMap & toMultimap

ToMap is used to convert emitted data into a value of another type, and its conversion process is per-item. The following code, for example, converts each number in the original sequence to its corresponding hexadecimal value. However, the results of the toMap transformation are not necessarily in the order in which the original sequence was fired:

Observable.range(8, 10).toMap(Integer::toHexString).subscribe(System.out::print);
Copy the code

ToMap is approximated by the toMultimap method, which converts each data item in the original sequence to a collection type:

Observable.range(8, 10).toMultimap(Integer::toHexString).subscribe(System.out::print);
Copy the code

The output of the above two programs is:

{11=17, a=10, b=11, c=12, d=13, e=14, f=15, 8=8, 9=9, 10=16} {11=[17], a=[10], b=[11], c=[12], d=[13], e=[14], f=[15], 9 = 8 = [8], [9], 10 = [16]}Copy the code

The above two methods are defined as (multiple overloads, select section) :

  1. public final <K> Single<Map<K, T>> toMap(final Function<? super T, ? extends K> keySelector)
  2. public final <K> Single<Map<K, Collection<T>>> toMultimap(Function<? super T, ? extends K> keySelector)

3.toFlowable

This method is used to convert an Observable to a Flowable type. Here is the definition of this method. Obviously, this method uses the strategy pattern, which involves backpressure, which we will cover in more detail later.

public final Flowable<T> toFlowable(BackpressureStrategy strategy)
Copy the code

4.to

You can convert the specified Observable to any type you want (if you can). Here is an example code that converts the specified sequence of integers to another Observable of the integer type. But each data item here is the value of the total number of data in the original list:

Observable.range(1, 5).to(Observable::count).subscribe(System.out::println);
Copy the code

Here is the definition of the method:

public final <R> R to(Function<? super Observable<T>, R> converter)

2.2 Thread Control

As mentioned before, the thread control of RxJava is completed by the subscribeOn and observeOn methods. Here we will go through several thread schedulers provided by RxJava and the usage scenarios and differences of the scheduler provided by RxAndroid for Android.

  1. Schedulers.io(): represents the scheduler applicable to IO operations, growing or shrinking from the appropriate pool of threads, typically for NETWORK, reading and writing files, and other IO intensive operations. The important thing to note is that the thread pool is unlimited, and a large number of I/O scheduling operations will create many threads and consume memory.
  2. Schedulers.computation(): The default scheduler for computing work, representing CPU-intensive operations, independent of I/O operations. It is also a number of RxJava methods, such asbuffer().debounce().delay().interval().sample().skip(), the default scheduler for.
  3. Schedulers.newThread(): represents a regular new thread.
  4. Schedulers.immediate(): This scheduler allows you to immediately perform the work you specify on the current thread. It is atimeout().timeInterval()As well astimestamp()Method default scheduler.
  5. Schedulers.trampoline(): When we want to execute a task in the current thread, not immediately, we can usetrampoline()Add it to the team. The scheduler will process its queue and run each task in the queue in order. It is arepeat()andretry()Method default scheduler.

And RxAndroid thread scheduler:

AndroidSchedulers. MainThread () is used to refer to the main thread of Android

2.3 summarize

The above operations are also generally applicable to Flowable, Single, Completable, and Maybe.

We have spent a lot of time and effort combing through these methods, and it is not a problem to use RxJava to implement some basic or advanced operations.

Observables, however, is more suitable for dealing with some data on a smaller scale, when the data size is more MissingBackpressureException abnormalities may occur. Therefore, we also need to understand back pressure and Flowable to better understand and apply RxJava.

Above, thanks for reading ~