DevUI is a team with both design and engineering perspectives, serving huawei DevCloud platform and huawei internal background systems, as well as designers and front-end engineers.

Add devui Helper (Devui-Official) DevUIHelper plugin: Devuihelper-lsp (welcome Star)

The introduction

Before starting this article, define some custom terms to make them easier to read.

  • Top stream: a stream that calls the operator.
  • Upstream stream: the stream to which the operator’s internal subscriber subscribes.
  • Downstream stream: a stream managed by the operator’s internal subscriber.
  • Downstream subscription: Subscribers who subscribe to streams generated by the operator.

In the previous article, I described the functions of OuterSubscriber and InnerSubscriber and parsed the source codes of several Join Creation operators. Next, we will enter the source code analysis of Transformation Operator.

After learning that OuterSubscriber and InnerSubscriber are a way to manage downstream stream subscriptions through the delegate mode, I realized that this implementation technique is used by many operators. This and the next article will cover more of these designs.

(PS: FOR the sake of description, I took the relevant pictures from the official website. The sequence is read from left to right.)

Base mapping

map

A map is the most basic mapping that evaluates the value of the upstream stream and forwards it to the downstream subscription. This part of the source code is not very complex, in fact, is to do a layer of forwarding.

protected _next(value: T) {
  let result: R;
  try {
    result = this.project.call(this.thisArg, value, this.count++);
  } catch (err) {
    this.destination.error(err);
    return;
  }
  this.destination.next(result);
}
Copy the code

scan

Scan and has the same function as Reduce. A function is passed to level a set of data. However, the difference between Reduce and Scan is that the result is returned immediately after each combination.

const clicks1 = fromEvent(document.'click');
const ones1 = clicks.pipe(mapTo(1));
const seed1 = 0;
const count1 = ones.pipe(
  // The input is a function that returns any value
  scan((acc, one) = > acc + one, seed)
);
count.subscribe(x= > console.log(x));

Copy the code

The implementation of this part is also not very complicated. After receiving upstream stream data, accumulator is used to accumulate data.

protected _next(value: T): void {
  // Need to check whether there is an initial value.
  if (!this.hasSeed) {
    this.seed = value;
    this.hasSeed = true;
    this.destination.next(value);
  } else {
  	return this._tryNext(value); }}private _tryNext(value: T): void {
  const index = this.index++;
  let result: any;
  try {
  	// Calculate the result
    result = this.accumulator(<R>this.seed, value, index);
  } catch (err) {
    this.destination.error(err);
  }
	// Save it for future use
  this.seed = result;
  this.destination.next(result);
}
Copy the code

Five basic composite mappings

By composite mapping, these operators take a parameter that returns an Observable with upstream stream data, and forward the subscription data to the downstream.

MergeMap, switchMap, Haustmap, concatMap, mergeScan are five compound mapping operators that allow data from upstream flows to be passed to and processed by downstream flows. ConcatMap and mergeScan are a special case of mergeMap, so we only need to focus on the remaining three.

The source code structure of mergeMap, switchMap, haustmap is divided into these three parts:

  • The old flow is mapped to the new flow using the LIFT operation.
  • Implement the Operator interface and return a Subscriber through call.
  • This Subscriber is achieved by inheriting OuterSubscriber.

The first two of these have a very similar structure and are written with this boilerplate code.

export function someMap<T.R.O extends ObservableInput<any> > (
  project: (value: T, index: number) => O,
) :OperatorFunction<T.ObservedValueOf<O> | R> {
  return (source: Observable<T>) = > source.lift(new SomeMapOperator(project));
}

class SomeMapOperator<T.R> implements Operator<T.R> {
  constructor(private project: (value: T, index: number) => ObservableInput<R>) {
  }

  call(Subscriber: Subscriber<R>, source: any) :any {
    return source.subscribe(new SomeMapSubscriber(Subscriber, this.project)); }}Copy the code

Create InnerSubscriber in the internal registration method provided by _innerSub and pass in the current OuterSubscriber.

private _innerSub(input: ObservableInput<R>, value: T, index: number) :void {
  const innerSubscriber = new InnerSubscriber(this, value, index);
  const destination = this.destination as Subscription;
  destination.add(innerSubscriber);
  const innerSubscription = subscribeToResult<T, R>(this, input, undefined.undefined, innerSubscriber);
  
  // Since the input is probably not an Observable, it is returned
  // The result of the subscription may be equal to innserSubscriber
  // Handle it.
  if (innerSubscription !== innerSubscriber) {
    destination.add(innerSubscription);
  }
}
Copy the code

Ultimately, an internal subscription is created by subscribeToResult to manage the downstream flows.

mergeMap

MergeMap provides a merge operation that allows upstream streams to send data to multiple downstream streams by internally maintaining subscriptions to multiple downstream streams. It provides a concurrency limit parameter, mainly used to control the number of concurrent downstream streams.

export function mergeMap<T.R.O extends ObservableInput<any> > (
  project: (value: T, index: number) => O,
  concurrent: number = Number.POSITIVE_INFINITY
) :OperatorFunction<T.ObservedValueOf<O> | R> {
  return (source: Observable<T>) = > source.lift(new MergeMapOperator(project, concurrent));
}
Copy the code

Next, our focus will shift to MergeMapSubscriber. First look at its data structure.

export class MergeMapSubscriber<T.R> extends OuterSubscriber<T.R> {
  // Whether the mark has been completed
  private hasCompleted: boolean = false;
  // Stream Observable data cache
  private buffer: T[] = [];
  // The number of streams currently being opened
  private active: number = 0;
  // Index of data
  protected index: number = 0;

  constructor(
    // External incoming subscribers
    destination: Subscriber<R>,
    // The factory of the Observable to merge
    private project: (value: T, index: number) => ObservableInput<R>,
    // Number of concurrent requests
    private concurrent: number = Number.POSITIVE_INFINITY,
  ) {
    super(destination); }... }Copy the code

Subscriber

When _next of MergeMapSubscriber is called, the value of active (number of downstream streams) is compared with concurrent (maximum number of concurrent streams). If active is less than concurrent, _tryNext is called. Otherwise it puts incoming data into a buffer, but as you know, JavaScript doesn’t really have concurrency, it’s just an asynchronous queue. Each time _tryNext is done, a downstream stream is created through project, and the update active is made to stream the downstream in and fire the _innerSub.

protected _next(value: T): void {
  if (this.active < this.concurrent) {
    this._tryNext(value);
  } else {
    this.buffer.push(value); }}protected _tryNext(value: T) {
  let result: ObservableInput<R>;
  const index = this.index++;
  try {
    result = this.project(value, index);
  } catch (err) {
    this.destination.error(err);
    return;
  }
  this.active++;
  // 
  this._innerSub(result, value, index);
}
Copy the code

When the upstream stream completes, _complete is fired.

protected _complete(): void {
  this.hasCompleted = true;
  if (this.active === 0 && this.buffer.length === 0) {
    this.destination.complete();
  }
  this.unsubscribe();
}
Copy the code

If all downstream streams have completed and there is no data in the buffer, the downstream subscription data is notified that it has been output.

notify

NotifyNext simply passes the result to the downstream subscription, while notifyComplete is much more interesting.

NotifyComplete tells you which streams have completed and are closed. If there is data in buffer, it is sent to _next and a new downstream stream is created. With this recursive operation, all the data in the buffer can be sent out. Finally, determine whether the upstream and downstream streams have ended. If so, notify the downstream that the subscription data has been output.

notifyNext(
  outerValue: T, innerValue: R,
  outerIndex: number.innerIndex: number.innerSub: InnerSubscriber<T, R>
): void {
  this.destination.next(innerValue);
}

notifyComplete(innerSub: Subscription): void {
  const buffer = this.buffer;
  this.remove(innerSub);
  this.active--;
  if (buffer.length > 0) {
    this._next(buffer.shift());
  } else if (this.active === 0 && this.hasCompleted) {
    this.destination.complete(); }}Copy the code

switchMap

SwitchMap provides an upstream flow-based mapping operation. When the subscription data of the upstream flow arrives, the old downstream flow is unsubscribed and a new set of downstream flows is re-subscribed.

export function switchMap<T.R.O extends ObservableInput<any> > (
  project: (value: T, index: number) => O
) :OperatorFunction<T.ObservedValueOf<O>|R> {
  return (source: Observable<T>) = > source.lift(new SwitchMapOperator(project));
}
Copy the code

Subscriber

InnerSubscription holds the subscription for the current downstream stream, so this operator maintains a subscription for only one downstream stream.

private index: number = 0;
private innerSubscription: Subscription;
Copy the code

When the next operation is performed, a new downstream stream is created, and if the old downstream stream exists, it is unsubscribed.

protected _next(value: T) {
  let result: ObservableInput<R>;
  const index = this.index++;
  try {
    // The upstream stream data arrives, creating a new downstream stream.
    result = this.project(value, index);
  } catch (error) {
    this.destination.error(error);
    return;
  }

  // Old downstream streams unsubscribe
  const innerSubscription = this.innerSubscription;
  if (innerSubscription) {
    innerSubscription.unsubscribe();
  }

  this._innerSub(result, value, index);
}
Copy the code

The Subscriber overwrites *_complete*. This means that the upstream stream has finished output, so if the downstream subscribes

protected _complete(): void { const {innerSubscription} = this; if (! innerSubscription || innerSubscription.closed) { super._complete(); return; } this.unsubscribe(); }Copy the code

notify

NotifyNext, as before, forwards data from the downstream stream. Again, the main concern is notifyComplete. Because innerSubscription is set to empty, calling this._complete is meaningless and does not trigger its parent function.

notifyComplete(innerSub: Subscription): void {
  const destination = this.destination as Subscription;
  destination.remove(innerSub);
  this.innerSubscription = null;
  if (this.isStopped) {
    super._complete(); }}Copy the code

If the current downstream stream has completed, it is removed from the downstream subscription (Destination), and if the upstream stream has stopped (error or complete was called, or unsubscribed), super._complete is called to indicate completion.

exhaustMap

In contrast to switchMap, haustmap provides a mapping operation dominated by downstream flows. If the downstream stream is enabled, incoming subscription data from the upstream stream will be discarded until the downstream stream completes the subscription. After the downstream stream completes the subscription, the upstream stream’s data will continue to be combined with the new downstream stream to form a new subscription.

export function exhaustMap<T.R.O extends ObservableInput<any> > (
  project: (value: T, index: number) => O,
) :OperatorFunction<T.ObservedValueOf<O>|R> {
  return (source: Observable<T>) = > source.lift(new ExhaustMapOperator(project));
}
Copy the code

Subscriber

The implementation of haustmap is simple, marking whether a downstream stream is subscribed by maintaining an internal state such as hasSubscription. HasCompleted is a marker of the completion of the upstream stream.

private hasSubscription = false;
private hasCompleted = false;
Copy the code

The subscription calls _next, marking whether the downstream stream is already on (if the subscription already exists), and if not, building a new downstream stream and marking hasSubscription to true.

protected _next(value: T): void {
  if (!this.hasSubscription) {
      let result: ObservableInput<R>;
      const index = this.index++;
      try {
        result = this.project(value, index);
      } catch (err) {
        this.destination.error(err);
        return;
      }
      // mark true
      this.hasSubscription = true;
      this._innerSub(result, value, index); }}Copy the code

Data for both upstream and downstream streams has been output, so the completion signal is passed to the downstream subscription.

protected _complete(): void {
  this.hasCompleted = true;
  if (!this.hasSubscription) {
    this.destination.complete();
  }
  this.unsubscribe();
}
Copy the code

notify

If the downstream stream is finished exporting data, hasSubscription should be marked as false.

notifyComplete(innerSub: Subscription): void {
  const destination = this.destination as Subscription;
  destination.remove(innerSub);

  // mark it as false
  this.hasSubscription = false;

  // Check whether the upstream stream has completed
  if (this.hasCompleted) {
    this.destination.complete(); }}Copy the code

concatMap

ConcatMap is a special form of mergeMap.

export function concatMap<T.R.O extends ObservableInput<any> > (
  project: (value: T, index: number) => O,
) :OperatorFunction<T.ObservedValueOf<O>|R> {
  return mergeMap(project, 1);
}
Copy the code

mergeScan

The source code for mergeScan is similar to mergeMap. It simply replaces the function passed in and caches the value of the previous combination internally.

const clicks2 = fromEvent(document.'click');
const ones2 = click$.pipe(mapTo(1));
const seed2 = 0;
const count2 = one$.pipe(
  // Enter an Observable factory
  mergeScan((acc, one) = > of(acc + one), seed),
);
Copy the code

concat & merge

The two related operators of Concat and Merge were not mentioned in the previous article, because they both end up calling mergeMap.

summary

Through these three different mapping operators, the upstream stream can be combined with the downstream stream in a certain way. So, with a diagram, we can look at the relationship of the correlation operators.

These operators are classified.

  • Belonging to Transformation Operators are: ConcatMap, concatMapTo, mergeMap, mergeMapTo, switchMap, switchMapTo, haustmap, haustmapto.

  • Join Creation Operators are Merge, concat.

  • Join Operators are mergeAll, concatAll, switchAll, startWith, endWith.

Discrete higher-order operators

expand

Expand recurses the incoming Observable factory. Expand is also a compound map, similar to the compound map above, except that it continuously compounds the downstream flow data, which is similar to the pattern shown above.

Subscriber

To implement the corresponding function, expand defines the following data structures.

export class ExpandSubscriber<T.R> extends OuterSubscriber<T.R> {
  // Current index
  private index: number = 0;
  // The number of downstream streams started
  private active: number = 0;
  // Whether the upstream stream has been completed
  private hasCompleted: boolean = false;
  // Cache data for index
  private buffer: any[];
  // Downstream factory
  private project: (value: T, index: number) = > ObservableInput<R>,
	/ / concurrency
 	private concurrent: number;
}
Copy the code

When the upstream stream data arrives, it is similar to mergeMap, comparing active and concurrent. If active is greater than concurrent, the upstream stream data is cached with a buffer. If active is less than concurrent, the data is sent directly to the downstream subscription and a new downstream stream is subscribed to. One thing to note is that, in order to prevent a stack burst, expand adds a judgment condition here, which in notify will be used to end the recursion.

protected _next(value: any) :void {
  const destination = this.destination;

  if (destination.closed) {
    this._complete();
    return;
  }

  const index = this.index++;
  if (this.active < this.concurrent) {
    destination.next(value);
    try {
      const { project } = this;
      const result = project(value, index);
      this.subscribeToProjection(result, value, index);
    } catch(e) { destination.error(e); }}else {
    this.buffer.push(value); }}// Subscribe to a new downstream stream
private subscribeToProjection(result: any.value: T, index: number) :void {
  this.active++;
  const destination = this.destination as Subscription;
  destination.add(subscribeToResult<T, R>(this, result, value, index));
}
Copy the code

When the upstream stream completes, you need to mark hasComplete as true. This step is an important sign of the end of recursion.

protected _complete(): void {
  this.hasCompleted = true;
  if (this.hasCompleted && this.active === 0) {
    this.destination.complete();
  }
	this.unsubscribe();
}
Copy the code

notify

So how does expand form a recursion? When the downstream stream has data coming, it calls _next directly. Finally, a recursive chain _next -> subscribeToProjection -> Next -> notifyNext -> _NEXT is formed.

notifyNext(
  outerValue: T, 
  innerValue: R,
	outerIndex: number.innerIndex: number.innerSub: InnerSubscriber<T, R>
): void {
	this._next(innerValue);
}
Copy the code

When the downstream flow completes, you need to decide whether to end the recursion based on the state of hasCompleted and buffer. Here, too, a recursive chain is formed: _next -> subscribeToProjection -> Next -> notifyComplete -> _next.

notifyComplete(innerSub: Subscription): void {
  const buffer = this.buffer;
  const destination = this.destination as Subscription;
  destination.remove(innerSub);
  this.active--;
  if (buffer && buffer.length > 0) {
    this._next(buffer.shift());
  }
  if (this.hasCompleted && this.active === 0) {
    this.destination.complete(); }}Copy the code

exhaust

Exhaust is a leveling operation, and its source code does not invoke Haustmap. The idea is simple: the current incoming downstream stream is enabled by determining whether the previous downstream stream subscription currently exists.

private hasCompleted: boolean = false;
private hasSubscription: boolean = false;


protected _next(value: T): void {
  // Discard this value if there is a subscription
  if (!this.hasSubscription) {
    this.hasSubscription = true;
    this.add(subscribeToResult(this, value)); }}protected _complete(): void {
  this.hasCompleted = true;
  if (!this.hasSubscription) {
    this.destination.complete();
  }
}

notifyComplete(innerSub: Subscription): void {
  this.remove(innerSub);
  this.hasSubscription = false;
  if (this.hasCompleted) {
    this.destination.complete(); }}Copy the code

conclusion

While this article focuses on analyzing how operators map data, the next article will look at how buffer and window-related cache operators are run and implemented.

Join us

We are DevUI team, welcome to come here and build elegant and efficient human-computer design/research and development system with us. Email: [email protected].

Author: ZCX (Public account: ZCX Studio)

Operator III RxJS Operator III RxJS Operator III


Previous articles are recommended

VSCode plugin DevUIHelper design and development overview (3)

Dark Mode and Thematic Development of Web Interfaces

“How to build a grayscale Publishing environment”