An introduction to


Basic Concepts:

  • Observable: Represents a concept that is a collection of future values or events that can be called.
  • Observer: A collection of callback functions that know how to listen for values provided by an Observable.
  • **Subscription: ** Unsubscribe an Observable from execution.
  • Operator: Pure functions in the functional programming style, using operators like map, filter, concat, flatMap, and so on to work with collections.
  • Subject: Equivalent to EventEmitter and the only way to multiroute values or events to multiple observers.
  • Schdulers: : a centralized dispatcher that controls concurrency and allows us to coordinate when calculations occur, such as setTimeout or requestAnimationFrame or others.

Observable Observable


Observables are lazy push collections of multiple values. It fills in the blanks in the table below:

A single value More than one value
pull Function
push Promise

Pull vs. Push

Pull and push are two different protocols that describe how data producers communicate with data consumers.

Pull? The consumer decides when to receive the data from the producer, who does not know when the data will be delivered to the consumer.

Every Javascript function is a pull system. The producer of functional data that is consumed by the code calling the function by taking a single return value from the function call.

producers consumers
pull Passive: Generates data when requested.
push Active: Generate data at your own pace.

Push? It is up to the producer to decide when to send the data to the consumer. Consumers themselves do not know when to receive data

Promises are the most common type of push architecture. The Promise(producer) passes a parsed value to the registered callback function (consumer), but unlike the function, the Promise decides when to “push” the value to the callback function.

RxJS introduced Observables, a new javascript push architecture. An Observable is a multiple value producer that pushes values to observers (consumers)

  • Function is a lazy evaluation operation that returns a single value synchronously when called
  • The Generator is a lazy evaluation operation that returns zero to an infinite number of values synchronously when called
  • A Promise is the possibility of eventually returning a worthwhile operation
  • An Observable is a lazy evaluation operation that can return zero to an infinite number of values from the time it is invoked or asynchronously.

Observables analysis

  • Create observables
  • Subscribe to observables
  • Perform observables
  • Clean up the observables

Create observables

Rx.observable. Create is an alias for the Observable constructor, which accepts a subscribe function.

Observable = rx.observable. Create (function subscribe(observer){var id = setInterval(()=>{ observer.next('hi') },1000); })Copy the code

Observables can be created using create, but usually we use so-called create operators like of, from, interval, and so on

Subscribe to observables

// Observer observable.subscribe(x=>console.log(x))Copy the code

This indicates that subscribe calls are not shared between multiple observers of the same Observable. Each call to Observable. subscribe triggers a separate setting for a given observer. Observables don’t even maintain an additional list of observers.

Perform observables

Observable.create(function subscribe(observer) {… }) in… The code stands for “Observable execution,” which is lazy and executes only after each observer subscribes. Over time, the execution produces multiple values either synchronously or asynchronously.

Observable execution can pass three types of values:

  • “Next” notification: Send a value, such as a number, string, object, etc.
  • “Error” notification: Sends a JavaScript Error or exception.
  • “Complete” notification: no more values are sent.

Here’s an example of an Observable executing, sending three “Next” notifications followed by a “Complete” notification:

var observable = Rx.Observable.create(function subscribe(observer) {
	observer.next(1);
	observer.next(2);
	observer.next(3);
	observer.complete();
});
Copy the code

Observable strictly obeys its own protocol, so the following code doesn’t send “Next” notification 4:

var observable = Rx.Observable.create(function subscribe(observer) { observer.next(1); observer.next(2); observer.next(3); observer.complete(); observer.next(4); });Copy the code

It’s a good idea to wrap arbitrary code with a try/catch block in subscribe, and send an “Error” notification if an exception is caught:

var observable = Rx.Observable.create(function subscribe(observer) { try { observer.next(1); observer.next(2); observer.next(3); observer.complete(); } catch (err) { observer.error(err); // An error is sent if an exception is caught}});Copy the code

Clean up the observables

Because each execution is exclusive to its corresponding observer, once the observer has finished receiving the value, it must have a way to stop execution to avoid wasting computational power or memory resources.

When observable.subscribe is called, the observer is attached to the newly created Observable execution. This call also returns an object, Subscription:

var subscription = observable.subscribe(x => console.log(x));
Copy the code

Subscription represents ongoing execution and has a minimal API that allows you to cancel execution. For more on subscriptions, see Subscription Type. You can cancel execution in progress using subscription. Unsubscribe () :

var observable = Rx.Observable.from([10, 20, 30]); var subscription = observable.subscribe(x => console.log(x)); // Later: subscription. Unsubscribe ();Copy the code

When you subscribe to an Observable, you get a Subscription, which represents ongoing execution. Execution can be cancelled by simply calling the unsubscribe() method.

An Observer


What is an observer? – Observers are consumers of values sent by an Observable. An observer is just a collection of callbacks, each corresponding to a notification type that An Observable sends: Next, Error, and complete. The following example is a typical observer object:

var observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};
observable.subscribe(observer);
Copy the code

An observer is just an object with three callbacks, each corresponding to a notification type that an Observable sends.

The Subscription (Subscription)


What is Subscription? – Subscription is an object representing a cleanable resource, usually an Observable execution. Subscription has an important method, unsubscribe, which takes no arguments and is simply used to clean up resources held up by Subscription. In the previous version of RxJS, Subscription was called “Disposable” (cleanable object).

var observable = Rx.Observable.interval(1000); var subscription = observable.subscribe(x => console.log(x)); // This cancels Observable execution in progress // Observable execution is initiated by calling the subscribe method with observer subscription. Unsubscribe (); //Subscription can also be combined in such a way that a single Subscription calls the unsubscribe() method and may have multiple subscriptions unsubscribed. You can do this by adding one Subscription to another: var observable1 = rx.Observable. interval(400); var observable2 = Rx.Observable.interval(300); var subscription = observable1.subscribe(x => console.log('first: ' + x)); var childSubscription = observable2.subscribe(x => console.log('second: ' + x)); subscription.add(childSubscription); SetTimeout (() => {// subscription and childSubscription both unsubscribe subscription. Unsubscribe (); }, 1000);Copy the code

Subscriptions also have a remove(otherSubscription) method, which is used to revoke an added subsubscription.

The Subject (the Subject)


What is Subject? – RxJS Subject is a special type of Observable that allows values to be multicast to multiple observers, so Subject is multicast, whereas normal Observables are unicast (each Observables subscribed has a separate execution of an Observable).

A Subject is like an Observable, but can be multicast to multiple observers. Subject also acts like EventEmitters, maintaining a registry of multiple listeners.

Each Subject is an Observable – for a Subject, you can provide an observer and use the subscribe method to start receiving values normally. From the observer’s point of view, it can’t tell whether Observable execution is from a normal Observable or a Subject.

Within the Subject, SUBSCRIBE does not call a new execution of the sent value. It simply registers a given observer into a list of observers, similar to how addListener works in other libraries or languages.

Each Subject is an observer. -Subject is an object with the following methods: Next (v), error(e), and complete(). To give a Subject a new value, simply call next(theValue), which will multicast theValue to observers registered to listen to the Subject.

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);
Copy the code

Since Subject is an observer, this means you can pass Subject as a parameter to the Subscribe method of any Observable, as shown in the following example:

var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); var observable = Rx.Observable.from([1, 2, 3]); observable.subscribe(subject); ObserverA: 1 observerB: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3Copy the code

Using the above approach, we basically just convert the unicast Observable execution to multicast through Subject. This also shows that Subjects are the only way to share any Observable execution with multiple observers.

There are also some special types of Subjects, such as BehaviorSubject, ReplaySubject, and AsyncSubject.

Multicast Observables

A multicast Observable sends notifications through a Subject, which may have multiple subscribers, whereas a normal unicast Observable sends notifications to a single observer.

The underlying multicast Observable is that multiple observers can see the same Observable executing by using Subject.

At the bottom, this is how the Multicast operator works: An observer subscribes to an underlying Subject, which subscribes to the source Observable. The following example is similar to the previous example using Observable.subscribe (subject) :

var source = Rx.Observable.from([1, 2, 3]); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); // Use 'subject.subscribe({... })`: multicasted.subscribe({ next: (v) => console.log('observerA: ' + v) }); multicasted.subscribe({ next: (v) => console.log('observerB: ' + v) }); // Use 'source. Subscribe (subject)' at the bottom: multicasted. Connect ();Copy the code

The multicast operator returns an Observable that looks like a normal Observable but acts like a Subject when subscribing. Multicast returns a ConnectableObservable, which is just an Observable with a connect() method.

The connect() method is important because it determines when to start the shared Observable execution. Because the connect() method performs source.subscribe(subject) at the bottom, it returns Subscription, which you can unsubscribe to unsubscribe the shared Observable execution.

Reference counting

Calling connect() manually and handling Subscription is often too cumbersome. In general, we want to connect automatically when the first observer arrives, and unsubscribe automatically when the last observer unsubscribes.

Consider the following example. The following list summarizes how Subscriptions occur:

  1. The first observer subscribes to a multicast Observable
  2. The multicast Observable is connected
  3. The next value of 0 is sent to the first observer
  4. The second observer subscribes to a multicast Observable
  5. The next value of 1 is sent to the first observer
  6. The next value of 1 is sent to the second observer
  7. The first observer unsubscribes to the multicast Observable
  8. The next value of 2 is sent to the second observer
  9. The second observer unsubscribes to the multicast Observable
  10. The connection to the multicast Observable is broken (the underlying operation is to unsubscribe)

To do this, you need to explicitly call connect() as follows:

var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); var subscription1, subscription2, subscriptionConnect; subscription1 = multicasted.subscribe({ next: (v) => console.log('observerA: ' + v) }); // Here we should call 'connect()' because the first of 'multicasted' // subscribers care about the consumption value subscriptionConnect = multicasted. Connect (); setTimeout(() => { subscription2 = multicasted.subscribe({ next: (v) => console.log('observerB: ' + v) }); }, 600); setTimeout(() => { subscription1.unsubscribe(); }, 1200); / / here we should cancel the Shared observables execute subscription, / / since this ` multicasted ` will no longer have the subscriber setTimeout (() = > {subscription2. Unsubscribe (); subscriptionConnect.unsubscribe(); Observable for sharing}, 2000);Copy the code

Instead of explicitly calling connect(), we can use the refCount() method of ConnectableObservable (reference count), which returns an Observable that keeps track of how many subscribers it has. When the number of subscribers goes from 0 to 1, it calls connect() to initiate the execution of the share. When the number of subscribers goes from 1 to 0, it unsubscribes completely, stopping further execution.

RefCount is a multicast Observable that automatically starts execution when there is the first subscriber and stops execution when the last subscriber leaves.

var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var refCounted = source.multicast(subject).refCount(); var subscription1, subscription2, subscriptionConnect; $subscribed $subscribed $subscribed $subscribed $subscribed $subscribed $subscribed $subscribed $subscribed $subscribed $subscribed $subscribed $subscribed $subscribed $subscribed $subscribed $subscribed. subscription1 = refCounted.subscribe({ next: (v) => console.log('observerA: ' + v) }); setTimeout(() => { console.log('observerB subscribed'); subscription2 = refCounted.subscribe({ next: (v) => console.log('observerB: ' + v) }); }, 600); setTimeout(() => { console.log('observerA unsubscribed'); subscription1.unsubscribe(); }, 1200); / / Shared observables execution will stop here, because / / since ` refCounted ` will no longer have the subscriber setTimeout (() = > {the console. The log (' observerB unsubscribed, '); subscription2.unsubscribe(); }, 2000); // Result: observerA Subscribed observerA: 0 observerB Subscribed observerA: 1 observerB: 1 observerA unsubscribed observerB: 2 observerB unsubscribedCopy the code

RefCount () exists only in a ConnectableObservable, which returns an Observable, not another ConnectableObservable.

BehaviorSubject

One variation of Subject is BehaviorSubject, which has a concept of “current value”. It saves the most recent value sent to the consumer. And when a new observer subscrires, the BehaviorSubject immediately receives a “current value” from the BehaviorSubject.

The BehaviorSubjects are suitable for representing “values over time.” For example, a birthday flow would be a Subject, but an age flow would be a BehaviorSubject.

In the following example, the BehaviorSubject is initialized with a value of 0, which is 0 when the first observer subscribes. The second observer gets the value 2 when it subscribes, even though it subscribed after the value 2 was sent.

var subject = new Rx.BehaviorSubject(0); Subscribe ({next: (v) => console.log('observerA: '+ v)}); subscribe({next: (v) => console.log('observerA:' + v)}); subject.next(1); subject.next(2); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(3);Copy the code

Output:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
Copy the code

ReplaySubject

ReplaySubject is similar to BehaviorSubject in that it can send old values to new subscribers, but it can also log a portion of Observable execution.

ReplaySubject records multiple values in Observable execution and plays them back to new subscribers.

var subject = new Rx.ReplaySubject(3); Subscribe ({next: (v) => console.log('observerA: '+ v)}); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(5);Copy the code

In addition to the number of buffers, you can specify Window time (in milliseconds) to determine how long ago values can be recorded. In the example below, we use a large cache number of 100, but the Window Time parameter is only set to 500 milliseconds.

var subject = new Rx.ReplaySubject(100, 500 /* windowTime */); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); var i = 1; setInterval(() => subject.next(i++), 200); setTimeout(() => { subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); }, 1000); // Output: observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerA: 5 observerB: 3 observerB: 4 observerB: 5 observerA: 6 observerB: 6 ...Copy the code

AsyncSubject

AsyncSubject is another variation of Subject that sends the last value executed to the observer only when the Observable completes (complete()).

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();
Copy the code

Output:

observerA: 5
observerB: 5
Copy the code

AsyncSubject is similar to the last() operator in that it also waits for complete notification to send a single value.

Operators

Although RxJS has its roots as an Observable, its operators are the most useful. Operators are basic code units that allow complex asynchronous code to be easily combined in a declarative manner.

What is an operator?

Operators are methods of type Observable, such as.map(…). And the filter (…). And the merge (…). , and so on. When operators are called, they do not change existing Observable instances. Instead, they return a new Observable whose Subscription logic is based on the first Observable.

Operators are functions that create a new Observable based on the current Observable. This is a side effect free operation: the front Observable remains the same.

The operator is essentially a pure function that takes an Observable as input and generates a new Observable as output. Subscribing to the output Observable also subscribes to the input Observable. In the following example, we create a custom operator function that multiplys each value received from the input Observable by 10:

function multiplyByTen(input) { var output = Rx.Observable.create(function subscribe(observer) { input.subscribe({ next:  (v) => observer.next(10 * v), error: (err) => observer.error(err), complete: () => observer.complete() }); }); return output; } var input = Rx.Observable.from([1, 2, 3, 4]); var output = multiplyByTen(input); output.subscribe(x => console.log(x));Copy the code

Output:

10 20 30 to 40Copy the code

Note that subscribing to output causes the Input Observable to be subscribed. We call this the operator subscription chain.

Instance operators vs. static operators

What is an instance operator? – When referring to operators, we usually refer to the instance operator, which is a method on an Observable instance. For example, if multiplyByTen above were the officially provided instance operator, it would look something like this:

Rx.Observable.prototype.multiplyByTen = function multiplyByTen() {
  var input = this;
  return Rx.Observable.create(function subscribe(observer) {
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
}
Copy the code

Instance operators are functions that use the this keyword to refer to the input Observable.

Note that the Input Observable is no longer a function parameter; it is now this object. Here’s how we use such an instance operator:

var observable = Rx.Observable.from([1, 2, 3, 4]).multiplyByTen();

observable.subscribe(x => console.log(x));
Copy the code

What are static operators? – In addition to instance operators, there are static operators that attach directly to the Observable class. The static operator does not use the this keyword internally, but relies entirely on its arguments.

Static operators are pure functions attached to the Observalbe class, usually used to create Observalbe from scratch.

The most common type of static operator is the so-called create operator. Instead of converting an input Observable into an output Observable, they take only non-Observable parameters, such as numbers, and create a new Observable.

A typical example of a static operator is the interval function. It takes a number (non-Observable) as an argument and produces an Observable as output:

Var Observable = rx.observable. Interval (1000 /* milliseconds */);Copy the code

Another example of the create operator is create, which has been used extensively in previous examples. Click here for a list of all static operators.

However, some static operators may differ from simple creation. Some combinatorial operators may be static, such as merge, combineLatest, concat, and so on. These make sense as static operators because they take multiple Observables as input, not just one, for example:

var observable1 = Rx.Observable.interval(1000);
var observable2 = Rx.Observable.interval(400);

var merged = Rx.Observable.merge(observable1, observable2);
Copy the code

To explain how operators work, literal descriptions are often insufficient. Many operators are time-dependent, and they may delay, sample, throttle, or debonce values in different ways. Charts are often a better tool. The pinball diagram is a visual representation of how the operator works, including the input Obserable(s) (which can be multiple Observables), the operator and its parameters, and the output Observable.

Time flows to the right in the pinball graph, which describes how the median (” pinball “) is emitted in Observable execution.

The dissected marble can be seen in the picture below.

Throughout the documentation site, we use pinball diagrams extensively to explain how operators work. They can also be useful in other environments, such as whiteboards, or even in our unit tests (such as ASCII diagrams).