preface

Good programmers know how to get away from repetitive work:

- Jquery was found while manipulating DOM. - Lodash was found while working with JS. - Rx was found while manipulating the event.Copy the code

The concept of Rxjs itself is not complicated, but simply encapsulates the Observer pattern, which is very popular in the front end, whether you use frameworks or native JS, you must have experienced it.

In my opinion, the power and difficulty of Rxjs lies in the flexible use of its nearly 120 operators.

Unfortunately, the description of these operators in the official website is very difficult to understand, which leads to many people who clearly understand the concepts of Rxjs, but are struggling to use the operators.

The purpose of this article is to explain the functions of common Rxjs operators in the most concise and easy to understand way. Learning at the same time, can also be used as a quick reference to the index list.

Read the reminder

  • The concept of flow

  • Subscribe: callsubscribe
  • 3. To exhale.next
  • Done: invokecomplete

Watch out for the completion and subscription times of the stream; some operators must wait for the stream to complete before firing.

If an operator needs all the data it needs to make a judgment, it must wait until the stream is complete.

Create the stream operator

Creating a stream operator is the starting point of a stream. There is no complexity and there is only a simple classification here. For details, please refer to the official website.

Synchronized flow

  • Create: new observables
  • of
  • range
  • repeat
  • empty
  • never
  • throw
  • generate

Asynchronous flow

  • interval/timer
  • Form: string/number/ array/class array/Promise /generator
  • formPromise
  • formEvent
  • formEventPattern
  • ajax
  • repeatWhen
  • Defer: Create when you subscribe

Merge class operators

Subscribe to multiple streams and spit the received data down.

concat

The front

  • Subscribe sequentially: the previous stream completes and then subscribes to subsequent streams.

  • The concat stream ends when the stream is complete.

    concat(source1$, source2$)
Copy the code

merge

First come first served basis

  • After subscribing to all streams and any stream spews data, the Merge stream spews data.

  • It makes sense for asynchronous data.

  • The merge flow ends when the flow is complete.

    merge(source1$, source2$)
Copy the code

zip

One to one merge (like a zipper)

  • Subscribe to all streams, wait for all streams to fire I times, and pass the i-th data into an array.

  • After one stream completes, wait for the same amount of data to arrive from the other stream to complete the ZIP stream.

  • When we use ZIP, we expect the first data to be the first data sent by all streams, and the second data to be the second data sent by all streams.

    zip(source1$, source2$)
Copy the code

combineLatest

Merges the last data of all streams

  • Subscribe to all streams, and when any stream is triggered, the final value of all other streams is merged.

  • Because you want to get the last values of the other streams, you initially have to wait for all the streams to spit out values before you can start passing data down.

  • The combineLatest stream is not complete until all streams are complete.

    combineLatest(source1$, source2$)
Copy the code

withLatestFrom

Merges the last data of all streams, functions like combineLatest, except that:

  • CombineLatest: When all streams are ready, any stream triggering data will result in data being emitted downward.

  • WithLatestFrom: When all streams are ready (they all have a final value), only the stream calling withLatestFrom spits data down, other streams only record the last value when triggered.

    source1$.pipe(withLatesFrom(source2$, source3$))
Copy the code

race

The winner winner-take-all

  • Subscribe to all streams and unsubscribe the other streams when the first stream is triggered.
    race(source1$, source2$)
Copy the code

startWith

Populate the data in front of the stream

    source1$.pipe(startWith(1))
Copy the code

forkJoin

Merges the last data of all streams

  • Subscribe to all streams, wait for all streams to complete, fetch the last value of all streams and send down.
    forkJoin(source1$, source2$)
Copy the code

Auxiliary class operators

count

After the current stream completes, count how many data was sent by the stream.

source$.pipe(count())
Copy the code

mix/max

After the current flow is complete, calculate the minimum/maximum value.

source$.pipe(max())
Copy the code

reduce

Like an array, after the current stream completes, all data received is passed into the calculation in turn.

source$.pipe(reduce(() = > {}, 0))
Copy the code

Boolean class operator

every

As with arrays, note that if both conditions are true, the result will not be spit out until the stream completes.

The reason is simple: if the stream is not complete, how can the data condition be true?

source$.pipe(every(() = > true/false))
Copy the code

The find, findIndex

Same array, notice the same thing as every

source$.pipe(find(() = > true/false))
Copy the code

isEmpty

Determine if the stream completed without spitting out any data.

source$.pipe(isEmpty())
Copy the code

defaultIfEmpty

If the stream satisfies isEmpty, spit out the default value.

source$.pipe(defaultIfEmpty(1))
Copy the code

Filter class operators

filter

With an array of

source$.pipe(filter(() = > true/false))
Copy the code

first

Take the first data that satisfies the condition, or if no condition is passed in, take the first data

source$.pipe(first(() = > true/false))
Copy the code

last

Take the first data that meets the condition, and if no condition is passed in, take the last data, and the stream will not fire until it is complete.

source$.pipe(last(() = > true/false))
Copy the code

take

Get the first N and you’re done

source$.pipe(take(N))
Copy the code

takeLast

When the stream is finished, the data will be sent only once the stream is finished.

source$.pipe(takeLast(N))
Copy the code

takeWhile

Pass me the judgment function, and you decide when it ends

source$.pipe(takeWhile(() = > true/false))
Copy the code

takeUntil

Give me A stream (A), and when that stream (A) spits out data, I’m done

source$.pipe(takeUntil(timer(1000)))
Copy the code

skip

Skip the first N data

source$.pipe(skip(N))
Copy the code

skipWhile

Pass me the function. Skip the first few

source$.pipe(skipWhile(() = > true/false))
Copy the code

skipUntil

Give me A stream (A), and when does this stream (A) spit out data and I stop jumping

source$.pipe(skipUntil(timer(1000)))
Copy the code

Convert class operators

map

  • Accepts the value passed in from upstream and returns another value to downstream. (It makes no sense if you also return upstream values.)

source$.pipe(map(() = > {}))
Copy the code

mapTo

  • Passes the incoming value downstream.

source$.pipe(mapTo('a'))
Copy the code

pluck

  • Extract a key from the upstream spit object and pass it downstream.

source$.pipe(pluck('v'))
Copy the code

Lossy backpressure control

If you do not know about anti-shake and throttling, please refer to relevant instructions.

throttle

A stream (A) is passed in, and the upstream data is throttled until the stream (A) discharges the data, and the process is repeated

source$.pipe(throttle(interval(1000)))
Copy the code

throttleTime

Throttle according to time (ms)

source$.pipe(throttleTime(1000))
Copy the code

debounce

A stream (A) is passed in and the upstream data is buffered until the stream (A) spits out the data, and the process is repeated

source$.pipe(debounce(interval(1000)))
Copy the code

debounceTime

Anti-shake according to time (MS)

source$.pipe(debounceTime(1000))
Copy the code

audit

Audit and throttle differ in:

  • Throttle: Emits the first data received during throttling
  • Audit: Emits the last data received during throttling

source$.pipe(audit(interval(1000)))
Copy the code

auditTime

Same as above, no further elaboration

source$.pipe(auditTime(1000))
Copy the code

sample

  • Normal stream, upstream triggers, downstream receives data.

  • After using SAMPLE, the flow caches the latest data emitted upstream and fetches it from the cache at its own pace.

  • In other words, it doesn’t matter how fast or slow the upstream sends the data. Regardless of sample, he takes the number from the cache at his own pace, and if there is one in the cache, he spits it downstream. If not, don’t do it.

A stream (A) is passed in and the latest data spit out by upstream data is cached until the data is pulled out of the cache by stream (A) and passed down, and the process is repeated

source$.pipe(sample(interval(1000)))
Copy the code

sampleTime

Take the number according to time (ms)

source$.pipe(sampleTime(1000))
Copy the code

distinct

  • A DISTINCT prefix indicates a deduplication operation

All elements are de-duplicated, returning data that has never been present in the current stream.

When a function is passed in, a unique key is assigned based on the return value of the function.

source$.pipe(distinct())
Observable.of({ age: 4.name: 'Foo'}).pipe(distinct((p) = > p.name))
Copy the code

distinctUntilChanged

Adjacent elements are deduplicated and only the data that differs from the previous data is returned.

When a function is passed in, a unique key is assigned based on the return value of the function.

source$.pipe(distinctUntilChanged())
Copy the code

distinctUntilKeyChanged

  • DistinctUntilChanged is a simplified version of distinctUntilChanged that helps you implement the logic for fetching object keys.

source$.pipe(distinctUntilKeyChanged('id'))
Copy the code

ignoreElements

IgnoreElements ignores all upstream data, and when upstream completes, ignoreElements completes. (I don’t care what you do, just tell me if it’s done.)

source$.pipe(ignoreElements())
Copy the code

elementAt

Only the NTH data emitted by upstream data is retrieved.

The second parameter is equivalent to the default: emits this parameter to the downstream when the upstream ends without sending the NTH data.

source$.pipe(elementAt(4.null))
Copy the code

single

  • Check all data upstream, and if only one data meets the criteria, send that data down. Otherwise, an exception is passed down.

source$.pipe(single(() = > true/false))
Copy the code

Lossless back pressure control

  • Buffer prefix: Cache the value into an array and spit it out downstream.
  • Window prefix: Cache the value into a stream and spit it out downstream.

BufferTime, windowTime

Cache upstream spit data, spit to a specified time, and then repeat.

source$.pipe(bufferTime(1000))
Copy the code

BufferCount, windowCount

Cache upstream spit data, spit to a specified number, and then repeat.

The second parameter is used to control the opening of the cache every few data points, which may be more consistent with our perception.

source$.pipe(bufferCount(10))
Copy the code

BufferWhen, windowWhen

Pass in A factory function that returns flow (A)

The process is as follows:

  1. When the subscription is triggered, the factory function is called to fetch the stream (A) and start caching
  2. While waiting for stream (A) to emit data, the cached value is spit down
  3. Call the factory function again, get A new stream (A), turn on caching, and repeat.

randomSeconds = () = > timer(Math.random() * 10000 | 0)
source$.pipe(bufferWhen(randomSeconds))
Copy the code

BufferToggle, windowToggle

The first argument is to turn on the cache stream (O), and the second argument is to return the factory function that turns off the cache stream (C)

The process is as follows:

  1. When the open stream (O) spits out data, the factory function is called to get the closed stream (C), starting the cache
  2. After waiting for the closed stream (C) to spit out the data, the cached value is spit down
  3. Wait for the open flow (O) to spit out the data, and then repeat Step 1

source$.pipe(bufferToggle(interval(1000), () = > randomSeconds))
Copy the code

Buffer, window

Passing in a closed stream (C) differs from bufferWhen: the stream is passed in, not the factory function that returns the stream.

When the subscription is triggered, the cache is started, and when the closed stream (C) spits out data, the cached value is passed down and the cache restarts.

source$.pipe(buffer(interval(1000)))
Copy the code

The accumulated data

scan

The differences between Scan and Reduce are as follows:

  • Reduce: Triggered only after the flow is complete
  • Scan: Triggered every time a stream receives data

Unlike other streams, SCAN has the ability to save and remember state.

source$.pipe(scan(() = > {}, 0))
Copy the code

mergeScan

Same as scan, but returns a stream instead of data.

  • When upstream spits out data, the protocol function is called to get and subscribe to stream (A), pass the data returned by stream (A) downstream, and cache the last data returned by stream (A). When upstream spits out data again, the last cached data is passed to the protocol function, and the cycle repeats.
source$.pipe(mergeScan(() = > interval(1000)))
Copy the code

Error handling

catch

Capture the error

source$.pipe(catch(err= > of('I'.'II'.'III'.'IV'.'V')))
Copy the code

retry

Pass in the number N, and when an error is encountered, re-subscribe upstream and retry N times.

source$.pipe(retry(3))
Copy the code

retryWhen

Incoming stream (A), subscribes to stream (A) when it encounters an error, and retries each time stream (A) spits out data. The stream is complete, as is retryWfhen.

source$.pipe(retryWhen((err) = > interval(1000)))
Copy the code

finally

source$.pipe(finally())
Copy the code

Multicast operator

multicast

Receives the factory function that returns the Subject and returns a Hot Observable (HO)

When the link starts, the subscriber upstream gets the data, calls the factory function to fetch the Subject, and the data spit out by the upstream is multicast through the Subject.

  • The returned HO ownsconnect,refCountMethods.
  • callconnectBefore you actually start subscribing to the top stream and sending out data.
  • callrefCountWill be based onsubscribeAutomatic quantityconnectandunsubscribeOperation.
  • The boss of the multicast operator, relatively low-level design, daily use is not much.
  • Subsequent multicast operators are implemented based on this operator.

source$.pipe(multicast(() = > new Subject()))
Copy the code

publish

  • Encapsulates operations that the multicast operator needs to pass into the Subject factory function, otherwise consistent.

source$.pipe(publish())
Copy the code

share

Publish based encapsulation returns the result of calling refCount (see code)

source$.pipe(share())
/ / is equivalent to
source$.pipe(publish().refCount())
Copy the code

publishLast

When the upstream is complete, multicast the last data upstream and complete the current stream.

source$.pipe(publishLast())
Copy the code

publishReplay

Pass in the cache number N, cache the latest N data upstream, and spit out the cache when there is a new subscription.

  • Upstream will only be subscribed once.

source$.pipe(publishReplay(1))
Copy the code

publishBehavior

Caches the latest data spit out upstream and spit out the latest value when there is a new subscription. If upstream never spit out data when subscribed, the default value passed in is spit out.

source$.pipe(publishBehavior(0))
Copy the code

Higher-order merge class operator

  • Higher-order operators are not higher-order operators
  • When a stream spits out a stream instead of data, it is a higher-order stream, just as a function is called a higher-order function if its return value is still a function
  • Higher-order operators are operators that operate on higher-order streams

In the following code example, the top stream spits out not ordinary data, but two streams that produce data. Then, when receiving the top stream, the downstream needs to subscribe to the upstream stream to obtain data, as follows:

of(of(1, 2, 3), of(4, 5, 6))
	.subscribe(
		ob => ob.subscribe((num) => {
			console.log(num)
		})
	)
Copy the code

The above code simply takes the data out of the stream. What if I want to use the previous operator on the spit stream?

cache = [] of(of(1, 2, 3), of(4, 5, 6)) .subscribe({ next: ob => cache.push(ob), complete: { concat(... cache).subscribe(console.log) zip(... cache).subscribe(console.log) } })Copy the code

Regardless of whether the above implementation is reasonable, we can already use operators on upstream streams, but it is too cumbersome to implement, so Rxjs encapsulates the relevant operators for us to implement the above functionality.

To summarize: higher-order operators operate on streams, while normal operators operate on data.

concatAll

Corresponding to concat, cache each stream emitted by higher-order streams, subscribe in turn, and when all streams are complete, concatAll is completed.

source$.pipe(concatAll())
Copy the code

mergeAll

Corresponding to merge, subscribe to every flow that higher-order flows spit out. Any flow spit out data, and mergeAll spit out data accordingly.

source$.pipe(mergeAll())
Copy the code

zipAll

Corresponding to the ZIP, subscribe to each stream ejected by the higher-order stream, and merge the data of the same index that these streams ejected downward.

source$.pipe(zipAll())
Copy the code

combineAll

Corresponding to combineLatest, subscribe to every stream that comes out of the higher-order stream, and merge the last value of all streams and pass it down.

source$.pipe(combineAll())
Copy the code

Higher-order toggle class operators

switch

Switch flow – Love the new and hate the old

Each time a higher-order stream spits out, it unsubscribes to the previous stream and subscribes to the latest stream.

source$.pipe(switch())
Copy the code

exhaust

Switch streams – Stay together

Subscribe when a higher-order stream spits out a stream. Ignore all flows emitted by higher order flows during this period until the flow is complete. When the stream is complete, wait to subscribe to the next stream that the higher-order stream spits out, repeat.

source$.pipe(exhaust())
Copy the code

Higher-order Map operators

After you look at the examples, you get the definition.

example

Achieve the following functions:

  • mousedownAfter the event is triggered, listenmousemoveThe event

Common implementation

mousedown$ = formEvent(document.'mousedown')
mousemove$ = formEvent(document.'mousemove')

mousedown$.pipe(
	map(() = > mousemove$),
	mergeAll()
)
Copy the code
  1. whenmousedownAfter the event is triggered, usemapOperator to convert downvomited data tomousemoveThe flow of events.
  2. It is required because it returns a stream rather than datamergeAllThe operator helps us expand the data in the stream.
  3. So what we end up with ismousemovetheeventEvent object.

Note: Since there is only one event flow, using any of the higher-order merge operators described above has the same effect.

High-level Map implementation

mousedown$.pipe(
  mergeMap(() = > mousemove$)
)
Copy the code

It is easy to see that the so-called high-level map is

concatMap 	= map + concatAll
mergeMap 		= map + mergeAll
switchMap 	= map + switch
exhaustMap 	= map + exhaust
concatMapTo = mapTo + concatAll
mergeMapTo 	= mapTo + mergeAll
switchMapTo = mapTo + switch
Copy the code

expand

Similar to mergeMap, however, all data that is passed downstream is also passed to itself, so expand is a recursive operator.

source$.pipe(expand(x= > x === 8 ? EMPTY : x * 2))
Copy the code

The data packet

groupBy

The output stream, which classifies the data passed in upstream by key value, creates a stream for each category and passes it downstream.

The key value is controlled by the first function argument.

source$.pipe(groupBy(i= > i % 2))
Copy the code

partition

A simplified version of groupBy, passing judgment criteria, and putting those that meet the criteria into the first stream, and those that don’t meet the criteria into the second stream.

Simple said.

  • GroupBy may pass N streams down, depending on the key classification.
  • Partition passes down only two streams: those that meet the condition and those that do not.

source$.pipe(partition())
Copy the code

conclusion

The above is the content of this article, I hope you will have a harvest.

If you don’t understand something, you can raise it in the comments section so we can grow together.

I wish you all the best to take down Rxjs as soon as possible.

The resources

  • The official documentation
  • Rxjs in Plain English