Introduction to Rx

The history of ReactiveX

ReactiveX is an acronym for Reactive Extensions, commonly known as Rx. Originally developed as an extension to LINQ and opened in November 2012 by a team led by Microsoft architect Erik Meijer, ReactiveX is a programming model that aims to provide a consistent programming interface. To help developers more easily handle asynchronous data flow, Rx library support. NET, JavaScript, and C++, Rx has become more and more popular in recent years, and now supports almost all popular programming languages. Most of Rx’s language libraries are maintained by the ReactiveX organization. The community site is reactivex.io.

What is a ReactiveX

Rx is a library that enables developers to write asynchronous and event-based programs using Observables and LINQ style query operators. With Rx, developers can use Observables to represent asynchronous data streams and LINQ operators to query asynchronous data streams. Schedulers are used to parameterize concurrent processing of asynchronous data streams. Rx can be defined as Rx = Observables + LINQ + Schedulers.

IO defines Rx as a programming interface for asynchronous programming using observable data streams. ReactiveX combines the best of observer mode, iterator mode, and functional programming.

The application of ReactiveX

ReactiveX is being used by companies such as Microsoft, Netflix, Github, Trello, and SoundCloud.

The declaration of ReactiveX

More than just a programming interface, ReactiveX was a breakthrough in programming thought that influenced many other libraries and frameworks as well as programming languages.

Rx mode

Using observer mode

  • Create: Rx makes it easy to create event streams and data streams
  • Composition: Rx combines and transforms data streams using query-like operators
  • Listen: Rx can subscribe to any observable data stream and perform operations

Simplify the code

  • Functional style: The use of side-effect-free input/output functions for observable data streams avoids the intricacies of state in the program
  • Simplified code: Rx often works to simplify complex problems into a few lines of code
  • Asynchronous error handling: Traditional try/catch cannot handle asynchronous computations, and Rx provides an appropriate error handling mechanism
  • Easy concurrency: Rx’s Observables and Schedulers allow developers to get rid of underlying thread synchronization and concurrency issues

Advantages of using Observable

Rx extends the observer pattern to support sequences of data and events, adding operators that allow you to declaratively combine these sequences without concern for underlying implementations such as threading, synchronization, thread-safety, concurrent data structures, and non-blocking IO.

Observables fill this gap by optimally accessing asynchronous data sequences

A single data Multiple data
synchronous T getData() Iterable<T> getData()
asynchronous Future<T> getData() Observable<T> getData()

Rx’s Observable model lets you manipulate asynchronous event streams as if they were collections of data, with a variety of simple, composable operations.

Observables can be combined

The way Future objects are handled in Java is simple and efficient for single-layer asynchronous operations, but when it comes to nesting, they start to get tedious and complicated. It’s hard to compose conditional asynchronous execution flows well with futures-even impossible, given all the potential run-time problems. Sure, it’s possible, but very difficult. Perhaps you could use future.get (), but that would eliminate the benefits of asynchronous execution altogether. Rx Observables, on the other hand, were originally designed to combine asynchronous data streams.

Observables more flexible

Rx observables support handling not only individual scalar values (as Future can do), but also data sequences, and even infinite data streams. Observable is an abstract concept that can be applied to any scenario. Observable has all the elegance and flexibility of its cousin Iterable.

An Observable is an asynchronous two-way push and an Iterable is a synchronous one-way pull.

The event Iterable(pull) Observable(push)
To get the data T next() onNext(T)
Exception handling throws Exception onError(Exception)
Task to complete ! hasNext() onCompleted()

Observables without prejudice

Rx doesn’t have any particular preference for concurrency or asynchronicity, and observables can be implemented in any way, thread pools, event loops, non-blocking IO, Actor modes, anything that meets your needs that you’re good at or prefer. No matter how you choose to implement it, the client code treats all Observable interactions as asynchronous, regardless of whether the underlying implementation is blocking or non-blocking.

How does An Observable work?

public Observable<data> getData(a);
Copy the code
  • Can it be executed synchronously on the same thread as the caller?
  • Can it be executed asynchronously on a separate thread?
  • Does it distribute work to multiple threads and return data in any order?
  • Does it use Actor patterns instead of thread pools?
  • Does it use NIO and event loops to perform asynchronous network access?
  • Does it use an event loop to separate the worker thread from the callback thread?

From an Observer perspective, none of this matters. With Rx, you can change your mind. You can completely change the underlying Observable implementation without affecting the users of the Observable library.

There are many problems with using callbacks

The callback solves the problem of future.get () prematurely blocking without blocking anything. Because Callback is called as soon as the response result is ready, they are inherently efficient. However, just as with futures, callbacks are easy to use for single-layer asynchronous execution, and unwieldy for nested asynchronous combinations.

Rx is a multilingual implementation

Rx is implemented in a large number of programming languages and respects the implementation language style, and more implementations are growing rapidly.

Responsive programming

Rx provides a series of operators that you can use to filter, select, transform, combine, and compose multiple Observables that make execution and composition very efficient.

You can think of observables as the equivalent of Iterable’s push method, where the consumer pulls data from the producer and the thread blocks until the data is ready. With an Observable, the producer pushes the data to the consumer when it’s ready. Data can arrive synchronously or asynchronously, which is more flexible.

The following examples show similar higher-order functions used on Iterable and Observable

// Iterable
getDataFromLocalMemory()
  .skip(10)
  .take(5)
  .map({ s -> return s + " transformed" })
  .forEach({ println "next => " + it })

// Observable
getDataFromNetwork()
  .skip(10)
  .take(5)
  .map({ s -> return s + " transformed" })
  .subscribe({ println "onNext => " + it })
Copy the code

Observable adds two missing semantics to GOF’s observer schema, making it consistent with operations available in Iterable:

  1. The producer can signal the consumer that no more data is available. (For Iterable, a for loop that completes normally means no data is available; In the case of an Observable, it calls an observeronCompletedMethods)
  2. The producer can signal the consumer that it has encountered an error (for Iterable, an error during iteration throws an exception; In the case of an Observable, it calls an ObserveronErrorMethods)

With these two capabilities, Rx keeps an Observable and Iterable in line, the only difference being the direction of the data flow. You can do anything with an Iterable that you can do with an Observable.

Definitions of nouns

Here are some translations of the nouns

  • A. Reactive B. Reactive C. Reactive D. Reactive
  • Iterable: an Iterable object that can be iterated over as an iterator. This concept exists in many languages
  • An Observable, defined as a more powerful Iterable in Rx, is an Observable in observer mode that somehow notifies an observer or subscriber of data production or changes
  • Observer An Observer object that listens to data emitted by an Observable and responds. Subscriber is a special implementation of this object
  • Emit literally translates to emit, publish, and issue. It means that an Observable sends a notification to an Observer when data is generated or changed, and calls the corresponding methods of the Observer
  • Items are literally translated as items and items. In Rx, items refer to the data items emitted by an Observable. In the article, items are all translated as data and data items

Observable

An overview of the

In ReactiveX, an Observer subscribes to an Observable. An observer responds to data, or a sequence of data, emitted by an Observable. This mode greatly simplifies concurrency because it creates an observer sentinel that is on standby to respond to an Observable’s notification at some point in the future, without blocking and waiting for the Observable to emit data.

This article explains reactive patterns, Observables and observers, and other articles show how to combine and change Observable behavior with operators.

Related references:

  • Single – a special Observable that emits only a Single data.

Background knowledge

In many software programming tasks, you more or less expect the code you write to be executed and completed in the order you write it, one at a time. In ReactiveX, however, many instructions may be executed in parallel before their results are captured by an observer in an indeterminate order. To do this, you define a mechanism for getting and transforming data, rather than calling a method. In this mechanism, an Observable exists, and observers Subscribe to it. When data is ready, the previously defined mechanism distributes data to the Observer sentinel who is in a waiting state.

The advantage of this approach is that if you have a large number of tasks to deal with, they are not dependent on each other. You can start executing them at the same time without waiting for one to complete before starting the next (in this way, your entire task queue will take no longer than the one that takes the most time).

There are a number of terms that can be used to describe this asynchronous programming and design pattern, and we’ll use them in this article: An Observer subscribes to An Observable. An Observable emits data or notifies its observers by calling the observer’s methods.

In other documents and scenarios, Observer is sometimes called Subscriber, Watcher, Reactor. This model is often referred to as the Reactor pattern.

Create observer

This article uses examples of pseudo-code similar to Groovy, but ReactiveX is implemented in multiple languages.

For a normal method call (not some kind of asynchronous method, nor a parallel call in Rx), the flow usually looks like this:

  1. Call a method
  2. Use a variable to hold the result returned by the method
  3. Do something useful with this variable and its new value

The code description is:

// make the call, assign its return value to `returnVal`
returnVal = someMethod(itsParameters);
// do something useful with returnVal
Copy the code

In the asynchronous model the process looks more like this:

  1. Define a method that does something useful with the return value of an asynchronous call. This method is part of the observer.
  2. Define the asynchronous call itself as an Observable
  3. The observer is associated with the Observable through a Subscribe operation
  4. Continue your business logic, and when the method returns, the Observable emits the result, and the observer’s method starts processing the result or result set

The code description is:

// defines, but does not invoke, the Subscriber's onNext handler
// (in this example, the observer is very simple and has only an onNext handler)
def myOnNext = { it -> do something useful with it };
// defines, but does not invoke, the Observable
def myObservable = someObservable(itsParameters);
// subscribes the Subscriber to the Observable, and invokes the Observable
myObservable.subscribe(myOnNext);
// go on about my business
Copy the code

Callback methods (onNext, onCompleted, onError)

The Subscribe method is used to connect an observer to an Observable. Your observer needs to implement a subset of the following methods:

  • onNext(T item)

    The Observable calls this method to emit data, and its parameters are the data that the Observable emits. This method can be called multiple times, depending on your implementation.

  • onError(Exception ex)

    This method is called when an Observable encounters an error or fails to return the expected data. The call terminates the Observable, and onNext and onCompleted are not called. The arguments to the onError method are thrown exceptions.

  • onComplete

    Observable calls this method after the last onNext call if no errors are encountered.

According to the Observable protocol, onNext may be called zero or many times, and there is an onCompleted or onError call (not at the same time). Passing data to onNext is usually called an emission. OnCompleted and onError are called notifications.

Here’s a more complete example:

def myOnNext     = { item -> /* do something useful with item */ };
def myError      = { throwable -> /* react sensibly to a failed call */ };
def myComplete   = { /* clean up after the final response */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// go on about my business
Copy the code

Unsubscribe

In some ReactiveX implementations, there is a special observer interface Subscriber, which has an unsubscribe method. Calling this method means that you don’t care about the currently subscribed Observable, so the Observable can choose to stop emitting new entries if no other observer subscribes.

The result of unsubscribing is passed to the Observable operator chain and causes every link in the chain to stop emitting data items. This is not guaranteed to happen immediately, however, for an Observable to continue generating and attempting to emit items in a while loop even when there are no observers left.

About naming conventions

Each language-specific implementation of ReactiveX has its own naming preferences, and while there is a lot in common between different implementations, there is no universal naming standard.

Also, in some scenarios, some names have different implications, or seem weird in some languages.

For example, there is an onEvent naming scheme (onNext, onCompleted, onError), and in some scenarios, these names might mean that the event handler has been registered. In ReactiveX, however, they are the name of the event handler.

“Hot” and “cold” of Observables

When does an Observable start emitting data sequences? Depending on the Observable implementation, a “hot” Observable might start emitting data as soon as it is created, so all observers that follow up to subscribe to it might start receiving data somewhere in the middle of the sequence (some data is missing). A “cold” Observable waits until an observer subscribes before transmitting data, so the observer can be sure to receive the entire data sequence.

In some ReactiveX implementations, there is also an Observable called Connectable that does not start emitting data unless the Connect method is called, regardless of whether or not an observer subscrires to it.

Observables are combined with operators

Observables and Observers are just the beginning for ReactiveX, which themselves are lightweight extensions to the standard Observer schema to better handle event sequences.

The real power of ReactiveX lies in its operators, which allow you to transform, combine, manipulate, and manipulate the data emitted by observables.

Rx’s operators, which allow you to combine sequences of asynchronous operations in a declarative style, have all the efficiency advantages of callbacks while avoiding the disadvantages of nested callbacks in typical asynchronous systems.

Here is a list of common operators:

  1. Create operations Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
  2. Transform operations Buffer, FlatMap, GroupBy, Map, Scan, and Window
  3. Filter operations Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
  4. Combine operations And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
  5. Error handling of Catch and Retry
  6. Auxiliary operation Delay, Do Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
  7. Conditional and Boolean operations All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
  8. Arithmetic and set operations Average, Concat, Count, Max, Min, Reduce, Sum
  9. Conversion operation To
  10. Connect operations Connect, Publish, RefCount, Replay
  11. Backpressure operation, used to add a special process control policy operator

Not all of these operators are core components of ReactiveX; some are language-specific implementations or optional modules.

Four RxJava.

In RxJava, an object implementing the Observer interface can subscribe to an Observable instance. The subscriber responds to any data or data sequence emitted by the Observable. This simplifies concurrency because instead of blocking while an Observable emits data, it creates a sentinel observer on standby that responds to an Observable’s notification at some point in the future.

Five Single.

introduce

RxJava (and its derivatives RxGroovy and RxScala) has a variant called Single Observable.

A Single is like an Observable, except that it always emits a Single value, or an error notification, rather than a list of values.

So instead of an Observable requiring three onNext, onError, and onCompleted methods, subscribing to Single requires only two methods:

  • Onsuccess-single emits a Single value to this method
  • OnError – If unable to emit the required value, Single emits a Throwable object into this method

Single only calls one of these two methods, and only once, and the subscription terminates after either method is called.

Operator of Single

Single can also combine operations. A few operators allow you to mix Observable and Single:

The operator The return value instructions
compose Single Create a custom operator
concat and concatWith Observable Joins data emitted by multiple Single and Observables
create Single Call the observer’s create method to create a Single
error Single Returns a Single that immediately sends an error notification to the subscriber
flatMap Single Returns a Single that emits the result of a flatMap operation on the data of the original Single
flatMapObservable Observable Returns an Observable that emits the result of a flatMap operation on the original Single’s data
from Single Convert Future to Single
just Single Returns a Single that emits a specified value
map Single Returns a Single that emits the result of a map operation on the original Single’s data
merge Single Convert one Single(which emits data from another Single, let’s say B) to another Single(which emits data from another Single(B))
merge and mergeWith Observable Merge emits data from multiple singles
observeOn Single Instructs Single to invoke the subscriber’s methods on the specified scheduler
onErrorReturn Single Converts a Single that emits an error notification to a Single that emits the specified data item
subscribeOn Single Instructs Single to perform an operation on the specified scheduler
timeout Single It adds a timeout control to the original Single and emits an error notification if it times out
toSingle Single Converts an Observable that emits a Single value to a Single
zip and zipWith Single Multiple Single’s are converted into one, and the data emitted by the latter is the result of applying a function to the former

Operator icon

A detailed illustration can be found in the English document: Single

6. The Subject

Subject can be thought of as a bridge or proxy, and in some ReactiveX implementations, such as RxJava, it acts as both an Observer and an Observable. Because it is an Observer, it can subscribe to one or more Observables; Since it is an Observable, it can forward data it receives and transmit new data.

Since a Subject subscribs to an Observable, it can trigger the Observable to start emitting data (if the Observable is “cold” — that is, it waits for a subscription to start emitting data). So the effect is that the Subject can turn a “cold” Observable into a “hot” one.

The types of the Subject

There are four types of Subjects for different scenarios. They are not present in all implementations, and some implementations use other naming conventions (for example, Subject is called PublishSubject in RxScala).

AsyncSubject

An AsyncSubject emits only the last value from the original Observable after the original Observable completes. (If the original Observable does not emit any value, AsyncObject does not emit any value.) It emits the last value to any subsequent observer.

However, if the original Observable terminates because of an error, the AsyncSubject will not emit any data and will simply pass the error notification forward.

BehaviorSubject

When the observer subscrires to the BehaviorSubject, it starts emitting the most recent emitted data from the original Observable (it emits a default value if it hasn’t received any yet), and then continues emitting any other emitted data from the original Observable.

However, if the original Observable terminates because of an error, the BehaviorSubject simply transmits the error notification forward without emitting any data.

PublishSubject

The PublishSubject only emits data from the original Observable to the observer after the point in time the subscription occurred. Note that the PublishSubject may start emitting data as soon as it is created (unless you can prevent it from happening), so there is a risk that one or more data may be lost between the time the Subject is created and the time an observer subscribing to it. To ensure that all data from the original Observable is distributed, you need to do this: Either Create that Observable with Create to manually introduce “cold “Observable behavior (starting to emit data when all observers have subscribed), or use ReplaySubject instead.

If the original Observable terminates due to an error, the PublishSubject does not emit any data, but simply forwards the error notification.

ReplaySubject

The ReplaySubject emits all data from the original Observable to the observer, regardless of when they subscribed. There are other versions of ReplaySubject that discard old data (emitted by the original Observables) when reslowed down to a certain size or after a period of time.

If you use ReplaySubject as an observer, be careful not to call its onNext methods (including other ON methods) from multiple threads. This can result in simultaneous (non-sequential) calls, which violate the Observable protocol and add uncertainty to the Subject’s results.

RxJava equivalent class

Suppose you have a Subject that you want to pass on to another proxy or expose its Subscriber interface. You can call its asObservable method, which returns an Observable. Refer to the Javadoc documentation for details.

serialization

If you use Subject as a Subscriber, be careful not to call its onNext method (including other ON family methods) from multiple threads, as this can result in simultaneous (non-sequential) calls, which violate the Observable protocol and add uncertainty to the Subject’s result.

To avoid such problems, you can convert Subject to a SerializedSubject, something like this:

mySafeSubject = new SerializedSubject( myUnsafeSubject );
Copy the code

7. Scheduler

If you want to add multithreading capabilities to the Observable operator chain, you can specify that the operator (or a particular Observable) is executed on a particular Scheduler.

Some ReactiveX Observable operators have variations that accept a Scheduler parameter. This parameter specifies that operators execute some or all of their tasks on a particular scheduler.

Using the ObserveOn and SubscribeOn operators, you can have an Observable execute on a particular scheduler, ObserveOn tells an Observable to call the observer’s onNext, onError, and onCompleted methods on a particular scheduler. It instructs an Observable to perform all processing, including emission data and notifications, on a specific scheduler.

RxJava sample

Type of scheduler

The following table shows the types of schedulers available in RxJava:

Scheduler type The effect
Schedulers.com putation ( ) Use schedulers.io () for computing tasks, such as event loops or callback processing, not IO operations (IO operations use schedulers.io ()); The default thread count is equal to the number of processors
Schedulers.from(executor) Uses the specified Executor as the scheduler
Schedulers. Immediate ( ) Start the task immediately on the current thread
Schedulers. IO ( ) For IO intensive tasks, such as asynchronously blocking IO operations, the scheduler’s thread pool grows as needed; For normal computing tasks, use Schedulers.computation(); Schedulers.io( ) defaults to a CachedThreadScheduler, much like a new thread scheduler with a thread cache
Schedulers. NewThread ( ) Create a new thread for each task
Schedulers. Trampoline ( ) When other queued tasks complete, the current thread queues to start execution

Default scheduler

In RxJava, some variations of the Observable operator allow you to set which scheduler an operation executes on, while others do not execute on any particular scheduler, or on a specified default scheduler. The following table lists the default schedulers for some operators:

The operator The scheduler
buffer(timespan) computation
Buffer (timespan,   count) computation
Buffer (timespan,   timeshift) computation
Debounce (timeout,   unit) computation
Delay, delay,   unit) computation
DelaySubscription (delay,   unit) computation
interval computation
repeat trampoline
Replay (time,   unit) computation
Replay (buffersize,   time,   unit) computation
Replay (selector,   time,   unit) computation
Replay (selector,   buffersize,   time,   unit) computation
retry trampoline
The sample (period,   unit) computation
The skip (time,   unit) computation
SkipLast (time,   unit) computation
Take (time,   unit) computation
TakeLast (time,   unit) computation
TakeLast (count,   time,   unit) computation
TakeLastBuffer (time,   unit) computation
TakeLastBuffer (count,   time,   unit) computation
throttleFirst computation
throttleLast computation
throttleWithTimeout computation
timeInterval immediate
timeout(timeoutSelector) immediate
The timeout (firstTimeoutSelector,   timeoutSelector) immediate
The timeout (timeoutSelector,   other) immediate
The timeout (timeout,   timeUnit) computation
The timeout (firstTimeoutSelector,   timeoutSelector,   other) immediate
The timeout (timeout,   timeUnit,   other) computation
timer computation
timestamp immediate
window(timespan) computation
The window (timespan,   count) computation
The window (timespan,   timeshift) computation

Use a scheduler

In addition to passing these schedulers to the RxJava Observable operator, you can also use them to schedule your own tasks. The following example shows scheduler.worker in use:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call(a) { yourWork(); }});// some time later...
worker.unsubscribe();
Copy the code

Recursive scheduler

To schedule recursive method calls, you can use schedule and then schedule(this) as an example:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call(a) {
        yourWork();
        // recurse until unsubscribed (schedule will do nothing if unsubscribed)
        worker.schedule(this); }});// some time later...
worker.unsubscribe();
Copy the code

Check or set unsubscribe status

The object of the Worker class implements the Subscription interface, using its isUnsubscribed and unsubscribe methods, so you can either stop a task when the Subscription is cancelled, or unsubscribe from within the task being scheduled, as in:

Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {

    @Override
    public void call(a) {
        while(! worker.isUnsubscribed()) { status = yourWork();if(QUIT == status) { worker.unsubscribe(); }}}});Copy the code

Worker is also Subscription, so you can (and usually should) call its unsubscribe method to notify you that you can suspend tasks and release resources.

Delay and period scheduler

You can use the schedule (action, delayTime, timeUnit) on the specified scheduler delay your mission, in the example below task will commence after 500 milliseconds:

someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
Copy the code

Use another version of the schedule, schedulePeriodically (action, initialDelay, period, timeUnit) method allows you to schedule a regular task, the task will be executed in 500 milliseconds after this example, Then every 250 milliseconds:

someScheduler.schedulePeriodically(someAction, 500.250, TimeUnit.MILLISECONDS);
Copy the code

Test the scheduler

TestScheduler allows you to manually fine-tune the scheduler’s clock performance. This is useful for testing tasks that rely on precise timing. The scheduler has three additional methods:

  • AdvanceTimeTo (time,unit) Moves forward the scheduler’s clock to a specified point in time
  • AdvanceTimeBy (time,unit) moves the scheduler’s clock forward by a specified time period
  • TriggerActions ( ) starts executing any scheduled but not started tasks if their scheduled time is equal to or earlier than the current time of the scheduler clock

The Bug that can’t be changed, the pretense that can’t be written. The official account now focuses on audio and video and APM fields, covering various fields of knowledge; Only do the whole network of the most than the heart of the public number, welcome your attention!