Operators are one of the most commonly used APIS in RXJS for developers. There are ten types of operators built into RXJS according to their functions: Creation, Join Creation, Transformation, Filtering, Join, Multicasting, Error Handling, Utility, Conditional and Boolean, Mathematical and Aggregate (Boolean, Mathematical and Aggregate) are used for almost all scenarios. If you encounter scenarios where you cannot adapt well, you can also define your own operator

Due to space constraints, it is not possible to go through all of the 100 operators source code, nor is it necessary, because much logic is shared. I will select a few examples from each of the ten operators to interpret the source code

Creation

from

From can create observables (i.e. data sources) from Array, array-like, Promise, iterable, Observability-like, and readable-stream-like objects. These classes of objects can be converted into almost any JS object, so RXJS Converts from almost anything to an Observable by case study

For example, from Array:

 import { from } from 'rxjs';

 const array = [10.20.30];
 const result = from(array);
 result.subscribe(x= > console.log(x));
 // Logs:
 / / 10
 / / 20
 / / 30
Copy the code

From the iterable:

import { from } from 'rxjs';
import { take } from 'rxjs/operators';
function* generateDoubles(seed) {
   let i = seed;
   while (true) {
     yield i;
     i = 2* i; }}const iterator = generateDoubles(3);
const result = from(iterator).pipe(take(10));
result.subscribe(x= > console.log(x));
// Logs:
/ / 3
/ / 6
/ / 12
/ / 24
/ / 48
/ / 96
/ / 192
/ / 384
/ / 768
/ / 1536
Copy the code

Since it can process all kinds of objects, it must have its own conditional branches to make judgments, and then enter different processing methods according to different object types, eventually generating an Observable

// node_modules/rxjs/src/internal/observable/from.ts
export function from<T> (input: ObservableInput
       
        , scheduler? : SchedulerLike
       ) :Observable<T> {
  return scheduler ? scheduled(input, scheduler) : innerFrom(input);
}
Copy the code

The from function receives two functions. The first is the data source, and the second is the scheduler, which we’ve discussed in previous articles. This is the logic that allows the developer to schedule the flow of data

// node_modules/rxjs/src/internal/observable/innerFrom.ts
export function innerFrom<T> (input: ObservableInput<T>) :Observable<T> {
  if (input instanceof Observable) {
    return input;
  }
  if(input ! =null) {
    if (isInteropObservable(input)) {
      return fromInteropObservable(input);
    }
    if (isArrayLike(input)) {
      return fromArrayLike(input);
    }
    if (isPromise(input)) {
      return fromPromise(input);
    }
    if (isAsyncIterable(input)) {
      return fromAsyncIterable(input);
    }
    if (isIterable(input)) {
      return fromIterable(input);
    }
    if (isReadableStreamLike(input)) {
      returnfromReadableStreamLike(input); }}throw createInvalidObservableTypeError(input);
}
Copy the code

The innerFrom method is used to turn the input object input into an Observable. It selects different methods for processing logic according to the incoming object type. IsXXX is a method for determining the type, and fromXXX is a method for converting different objects into Observables, such as fromPromise

export function fromPromise<T> (promise: PromiseLike<T>) {
  return new Observable((subscriber: Subscriber<T>) = > {
    promise
      .then(
        (value) = > {
          if (!subscriber.closed) {
            subscriber.next(value);
            subscriber.complete();
          }
        },
        (err: any) = > subscriber.error(err)
      )
      .then(null, reportUnhandledError);
  });
}
Copy the code

It’s a new Observable inside, and it returns that instance

fromEvent

FromEvent has a lot of type signatures, but basically it takes four parameters

  • target

There are three valid types of target: HasEventTargetAddRemove, NodeStyleEventEmitter, and JQueryStyleEventEmitter

export interface HasEventTargetAddRemove<E> {
  addEventListener(
    type: string.listener: ((evt: E) = > void) | EventListenerObject<E> | null, options? :boolean | AddEventListenerOptions
  ): void;
  removeEventListener(
    type: string.listener: ((evt: E) = > void) | EventListenerObject<E> | null, options? : EventListenerOptions |boolean) :void;
}
Copy the code

HasEventTargetAddRemove, obviously, is for htmlElements, so you can pass in DOM elements. Similarly, NodeStyleEventEmitter allows you to pass in NodeJS emitter objects, The JQueryStyleEventEmitter allows you to pass in a JQ element object

  • eventName

Event names, such as click, Focus

  • options

Type signature:

export interfaceEventListenerOptions { capture? :boolean; passive? :boolean; once? :boolean;
}
Copy the code

This is the third argument to addEventListener in cases where target is a DOM element

  • resultSelector

It’s an interceptor that decides what value is finally passed to subscribe, for example

let i = 0
fromEvent(div, 'click'.(v) = > {
  if (i++ % 2= = =0) {
    return v
  }
  return null
}).subscribe(e= > {
  console.log('click', e);
})
Copy the code

For the even-numbered click event, SUBSCRIBE receives e as the click event, and for the odd-numbered click event, E as null

Let’s look at the source code

// rxjs/src/internal/observable/fromEvent.ts
export function fromEvent<T> (
  target: any,
  eventName: string, options? : EventListenerOptions | ((... args:any[]) => T), resultSelector? : (... args:any[]) => T
) :Observable<T> {
  // ...
  const [add, remove] =
    // If it is an EventTarget, we need to use a slightly different method than the other two patterns.
    isEventTarget(target)
      ? eventTargetMethods.map((methodName) = > (handler: any) = > target[methodName](eventName, handler, options as EventListenerOptions))
      : // In all other cases, the call pattern is identical with the exception of the method names.
      isNodeStyleEventEmitter(target)
      ? nodeEventEmitterMethods.map(toCommonHandlerRegistry(target, eventName))
      : isJQueryStyleEventEmitter(target)
      ? jqueryMethods.map(toCommonHandlerRegistry(target, eventName))
      : [];
}
Copy the code

IsEventTarget, isNodeStyleEventEmitter, isJQueryStyleEventEmitter respectively is to determine the target dom element or node event or jq object, After that, perform the corresponding methods for each type of target, returning add and remove

// rxjs/src/internal/observable/fromEvent.ts
const nodeEventEmitterMethods = ['addListener'.'removeListener'] as const;
const eventTargetMethods = ['addEventListener'.'removeEventListener'] as const;
const jqueryMethods = ['on'.'off'] as const;
Copy the code

This is done by taking target’s add listener method and remove listener method, registering listener event with target when subscribing, and removing listener event when subscribing

// rxjs/src/internal/observable/fromEvent.ts
export function fromEvent<T> (
  target: any,
  eventName: string, options? : EventListenerOptions | ((... args:any[]) => T), resultSelector? : (... args:any[]) => T
) :Observable<T> {
  / /...
  return new Observable<T>((subscriber) = > {
    const handler = (. args:any[]) = > subscriber.next(1 < args.length ? args : args[0]);
    add(handler);
    return () = >remove! (handler); }); }Copy the code

timer & interval

Timer: Used to emit a notification after a delay

Interval: Periodically sends notifications (that is, polling) (Emits Incremental numbers periodically in time)

// rxjs/src/internal/observable/timer.ts
export function timer(
  dueTime: number | Date = 0, intervalOrScheduler? :number | SchedulerLike,
  scheduler: SchedulerLike = asyncScheduler
) :Observable<number> {}
Copy the code

The first parameter of the timer is the delay time, which determines how long to delay execution, so this is an asynchronous operation that uses the scheduler internally

if (due < 0) {
  due = 0;
}
let n = 0;
return scheduler.schedule(function () {
  // ...
}, due);
Copy the code

Scheduler (function () {}, due); You can view it as setTimeout(function () {}, due);

The second parameter, intervalOrScheduler, if a number is passed in, indicates that the intervalOrScheduler will execute once every time after the delay of dueTime, which is equivalent to setInterval. The polling method, Call schedule in schedule

return scheduler.schedule(function () {
  if(! subscriber.closed) {// Emit the next value and increment.
    subscriber.next(n++);

    if (0 <= intervalDuration) {
      // If we have a interval after the initial timer,
      // reschedule with the period.
      this.schedule(undefined, intervalDuration);
    } else {
      // We didn't have an interval. So just complete.
      subscriber.complete();
    }
  }
}, due);
Copy the code

You can also pass in a scheduler for custom scheduling, or if you pass in a number for the second argument and want to pass in a scheduler for the third argument

Interval is a wrapped version of timer, which can be regarded as setInterval in RXJS

// /rxjs/src/internal/observable/interval.ts
export function interval(period = 0, scheduler: SchedulerLike = asyncScheduler) :Observable<number> {
  if (period < 0) {
    period = 0;
  }
  return timer(period, period, scheduler);
}
Copy the code

Join Creation Operators

concat

Concat can combine several Observables into a new Observable and continue to execute the next Observable after each Observable is finished

const timer1 = interval(300).pipe(take(1));
const timer2 = interval(500).pipe(take(2));
const timer3 = interval(500).pipe(take(2));
Copy the code

If you want to start timer2 after timer1 and timer3 after timer2, you can do this without concat:

timer1.subscribe({
  next: console.log,
  complete: () = > timer2.subscribe({
    next: console.log,
    complete: () = > timer3.subscribe({
      next: console.log
    })
  })
})
Copy the code

This is the Callback Hell we should avoid writing, and it would be much better to use concat

const result = concat(timer1, timer2, timer3)
result.subscribe(x= > console.log(x))
Copy the code
// /rxjs/src/internal/observable/concat.ts
export function concat(. args:any[]) :Observable<unknown> {
  return concatAll()(from(args, popScheduler(args)));
}

// /rxjs/src/internal/util/args.ts
export function popScheduler(args: any[]) :SchedulerLike | undefined {
  return isScheduler(last(args)) ? args.pop() : undefined;
}
Copy the code

ConcatAll is mentioned below. From is a method for converting an input value into an Observable. Args is the parameter of concat, the data stream to be merged

Then the meaning of the source code is more obvious, that is, concatAT received data flow one by one to concatAll, because concatAll has the function of sequential execution of data flow, so concat incoming data flow must be executed one by one, the end of the last data flow to the next

Transformation Operators

mergeMap

All observables are merged into the same data flow without unsubscribe or wait for the end of the last Observable. All observables are processed in parallel

const letters = of('a'.'b'.'c')
const result = letters.pipe(
  mergeMap(x= > interval(1000).pipe(map(i= > x + i)))
)
result.subscribe(x= > console.log(x))
// a0
// b0
// c0
// a1
// b1
// c1
// ...
Copy the code

Is equivalent to

const letters = of('a'.'b'.'c')
letters.subscribe({
  next: x= > {
    interval(1000).pipe(map(i= > x + i)).subscribe(x= > console.log(x))
  }
})
Copy the code

So mergeMap is actually a further encapsulation (of course, almost all operators in RXJS are the result of further encapsulation), and if there is a resultSelector, you don’t need to subscribe manually, just mergeMap

If YOU look at the mergeMap source code, you’ll see that mergeInternals is mostly called, so look at this directly

// /rxjs/src/internal/operators/mergeInternals.ts
const doInnerSub = (value: T) = > {
  // ...
  innerFrom(project(value, index++)).subscribe(
    new OperatorSubscriber(
      subscriber,
      (innerValue) = > {
        / /...
        subscriber.next(innerValue);
      },
      // ...
    )
Copy the code

The main one is subscriber.next(innerValue), which is a function passed into mergeMap as a parameter. Value is the value emitted by the original Observable. The index represents the number of times an Observable emits an event (starting from 0). The innerFrom, as mentioned above, is the method that converts an incoming parameter to an Observable

Subscribe to the Observable passed in by mergeMap. Each time the Observable sends a value, it executes subscriber.next(innerValue). Subscriber is the data stream to which mergeMap belongs

The flow of data is clear. MergeMap subscribes to all internally generated data streams, processes them as they are generated, and then passes them to the outermost master data stream, which has only one exit, resulting in a single data stream to the outside world

map

Similar to array.prototype.map (), the value of an Observable is converted to another value each time by a transform function

const letters = of('a'.'b'.'c');
const result = letters.pipe(map((v, i) = > v + i));
result.subscribe(x= > console.log(x));

// a0
// b1
// c2
Copy the code
// /rxjs/src/internal/operators/map.ts
export function map<T.R> (project: (value: T, index: number) => R, thisArg? :any) :OperatorFunction<T.R> {
  return operate((source, subscriber) = > {
    let index = 0;
    source.subscribe(
      new OperatorSubscriber(subscriber, (value: T) = >{ subscriber.next(project.call(thisArg, value, index++)); })); }); }Copy the code

Operate is a common way to create operators inside RXJS. Many operators operate inside RXJS by calling Observable’s Lift method, which facilitates chain calls between operators. OperatorSubscriber is just a Subscriber, no more

Call (thisArg, value, index++). Project is the function argument that the map passes in. This function accepts two arguments. The first parameter is the value sent by the original Observable. The second parameter records how many times (starting from 0) the Observable sends the current value. The result after map processing is sent to subscriber

As you can see, index, the second argument to the map callback, is initialized outside subscribe, so its value increases each time we subscribe

scan

It is similar to Reduce and throws the current cumulative value on each update

of(1.2.3).pipe(
  scan((total, c) = > total + c, 0)
).subscribe(console.log)
/ / 1
/ / 3
/ / 6
Copy the code

Scan calls scanInternals internally, so look at this directly

// /rxjs/src/internal/operators/scanInternals.ts
let hasState = hasSeed;
let state: any = seed;
let index = 0;
source.subscribe(
  createOperatorSubscriber(
    subscriber,
    (value) = > {
      const i = index++;
      state = hasState
        ? // We already have state, so we can get the new state from the accumulator
          accumulator(state, value, i)
        : // We didn't have state yet, a seed value was not provided, so

          // we set the state to the first value, and mark that we have state now
          ((hasState = true), value);

      // Maybe send it to the consumer.
      emitOnNext && subscriber.next(state);
    },
    // ..));Copy the code

ScanInternals maintains two variables, state and index, respectively. The user stores the current accumulated value and the current accumulated times (starting from 0). Each time the user subscribes, state is added to the new value and index increases

switchMap

Map the upstream Observable to another Observables, and unsubscribe from the previous mapped data stream every time it connects to the upstream data stream

timer(0.2000).pipe(
  map(() = > interval(500)),
  mergeAll(),
).subscribe(console.log)
/ / 0,1,2,3... 0,1,2,3...
Copy the code

Every 2000ms, a new data stream is generated upstream, which is converted to another data stream by switchMap and unsubscribes to the previous data stream, so the above code loops 0, 1, 2, 3, 4

If you do not unsubscribe to the previous data stream each time a new data stream is generated, multiple data streams will exist at the same time, and the output result will be a mixture of multiple data stream subscriptions. This feature is the main difference between switchMap and other leveling operators

// /rxjs/src/internal/operators/switchMap.tsinnerSubscriber? .unsubscribe();let innerIndex = 0;
const outerIndex = index++;
// Start the next inner subscription
innerFrom(project(value, outerIndex)).subscribe(
  (innerSubscriber = createOperatorSubscriber(
    subscriber,
    (innerValue) = > subscriber.next(resultSelector ? resultSelector(value, innerValue, outerIndex, innerIndex++) : innerValue),
    () = > {
      innerSubscriber = null! ; checkComplete(); })));Copy the code

The innerSubscriber is the data stream from the previous subscription. Each time a new data stream is converted, the innerSubscriber is called. .unsubscribe(); InnerFrom is the method that generates an Observable. The logic to cancel and regenerate is just a few lines of code

Filtering Operators

take

Sets the value to receive only N events from Observable. When the subscription starts, if the number of events that have occurred has reached the number we set, the current data stream will end

interval(500).pipe(
  take(3)
).subscribe(console.log)
/ / 0
/ / 1
/ / 2
Copy the code
// /rxjs/src/internal/operators/take.ts
export function take<T> (count: number) :MonoTypeOperatorFunction<T> {
  return count <= 0
      () => EMPTY
    : operate((source, subscriber) = > {
        let seen = 0;
        source.subscribe(
          createOperatorSubscriber(subscriber, (value) = > {
            if (++seen <= count) {
              subscriber.next(value);
              if(count <= seen) { subscriber.complete(); }}})); }); }// /rxjs/src/internal/observable/empty.ts
export const EMPTY = new Observable<never> ((subscriber) = > subscriber.complete());
Copy the code

When set times less than 0, immediately end (complete) the current data stream, this is a boundary judgment

Otherwise, a variable called “seen” will count, increasing each time before next. When count <= seen, we have executed our preset number, ending the current data stream

Join Operators

mergeAll

MergeAll is similar to mergeMap in that it actually calls mergeMap from within mergeAll. The difference is that upstream data flows as they pass through mergeMap are further processed by predefined functions passed in from mergeMap. Get new data results, and then let the data results flow down; MergeAll, on the other hand, only performs compatibility processing for upstream data, and does not perform additional data processing. In fact, mergeMap can completely replace mergeAll. At the source level, mergeAll also calls mergeMap, which is a further function encapsulation

It’s hard to understand the concept, but it’s a little bit clearer when you look at examples

of(1).pipe(
  map(v= > v)
).subscribe(console.log)
/ / 1
Copy the code

The output from the above code is obviously 1, now change it

of(1).pipe(
  map(v= > of(v))
).subscribe(console.log)
/ / Observable2 {_subscribe: ƒ}
Copy the code

The output is an Observable, and to get the value of that Observable, you subscribe again

of(1).pipe(
  map(v= > of(v)),
).subscribe(v= > v.subscribe(console.log))
Copy the code

And with mergeAll

of(1).pipe(
  map(v= > of(v)),
  mergeAll()
).subscribe(console.log)
/ / 1
Copy the code

You don’t need to subscribe further, and you can use mergeMap for this example

of(1).pipe(
  map(v= > of(v)),
  mergeMap(v= > v)
).subscribe(console.log)
/ / 1
Copy the code

But mergeMap can do more, for example, by changing the value of the original data

of(1).pipe(
  map(v= > of(v)),
  mergeMap(v= > v.pipe(map(n= > n *2)))
).subscribe(console.log)
/ / 2
Copy the code

The original data was 1, but became 2 after mergeMap processing

MergeAll accepts a parameter that controls the maximum number of Concurrent observables to subscribe to at the same time

of(0.1.2.3).pipe(
  map(v= > of(v).pipe(delay(1000))),
  mergeAll()
)
.subscribe(console.log)
Copy the code

The above code will print 0, 1, 2, 3 at 1000ms after execution

of(0.1.2.3).pipe(
  map(v= > of(v).pipe(delay(1000))),
  mergeAll(2)
)
.subscribe(console.log)
Copy the code

The above code will print 0, 1 after 1000ms, and then wait another 1000ms to print 2, 3 at the same time

Since mergeMap is a complete replacement for mergeAll, mergeMap can be implemented as well

of(0.1.2.3).pipe(
  map(v= > of(v).pipe(delay(1000))),
  mergeMap(v= > v, 2)
)
.subscribe(console.log)
Copy the code

MergeAll calls mergeMap internally, and ultimately mergeInternals

// /rxjs/src/internal/operators/mergeInternals.ts
const outerNext = (value: T) = > (active < concurrent ? doInnerSub(value) : buffer.push(value));
Copy the code

Active < concurrent, the initial value of active is 0, and the doInnerSub(value) is accumulated by 1 every time the value is executed. Concurrent is the mergeAll parameter, and if not, the value is Infinity. That is, if mergeAll is not passed, the expression always executes doInnerSub(value)

DoInnerSub is a method to execute data stream subscription. When the number of observables in execution is greater than concurrent, the following data stream data is temporarily stored in buffer. When any Observables complete execution, active–, Observables are generated and subscribed to the data temporarily stored in the buffer, that is, doInnerSub is executed

// /rxjs/src/internal/operators/mergeInternals.ts
try {
  active--;
  while (buffer.length && active < concurrent) {
    constbufferedValue = buffer.shift()! ;if (innerSubScheduler) {
      executeSchedule(subscriber, innerSubScheduler, () = > doInnerSub(bufferedValue));
    } else {
      doInnerSub(bufferedValue);
    }
  }
  checkComplete();
} catch (err) {
  subscriber.error(err);
}
Copy the code

concatAll

ConcatAll is actually a higher-order Observable operator of Concat. That is, concatAll doesn’t need to receive streams as parameters one by one and then merge them. It just receives upstream streams in pipe

// /rxjs/src/internal/observable/concat.ts
export function concatAll<O extends ObservableInput<any> > () :OperatorFunction<O.ObservedValueOf<O>> {
  return mergeAll(1);
}
Copy the code

ConcatAll is mergeAll(1), which means that only one data stream can be subscribed to at a time, and the next data stream can be started only after the last one has finished

  of(0.1.2).pipe(
    map(v= > of(v).pipe(delay(1000))),
    concatAll()
  ).subscribe(console.log)
Copy the code

After the above code is executed, 0 is output for 1s, 1 is output for 1s, and 2 is output for 1s

Utility Operators

tap

Intercepts and modifies the data for each Observable on the source, and passes the observable along

Generally speaking, operator will generate a new Observable after processing upstream data and pass it down, but tap is different. Although it also modifies the data stream, it does not generate a new Observable. It only modifies upstream Observables. It then passes the modified Observable along, so there are side effects

Tap receives the same argument as Observable. subscribe, so you can think of it as a subscription operation

interval(500).pipe(
  tap(console.log),
  map(n= >  n % 2 ? 'even' : 'odd')
).subscribe(console.log)
/ / 0
// even
/ / 1
// add
// ...
Copy the code
// /rxjs/src/internal/operators/tap.ts
source.subscribe(
  createOperatorSubscriber(
    subscriber,
    (value) = >{ tapObserver.next? .(value); subscriber.next(value); },() = > {
      isUnsub = false;
      tapObserver.complete?.();
      subscriber.complete();
    },
    (err) = > {
      isUnsub = false; tapObserver.error? .(err); subscriber.error(err); },() = > {
      if(isUnsub) { tapObserver.unsubscribe? . (); } tapObserver.finalize? . (); }));Copy the code

TapObserver performs whatever operation subscriber performs, and tapObserver performs before Subscriber. The external can not get subscriber, but the external can control tapObserver, so it is an interception processing

Mathematical and Aggregate Operators

reduce & max

Reduce and SCAN are similar in that they call scanInternals but have different parameter values. The only difference is that Scan will emit the current cumulative value in each iteration, while Reduce will emit the current cumulative value only when the data stream is complete. Only then will the final cumulative value be emitted. Therefore, when using Reduce, data flow must be complete, otherwise reduce will not produce results

Max is similar to math.max and is used to find the largest data value in the current data stream, although the concept of maximum actually depends on the comparer function you pass in using reduce

// /rxjs/src/internal/operators/max.ts
export function max<T> (comparer? : (x: T, y: T) =>number) :MonoTypeOperatorFunction<T> {
  return reduce(isFunction(comparer) ? (x, y) = > (comparer(x, y) > 0 ? x : y) : (x, y) = > (x > y ? x : y));
}
Copy the code
interval(100).pipe(
  map(() = > of(Math.random())),
  mergeAll(),
  take(5),
  max((a, b) = > a < b ? -1 : 1),
).subscribe(console.log)
/ / 0.9138789699353005
Copy the code

summary

Operator is one of the most frequently used APIS in RXJS, and understanding how it works helps us implement customized operators on our own