Rxjs operator field guide

1. Tool method type

count

  • The total number of statistics
import { range } from 'rxjs';
import { count } from 'rxjs/operators';

const numbers = range(1, 7);
const result = numbers.pipe(count(i => i % 2 === 1));
result.subscribe(x => console.log(x));
// Results in: / / 4Copy the code

reduce

  • The cumulative
import { fromEvent, interval } from 'rxjs';
import { reduce, takeUntil, mapTo } from 'rxjs/operators';

const clicksInFiveSeconds = fromEvent(document, 'click').pipe(
  takeUntil(interval(5000)),
);
const ones = clicksInFiveSeconds.pipe(mapTo(1));
const seed = 0;
const count = ones.pipe(reduce((acc, one) => acc + one, seed));
count.subscribe(x => console.log(x));
Copy the code

max\min

import { of,merge } from 'rxjs';
import { max,min,tap } from 'rxjs/operators';

const obs$ = of(5, 4, 7, 2, 8);
merge(
    obs$.pipe(max()),
    obs$.pipe(min()),
).pipe(tap((val) => {
    console.log("result....",val);
})).subscribe(console.log);

//output
result.... 8
8
result.... 2
2

Copy the code

tap

The log output of is printed one by one

import { of,merge } from 'rxjs';
import { max,min,tap } from 'rxjs/operators';

const obs$ = of(5, 4, 7, 2, 8);
obs$.pipe(tap({
    next:(val) => {
        console.log("val",val);
    },
    error:() => {

    },
    complete:() => {

    }
})).subscribe(console.log)

//output
val 5
5
val 4
4
val 7
7
val 2
2
Copy the code

delay

  • Delay execution, but ignore error?
  import { of } from 'rxjs';
  import { tap, delay } from 'rxjs/operators'; Const $= of obs ([1, 2]); obs$.pipe(tap(res => { console.log("get value from of....",new Date().toLocaleTimeString());
  }),delay(2000),tap(() => { # delay execution, which can also become random delay. delayWhen(event => interval(Math.random() * 5000))
      console.log("get value from of....",new Date().toLocaleTimeString());
  })).subscribe(res => {
      console.log("of res... ; .",res);
  });

  //output
  get value from of.... 7:52:27 AM
  get value from of.... 7:52:29 AM
  of res...;. [ 1, 2 ]
Copy the code

delayWhen

import { interval, timer } from 'rxjs';
import { delayWhen } from 'rxjs/operators'; const message = interval(1000); const delayForFiveSeconds = () => timer(5000); const delayWhenExample = message.pipe(delayWhen(delayForFiveSeconds)); const subscribe = delayWhenExample.subscribe(val => console.log(val)); //output 5s delay.... 0 1 2Copy the code

repeat

Repeat === multiple subscriptions

import { tap } from 'rxjs/operators';
// RxJS v6+
import { repeat, delay } from 'rxjs/operators';
import { of } from 'rxjs';

const delayedThing = of('delayed value').pipe(
        tap(() => {
            console.log("time.. 1.",new Date().toLocaleTimeString());
        }),
        delay(2000)
        );

delayedThing
  .pipe(
    tap(() => {
        console.log("time... 2. "",new Date().toLocaleTimeString()); }), repeat(3) ) .subscribe(console.log); //output time.. 1. 4:42:45 PM time... 2 4:42:47 PM delayed value time.. 1. 4:42:47 PM time... 2 4:42:49 PM delayed value time.. 1. 4:42:49 PM time... 2 4:42:51 PM delayed valueCopy the code

SubscribeOn, observeOn

  • Adjust the timing of execution,
import { of, merge } from 'rxjs';

const a = of(1, 2, 3, 4);
const b = of(5, 6, 7, 8, 9);
merge(a, b).subscribe(console.log);
// 1 2 3 4 5 6 7 8 9


import { of, merge, asyncScheduler } from 'rxjs';
import { subscribeOn } from 'rxjs/operators';

const a = of(1, 2, 3, 4).pipe(subscribeOn(asyncScheduler));
const b = of(5, 6, 7, 8, 9);
merge(a, b).subscribe(console.log);
//5 6 7 8 9 1 2 3 4


import { interval } from 'rxjs';
import { observeOn } from 'rxjs/operators'; const intervals = interval(10); // Intervals are scheduled // with async scheduler by default... intervals.pipe( observeOn(animationFrameScheduler), // ... but we will observe on animationFrame ) // scheduler to ensure smooth animation. .subscribe(val => { someDiv.style.height = val +'px';
});
Copy the code

materialize

  • With the default object package box, deMaterialize unboxes
import { of } from 'rxjs';
import { materialize, map } from 'rxjs/operators';

const letters = of('a'.'b'.'13'.'d');
const upperCase = letters.pipe(map(x => x.toUpperCase()));
const materialized = upperCase.pipe(materialize());
materialized.subscribe(x => console.log(x));

Notification { kind: 'N', value: 'A', error: undefined, hasValue: true }
Notification { kind: 'N', value: 'B', error: undefined, hasValue: true }
Notification { kind: 'N', value: '13', error: undefined, hasValue: true }
Notification { kind: 'N', value: 'D', error: undefined, hasValue: true }
Notification { kind: 'C', value: undefined, error: undefined, hasValue: false }
Copy the code

timestamp

  • Add a timestamp
 import { of } from 'rxjs';
 import { materialize, map, timestamp, tap } from 'rxjs/operators';

 const letters = of('a'.'b'.'13'.'d');

 const times = letters.pipe(timestamp());
 times.subscribe(res => {
     console.log("res...",res)
 });

//output 
 res... Timestamp { value: 'a', timestamp: 1594074567694 }
 res... Timestamp { value: 'b', timestamp: 1594074567700 }
 res... Timestamp { value: '13', timestamp: 1594074567700 }
 res... Timestamp { value: 'd', timestamp: 1594074567700 }
Copy the code

The timeout, timeInterval

  • TimeInterval Prints the object, and timeout prints the value
 import { timeInterval, timeout } from "rxjs/operators";
 import { interval } from "rxjs";

 const seconds = interval(1000);

 seconds.pipe(timeInterval())
 .subscribe(
     value => {
         console.log("time.....");
         console.log(value)
     },
     err => console.log(err),
 );

 seconds.pipe(timeout(1100)) # 900 is ignored if the interval is less than 1000
 .subscribe(
     value => {
         console.log("out.....");
         console.log(value)
     },
     err => console.log(err),
 );

//output
 time.....
 TimeInterval { value: 0, interval: 1007 }
 out.....
 0
 time.....
 TimeInterval { value: 1, interval: 1005 }
 out.....
 1

Copy the code

timeoutWith

import { interval } from 'rxjs';
import { timeoutWith } from 'rxjs/operators'
import { of } from 'rxjs';

const first$ = interval(3000);
const second$ = of('go to the default');
first$.pipe(timeoutWith(2000,second$)).subscribe(console.log) Data must be retrieved within 2s, otherwise use the default value
//output
go to the default
Copy the code

toArray

 import { interval } from 'rxjs';
 import { toArray, take } from 'rxjs/operators';

 const source = interval(1000);
 const example = source.pipe(
   take(10),
   toArray()
 );

 const subscribe = example.subscribe(val => console.log(val));

 // output: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Copy the code

2. The creative

Of – Single output

Synchronizing data generation

import { of } from 'rxjs'; Of ([1,2,3]).subscribe(next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end')); // result: //'next: [1, 2, 3]'
Copy the code

From – Split the output

Synchronizing data generation

import { from, asyncScheduler } from 'rxjs';

console.log('start');

const array = [10, 20, 30];
const result = from(array, asyncScheduler);

result.subscribe(x => console.log(x));

console.log('end');

// Logs:
// start
// end
// 10
// 20
// 30
Copy the code

ajax

Asynchronously generating data

  • ajax
import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
import { of } from 'rxjs';

const obs$ = ajax(`https://api.github.com/users?per_page=5`).pipe(
  map(userResponse => console.log('users: ', userResponse)),
  catchError(error => {
    console.log('error: ', error);
    returnof(error); }));Copy the code
  • getJson
 import { ajax } from 'rxjs/ajax';
 import { map, catchError } from 'rxjs/operators';
 import { of } from 'rxjs';

 const obs$ = ajax.getJSON(`https://api.github.com/users?per_page=5`).pipe(
   map(userResponse => console.log('users: ', userResponse)),
   catchError(error => {
     console.log('error: ', error);
     returnof(error); }));Copy the code
  • Class jquery writing
 import { ajax } from 'rxjs/ajax';
 import { of } from 'rxjs';

 const users = ajax({
   url: 'https://httpbin.org/delay/2',
   method: 'POST',
   headers: {
     'Content-Type': 'application/json'.'rxjs-custom-header': 'Rxjs'
   },
   body: {
     rxjs: 'Hello World! '
   }
 }).pipe(
   map(response => console.log('response: ', response)),
   catchError(error => {
     console.log('error: ', error);
     returnof(error); }));Copy the code

interval

Asynchronously generating data

 import { interval } from 'rxjs';
 import { take } from 'rxjs/operators';

 const numbers = interval(1000);

 const takeFourNumbers = numbers.pipe(take(4));

 takeFourNumbers.subscribe(x => console.log('Next: ', x));

 // Logs:
 // Next: 0
 // Next: 1
 // Next: 2
 // Next: 3
Copy the code

3. The type conversion

mergeMap

import { of, interval } from 'rxjs';
import { mergeMap, map } from 'rxjs/operators';

const letters = of('a'.'b'.'c');
const result = letters.pipe(
  mergeMap(x => interval(1000).pipe(map(i => x+i))),
);
result.subscribe(x => console.log(x));

//output
a0
b0
c0
a1
b1
c1

Copy the code

concatMap

import { of } from 'rxjs';
import { concatMap, delay, mergeMap } from 'rxjs/operators';

const source = of(2000, 1000);
// map value from source into inner observable, when complete emit result and move to next
const example = source.pipe(
  concatMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
);
const subscribe = example.subscribe(val =>
  console.log(`With concatMap: ${val}`)); const mergeMapExample =source
  .pipe(
    // just so we can log this after the first example has run
    delay(5000),
    mergeMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
  )
  .subscribe(val => console.log(`With mergeMap: ${val}`)); //output // With concatMap: Delayed by: 2000ms // With concatMap: Delayed by: 1000ms // With mergeMap: Delayed by: 1000ms 1s short delay, early output // With mergeMap: Delayed by: 2000msCopy the code

switchMap

// RxJS v6+
import { timer, interval, of } from 'rxjs';
import { switchMap, tap } from 'rxjs/operators';


console.log("time 0....",new Date().toLocaleTimeString()); Timer (1000,4000).pipe(// delay a time to generate an incremented integer tap(() => {console.log()"time 1....",new Date().toLocaleTimeString());
 }),
 switchMap(
    _ => interval(1000).pipe(tap((rs)=> {console.log('inner value.... ',rs)})), // specific interval, Create an increasing integer (outerValue, innerValue, outerIndex, innerIndex) => ({outerValue, innerValue, outerIndex, innerIndex }) ) ).subscribe((res) => { console.log("final res....",res);
});

//output
time 0.... 5:21:59 PM
time 1.... 5:22:00 PM
inner value.... 0
final res.... { outerValue: 0, innerValue: 0, outerIndex: 0, innerIndex: 0 }
inner value.... 1
final res.... { outerValue: 0, innerValue: 1, outerIndex: 0, innerIndex: 1 }
inner value.... 2
final res.... { outerValue: 0, innerValue: 2, outerIndex: 0, innerIndex: 2 }
time 1.... 5:22:04 PM
inner value.... 0
final res.... { outerValue: 1, innerValue: 0, outerIndex: 1, innerIndex: 0 }

Copy the code

exhaustMap

// RxJS v6+
import { interval } from 'rxjs';
import { exhaustMap, tap, take } from 'rxjs/operators';

const firstInterval = interval(1000).pipe(take(10));
const secondInterval = interval(1000).pipe(take(2));

const exhaustSub = firstInterval
  .pipe(
    exhaustMap(f => {
      console.log(`Emission Corrected of first interval: ${f}`);
      returnsecondInterval; Subscribe (val => console.log(val)); //output Emission Corrected of first interval: 0 0 1 Emission Corrected of first interval: 2 0 1 Emission Corrected of first interval: 4 0 1 Emission Corrected of first interval: 6 0 1 Emission Corrected of first interval: 8 0 1Copy the code

mapTo

  • Modify the value
import { fromEvent } from 'rxjs';
import { mapTo } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const greetings = clicks.pipe(mapTo('Hi'));
greetings.subscribe(x => console.log(x));
Copy the code

map

import { combineLatest, of } from 'rxjs';
import { map, tap } from 'rxjs/operators';
 
const weight = of(70, 72, 76, 79, 75);
const height = of(1.76, 1.77, 1.78,1.8);
const bmi = combineLatest(weight, height).pipe(
  tap(([w,h]) => {
      console.log(`w:${w},h:${h}`);
  }),
  map(([w, h]) => w / (h * h)),
);
bmi.subscribe(x => console.log('BMI is '+ x)); // With output to console: W :75,h:1.76 BMI is 24.212293388429753 W :75,h:1.77 BMI is 23.93948099205209 W :75,h:1.78 BMI is 23.671253629592222 W: 75, h: 1.8 BMI is 23.148148148148145Copy the code

4. Joint

combineLatest

Application scenario: Generates new conclusions based on multiple inputs

Data consolidation is handed over downstream
import { timer, combineLatest } from 'rxjs';
​
// timerOne emits first value at 1s, then once every 4s
const timerOne$ = timer(1000, 4000);
// timerTwo emits first value at 2s, then once every 4s
const timerTwo$ = timer(2000, 4000);
// timerThree emits first value at 3s, then once every 4s
const timerThree$ = timer(3000, 4000);
​
// when one timer emits, emit the latest values from each timer as an array
combineLatest(timerOne$, timerTwo$, timerThree$).subscribe(
 ([timerValOne, timerValTwo, timerValThree]) => {
   /*
     Example:
   timerThree first tick: 'Timer One Latest: 0, Timer Two Latest: 0, Timer Three Latest: 0 timerOne second tick: 'Timer One Latest: 1, Timer Two Latest: 0, Timer Three Latest: 0
   timerTwo second tick: 'Timer One Latest: 1, Timer Two Latest: 1, Timer Three Latest: 0 */ console.log( `Timer One Latest: ${timerValOne}, Timer Two Latest: ${timerValTwo}, Timer Three Latest: ${timerValThree}` ); });Copy the code
Consolidate the data and hand it downstream
// RxJS v6+
import { timer, combineLatest } from 'rxjs';
​
const timerOne$ = timer(1000, 4000);
const timerTwo$ = timer(2000, 4000);
const timerThree$ = timer(3000, 4000);
​
combineLatest(
 timerOne$,
 timerTwo$,
 timerThree$,
 // combineLatest also takes an optional projection function
 (one, two, three) => {
   return `Timer One (Proj) Latest: ${one}, 
             Timer Two (Proj) Latest: ${two}, 
             Timer Three (Proj) Latest: ${three}`;
 }
).subscribe(console.log);
Copy the code

withLatestFrom

Applies to: multiple input sources, but only one dominant one

import { timeInterval, timeout, withLatestFrom } from "rxjs/operators";
import { interval, of } from "rxjs";

const seconds = interval(1000);

const first = interval(500);

const obs$ = first.pipe(withLatestFrom(seconds));
obs$.subscribe(res => {
    console.log("res...",res); }); //output res... [ 1, 0 ] res... [ 2, 0 ] res... [ 3, 1 ] res... [ 4, 1 ] res... [ 5, 2 ] res... [ 6, 2 ] res... [ 7, 3 ] res... [ 8, 3 ] res... [9, 4)Copy the code

concat

Suitable for: first come, first served, queue data processing

The first one doesn’t end, the second one never has a chance
// RxJS v6+
import { interval, of, concat } from 'rxjs';
​
// when source never completes, any subsequent observables never run
concat(interval(1000), of('This'.'Never'.'Runs')) / /log: 1, 2, 3, 4... .subscribe(console.log);Copy the code
The countdown
// RxJS v6+
import { concat, empty } from 'rxjs';
import { delay, startWith } from 'rxjs/operators';
​
// elems
const userMessage = document.getElementById('message');
// helper
const delayedMessage = (message, delayedTime = 1000) => {
  return empty().pipe(startWith(message), delay(delayedTime));
};
​
concat(
  delayedMessage('Get Ready! '),
  delayedMessage(3),
  delayedMessage(2),
  delayedMessage(1),
  delayedMessage('Go! '),
  delayedMessage(' ', 2000)
).subscribe((message: any) => (userMessage.innerHTML = message));
Copy the code

merge

First come, first output, no matter where it is written, one by one, not an array

import { of, merge, concat } from 'rxjs';
import { mapTo, delay, concatAll, mergeAll } from 'rxjs/operators';

//emit one item
const example = of(null);

merge(
  example.pipe(mapTo('Hello --- 1')),
  example.pipe(mapTo('World1!--- 1'),delay(1300)),
  example.pipe(mapTo('Goodbye --- 1'),delay(500)),
  example.pipe(mapTo('World! 2-1 '),delay(300))
).subscribe(val => console.log(val));;

//output
Hello --- 1
World!2 -- 1
Goodbye --- 1
World1!--- 1
Copy the code

startWith

Applicable to: Adding specific data

Add a single front data – Hello world
// RxJS v6+
import { startWith, scan } from 'rxjs/operators';
import { of } from 'rxjs';
​
//emit ('World! '.'Goodbye'.'World! ')
const source = of('World! '.'Goodbye'.'World! ');
//start with 'Hello', concat current string to previous
const example = source.pipe(
  startWith('Hello'),
  scan((acc, curr) => `${acc} ${curr}`)); /* output:"Hello"
  "Hello World!"
  "Hello World! Goodbye"
  "Hello World! Goodbye World!"
*/
const subscribe = example.subscribe(val => console.log(val));
Copy the code
Add multiple front data
// RxJS v6+
import { startWith } from 'rxjs/operators';
import { interval } from 'rxjs';
​
//emit values in sequence every 1s
const source = interval(1000);
//start with -3, -2, -1
const example = source.pipe(startWith(-3, -2, -1));
//output: -3, -2, -1, 0, 1, 2....
const subscribe = example.subscribe(val => console.log(val));
Copy the code

5. Type filter

filter

import { from } from 'rxjs';
import { filter } from 'rxjs/operators';

const source = from([
  { name: 'Joe', age: 31 },
  { name: 'Bob', age: 25 } ]); const example = source.pipe(filter((person,index) => { const res = person.age >= 30; console.log(`person info..... `,person,index);returnres; })); const subscribe = example.subscribe(val => console.log(`final result Over 30:${val.name}`));

//output
person info..... { name: 'Joe', age: 31 } 0
final result Over 30: Joe
person info..... { name: 'Bob', age: 25 } 1
Copy the code

6. Condition determination

every

  • Each one needs to satisfy the condition to be true
import { of } from 'rxjs';
import { every } from 'rxjs/operators';

of(1, 2, 3, 4, 5, 6).pipe(
    every(x => x < 5),
)
.subscribe(x => console.log(x)); // -> false
Copy the code

The find, findIndex

  • Just find the first one that meets the criteria
import { fromEvent } from 'rxjs';
import { find } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const result = clicks.pipe(find(ev => ev.target.tagName === 'DIV'));
result.subscribe(x => console.log(x));
Copy the code

isEmpty

  • Checks whether Observable is empty
import { Subject } from 'rxjs';
import { isEmpty } from 'rxjs/operators';

const source = new Subject<string>();
const result = source.pipe(isEmpty());
source.subscribe(x => console.log(x));
result.subscribe(x => console.log(x));
source.next('a');
source.next('b');
source.next('c');
source.complete();

// Results in:
// a
// false
// b
// c
Copy the code

iif

import { iif, of, interval } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

const r$ = of('R');
const x$ = of('X');

interval(1000)
  .pipe(mergeMap(v => iif(() => v % 4 === 0, r$, x$)))
  .subscribe(console.log);
//output 
R
X
R
X
Copy the code

defaultIfEmpty

import { defaultIfEmpty } from 'rxjs/operators';
import { empty } from 'rxjs';
import { of, merge } from 'rxjs';

const exampleOne = of().pipe(defaultIfEmpty('Observable.of() Empty! '));
const example = empty().pipe(defaultIfEmpty('Observable.empty()! '));

merge(
    example,
    exampleOne
).subscribe(console.log);

//output 
Observable.empty()!
Observable.of() Empty!
Copy the code

7. Exception handling

catchError

A single treatment
import { throwError, of } from 'rxjs';
import { catchError } from 'rxjs/operators';
//emit error
const source = throwError('This is an error! ');
//gracefully handle error, returning observable with error message
const example = source.pipe(catchError(val => of(`I caught: ${val}`)));
//output: 'I caught: This is an error'
const subscribe = example.subscribe(val => console.log(val));
Copy the code
Integrate other Operators
import { throwError, fromEvent, of } from 'rxjs';
import {
 catchError,
 tap,
 switchMap,
 mergeMap,
 concatMap,
 exhaustMap
} from 'rxjs/operators';
​
const fakeRequest$ = of().pipe(
 tap(_ => console.log('fakeRequest')),
 throwError
);
​
const iWillContinueListening$ = fromEvent(
 document.getElementById('continued'),
 'click'
).pipe(
 switchMap(_ => fakeRequest$.pipe(catchError(_ => of('keep on clicking!!! '))))); const iWillStopListening$ = fromEvent( document.getElementById('stopped'),
 'click'
).pipe(
 switchMap(_ => fakeRequest$),
 catchError(_ => of('no more requests!!! '))); iWillContinueListening$.subscribe(console.log); iWillStopListening$.subscribe(console.log);Copy the code

retry

Setting retry Times
// RxJS v6+
import { interval, of, throwError } from 'rxjs';
import { mergeMap, retry } from 'rxjs/operators';

//emit value every 1s
const source = interval(1000);
const example = source.pipe(
 mergeMap(val => {
   //throw error for demonstration
   if (val > 5) {
     return throwError('Error! ');
   }
   return of(val);
 }),
 //retry 2 timeson error retry(2) ); /* output: 0.. 1.. 2.. 3.. 4.. 5.. 0.. 1.. 2.. 3.. 4.. 5.. 0.. 1.. 2.. 3.. 4.. 5.."Error! : Retried 2 times then quit!"
*/
const subscribe = example.subscribe({
 next: val => console.log(val),
 error: val => console.log(`${val}: Retried 2 times then quit!`)
});
Copy the code

retryWhen

delayWhen
// RxJS v6+
import { timer, interval } from 'rxjs';
import { map, tap, retryWhen, delayWhen } from 'rxjs/operators';

//emit value every 1s
const source = interval(1000);
const example = source.pipe(
 map(val => {
   if (val > 5) {
     //error will be picked up by retryWhen
     throw val;
   }
   return val;
 }),
 retryWhen(errors =>
   errors.pipe(
     //log error message
     tap(val => console.log(`Value ${val}was too high! `)), //restartin 6 seconds
     delayWhen(val => timer(val * 1000))
   )
 )
);
/*
 output:
 0
 1
 2
 3
 4
 5
 "Value 6 was too high!"
 --Wait 6 seconds then repeat
*/
const subscribe = example.subscribe(val => console.log(val));
Copy the code

8. Custom types

reference

Rxjs