RxJava operator

RxJava

Reference Documents:

  • RxJava JavaDoc
  • Chinese translation of ReactiveX documents

1 ObservableThe creation of

1.1 the from ()

Observable
List of methods:

  • public static <T> Observable<T> from(Future<? extends T> future)
  • public static <T> Observable<T> from(Future<? extends T> future, long timeout, TimeUnit unit)
  • public static <T> Observable<T> from(Future<? extends T> future, Scheduler scheduler)
  • public static <T> Observable<T> from(Iterable<? extends T> iterable)
  • public static <T> Observable<T> from(T[] array)

Chestnut:

Observable<String> Observable = Observable. From (new String[]{"hello", "hi"});Copy the code
// 2. Create an Observable with a Future that represents the result of an asynchronous calculation. FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() { @Override public String call() throws Exception {// TODO performs an asynchronous operation and returns data return "hihi"; }}); Scheduler.Worker worker = Schedulers.io().createWorker(); worker.schedule(new Action0() { @Override public void call() { futureTask.run(); }}); Observable<String> observable = Observable.from(futureTask);Copy the code

1.2 just( )

Observable
List of methods:

  • public static <T> Observable<T> just(final T value)
  • public static <T> Observable<T> just(T t1, T t2)
  • public static <T> Observable<T> just(T t1, T t2, T t3)
  • public static <T> Observable<T> just(T t1, T t2, T t3, T t4)
  • public static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5)
  • public static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6)
  • public static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7)
  • public static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8)
  • public static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9)
  • public static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9, T t10)

Chestnut:

Observable<String> observable = Observable.just("hello"); Observable<String> Observable = Observable. Just ("hello", "hi", "...") ); // Iterate from() with the same effect as just(). String[] stringArrs = new String[]{"hello", "hi", "..." }; Observable<String> observable = Observable.from(stringArrs);Copy the code

just()
just()
from()
from()

1.3 the create ()

Observable
List of methods:

  • public static <T> Observable<T> create(OnSubscribe<T> f)
  • @Beta public static <S, T> Observable<T>create(SyncOnSubscribe<S, T> syncOnSubscribe)
  • @Experimental public static <S, T> Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe)

Chestnut:

Observable.OnSubscribe<String> onSubscribe = new Observable.OnSubscribe< String >() { @Override public void call(Subscriber<? Super String > subscriber) {// onNext() can subscribe multiple times. OnNext ("hello"); subscribe.onCompleted(); }}; Observable<Object> observable = Observable.create(onSubscribe);Copy the code

just( )
form( )

Observable<Object> observable = Observable.just("hello");Copy the code

1.4 the interval ()

Observable
List of methods:

  • public static Observable<Long> interval(long interval, TimeUnit unit)
  • public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler scheduler)
  • public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
  • public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

Chestnut:

// Send serial numbers every 1s, starting from 0 and accumulating by 1 each time. Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);Copy the code

1.5 the timer ()

Observable
List of methods:

  • public static Observable<Long> timer(long delay, TimeUnit unit)
  • public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler)

Chestnut:

Observable<Long> Observable = Observable.timer(3, timeunit.seconds);Copy the code

1.6 the range ()

Observable
List of methods:

  • public static Observable<Integer> range(int start, int count)
  • public static Observable<Integer> range(int start, int count, Scheduler scheduler)

Chestnut:

Observable<Integer> Observable = observable. range(5, 3);Copy the code

1.7 the empty ()

onCompleted()
Observable
List of methods:

  • public static <T> Observable<T> empty()

Chestnut:

// Issue an onCompleted() notification to Observable<Object> Observable = Observable.empty();Copy the code

1.8 the error ()

onError
Observable
List of methods:

  • public static <T> Observable<T> error(Throwable exception)

Chestnut:

Observable<Object> Observable = Observable. Error(new Throwable("message"));Copy the code

1.9 never ()

Observable
List of methods:

  • public static <T> Observable<T> never()

Chestnut:

Observable<Object> observable = Observable.never();Copy the code

1.10 the defer ()

Observable
List of methods:

  • public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory)

Chestnut:

Observable<String> observable = Observable.defer(new Func0<Observable<String>>() { @Override public Observable<String> call() { return Observable.just("string"); }});Copy the code

2 redo

2.1 repeat( )

onNext()
onComplete()
onError()
List of methods:

  • public final Observable<T> repeat()
  • public final Observable<T> repeat(final long count)
  • public final Observable<T> repeat(Scheduler scheduler)
  • public final Observable<T> repeat(final long count, Scheduler scheduler)

Chestnut:

Observable<String> observable = Observable.just("string"); // Infinitely repeat observable. Repeat (); Observable. repeat(5); // Repeat 5 times.Copy the code

2.2 repeatWhen( )

onNext()
onCompleted()
onError()
List of methods:

Chestnut:

observable.repeatWhen(new Func1<Observable<? extends Void>, Observable<? >>() { @Override public Observable<? > call(Observable<? Extends Void> observable) {return observable.zipWith(observable. range(1, 3), new Func2<Void, Integer, Integer>() { @Override public Integer call(Void aVoid, Integer integer) { return integer; } }).flatMap(integer -> Observable.timer(1, TimeUnit.SECONDS)); }});Copy the code

3 retry

3.1 retry( )

Observable
onError()
Observable
onNext()
onCompleted()
onError()
List of methods:

  • public final Observable<T> retry()
  • public final Observable<T> retry(final long count)
  • public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate)

Chestnut:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { System.out.println("......." ); int a = 1 / 0; subscriber.onNext(a); subscriber.onCompleted(); }}); // Infinite retries observable. Retry (); // Retry 3 times observable. Retry (3); // Use the predicate function to decide whether observable. Retry (new Func2<Integer, Throwable, Boolean>() {@override public Boolean call(Integer Integer, Throwable Throwable) {// Integer specifies the number of times to subscribe; The throwable parameter is an exception thrown. // The throwable parameter returns true for retry. The throwable parameter returns false for retry. }});Copy the code

3.2 retryWhen( )

Function:
List of methods:

Chestnut:

S Observable. RetryWhen (new Func1<Observable<? extends Throwable>, Observable<? >>() { @Override public Observable<? > call(Observable<? extends Throwable> observable) { return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Object>() { @Override public Object call(Throwable throwable, Integer integer) { return integer; } }).flatMap(new Func1<Object, Observable<? >>() { @Override public Observable<? > call(Object o) { return Observable.timer(1, TimeUnit.SECONDS); }}); }});Copy the code

4 transform

4.1 the map ()

Observable
List of methods:

  • public final <R> Observable<R> map(Func1<? super T, ? extends R> func)

Chestnut:

Observable.just(2) .map(new Func1<Integer, String>() {@override public String call(Integer Integer) {return string.valueof (string.format (" Double the original data is: %s", integer * 2)); }});Copy the code

4.2 flatMap( )

Observable
Observable
List of methods:

  • public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func)
  • @Beta public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent)
  • public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> onNext, Func1<? super Throwable, ? extends Observable<? extends R>> onError, Func0<? extends Observable<? extends R>> onCompleted)
  • @Beta public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> onNext, Func1<? super Throwable, ? extends Observable<? extends R>> onError, Func0<? extends Observable<? extends R>> onCompleted, int maxConcurrent)
  • public final <U, R> Observable<R> flatMap(final Func1<? super T, ? extends Observable<? extends U>> collectionSelector, final Func2<? super T, ? super U, ? extends R> resultSelector)
  • @Beta public final <U, R> Observable<R> flatMap(final Func1<? super T, ? extends Observable<? extends U>> collectionSelector, final Func2<? super T, ? super U, ? extends R> resultSelector, int maxConcurrent)

Chestnut:

Observable.just(2) .flatMap(new Func1<Integer, Observable<Long>>() {@override public Observable<Long> Call (Integer Integer) {// Converts to an Observable timed in Integer seconds return Observable.timer(integer, TimeUnit.SECONDS); }});Copy the code

5 filter

5.1 the filter ()

Only elements that satisfy the specified predicate are emitted.

List of methods:

  • public final Observable<T> filter(Func1<? super T, Boolean> predicate)

Chestnut:

Observable.just(-1, -2, 0, 1, 2) .filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer > 0; }});Copy the code

5.2 the first ()

Observable
Observable
Observable
NoSuchElementException
List of methods:

  • public final Observable<T> first()
  • public final Observable<T> first(Func1<? super T, Boolean> predicate)

Chestnut:

Observable. Just (-1, -2, 0, 1, 2).first(); Observable. Just (-1, -2, 0, 1, 2). First (new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer > 0; }}); // NoSuchElementException Observable. Empty ().first();Copy the code

5.3 the last ()

Observable
Observable
Observable
NoSuchElementException
List of methods:

  • public final Observable<T> last()
  • public final Observable<T> last(Func1<? super T, Boolean> predicate)

Chestnut:

Observable. Just (-1, -2, 0, 1, 2).first(); Observable. Just (-1, -2, 0, 1, 2). First (new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer < 0; }}); // NoSuchElementException Observable. Empty ().last();Copy the code

5.4 the skip ()

Skip the previously specified number of elements or the specified period of time, only the following elements.

List of methods:

Chestnut:

Observable. Just (-1, -2, 0, 1, 2).skip(2) // Skip the first two dataCopy the code

5.5 skipLast( )

Observable
List of methods:

  • public final Observable<T> skipLast(int count)
  • public final Observable<T> skipLast(long time, TimeUnit unit)
  • public final Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler)

Chestnut:

Observable. Just (-1, -2, 0, 1, 2). Skip (2Copy the code

5.6 take( )

Emits only the previously specified number of elements or for the specified time.

List of methods:

Chestnut:

Observable.just(-1, -2, 0, 1, 2).take(3); // Only the first three data are emittedCopy the code

5.7 takeLast( )

Observable
List of methods:

  • public final Observable<T> takeLast(final int count)
  • public final Observable<T> takeLast(int count, long time, TimeUnit unit)
  • public final Observable<T> takeLast(int count, long time, TimeUnit unit, Scheduler scheduler)
  • public final Observable<T> takeLast(long time, TimeUnit unit)
  • public final Observable<T> takeLast(long time, TimeUnit unit, Scheduler scheduler)

Chestnut:

Observable.just(-1, -2, 0, 1, 2).takeLast(3); // Only launch the last three data 1Copy the code

5.8 the sample ()

Observable
List of methods:

  • public final Observable<T> sample(long period, TimeUnit unit)
  • public final Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler)
  • public final <U> Observable<T> sample(Observable<U> sampler)

Chestnut:

Observable.interval(300, TimeUnit.MILLISECONDS)
        .sample(2, TimeUnit.SECONDS)Copy the code

5.9 elementAt( )

Emits only elements with the specified index.

List of methods:

Chestnut:

Observable.just(-1, -2, 0, 1, 2).elementAt(2); // Emit data with index 2Copy the code

5.10 elementAtOrDefault( )

Emits only the element of the specified index, or the default value if the element of the index does not exist.

List of methods:

Chestnut:

Observable.just(-1, -2, 0, 1, 2).elementAtOrDefault(9, -5); // Launch data with index 9, if not present, launch -5Copy the code

5.11 ignoreElements( )

onCompleted()
List of methods:

Chestnut:

Observable.just(-1, -2, 0, 1, 2).ignoreElements()Copy the code

5.12 distinct( )

Filter duplicate elements by allowing only elements that have not yet been launched.

List of methods:

Chestnut:

Observable. Just (-1, -2, 0, 1, 2, 1). Distinct (); Observable. Just (-1, -2, 0, 1, 2, 1). Distinct (new Func1<Integer, Integer>() {@override public Integer call(Integer Integer) {// Generate key return Integer * (int)(math.random () * 10); }});Copy the code

5.13 debounce( )

Every time the source Observable produces a result, it emits the result if no new result is generated within the specified interval, otherwise the result is ignored. This operator filters out data items that emit too fast.

List of methods:

Chestnut:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? Super Integer> subscriber) {try {// The interval for generating the result is 100, 200, 300... For (int I = 1; i < 10; i++) { subscriber.onNext(i); Thread.sleep(i * 100); } subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); }}}); Observable. Debounce (400, timeunit.milliseconds) // Timeout is 400msCopy the code

The chestnut produces results as follows: print 5, 6, 7, 8 in sequence.

Attached: function realization

Delay traversal

Observable<Integer> traverseObservable = Observable. Just (3, 4, 5, 6); // Observable<Long> intervalObservable = Observable.interval(1, timeunit.seconds); Func2<Long, Integer, Integer> func2 = new Func2<Long, Integer, Integer>() { @Override public Integer call(Long aLong, Integer integer) { return integer; }}; intervalObservable.zipWith(traverseObservable, func2) .toBlocking() .subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(Integer integer) { System.out.println(integer); }});Copy the code

The countdown

int startTime = 10; Observable.interval(0, 1, timeUnit.seconds). Take (startTime + 1) // Receive startTime + 1 times. Map (new Func1<Long, Long>() { @Override public Long call(Long time) { // 1 2 3... Convert... 3 2 1 return startTime - time; } }) .toBlocking() .subscribe(new Subscriber<Long>() { @Override public void onCompleted() { System.out.println(" countdown ends "); } @override public void onError(Throwable e) {system.out.println (system.out.println); e.printStackTrace(); } @override public void onNext(Long aLong) {system.out.println (string.format (" countdown: %s ", aLong)); }});Copy the code