Observable is a library of observer, iterator, and functional patterns that provides powerful asynchronous processing capabilities based on event flows. It’s already in the Stage1 draft. Rxjs is an implementation of Observable, a JavaScript version of ReactiveX’s many languages.

In JavaScript, we can use T | null to deal with a single value, using the Iterator to handle multiple worth, USES Promise to handle asynchronous single values, while observables fill in the missing “asynchronous multiple values.

Using Rxjs

As mentioned above, EventEmitter is used for response processing, which is slightly different in Rxjs:

/* const change$ = new Subject(); <Input change$={change$} /> <Search change$={change$} /> */ class Input extends Component { state = { value: '' }; onChange = e => { this.props.change$.next(e.target.value); }; componentDidMount() { this.subscription = this.props.change$.subscribe(value => { this.setState({ value }); }); } componentWillUnmount() { this.subscription.ubsubscribe(); } render() { const { value } = this.state; return <input value={value} onChange={this.onChange} />; } } class Search extends Component { // ... componentDidMount() { this.subscription = this.props.change$.subscribe(value => { ajax(/* ... */).then(list => this.setState({ list }) ); }); } componentWillUnmount() { this.subscription.ubsubscribe(); } render() { const { list } = this.state; return <ul>{list.map(item => <li key={item.id}>{item.value}</li>)}</ul>; }}Copy the code

Here, we also need to release subscriptions to events manually, but thanks to the design of Rxjs, we don’t need to store instances of callback functions like EventEmitter to release subscriptions, so we can easily solve this problem with higher-order components. Such as:

const withObservables = observables => ChildComponent => { return class extends Component { constructor(props) { super(props); this.subscriptions = {}; this.state = {}; Object.keys(observables).forEach(key => { this.subscriptions[key] = observables[key].subscribe(value => { this.setState({ [key]: value }); }); }); } onNext = (key, value) => { observables[key].next(value); }; componentWillUnmount() { Object.keys(this.subscriptions).forEach(key => { this.subscriptions[key].unsubscribe(); }); } render() { return ( <ChildComponent {... this.props} {... this.state} onNext={this.onNext} /> ); }}; };Copy the code

This avoids the hassle of manually releasing resources like EventEmitter when multiple data sources need to be aggregated. Meanwhile, in Rxjs we have methods specifically for aggregating data sources:

Observable.combineLatest(foo$, bar$) .pipe( // ... ) ;Copy the code

It’s obviously very efficient compared to EventEmitter’s approach, but it also has huge advantages over Mobx. In Mobx, we mentioned the need to aggregate multiple data sources. Using autoRun is easy to gather unnecessary dependencies and using Observe is inefficient. In Rxjs, there are no such problems. CombineLatest can declare data sources that need to be aggregated in a very concise way, and thanks to Rxjs design, we don’t need to call the destructor returned by Observe one by one, as Mobx does. Just handle the subscription for each subscribe return:

class Foo extends Component { constructor(props) { super(props); this.subscription = Observable.combineLatest(foo$, bar$) .pipe( // ... ) .subscribe(() => { // ... }); } componentWillUnmount() { this.subscription.unsubscribe(); }}Copy the code

Asynchronous processing

Rxjs uses operators to describe various behaviors, and each operator returns a new Observable that we can perform subsequent operations on. For example, we can convert data using the map operator:

foo$.map(event => event.target.value);
Copy the code

After Rxjs5.5, all Observables have introduced a pipe method that receives several operators and returns an Observable. Therefore, it is easy to work with Tree Shaking to implement the introduction of operators on demand, rather than bringing in the entire Rxjs:

import { map } from 'rxjs/operators';

foo$.pipe(map(event => event.target.value));
Copy the code

This is recommended. In our discussion of object-oriented responsiveness, we mentioned that the object-oriented approach to asynchronous problems is not easy to handle. Observable we can handle asynchronous problems with the switchMap operator. An asynchronous search would look something like this:

input$.pipe(switchMap(keyword => Observable.ajax(/* ... * /)));Copy the code

We can use promises to handle asynchronous single values, while observables handle asynchronous multiple values. We can easily turn a Promise into an Observable, and reuse existing asynchronous code:

input$.pipe(switchMap(keyword => fromPromise(search(/* ... * /))));Copy the code

SwitchMap takes a function that returns an Observable as a parameter, and downstream flows are cut to the returned Observable. To aggregate multiple data sources and do asynchronous processing:

combineLatest(foo).pipe( switchMap(keyword => fromPromise(someAsyncOperation(/* … * /)))); Also, because the standard promises don’t have a cancel method, we sometimes have trouble cancelling asynchronous methods (mainly to address concurrency safety issues). When a new value arrives upstream, switchMap ignores the completion of the existing Observable and calls a function to return a new Observable. We solved the concurrency security problem by using only one function. Of course, we can choose switchMap, mergeMap, concatMap, haustmap and so on according to actual needs.

Rxjs also has a huge advantage when it comes to timeline manipulation. As mentioned in my previous blog, when we need to delay operations by 5 seconds, both EventEmitter and object-oriented methods are inadequate. In Rxjs, we only need a delay operator to solve the problem:

Input $. Pipe (delay(5000) // Downstream receives data 5 seconds after input$);Copy the code

Use Rxjs to process data

Observables are designed to handle events, so they have a lot of event-intuitive design.

Observables are designed to be lazy, so a stream will not execute when there are no subscribers. For events, there is no problem with consumers not executing without events. In a GUI, the subscriber might be a View:

class View extends Component { state = { input: '' }; componentDidMount() { this.subscription = input$.subscribe(input => { this.setState({ input }); }); } componentWillUnmount() { this.subscription.unsubscribe(); } render() { // ... }}Copy the code

Since the View might not exist, for example if the route was cut, our event source would have no subscribers and would not run. But we expect the data in the background to continue after the route is removed.

In the case of events, subscribers after the event are not subjected to the logic before the subscription. For example, in EventEmitter:

eventEmitter.emit('hello', 1);
// ...
eventEmitter.on('hello', function listener() {});
Copy the code

Because the listener listens after the Hello event occurs, the listener does not receive an event with the value 1. However, this can cause problems when dealing with data, which is lost when the View is uninstalled (such as route cutting).

At the same time, since an Observable does not provide a method to fetch the internal state directly, it is not convenient for us to get data at any time when we use an Observable to process data. Is there a way around this problem that enables Observable abstraction to empower the data layer?

Back to the story. Redux events (actions) are actually a stream of events, so we can naturally incorporate Redux’s stream of events into the Rxjs stream:

() => next => {
  const action$ = new Subject();

  return action => {
    action$.next(action);
    // ...
  };
};
Copy the code

With this encapsulation, redux-Observable allows us to combine the powerful event description and processing capabilities of an Observable with Redux. We can easily handle side effects according to actions:

action$.pipe(
  ofType('ACTION_1'),
  switchMap(() => {
    // ...
  }),
  map(res => ({
    type: 'ACTION_2',
    payload: res
  }))
);

action$.pipe(
  ofType('ACTION_3'),
  mergeMap(() => {
    // ...
  }),
  map(res => ({
    type: 'ACTION_4',
    payload: res
  }))
);
Copy the code

ReduxObservable allows us to combine Redux and Observable. Here, Action is treated as a stream, and ofType equals filter(Action =>action.type===’SOME_ACTION’) to get the Action to listen on. Thanks to Redux’s design, We can listen for actions to handle side effects or to listen for data changes. The stream returns a new Action stream, which ReduxObservable dispatches out. This gives us the power of Rxjs to handle asynchronous events on the basis of storing data using Redux.