Original link: medium.com/benlesh/le… This article is translated by RxJS Chinese community, if you need to reprint, please indicate the source, thank you for your cooperation! If you would like to translate more quality RxJS articles with us, please click here.

Through social media or at events, I am often asked whether “hot” vs “cold” observables, or observables are “multicast” or “unicast”. Rx.Observable is confused by the perception that its inner workings are pure black magic. When asked to describe observables, people say, “They’re streams” or “They’re like Promises.” In fact, I’ve talked about this on many occasions and even in public speaking.

The comparison with Promises is necessary but unfortunate. Given that Promieses and Observables are both Async primitives and promises are already widely used and familiar by the JavaScript community, this is usually a good place to start. Compare the then of a promise, which executes immediately, to the subscribe of an Observable, which executes lazily, is cancelable, reusable, and so on. This is an ideal way to introduce observables to beginners.

But there is a problem: Observables are much more different from Promises than they are alike. Promises are always Promises. A Promise’s resolution and rejection are always asynchronous. When people deal with observables, they seem to be dealing with promises, so they expect the two to behave similarly, but this is not always true. Observables are sometimes multicast. Observables are usually asynchronous. I blame myself, too, for helping to spread this misunderstanding.

Observables is just a function that accepts an Observer and returns a function

If you really want to understand an Observable, you can write a simple one yourself. It’s not as difficult as it sounds, honestly. An Observable, in its simplest form, is nothing more than a specific type of function that serves a specific purpose.

Model:

  • function
  • Accept the Observer: The Observer does havenext,errorcompleteMethod object
  • Returns a cancelable function

Objective:

Connect the observer to the producer and return a means to break the connection. The observer is essentially a registry for the handler, which can push values over time.

Basic implementation:

function myObservable(observer) {
    const datasource = new DataSource();
    datasource.ondata = (e) = > observer.next(e);
    datasource.onerror = (err) = > observer.error(err);
    datasource.oncomplete = (a)= > observer.complete();
    return (a)= > {
        datasource.destroy();
    };
}Copy the code

(You can debug it online by clicking here)

As you can see, there’s not much, just a fairly simple contract.

Secure observer: Make the observer better

When talking about RxJS or responsive programming, observables usually come up most often. In reality, however, the observer implementation is the mainstay of this type of responsive programming. Observables are lazy. They’re just functions. They don’t do anything until you subscribe them, they assemble the observer, and then they’re done, just like boring old functions waiting to be called. The observer, on the other hand, stays active and listens for events from the producer.

You can subscribe to an Observable using any simple JavaScript object (POJO) with next, Erro, and complete methods, but the POJO observer you subscribe to an Observable with is really just the beginning. In RxJS 5, we need to give you some guarantees. Here are some important protections:

Observer guarantee

  1. If the observer you pass doesn’t have any of the three methods described above, this is fine.
  2. You don’t want tocompleteerrorAfter the callnext
  3. If unsubscribed, then you don’t want any methods to be called.
  4. callcompleteerrorUnsubscribe logic needs to be invoked.
  5. ifnextcompleteerrorThe handler method throws an exception and you want to call the unsubscribe logic to ensure that the resource is not compromised.
  6. next,errorcompleteThey’re actually optional. You don’t have to deal with every value, error, or completion. You may just want to deal with one or two of them.

To accomplish the tasks in this list, we need to wrap the anonymous observer you provide in a SafeObserver to implement the above safeguards. Because of #2 above, we need to keep track of whether complete or error was called. Because of #3, we need to let SafeObserver know when consumers want to unsubscribe. Finally, because of #4, SafeObserver actually needs to understand the unsubscribe logic so that it can be called when complete or error is called.

It gets a little rough if we try to do this with the makeshift Observable function above… Here is a snippet of JSBin code that you can see and get a feel for how crude it is. I’m not trying to implement a very authentic SafeObserver in this example, because that would take up the entire article. Here’s our Observable, which uses SafeObserver:

function myObservable(observer) {
   const safeObserver = new SafeObserver(observer);
   const datasource = new DataSource();
   datasource.ondata = (e) = > safeObserver.next(e);
   datasource.onerror = (err) = > safeObserver.error(err);
   datasource.oncomplete = (a)= > safeObserver.complete();

   safeObserver.unsub = (a)= > {
       datasource.destroy();
   };

   return safeObserver.unsubscribe.bind(safeObserver);
}Copy the code

Observable design: Keeps the observer safe

Using Observables as a class/object makes it easy to apply SafeObserver to incoming anonymous observers (and handlers, if you like the SUBSCRIBE (fn, fn, fn) signature in RxJS) and provides a better development experience for developers. Observables can again be defined in the simplest way by handling SafeObserver creation in the Subscribe implementation of Observable:

const myObservable = new Observable((observer) = > {
    const datasource = new DataSource();
    datasource.ondata = (e) = > observer.next(e);
    datasource.onerror = (err) = > observer.error(err);
    datasource.oncomplete = (a)= > observer.complete();
    return (a)= > {
        datasource.destroy();
    };
});Copy the code

You’ll notice that the code snippet above looks almost the same as the first example. But it’s easier to read and easier to understand. I extended the JSBin example to show the minimal implementation of Observable.

Operator: Again, just functions

An “operator” in RxJS is simply a function that receives a source Observable and returns a new Observable that subscribes to the source Observable when you subscribe to it. We can implement a basic, stand-alone operator, as shown in this online JSBin example:

function map(source, project) {
  return new Observable((observer) = > {
    const mapObserver = {
      next: (x) = > observer.next(project(x)),
      error: (err) = > observer.error(err),
      complete: (a)= > observer.complete()
    };
    return source.subscribe(mapObserver);
  });
}Copy the code

The most important thing to notice is what this operator does: when you subscribe to the Observable it returns, it creates a mapObserver to do the job and connects the Observer to the mapObserver. Building the operator chain is really just creating a template that connects the observer to the subscription.

Design Observable: More elegant operator chains

If all of our operators were implemented as separate functions, as in the example above, chaining the operators would be a bit ugly:

map(map(myObservable, (x) => x + 1), (x) => x + 2);Copy the code

If you think of the code above, nesting five or six more complex operators will produce more arguments. The code is completely unreadable.

You can use a simple PIPE implementation (as Suggested by Texas Toland), which multiplies operator arrays to produce the final Observable, but that means writing more complex operators that return functions (see the JSBin example here). This doesn’t make everything perfect either:

pipe(myObservable, map(x= > x + 1), map(x= > x + 2));Copy the code

Ideally, we could link operators together in a more natural way, such as this:

myObservable.map(x= > x + 1).map(x= > x + 2);Copy the code

Fortunately, our Observable class already supports the chain-like behavior of this operator. It doesn’t add any extra complexity to the operator implementation code, but it comes at the cost of violating my “prototype” recommendation that once you’ve added enough operators to work with, there might be too many methods in the prototype. Click (here’s the JSBin example) to see the Map operator we added to the Observable implementation prototype:

Observable.prototype.map = function (project) {
    return new Observable((observer) = > {
        const mapObserver = {
            next: (x) = > observer.next(project(x)),
            error: (err) = > observer.error(err),
            complete: (a)= > observer.complete()
        };
        return this.subscribe(mapObserver);
    });
};Copy the code

Now we have better syntax. This approach has other benefits as well as being more advanced. For example, we could subclass Observables into observables of a specific type (such as observables that wrap promises or a set of static values) and optimize our operators by overriding them.

TLDR: Observable is a function that receives an observer and returns a function

Remember, after reading all of the above, that all of this is designed around a simple function. Observables is a function that accepts an observer and returns a function. That’s all. If you write a function that receives an observer and returns a function, is it asynchronous or synchronous? No, it’s just a function. The behavior of any function depends entirely on how it is implemented. So, when dealing with an Observable, treat it like a function reference you pass, not some so-called magic, stateful alien type. When you build operator chains, what you really want to do is form a function that sets the chain of observers that are linked together and passes the value to the observer.

Note that the Observable implementation in the example still returns a function, while the RxJS and ES-Observable specifications return a Subscription object. The Subscription object is a better design, but I’ll have to write an entire article about it. So I just kept its unsubscribe feature to keep all the examples in this article simple.