DevUI is a team with both design and engineering perspectives, serving huawei DevCloud platform and huawei internal background systems, as well as designers and front-end engineers. Add devui Helper (Devui-Official) DevUIHelper plugin: Devuihelper-lsp (welcome Star)

preface

ReactiveX is an acronym for Reactive Extensions, commonly known as Rx. ReactiveX was originally developed as an extension to LINQ by a team led by Microsoft architect Erik Meijer and opened source in November 2012. Rx is a programming model that aims to provide a consistent programming interface to help developers handle asynchronous data flows more easily.

First, the official definition of Rx is given.

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

It’s a bit of a hassle to translate, but in a nutshell, it’s observer queue-based programming for asynchronous and basic events.

Rxjs is an implementation of JavaScript for Rx. This article will briefly examine how the Obersvable and Subscription source code works.

Observable

Observables are at the heart of Rx, providing an observer pattern that allows callers to retrieve data in a responsive manner.

An Observable is essentially a one-way linked list with the following basic data structure:

class Observable<T> {
  source: Observable<any>;
}
Copy the code

It is constructed in a similar way to Promise, by passing in a function that wraps the operation and lets the function determine the data transfer. The function’s argument contains a subscriber.

const observable = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.error(Error('error message'));
  subscriber.complete();
});
Copy the code

The subscriber provides three main methods: Next, Error, and complete. The subscriber implementation is clever, and its internal implementation is a linked list.

Unlike Promise, an Observable doesn’t run the function immediately. Instead, it waits until it’s subscribed. This lazy evaluation allows an Observable to perform calculations where it’s only needed.

lift

The Lift method provides this functionality by passing in a mapping function and returning a new Observable whose source points to the Observable that created it. In effect, what you do is wrap the mapping function around an enclosing class, which is an Observable. So, let’s see how it works.

lift<R>(operator: Operator<T, R>): Observable<R> {
  const observable = new Observable<R>();
  observable.source = this;
  observable.operator = operator;
  return observable;
}
Copy the code

pipe

Rxjs differs from ReactiveX in that its mapping method is no longer stored inside an Observable, but is passed as a parameter into a pipe function, which reduces an array of pipe functions. You get the ultimate Observable. This reduce process is also very clever. The parameter of the function passed in is the upstream Observable, and the returned one is the Observable received by the downstream, so one Observable can be connected together

pipe(... operations: OperatorFunction<any, any>[]): Observable<any> { if (operations.length === 0) { return this as any; } if (operations.length == 1) { return operation[0]; } return operations.reduce((prev, fn) => fn(prev), this); }Copy the code

Pipe then provides type information to the function passed in through overloading during use.

export function pipe<T>(): UnaryFunction<T, T>;
export function pipe<T, A>(fn1: UnaryFunction<T, A>): UnaryFunction<T, A>;
export function pipe<T, A, B>(fn1: UnaryFunction<T, A>, fn2: UnaryFunction<A, B>): UnaryFunction<T, B>;
// ...
Copy the code

UnaryFunction represents a UnaryFunction. Through this chain operation, all functions on the chain can take the upstream type and pass the type conversion to the downstream.

subscribe

When an Observable calls SUBSCRIBE, it starts executing all the functions in the chain. Subscribe is passed as an object containing the next, error, and complete attributes; It can also be three functions, corresponding to next, error, and complete.

observable.subscribe((value) { console.log(value); }, (error) { console.error(error); }, () { console.log('complete'); }); observable.subscribe({ next: (value) { console.log(value); }, error: (error) { console.error(error); }, complete: () { console.log('complete'); }});Copy the code

This is done by converting the function (object) argument passed in to a Subscriber object, which inherits Subscription. Finally, a subscription is returned to the caller.

subscribe( observerOrNext? : PartialObserver<T> | ((value: T) => void), error? : (error: any) => void, complete? // operator is a mapping function const {operator} = this; const sink = new Subscriber(observerOrNext, error, complete); if (operator) { sink.add(operator.call(sink, this.source)); } else { sink.add(this.source || ! sink.syncErrorThrowable ? this._subscribe(sink) : this._trySubscribe(sink) ); } // omit error handling return sink; } _subscribe(subscriber: Subscriber<any>): TeardownLogic { const { source } = this; return source && source.subscribe(subscriber); } _trySubscribe(sink: Subscriber<T>): TeardownLogic { try { return this._subscribe(sink); } catch (err) {// There are some errors in the source code that do not affect reading sink.error(err); }Copy the code

The add method of Subscriber will be described below. All in all, observables are like a string or a firecracker. Only when they are ignited (SUBSCRIBE) will they light up one Observable after another, and finally make a huge noise. Subscribe is a process of finding leads and lighting them.

Subscription

Subscription is a tree structure that contains leaf nodes and a parent or collection of parent nodes.

class Subscription {
  _parentOrParents: Subscription;
  _subscriptions: Subscription[];
}
Copy the code

add

The main function of the Add method is to connect different subscriptions with annotations. The logic is to parcel functions or subscriptions into a subscriptions variable, and set the parent subscription object of this parcel object as the current object.

add(logic: Function | Subscription | void): Subscription { let subscription = logic; If (typeof Logic === 'object') {// If the subscription has been cancelled, it is not set. // If the current subscription has been cancelled, the added subscription should also be cancelled. if (subscription === this || subscription.closed || typeof subscription.unsubscribe ! == 'function') { return subscription; } else if (this.closed) { subscription.unsubscribe(); return subscription; } else if (! (subscription instanceof Subscription)) { const tmp = subscription; subscription = new Subscription(); subscription._subscriptions = [tmp]; } } else if (typeof logic === 'function' ) { subscription = new Subscription(<(() => void)>teardown); } else {// Throws an error. } // The process of setting the parent object takes lazy loading mode. let { _parentOrParents } = subscription; If (_parentOrParents === null) {// If no parent object is set, the current object is set as the parent object. subscription._parentOrParents = this; } else if (_parentOrParents instanceof Subscription) {// If the parent object is already the current object, return it. if (_parentOrParents === this) { return subscription; } // The parent object of the added subscription already exists, so store it in an array. subscription._parentOrParents = [_parentOrParents, this]; } else if (_parentorParents.indexof (this) === -1) {// If there is already an array object and there is no current subscription object, set the current subscription object _parentorParents.push (this); } else {// The current subscription object is set as the parent object return subscription; } // Also, the process of setting up leaf nodes is also a lazy loading of const subscriptions = this._subscriptions; if (subscriptions === null) { this._subscriptions = [subscription]; } else { subscriptions.push(subscription); } return subscriptioCopy the code

unsubscribe

Unsubscribe is the primary function of subscription objects, which provide the observer pattern with a means to terminate observations.

Unsubscribe (): void {// Unsubscribe already. if (this.closed) { return; } // Get the relevant object that you want to unsubscribe from. Loop let {_parentOrParents, _unsubscribe, _subscriptions} = (<any> this); // Set unsubscribe this.closed = true; // Set the parent object to null this._parentOrParents = null; // Set subscriptions to null this._subscriptions = null; If (_parentOrParents instanceof Subscription) {_parentorparents.remove (this); } else if (_parentOrParents ! == null) { for (let index = 0; index < _parentOrParents.length; ++index) { const parent = _parentOrParents[index]; parent.remove(this); If (isFunction(_unsubscribe)) {try {_unsubscribe. Call (this); } catch (e) { errors = e instanceof UnsubscriptionError ? flattenUnsubscriptionErrors(e.errors) : [e]; If (isArray(_subscriptions)) {let len = _subscriptions.length; for (const sub of _subscriptions) { if (isObject(sub)) { try { sub.unsubscribe(); } catch (e) {// omit error handling}}}} //Copy the code

conclusion

The first article introduces these two important component classes, and the combination derived from these two classes is the essence of Rx, and we will continue to introduce these operation functions.

Join us

We are DevUI team, welcome to come here and build elegant and efficient human-computer design/research and development system with us. Email: [email protected].

ZCX (官 码 : Coder)

Original link: mp.weixin.qq.com/s/6fVoI_JtS…

Previous articles are recommended

“Two or three Things before and after Authentication”

Dark Mode and Thematic Development of Web Interfaces

“How to build a grayscale Publishing environment”