I’m going to do it again:

Original address (English, need to climb the wall)

Learn about observables by building an Observable

Many times people ask me what is the difference between “hot” and “cold” observables? Or is an Observable unicast or multicast? . People seem confused about the inner workings of ‘Rx.Observable’. People often say something like “Streams” or “Promises” when asked to describe an Observable. But in fact, I’ve talked about this stuff on many occasions and in some public speaking engagements.

The promise comparison is necessary, but unfortunately not very useful. Promises are asynchronous primitives, and Promises are already widely accepted and used by the JS community. Overall this is a good start. By comparing the ‘then’ of a promise with the ‘subscribe’ of an Observable, we can see the difference between immediate execution and delayed execution, observable cancellation and reusability, among many other things. Learning by comparison is easy for observables to learn in this way. But there’s a problem: the two are far more different than they are similar. Promises are multicast. Resolve and reject are asynchronous. When you deal with observables in the same way you deal with promises, you find that sometimes the results are not as expected. Observables are sometimes multicast, sometimes not, and often asynchronous. Indeed, sometimes I blame myself for the possibility of perpetuating this misunderstanding.

An Observable is simply a function that accepts an observer and returns a function

If you want to understand an Observable thoroughly, you can implement a simple Observable yourself. Really, it’s not as hard as it sounds. An Observable, when we look at its minimal implementation, is just a function with a specific (specific, specific, specified, specific) purpose that has its own specific type. (A function of a specific type with a specific purpose)

  1. structure
    • function
    • Accept an observer: an object with next, Error, and complete methods
    • Returns a cancelable function
  2. Purpose: Connect an observer to the producer (the object that produces the value) and return a method that unconnects the producer. An observer is essentially a listener handler that can pass in data at any time.
  3. Basic implementation:
    / / class DataSource {constructor() {let I = 0; this._id = setInterval(() => this.emit(i++), 200); } emit(n) { const limit = 10; if (this.ondata) { this.ondata(n); } if (n === limit) { if (this.oncomplete) { this.oncomplete(); } this.destroy(); } } destroy() { clearInterval(this._id); } /** * Observable */ function MyObservable(observer) {const datasource = new DatSource(); datasource.ondata = (e) => observer.next(e); datasource.onerror = (e) => observer.error(err); datasource.oncomplete = () => observer.complete(); return () => { datasource.destroy(); }} /** * Next we can use the above Observable */ const unsub = myObservable({next(x) {console.log(x); }, error(err) { console.error(err); }, complete() { console.log('done'); }});Copy the code

You can try it on jsbin

As you can see, it’s not complicated, it’s a simple contract

Astute: Optimize our Observers

When we talk about Rxjs or responsive programming, we put observables in the first place most of the time, but in fact the Implementation of the Observer is the workhorse at the heart of this type of responsive programming. Observables are inert, they’re just functions, and they sit there until you subscribe to them, and then he creates your observer, and then they’re done, Observers, on the other hand, remain active, monitoring for events from the producer.

You can subscribe to an Observable with a JS object with ‘next’, ‘error’, and ‘complete’ methods, but that’s just the beginning. In RXJS5 we provide some guarantees. Here are some very important ones:

The Observer to ensure

  1. This is fine if the Oberser you pass in doesn’t implement all the methods
  2. You don’t need to call next after complete and error
  3. When you unsubscribe, no events will be raised (Error, Next, or complete)
  4. Unsubsription is called when ‘complete’ and ‘error’ are called
  5. Unsubscription is called when your Next, complete, error handlers are abnormal to ensure no resource leaks
  6. Next, Error, and Complete are optional

To do this, we need to wrap your Observer into a SafeObserver, which enforces the guarantee. To implement 2, we need to keep track of whether complete or error has occurred. To implement 3, we need to let our SafeObserver know when a consumer has called unsubscribe, and so on.

So if we really wanted to implement a full SafeObserver, it would be huge, so I won’t go into the details in this article, but just briefly describe how to use it. The implementation can be seen in jsbin(unfortunately I don’t care about 23333)

/** * A contrived data source to use in our "observable" * NOTE: this will clearly never error */ class DataSource { constructor() { let i = 0; this._id = setInterval(() => this.emit(i++), 200); } emit(n) { const limit = 10; if (this.ondata) { this.ondata(n); } if (n === limit) { if (this.oncomplete) { this.oncomplete(); } this.destroy(); } } destroy() { clearInterval(this._id); } } /** * Safe Observer */ class SafeObserver { constructor(destination) { this.destination = destination; } next(value) { // only try to next if you're subscribed have a handler if (! this.isUnsubscribed && this.destination.next) { try { this.destination.next(value); } catch (err) { // if the provided handler errors, teardown resources, then throw this.unsubscribe(); throw err; } } } error(err) { // only try to emit error if you're subscribed and have a handler if (! this.isUnsubscribed && this.destination.error) { try { this.destination.error(err); } catch (e2) { // if the provided handler errors, teardown resources, then throw this.unsubscribe(); throw e2; } this.unsubscribe(); } } complete() { // only try to emit completion if you're subscribed and have a handler if (! this.isUnsubscribed && this.destination.complete) { try { this.destination.complete(); } catch (err) { // if the provided handler errors, teardown resources, then throw this.unsubscribe(); throw err; } this.unsubscribe(); } } unsubscribe() { this.isUnsubscribed = true; if (this.unsub) { this.unsub(); } } } /** * our observable */ function myObservable(observer) { const safeObserver = new SafeObserver(observer); const datasource = new DataSource(); datasource.ondata = (e) => safeObserver.next(e); datasource.onerror = (err) => safeObserver.error(err); datasource.oncomplete = () => safeObserver.complete(); safeObserver.unsub = () => { datasource.destroy(); }; return safeObserver.unsubscribe.bind(safeObserver); } /** * now let's use it */ const unsub = myObservable({ next(x) { console.log(x); }, error(err) { console.error(err); }, complete() { console.log('done')} }); /** * uncomment to try out unsubscription */ // setTimeout(unsub, 500);Copy the code

Observable design: Ergonomic Observer security

If we wrap observables as a class or an object, It would be easy to pass In SafeObserver as anonymous Obserers (or a function like subscribe(fn, fn, fn) in RXJS) and provide it to developers in a more ergonomic way. Observables can be used in a simple way by creating SafeObserver within the ‘subscribe’ of Observable:

const myObservable = new Observable((observer) => {
    const datasource = new DataSource();
    datasource.ondata = (e) => observer.next(e);
    datasource.onerror = (err) => observer.error(err);
    datasource.oncomplete = () => observer.complete();
    return () => {
        datasource.destroy();
    };
});Copy the code

You may have noticed that this example is similar to our first example. But it’s easier to read and understand. The concrete implementation can be seen in Jsbin, but in jsbin, we can see that it is unnecessary to wrap an Observable with a safeObservable when it is new Observable. Because we already wrapped the Observer in safeObservable when we called myObserable subscribe

Class Observable {constructor(_subscribe) {// Subscribe this._subscribe = _subscribe; Subscribe (observer) {const safeObserver = new safeObserver (observer);} subscribe(observer) {const safeObserver = new safeObserver (observer); return this._subscribe(safeObserver); }}Copy the code

Operator: is just a function

An operation in Rxjs is a receiving source Observable that returns a new Observable and subscribes to the source Observable when you subscribe to it. We can implement a simple one as follows: jsbin

function map(source, project) { return new Observable(observer) => { const mapObserver = { next: (x) => observer.next(project(x)), error: (err) => observer.error(err), complete: () => observer.complete() }; return source.subscribe(mapObserver); }}Copy the code

The most important thing here is what the operator does: when you subscribe to the observable it returns, it creates a ‘mapObserver’ to do the work and links the ‘observer’ to the mapObserver. The chain call to the construct operator simply creates a template to use to concatenate pau when subscribing.

Observable design: chain calls to make operators better

If we implemented all the operators as separate functions, our operator chain calls would be ugly

map(map(myObservable,(x) => x + 1), (x => x + 2)Copy the code

So we can imagine if we had a 5,6 operator, what about that? It’s basically unusable.

We can also use Reduce to simplify the implementation by referring to Jsbin

pipe(myObservable, map(x => x + 1), map(x => x + 2));Copy the code

Ideally, we would like to be able to make chain calls as follows

myObservable.map(x => x + 1).map(x => x + 2);Copy the code

Luckily, we’ve wrapped Observable as a class, so we can implement the operator as a class method: jsbin

/** * Observable basic implementation */ class Observable { constructor(_subscribe) { this._subscribe = _subscribe; } subscribe(observer) { const safeObserver = new SafeObserver(observer); safeObserver.unsub = this._subscribe(safeObserver); return safeObserver.unsubscribe.bind(safeObserver); Map = function (project) {return new Observable((observer) => {const mapObserver = { next: (x) => observer.next(project(x)), error: (err) => observer.error(err), complete: () => observer.complete() }; return this.subscribe(mapObserver); }); }Copy the code

Now we have the syntax we want. As a bonus, we subclass certain Observables (such as wrapping promises or requiring static values).

Observables is a function that takes an observer as an argument and returns a function

Observables are a function that takes an observer and returns a function. No more, no less. If you write a function that receives an observer and then returns a function, is it synchronous or asynchronous? It’s possible. It’s a function, and the behavior of any function depends on how it’s implemented. So, an Observable can be dealt with as a reference to a function you passed in, which was stateful alien type. When you are using chained calls to operators, all you are doing is composing a function, setting up observers, connecting them together, and passing data to your observers.

In this article, an Observable returns a function, while the Rxjs callback, in that case and in the ES-Observable specification, returns a Subscription object, which has a better design. But writing it this way keeps it simple.

And then contribute a little bit more

Another solution is to use observeOn

const source$ = Observable.interval(1000)
  .share()
  .observeOn(Rx.Scheduler.asap); // magic hereCopy the code

  • A synchronization error thrown downstream from a Subject will kill the entire subject.
  • The great God said he was wrong, and Promise’s mishandling was a good idea.
  • Error trap may be supported in future releases (but I’m currently 5.5.5 and I don’t have it either). There are pictures and facts, a recent RXJS issue discussion error_trap.png
  • It might be all asynchronous like promise and not necessary (shrug).

Article comments section highlights

  • For multicast and unicast, it depends on how you connect the producer. If you create a new producer every time you subscribe, then you are a unicast. Otherwise, you are a multicast.