What is RXJS

It is a kind of programming for asynchronous data flow. Simply put, it streams all data, whether HTTP requests, DOM events, or normal data, and then streams it with powerful and rich operators that allow you to programmatically process asynchronous data asynchronously and combine different operators to easily and elegantly implement what you need.




The stream in RxJS is presented as an Observable. To obtain data, you need to subscribe to an Observable as follows:

const ob = http$.getSomeList(); //getSomeList() returns an OB.subscribe ((data) => console.log(data)) HTTP request wrapped in an 'Observable'; // Add $to the end of the variable to indicate an Observable object.Copy the code



Don’t import from ‘RXJS’ with {Observable}. This imports the entire RXJS library as needed:

import { Observable } from 'rxjs/Observable'// Import the class import'rxjs/add/operator/map'// Import the instance operator import'rxjs/add/observable/forkJoin'// Import the class operatorCopy the code



Quick start



Observable
In simple terms, data flows through an Observable. You can stream it using various operators, such as:

const ob = Observable.interval(1000); 
ob.take(3).map(n => n * 2).filter(n => n > 2);Copy the code

In the first step, we create an Observable sequence through the class method interval. As the source, OB emits increasing data every 1000ms, that is, 0 -> 1 -> 2. In the second step, we use the operator convection for processing. Take (3) means that only the first three data of source emission are taken, and the source emission is closed after the third data is taken. Map means to map the data in the stream. Here, we double the data. Filter indicates that data matching conditions are filtered out. According to the result of the previous map, only the second and third data remain.
Above, we have used synchronous programming to create a stream processing process, but ob as the source does not immediately send data. If we print N in map, we will not get any output, because OB as an Observable sequence must be “subscribed” to trigger the above process. Subscribe (publish/subscribe).

const ob = Observable.interval(1000); 
ob.take(3).map(n => n * 2).filter(n => n > 0).subscribe(n => console.log(n));Copy the code



In the code above we pass a function to subscribe, which is a short form of subscribe. The full function signature is as follows:


ob.subscribe({
    next: d => console.log(d),
    error: err => console.error(err),
    complete: () => console.log('end of the stream')})Copy the code



Passing a function directly to SUBSCRIBE is treated as the next function. This complete object with three functions is called an observer and represents the processing of the results of a sequence. Next indicates that data flows normally and no abnormality occurs. Error indicates that there is an error in the stream, which may be a runtime error, HTTP error, etc. Complete indicates that the stream ends and no new data is emitted. During the life of a stream, either error or complete fires, and there can be multiple next (which means multiple launches of data) until complete or error.


Observer. next can be thought of as the first argument to then in a Promise, and observer.error corresponds to the second argument or catch of the Promise.


RxJS also provides the Catch operator. When an ERR flows into a catch, the catch must return a new Observable. Catch streams do not enter the Observer error function unless the new Observable returns an error.


Observable.of(1).map(n => n.ndefinedMethod ()).catch(err => {// the error that occurred before the catch is handled herereturnObservable.of(0); // Returns a new sequence, which becomes the new stream. });Copy the code





  • Observer
  • Operator






Create an observable sequence

There are many ways to create a sequence, but we’ll just list a few commonly used ones:

Observable.of(... Args) Observable.of() converts ordinary JavaScript data into Observable sequencesCopy the code



Observable. FromPromise (Promise) converts promises to ObservablesCopy the code



Observable.fromEvent(elment, eventName) creates a sequence from a DOM event, such as observable. fromEvent(elment, eventName)$input.'click'),Copy the code



Observables. Ajax (url | AjaxRequest) sending HTTP requests, AjaxRequest reference hereCopy the code



Observable.create(subscribe)Copy the code

This is a general-purpose creation method, usually used for some function or library that only provides callback functions. Before you use this method, think about whether you can use RxJS class methods to create the sequence you need.




Merging sequence

It is also a form of creating sequences, for example, if you need to go to a page and get a list, then you need to make an HTTP request for each item in the list to get the details. Here we treat each HTTP request as a sequence, and then we want to merge them.
There are many ways to merge. For example, N requests are sent sequentially (the first one ends and then the next one ends); N requests are issued at the same time and are required to be merged into an array upon arrival, triggering a callback; N requests are issued simultaneously, triggering a callback for each arrival.
If we do not use RxJS, we will be more difficult to deal with so many situations, not only implementation trouble, more trouble to maintain, the following is the solution to the above requirements using RxJS:


const ob1 = Observable.ajax('api/detail/1');
const ob2 = Observable.ajax('api/detail/2'); . const obs = [ob1, ob2...] ; // Create the corresponding HTTP request.Copy the code



N requests are sent sequentially (the first one ends and then the next one ends)

Observable.concat(... obs).subscribe(detail => console.log('Every request triggers a callback'));Copy the code



N requests are issued in parallel at the same time, triggering a callback for each arrival

Observable.merge(... obs).subscribe(detail => console.log('Every request triggers a callback'));Copy the code



N requests are issued at the same time and are required to be merged into an array when they all arrive, triggering a callback

Observable.forkJoin(... obs).subscribe(detailArray => console.log('Trigger a callback'));Copy the code





Example, to achieve the search function

var text = document.querySelector('#text');
var inputStream = Rx.Observable.fromEvent(text, 'keyup') // Bind the DOM element'keyup'Event.debouncetime (250) // Stop dithering. Pluck ('target'.'value'SwitchMap (url => http.get (url)) // Replace the current input stream with Http request.subscribe (data => render(data)); // Receive dataCopy the code



The subject topic

The RxJS Subject is a special type of Observable that allows values to be multicast to multiple Observer observers. While a normal Observable is unicast (each subscribed Observer has a separate execution of the Observable), a Subject is multicast.

Var subject = new rx.subject (); var subject = new rx.subject (); subject.subscribe({next: (v) => console.log('observerA: ' + v)});
subject.subscribe({next: (v) => console.log('observerB: '+ v)}); subject.next(1); subject.next(2); ObserverA: 1 observerB: 1 observerA: 2 observerB: 2Copy the code


Since Subject is an Observer, this also means that you can provide Subject as an argument to any Observable Subscribe,

var subject = new Rx.Subject();

subject.subscribe({
    next: (v) => console.log('observerA: ' + v)
})
subject.subscribe({
    next: (v) => console.log('observerB: '+ v)}) var observable = rx.observable from([1,2,3]); observable.subscribe(subject); ObserverA: 1 observerB: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3Copy the code


Common operators for personal summaries:

Class operators (typically merging sequences or creating sequences from existing data)

Merge forkJoin, merge, concat to create of, from, fromPromise, fromEvent, Ajax, throwCopy the code

Instance operators (to process or control the flow of data in the stream)

Map, Filter,switchMap, toPromise, Catch, take, takeUntil, timeout, debounceTime, distinctUntilChanged, Pluck.Copy the code