Each programming language implementation of ReactiveX implements a collection of operators. There is a lot of overlap between different implementations, and some operators only exist in specific implementations. Each implementation tends to name these operators in a way that is similar to the context they are familiar with in that programming language.

This article starts with a list of ReactiveX’s core operators and links to the corresponding documentation, followed by a decision tree to help you select the right operators for your particular scenario. Finally, there is an alphabetical list of language-specific implementation-specific operators.

If you want to implement your own operators, see here: Implementing custom operators

Create operator

Operator used to create an Observable

  • Create – Creates an Observable from scratch by calling the observer’s methods
  • Defer – Instead of creating this Observable before the observer subscribes, create a new Observable for each observer
  • Empty/Never/Throw – Creates a special Observable with limited behavior
  • From – Converts other objects or data structures into Observables
  • Interval – Creates an Observable that periodically emits integer sequences
  • Just – transforms an object or collection of objects into an Observable that emits those objects
  • Range – Creates an Observable that emits a sequence of integers in a specified Range
  • Repeat – Creates an Observable that repeatedly emits specific data or data sequences
  • Start – Creates an Observable that emits the return value of a function
  • Timer – Creates an Observable that emits single data after a specified delay

Create

Create an Observable from scratch using a function

You can Create an Observable from scratch using the Create operator, pass the operator a function that takes an observer as an argument, and write the function to behave as an Observable– calling the observer’s onNext appropriately, OnError and onCompleted methods.

A properly formed Observable must try to call the observer’s onCompleted or its onError exactly once, and cannot call any other observer methods after that.

RxJava implements this operator as the create method.

It is recommended that you check the isUnsubscribed state of the observers in the function passed to the Create method so that your Observable stops emitting data or doing expensive arithmetic when there are no observers.

Sample code:

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> observer) {
        try {
            if(! observer.isUnsubscribed()) {for (int i = 1; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); }}catch (Exception e) {
            observer.onError(e);
        }
    }
 } ).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted(a) {
            System.out.println("Sequence complete."); }});Copy the code

Output:

Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
Copy the code

The create method is not executed on any particular scheduler by default.

  • Javadoc: create(OnSubscribe)

Defer

An Observable is not created until an observer subscribes, and a new Observable is created for each observer

The Defer operator waits until an observer subscribes to it, and then generates an Observable using the Observable factory methods. It does this for each observer, so while each subscriber thinks they are subscribing to the same Observable, in fact each subscriber is getting its own separate data sequence.

In some cases, waiting until the last minute to generate an Observable (that is, until the subscription occurs) ensures that the Observable contains the most recent data.

RxJava implements this operator as the defer method. This operator takes an Observable factory function of your choice as a single argument. This function takes no arguments and returns an Observable.

The defer method does not execute on any particular scheduler by default.

  • Javadoc: defer(Func0)

switchCase

A similar operator is available in the optional package RxJava-Computation-Expressions. The switchCase operator conditionally creates and returns one of a possible collection of Observables.

There is also a simpler operator called ifThen in the optional rXJava-Computation-Expressions package. This operator checks for a condition and, depending on the result, returns either a mirror of the original Observable or an empty Observable.

Empty/Never/Throw

Empty

Create an Observable that emits no data but terminates normally

Never

Create an Observable that doesn’t emit data or terminate

Throw

Create an Observable that does not emit data terminating with an error

The Observable behavior generated by these three operators is very specific and limited. Useful for testing, sometimes in conjunction with other Observables, or as a parameter to other Observables that require Observable operators.

RxJava implements these operators as Empty, never, and error. The error operator requires a Throwable that your Observable terminates with. These operators do not execute on any particular Scheduler by default, but empty and Error have an optional argument called Scheduler, on which they will send notifications if you pass a Scheduler argument.

  • Javadoc: empty()
  • Javadoc: never()
  • Javadoc: error(java.lang.Throwable)

From

Transform other kinds of objects and data types into Observables

When you use Observables, it’s very convenient if all the data you’re processing can be converted to Observables instead of having to mix Observables with other types of data. This allows you to use a uniform set of operators to manage data flows throughout their life cycle.

For example, Iterable can be thought of as a synchronous Observable; A Future can be thought of as an Observable that always emits only a single data. By explicitly transforming that data into Observables, you can interact with them as Observables do.

Therefore, most ReactiveX implementations provide a way to transform language-specific objects and data structures into Observables.

In RxJava, the FROM operator can transform futures, Iterable, and arrays. For Iterable and array, the resulting Observable emits each entry of the Iterable or array.

The sample code

Integer[] items = { 0.1.2.3.4.5 };
Observable myObservable = Observable.from(items);

myObservable.subscribe(
    new Action1<Integer>() {
        @Override
        public void call(Integer item) { System.out.println(item); }},new Action1<Throwable>() {
        @Override
        public void call(Throwable error) {
            System.out.println("Error encountered: "+ error.getMessage()); }},new Action0() {
        @Override
        public void call(a) {
            System.out.println("Sequence complete"); }});Copy the code

The output

0
1
2
3
4
5
Sequence complete
Copy the code

For the Future, it emits the single data returned by the future.get () method. The FROM method has a version that accepts two optional parameters, specifying the timeout length and the unit of time. If the Future doesn’t return a value after the specified time, the Observable emits an error notification and terminates.

By default, from is not executed on any particular scheduler. However, you can pass the Scheduler as an optional second parameter to an Observable, which manages the Future on that Scheduler.

  • Javadoc: from(array)
  • Javadoc: from(Iterable)
  • Javadoc: from(Future)
  • Javadoc: from(Future,Scheduler)
  • Javadoc: from(Future,timeout, timeUnit)

RxJavaAsyncUtil

In addition, in the optional RxJavaAsyncUtil package, you can convert actions, callables, functions, and runnables into observables that generate the results of those actions using the following operators:

  • fromAction
  • fromCallable
  • fromFunc0
  • fromRunnable

See more information about these operators on the Start page.

Note: The optional StringObservable class also has a FROM method that converts a stream of characters or a REader into an Observable that emits an array of bytes or a string.

runAsync2

Note: This is repeated with the runAsync instructions in the start operator

In a separate RxJavaAsyncUtil package (not included in RxJava by default), there is also a runAsync function. Pass an Action and a Scheduler to runAsync, which returns a StoppableObservable that uses the Action to generate emitted data items.

Pass an Action and a Scheduler to runAsync, which returns a StoppableObservable that generates data using the Action. The Action takes an Observable and a Subscription, which it uses to check the unsubscribed conditions and to stop emitting data as soon as the conditions are found to be true. You can manually stop a StoppableObservable at any time using the unsubscribe method (which also unsubscribes the Subscription associated with the StoppableObservable).

Since runAsync will immediately call an Action and start emitting data, some data may be lost between the time you create a StoppableObservable and the time your observer is ready to receive the data. If this doesn’t meet your requirements, you can use a variant of runAsync that also accepts a Subject argument, passing a ReplaySubject to it, and you can retrieve other missing data.

decode

The StringObservable class, which is not part of the default RxJava, contains a decode operator that converts a stream of multi-byte characters into an Observable that emits an array of bytes divided by character boundaries.

Interval

Create an Observable that emits integer sequences at fixed intervals

The Interval operator returns an Observable that emits an infinitely increasing sequence of integers at fixed intervals.

RxJava implements this operator as an interval method. It takes one parameter representing an interval and one parameter representing a unit of time.

  • Javadoc: interval(long,TimeUnit)
  • Javadoc: interval(long,TimeUnit,Scheduler)

Another version of interval returns an Observable that emits a zero value after a specified delay and then an increasing number at a specified interval. This version of interval is called timer in RxJava 1.0.0, but that method is no longer recommended because an operator called interval does the same thing.

Javadoc: interval(long,long,TimeUnit) Javadoc: interval(long,long,TimeUnit,Scheduler)

Interval runs on the Computation scheduler by default. You can also pass an optional Scheduler parameter to specify the Scheduler.

Just

Creates an Observable that emits the specified value

Just converts a single data into an Observable that emits that data.

Just is similar to From, but From takes the data From an array or Iterable and shoots it one by one. Just simply shoots it as is, treating the array or Iterable as a single piece of data.

Note: If you pass null to Just, it returns an Observable that emits null. Don’t mistake it for returning an Empty Observable (one that emits no data at all). If you need an Empty Observable you should use the Empty operator.

RxJava implements this operator as just, which takes one to nine arguments and returns an Observable that emits the data in argument list order.

Sample code:

Observable.just(1.2.3)
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted(a) {
            System.out.println("Sequence complete."); }});Copy the code

The output

Next: 1
Next: 2
Next: 3
Sequence complete.
Copy the code
  • Javadoc: Just (item) (there are other versions that take two to nine arguments)

Range

Creates an Observable that emits a specific sequence of integers

The Range operator fires an ordered sequence of integers in a Range. You can specify the start and length of the Range.

RxJava implements this operator as a range function, which takes two arguments, the start value of the range and the number of data in the range. If you set the second parameter to 0, it causes the Observable to emit no data (if it’s negative, it throws exceptions).

Range does not execute on any particular scheduler by default. There is a variation that specifies the Scheduler with optional arguments.

  • Javadoc: range(int,int)
  • Javadoc: range(int,int,Scheduler)

Repeat

Create an Observable that emits a particular data multiple times

Repeat repeatedly emits data. Some implementations allow you to repeatedly transmit a sequence of data, while others allow you to limit the number of repetitions.

RxJava implements this operator as the repeat method. Instead of creating an Observable, it repeats the data sequence of the original Observable, either infinite or specified by repeat(n).

The repeat operator is executed by default on the Trampoline scheduler. There is a variation that specifies the Scheduler with optional arguments.

Javadoc: repeat() Javadoc: repeat(long) Javadoc: repeat(Scheduler) Javadoc: repeat(long,Scheduler)

repeatWhen

There is also an operator called repeatWhen, which, instead of caching and replaying the original Observable’s data sequence, conditionally resubscribes and emits the original Observable.

The original Observable’s termination notification (complete or error) is passed as void data to a notification handler, which decides whether to re-subscribe and relaunch the original Observable. The notification handler acts like an Observable operator. It accepts as input a void notifying Observable, returns void notifying data (i.e., resubscribes and launches the original Observable), or terminates (i.e., Observables that terminate emission data using repeatWhen).

The repeatWhen operator executes by default on the Trampoline scheduler. There is a variation that specifies the Scheduler with optional arguments.

  • Javadoc: repeatWhen(Func1)
  • Javadoc: repeatWhen(Func1,Scheduler)

doWhile

DoWhile is part of the optional rxJava-Computation-Expressions package and is not part of the RXJava standard operator. The doWhile checks for a condition after each iteration of the original sequence and repeats if the condition is met.

whileDo

WhileDo is part of the optional rXJava-Computation-Expressions package and is not part of the RXJava standard operator. WhileDo checks a condition before each iteration of the original sequence, and only repeats if the condition is met

Start

Returns an Observable that emits a value similar to a function declaration

A programming language has a variety of methods for retrieving values from the results of operations, with names such as functions, futures, Actions, callables, runnables, and so on. The set of operators in the Start directory allows them to behave like Observables, so they can be used with other Observables in the Observables call chain.

The various RxJava implementations of the Start operator are part of the optional RxJava-Async module.

The rxJava-Async module contains the start operator, which takes a function as an argument, calls the function to get a value, and returns an Observable that emits that value to subsequent observers.

Note: This function is executed only once, even if multiple observers subscribe to the returned Observable.

toAsync

The rxJava-Async module also contains these operators: toAsync, asyncAction, and asyncFunc. They take a function or an Action as an argument.

For functions, the operator calls the function to get a value and returns an Observable (like start) that emits the value to subsequent observers. For actions, the procedure is similar, but there is no return value, in which case the operator fires a NULL value before terminating.

Note: This function or action is executed only once, even if multiple observers subscribe to the returned Observable.

startFuture

The rxJava-Async module also contains a startFuture operator, which is passed a function that returns the Future. StartFuture immediately calls this function to get the Future object, and then calls the Future’s get() method to try to get its value. It returns an Observable that emits the value to subsequent observers.

deferFuture

The rxJava-Async module also contains a deferFuture operator, which is passed to a function that returns a Future (the Future returns an Observable). DeferFuture returns an Observable, but does not call any of your provided functions. Until an observer subscribes to the Observable it returns. It immediately calls the Future’s Get () method and mirrors the data emitted by the Observable returned by the Get () method.

In this way, you can include a Future object that returns an Observable in the Observables call chain.

fromAction

The rxJava-Async module also contains a fromAction operator that takes an Action as an argument, returns an Observable, and emits the data you pass to fromAction when the Action terminates.

fromCallable

The rxJava-Async module also contains a fromCallable operator that takes a Callable as an argument and returns an Observable that emits the result of that Callable.

fromRunnable

The rxJava-Async module also contains a fromRunnable operator that takes a Runnable as an argument, returns an Observable, and emits the data you pass to fromRunnable once the Runnable terminates.

forEachFuture

The rxJava-Async module also contains a forEachFuture operator. It is not really a variant of the Start operator, but has some characteristics of its own. You pass in some typical observer methods (such as onNext, onError, and onCompleted), and the Observable calls it the usual way. But the forEachFuture itself returns a Future and blocks at the get() method until the original Observable completes, and then it returns, completion or error depending on whether the original Observable completes or fails.

Use this operator if you want a function to block until Observable completes.

runAsync

The rxJava-Async module also contains a runAsync operator. It’s very special and returns a special Observable called StoppableObservable.

Pass an Action and a Scheduler to runAsync, which returns a StoppableObservable that generates data using the Action. The Action takes an Observable and a Subscription, which it uses to check the unsubscribed conditions and to stop emitting data as soon as the conditions are found to be true. You can manually stop a StoppableObservable at any time using the unsubscribe method (which also unsubscribes the Subscription associated with the StoppableObservable).

Since runAsync will immediately call an Action and start emitting data, some data may be lost between the time you create a StoppableObservable and the time your observer is ready to receive the data. If this doesn’t meet your requirements, you can use a variant of runAsync that also accepts a Subject argument, passing a ReplaySubject to it, and you can retrieve other missing data.

There is also a version of the From operator in RxJava that converts the Future into an Observable, similar to Start.

Timer

Create an Observable that emits a special value after a given delay.

The Timer operator creates an Observable that returns a special value after a given period of time.

RxJava implements this operator as a timer function.

Timer returns an Observable that emits a simple number 0 after a given delay.

The Timer operator executes by default on the Computation scheduler. There is a variation that specifies the Scheduler with optional arguments.

  • Javadoc: timer(long,TimeUnit)
  • Javadoc: timer(long,TimeUnit,Scheduler)

These operators can be used to transform data emitted by an Observable, and can be explained in the documentation for each operator

  • Buffer – A cache, simply known as a cache, that periodically collects data from an Observable into a collection and then packs the data into a bundle rather than one at a time
  • FlatMap — Flat mapping transforms the data emitted by Observables into Observables collection, and then flattens the data emitted by these Observables into a single Observable. It can be considered as a process of expanding nested data structures.
  • GroupBy — The original Observables are divided into a collection of Observables, and the data emitted by the original Observables are grouped by Key. Each Observable emits a group of different data
  • Map — A mapping that transforms the data emitted by an Observable by applying a function to each item in the sequence. Essentially, it executes a function on each item in the sequence, and the parameter of the function is the data item
  • Scan – scans, applying a function to each piece of data emitted by an Observable and emitting the values in sequence
  • Window – a Window that periodically splits data from an Observable into Observable Windows and fires them instead of one at a time. Similar to buffers, but buffers emit data, while Windows emit observables. Each Observable emits a subset of the original Observable’s data

Transform operation

This page shows the operators you can use to perform transformations on the data emitted by an Observable.

  • Map () – applies a function to each entry in the sequence to transform the data sequence emitted by an Observable
  • FlatMap (), concatMap(), and flatMapIterable() — Transforms the Observables into Observables. The data emitted by these Observables is then flattened into a single Observable
  • SwitchMap () – Transforms the collection of Observables emitted by an Observable into a collection of Observables, and then emits only the most recently emitted data from those Observables
  • Scan () – applies a function to each piece of data emitted by an Observable and emits each value in sequence
  • GroupBy () — An Observable is divided into a collection of Observables, and the data emitted by original Observables are grouped by Key. Each Observable emits a different set of data
  • Buffer () – It periodically collects data from an Observable into a collection, which is then packaged and emitted instead of one at a time
  • Window () – Periodically splits data from observables into Observable Windows and fires them instead of one at a time
  • Cast () – forces all data emitted by an Observable to be cast to the specified type before launching

Buffer

Observable data is collected regularly into a data package and emitted instead of one value at a time.

The Buffer operator transforms one Observable into another. The original Observable normally emits data, and the resulting Observable emits a cache of data. There are many variations of the Buffer operator in many language-specific implementations, and they differ in how they cache.

Note: If the original Observable emits an onError notification, the Buffer immediately delivers the notification rather than emitting the cached data first, even if the cache contains the data that the original Observable emits.

The Window operator is similar to Buffer, but it puts the data it collects into a separate Observable, rather than a data structure, before launching.

There are many variants of Buffer in RxJava:

buffer(count)

Buffers (count) emit non-overlapping caches in the form of lists, each containing at most count entries from the original Observable (the resulting List may be less than count).

  • Javadoc: buffer(int)

buffer(count, skip)

Buffer (count, skip) creates a new cache starting from the first Observable data, and fills the cache with count data whenever skip data is received: The leading and subsequent count-1 items emit caches as lists, depending on the value of count and SKIP, which may overlap (as in skip < count) or have gaps (as in skip > count).

  • Javadoc: buffer(int,int)

buffer(bufferClosingSelector)

When it subscribes to the original Observable, the buffer(bufferClosingSelector) starts collecting data into a List. Then it calls bufferClosingSelector to generate a second Observable, When the second Observable sends a TClosing, the buffer sends the current List and repeats the process: it starts assembling a new List, then calls bufferClosingSelector to create a new Observable and monitor it. It keeps doing this until the original Observable is done.

  • Javadoc: buffer(Func0)

buffer(boundary)

Buffer (Boundary) Monitors an Observable named Boundary. Whenever this Observable emits a value, it creates a new List and starts to collect data from the original Observable and emits the original List.

  • Javadoc: buffer(Observable)
  • Javadoc: buffer(Observable,int)

buffer(bufferOpenings, bufferClosingSelector)

Buffer (bufferOpenings, bufferClosingSelector) watches for this Observable called bufferOpenings (which emits a bufferOpenings object), The bufferOpenings are transmitted to the closingSelector function every time the bufferOpenings fire a data, it creates a new List to start collecting data from the original Observable and passes the bufferOpenings to the closingSelector function. This function returns an Observable. The buffer monitors the Observable, and when it detects data from the Observable, it closes the List and emits its own data (the previous List).

  • Javadoc: buffer(Observable,Func1)

buffer(timespan, unit[, scheduler])

The buffer(Timespan,  Unit) periodically emits new data as a List, collecting data from the original Observale each time period (since the previous data package, or in the case of the first data package, since an observer subscribed to the original Observale). There is another version of Buffer that accepts a Scheduler parameter and uses the Computation Scheduler by default.

  • Javadoc: buffer(long,TimeUnit)
  • Javadoc: buffer(long,TimeUnit,Scheduler)

buffer(timespan, unit, count[, scheduler])

Buffers (Timespan, unit, count) emit the data as a List every time they receive count items from the original Observable or after a specified period of time, even if there are fewer than count items. There is another version of Buffer that accepts a Scheduler parameter and uses the Computation Scheduler by default.

  • Javadoc: buffer(long,TimeUnit,int)
  • Javadoc: buffer(long,TimeUnit,int,Scheduler)

buffer(timespan, timeshift, unit[, scheduler])

Buffer (Timespan, timeshift, unit) creates a new List at each timeshift period and populates the List with each item emitted by the original Observable (starting at creation, before transmitting the List as your own). Not until after timespan). If timespan is longer than timeshift, the packets it transmits will overlap and so may contain duplicate data items.

There is another version of Buffer that accepts a Scheduler parameter and uses the Computation Scheduler by default.

  • Javadoc: buffer(long,long,TimeUnit)
  • Javadoc: buffer(long,long,TimeUnit,Scheduler)

buffer-backpressure

You can use the Buffer operator to backpressure (that is, deal with an Observable that can generate data faster than its observers can consume it).

The Buffer operator reduces large data sequences to fewer data cache sequences, making them easier to work with. For example, you can shut down and launch the data cache from an explosive Observable at regular intervals. This acts as a buffer.

The sample code

Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
Copy the code

Or, if you want to go one step further, you can collect data into the cache during an outbreak and then send it when the outbreak terminates, using the Debounce operator to send a buffer closing indicator to the buffer operator to do this.

Code examples:

// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
Copy the code

see

FlatMap

FlatMap transforms one Observables that emit data into multiple Observables, and then merges their emitted data into a single Observable

The FlatMap operator transforms each data emitted by the original Observables using a specified function that returns an Observable that also emits data. The FlatMap then merges the data emitted by the Observables. Finally, the combined result is transmitted as its own data sequence.

This method is useful, for example, when you have an Observable like this: It emits a sequence of data that itself contains the members of an Observable or can be transformed into an Observable, so you can create a new Observable that emits a whole collection of data that these sub-Observables emit.

Note that FlatMap merges the data emitted by these Observables, so they may be interleaved.

In many language-specific implementations, there is also an operator that does not interleave the data emitted by the transformed Observables, but emits them in a strict order. This operator is often called ConcatMap or something similar.

RxJava implements this operator as the flatMap function.

Note: If any single Observable generated by this flatMap operation terminates by calling onError, the Observable itself immediately calls onError and terminates.

This operator has a variant that takes an extra int argument. This parameter sets the maximum number of simultaneous subscriptions to the flatMap from the original Observable map Observables. When this limit is reached, it waits for one to terminate and then subscribes to the other.

  • Javadoc: flatMap(Func1)
  • Javadoc: flatMap(Func1,int)

There is also a version of flatMap that creates (and flattens) a new Observable for each piece of data and each notification from the original Observable.

It also has a variant that takes an extra int argument.

  • Javadoc: flatMap(Func1,Func1,Func0)
  • Javadoc: flatMap(Func1,Func1,Func0,int)

There is also a version of flatMap that uses the data of the original Observable to combine the triggered Observables, and then emits the data combination. It also has a version that accepts an extra int argument.

  • Javadoc: flatMap(Func1,Func2)
  • Javadoc: flatMap(Func1,Func2,int)

flatMapIterable

The flatMapIterable variant packs the data in pairs and then generates the Iterable instead of the raw data and the generated Observables, but in the same way.

  • Javadoc: flatMapIterable(Func1)
  • Javadoc: flatMapIterable(Func1,Func2)

concatMap

There is also a concatMap operator, which is similar to the simplest version of flatMap, but it concatenates the generated Observables in order rather than merging them, and then generates its own data sequence.

  • Javadoc: concatMap(Func1)

switchMap

RxJava also implements the switchMap operator. It is similar to flatMap except that when the original Observable emits a new data (Observable), it unsubscribes and stops monitoring the Observable that produced the previous data, only the current one.

  • Javadoc: switchMap(Func1)

split

There is also a split operator in the special StringObservable class (not included in RxJava by default). It converts one string-emitting Observable into another string-emitting Observable, which treats the raw data sequence as a data stream, splits them using a regular expression boundary, and merges the results of the split.

GroupBy

An Observable is split into collections of Observables, each of which emits a subsequence of the original Observable

The GroupBy operator splits the original Observable into collections of Observables, each of which emits a subsequence of the original Observable data sequence. Which data item is emitted by which Observable is determined by a function that assigns a Key to each item. Data with the same Key is emitted by the same Observable.

RxJava implements the groupBy operator. It returns a special subclass of Observables, GroupedObservable. Objects that implement the GroupedObservable interface have an additional method, getKey, that groups data into the specified Observable.

There is a version of groupBy that allows you to pass in a transform function so that it can change the data item before launching the resulting GroupedObservable.

Note: groupBy breaks the original Observable into an Observable that emits multiple GroupeDobServables. Once a subscription is made, each GroupedObservable starts caching data. Therefore, if you ignore any of these GroupeDobServables, the cache could form a potential memory leak. So if you don’t want to observe, don’t ignore GroupedObservable. You should use operators like take(0) that discard their own cache.

If you unsubscribe from a GroupedObservable, that Observable terminates. If the original Observable later emits data that matches the Observable’s Key, groupBy creates a new GroupedObservable for the Key.

GroupBy does not execute on any particular scheduler by default.

  • Javadoc: groupBy(Func1)
  • Javadoc: groupBy(Func1,Func1)

Map

Applies a function to each piece of data emitted by an Observable, performing transformations

The Map operator applies a function of your choice to each data emitted by the original Observable, and returns an Observable that emits those results.

RxJava implements this operator as a map function. This operator does not execute on any particular scheduler by default.

  • Javadoc: map(Func1)

cast

The cast operator, a special version of map, casts every data emitted by the original Observable to a specified type and then emits data.

  • Javadoc: cast(Class)

encode

Encode in the StringObservable class, which is not part of standard RxJava, is also a special Map operator. Encode transforms an Observable that emits a string into an Observable that emits an array of bytes delimited by the multi-byte character boundaries of the original string.

byLine

ByLine is also in the StringObservable class and is not part of the standard RxJava; it is also a special map operator. ByLine transforms an Observable that emits strings into an Observable that emits strings from the original Observable line byLine.

Scan

A function is continuously applied to each term of the data sequence, and the results are continuously transmitted

The Scan operator applies a function to the first data emitted by the original Observable and emits the result of that function as its first data. It populates the function with the result of the function along with the second item of data to produce its own second item of data. It continues this process to produce the remaining data sequence. This operator is called accumulator in some cases.

RxJava implements the SCAN operator.

Sample code:

Observable.just(1.2.3.4.5)
    .scan(new Func2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer sum, Integer item) {
            return sum + item;
        }
    }).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted(a) {
            System.out.println("Sequence complete."); }});Copy the code

The output

Next: 1
Next: 3
Next: 6
Next: 10
Next: 15
Sequence complete.
Copy the code
  • Javadoc: scan(Func2)

There is a variant of the Scan operator where you can pass a seed value to the first call to the accumulator function (the first data emitted by an Observable). If you use this version, Scan will emit seed values as its first data. Note: Passing NULL as a seed value is different from not passing null as a seed value is legal.

  • Javadoc: scan(R,Func2)

This operator does not execute on any particular scheduler by default.

Window

Periodically decompose data from the original Observable into an Observable window and launch these Windows instead of one data item at a time

A Window is similar to a Buffer, but instead of emitting packets from the original Observable, it emits Observables, each of which emits a subset of the original Observable data, and finally an onCompleted notification.

Like buffers, there are many variants of Windows, each of which in its own way breaks the original Observable into multiple resulting Observables, each containing a Window that maps the original data. In the jargon of the Window operator, when a Window opens (” Window opens”) it means that a new Observable has been launched, and the Observable starts emitting data from the original Observable. Watch-watches When a window “closes” means that the launching Observable stops emitting the original Observable’s data, and the go-goes notify onCompleted to its observers that the go-goes end.

There are many variants of the Window operator in RxJava.

window(closingSelector)

The variant of window immediately opens its first window. Whenever it observes that the Observable returned by closingSelector emits an object, it closes the currently open window and immediately opens a new one. In this way, the Window variant emits a nonoverlapping set of Windows that correspond to the data emitted by the original Observable.

  • Javadoc: window(Func0)

window(windowOpenings, closingSelector)

WindowOpenings Openings Whenever the Window observes that the Observable fires an Opening object, it opens a window It also calls closingSelector to generate a closing Observable associated with that window. When the closing Observable fires an object, the window operator closes the window. For this variant, since the closing of the current window and the opening of a new window are managed by a separate Observable, it may create Windows that overlap (duplicate some data from the original Observable) or create gaps (discard some data from the original Observable).

  • Javadoc: window(Observable,Func1)

window(count)

The window variant immediately opens its first window. Whenever the current window emits count data, it closes the current window and opens a new one. It also closes the window if it receives an onError or onCompleted notification from the original Observable. The Window variant emits a series of non-overlapping Windows whose data sets correspond to the data emitted by the original Observable.

  • Javadoc: window(int)

window(count, skip)

The window variant immediately opens its first window. The original Observable opens a new window every time it fires skip data (for example, if skip equals 3, it opens a new window every third data). Whenever the current window emits count data, it closes the current window and opens a new one. It also closes the window if it receives an onError or onCompleted notification from the original Observable. If skip=count, it behaves the same as window(source, count); If skip < count, the window can have count-skip overlapping data; If skip > count, skip-count entries are discarded between the two Windows.

  • Javadoc: window(int,int)

window(timespan, unit[, scheduler])

The window variant immediately opens its first window. It closes the current window and opens a new one each time that timespan has elapsed (the time unit is optionally executed on the scheduler). It also closes the window if it receives an onError or onCompleted notification from the original Observable. The Window variant emits a series of non-overlapping Windows whose data sets correspond to the data emitted by the original Observable.

  • Javadoc: window(long,TimeUnit)
  • Javadoc: window(long,TimeUnit,Scheduler)

window(timespan, unit, count[, scheduler])

The window variant immediately opens its first window. This variant is a combination of window(count) and Window (Timespan, Unit [, Scheduler]), which closes the current window and opens another whenever the timespan expires or the current window receives count data. It also closes the window if it receives an onError or onCompleted notification from the original Observable. The Window variant emits a series of non-overlapping Windows whose data sets correspond to the data emitted by the original Observable.

  • Javadoc: window(long,TimeUnit,int)
  • Javadoc: window(long,TimeUnit,int,Scheduler)

window(timespan, timeshift, unit[, scheduler])

Buffer (Timespan, timeshift, unit) creates a new List at each timeshift period and populates the List with each item emitted by the original Observable (starting at creation, before transmitting the List as your own). Not until after timespan). If timespan is longer than timeshift, the packets it transmits will overlap and so may contain duplicate data items.

The window variant immediately opens its first window. A new window (in unit, optionally executed on the scheduler) is then opened each time a timeshift time passes, and when the window is open for a time that reaches timespan, it closes the currently open window. It also closes the window if it receives an onError or onCompleted notification from the original Observable. Window data can overlap or have gaps, depending on the timeshift and Timespan values you set.

This variant Windows runs its timer on the Computation scheduler by default.

  • Javadoc: window(long,long,TimeUnit)
  • Javadoc: window(long,long,TimeUnit,Scheduler)

window-backpressure

You can use the Window operator to backpressure (that is, deal with an Observable that generates data faster than its observers consume data).

The Window operator reduces a large sequence of data to a smaller sequence of data Windows, making them easier to work with. For example, you can close and launch data Windows from an explosive Observable at regular intervals.

The sample code

Observable<Observable<Integer>> burstyWindowed = bursty.window(500, TimeUnit.MILLISECONDS);
Copy the code

You can also choose to launch a new data window every time you receive N explosively Observable data.

The sample code

Observable<Observable<Integer>> burstyWindowed = bursty.window(5);
Copy the code

The Bug that can’t be changed, the pretense that can’t be written. The official account now focuses on audio and video and APM fields, covering various fields of knowledge; Only do the whole network of the most than the heart of the public number, welcome your attention!