Rxjs real-time monitoring line chart

As a beginner of RXJS, I will complete a small demo from the perspective of a novice. I believe there will be many problems we have encountered in the process. At the same time, the whole process continues to diverge, explaining some core knowledge points and APIS of RXJS.

The project address

Requirements describe

The line chart has 12 points (distributed in time), refreshed every 2 seconds (for demonstration purposes).

How to do

Think about it a little bit more simply,

It needs a centralized place to store state, which is essentially the data corresponding to the chart. This place requests data from the server every time interval. It needs to store data corresponding to the last 12 points

Think of that as RXJS. Let’s start by writing a basic observable called fetchData$

New SRC/app. Ts

import {Observable, Observer} from 'rxjs'

import {Mock} from './mock'

const print = x= > console.log('x: ', x)

const intervalEmit$ = Observable.interval(2000)

const fetchData$ = Observable.fromPromise(Mock.fetch())

intervalEmit$.subscribe(print)
fetchData$.subscribe(print)Copy the code

New SRC/mock. Ts

import axios from 'axios'

export class Mock {

    static fetch():Promise<Number> {
        // base : 20
        return axios.get('https://zan.wilddogio.com/age.json')
        .then(res= > Number(res.data) + Mock.randomAge(10))}// random 1 ~ x
    static randomAge(x) {
        return Math.floor(1 + Math.random() * x)
    }
}Copy the code

Subtask 1 – Send a REST request every two seconds

It’s very simple one that produces an increasing value every two seconds, and one that asks for a value of promiseable and produces. Now let’s make a combination, so that every two seconds we ask for a value of promiseable and produce, and we modify app.ts

const intervalEmit$ = Observable.interval(2000)

/ / the first
const app$ = intervalEmit$.switchMap(e= > Observable.fromPromise(Mock.fetch()))

// The second way is to disassemble switchMap
const fetchData$ = intervalEmit$.map(e= > Observable.fromPromise(Mock.fetch()))
const app$ = fetchData$.switch(a)// Third, create an Observable using the defer factory
const deferPromise$ = Observable.defer(function () {
     return Observable.fromPromise(Mock.fetch())
})
const app$ = intervalEmit$.switchMap(e= > deferPromise$)

app$.subscribe(print)Copy the code

Let’s start with the third one, which is relatively simple :), and take a look at the definition of defer, Creates an Observable that, on subscribe, calls an Observable factory to make an Observable for each new Observer. The meaning is easy to understand. Defer accepts a function that generates an Observable, which it creates when the one it created is subscribed to.

The first and the second are put together. Map is an observable transformed into another Observable by a function, much like array.prototyp.map, but you can think of it as a value at a point in time or a one-to-one transformation of a value. Let’s focus on switch, and also let’s look at the definition, Converts a higher-order Observable into a first-order Observable by subscribing to only the most recently emitted of those inner Observables. To explain, convert a high-order Observable to a low-order Observable by subscribing, and only generate a low-order, most recently generated value.

First of all, we need to understand what higher order is,

var fn = function(a, b) {return a + b}Copy the code

As typeof fn shows, fn is of type function. Continue

var fn1 = fn(1.2)Copy the code

Fn1 is of type number. OK, it is no longer a function. So how do we make fn1 a function

var fn = function(a) {return function(b) {return a + b}}Copy the code

If you want to get 1+2=3 this time, then you need fn(1)(2) to get it, which means that we want to get the final result by calling the function more than once, ok, that’s called higher order, more than once is higher order, which is similar to higher order derivatives in mathematics. So let’s get back to switch.

var ob$ // A viewable object
varHigher $= ob$. Instance operator(static operator)Copy the code

Here is an instance operator, which is a converter that converts a source Observable as a template into another Observable. The source Observable does not change, and the static operator is like an Observable maker. Production begins as soon as we start subscribe. so

Var higher$= ob$.

The resulting higher$is a higher-order Observable, because when you subscribe to it, it doesn’t generate data like a static operator, it generates an Observable, so just like when you execute fn(1) it produces a new function instead of a value. Here’s a little chestnut, and you can see that it prints an Observable.

var print = x= >console.log('x: ', x)
var clicks = Rx.Observable.fromEvent(document.'click');
var higherOrder = clicks.map((ev) = > Rx.Observable.interval(1000));
higherOrder.subscribe(print)
// x: IntervalObservable {_isScalar: false, period: 1000, scheduler: AsyncScheduler}Copy the code

So we need the switch to convert high$into a low-order Observable,

var lower$ = higher$.switch()

So when we subscribe to lower$, we’ll get the value generated by the static operator.

var print = x= >console.log('x: ', x)
var clicks = Rx.Observable.fromEvent(document.'click');
var higherOrder = clicks.map((ev) = > Rx.Observable.interval(1000));
var lowerOrder = higherOrder.switch()
lowerOrder.subscribe(print)
//== First click ==
// x: 0
// x: 1
//== Second click ==
// x: 0Copy the code

As you can see, the value is now printed, and when we click again, rx.Observable. interval(1000) is re-executed, That was precisely the flattening s an Observables by dropping the previous inner Observable once a new one appeared. When the outer Observable generates a value, it triggers the discard of the last subscribed inner Observable. We know that once a Promise object is created, it is in the Pending state and eventually changes to the OnFailed or onRejected state, so it cannot be cancelled. With RXJS you can do that. Look at a chestnut. We use Express to make a restFul server,

app.js

var express = require('express');
var app = express()

app.use(express.static('blog'));

app.get('/delay'.function(req, res) {
  setTimeout(function(){
    res.send('hello world')},3000)})var server = app.listen(3000.function () {
  var host = server.address().address
  var port = server.address().port

  console.log('app listening at http://%s:%s', host, port)
})Copy the code

When the server receives the request, http://localhost:3000/delay three seconds delay sending the response. Now look at the client code

Recently cancelled.html

<script>
window.onload = function () {
            var print = x= >console.log('x: ', x)
            var ajax$ = Rx.Observable.fromPromise($.ajax('/delay'))

            var click$ = Rx.Observable.fromEvent(document.'click')
            var higher$ = click$.map(e= >Rx.Observable.fromPromise($.ajax('/delay')))

            var app$ = higher$.switch()

            app$.subscribe(print)   // When I click 5 times in 3 seconds, I only return 1 time, which means the first 4 times are unsubscribe
        }
</script>Copy the code

At this point, I click on the page five times (within three seconds), and you’ll see that five requests are made, but only a “Hello World” is printed. Yes, the first four requests are unsubscribed.

Moving on, now that we’ve implemented sending a request every two seconds, let’s implement storing the data

Subtask 2 – Data replay

First, we need to store up to 24 points, and then discard the oldest point for each new point. As we have all heard the tape, tape recorders have the function of the rewind (not jay Chou to jolin tsai to write the first), so the tape storage throughout the process, you can back to play before any one point in time to play, in fact, our request as in play the tape again and again, we think the best way to get to the point before is to store them, Tape also has storage size, so we can’t store indefinitely, so we temporarily store the last 24 records. The following RXJS rewind replay debut.

ReplaySubject is a Subject class, and publishReplay is an Observable instance operator. Is there any connection between them? Let’s start with publishReplay, which is more closely related to Observable.

public publishReplay(bufferSize: , windowTime: , scheduler: *): ConnectableObservable — this is publicReplay’s signature function. There is no example. We need to be inquisitive anyway. Since there are no chestnuts, let’s click on source to see the source code

export function publishReplay(bufferSize = Number.POSITIVE_INFINITY, windowTime = Number.POSITIVE_INFINITY, scheduler) {
    return multicast.call(this.new ReplaySubject(bufferSize, windowTime, scheduler));
}Copy the code

The publishReplay parameters are all for ReplaySubject instantiation, so let’s look at the multicast parameter. This represents an Observable instance, so before we look at the operator, Let’s talk about unicast and multicast first, which helps us understand the operator.

Although we haven’t talked about Subject so far, first of all, a single Observable and a multicast Subject are very cold and very focused (independent). She never takes the initiative to contact others and only talks with them after others pay attention to her. Another person to pay attention to her, and she did not feel the presence of others in communication. The Subject is hot and likes to share (not independent). Whenever you follow her, she is willing to share her experience with others. Now let’s look at two little chestnuts.

Obserable unicast

const printA = (val) = > console.log('observerA :' + val)
const printB = (val) = > console.log('observerB :' + val)

var clicks = Rx.Observable.fromEvent(document.'click');
var ones = clicks.mapTo(1);
var seed = 0;
var count = ones.scan((acc, one) = > acc + one, seed);
count.subscribe(printA);

setTimeout(function() {
    console.log('another subscribe.')
    ones.scan((acc, one) = > acc + one, seed).subscribe(printB)  
}, 3000)Copy the code

As you can see, observerB still prints from 1 after 3 seconds, and you can also see that it only communicates with people when they subscribe to it.

From this figure, it can be seen more intuitively that when we subscribe to blue Scan transformed Observables and red Scan transformed Observables, we actually go through two independent branches. Each subscription also creates a new Observable through fromEvent. An Observable is a function that executes a function when it receives a subscription and notifies the subscriber in the form of notifications left by the subscriber. Let’s look at Subject multicast.

The Subject multicast

var subject = new Rx.Subject()
subject.subscribe(printA)

setTimeout(function() {
    console.log('another subscribe.')
    subject.subscribe(printB)
}, 3000)

Rx.Observable.fromEvent(document.'click').mapTo(1).scan((acc, one) = > acc + one, 0)
.do(num= > subject.next(num))
.subscribe()Copy the code

It can be seen from the picture that although observerB3 arrived after a second, it still shared the efforts of observerA, printing from 3. At the same time, the subject is actively informing the subscribers

We can see the difference between Subject and Observable. The subscription after three seconds does not create a new branch, that is, there is no new Observable instance and some subsequent column transformations.

Here we briefly explain cold, unicast and independence of Observable and hot, multicast and sharing of Subject. So multicast, again, accepts a Subject instance as a parameter, and we have reason to believe that operator is an Observable instance endowed with multicast properties through the Subject instance. Let’s look at a multicast chestnut.

var clickAddOne$ = Rx.Observable.fromEvent(document.'click').mapTo(1).scan((acc, one) = > acc + one, 0)

var subject = new Rx.Subject

subject.subscribe(printA)
setTimeout(function() {
    console.log('another subscribe.')
    subject.subscribe(printB)
}, 3000)

var app$ = clickAddOne$.multicast(subject)

app$.subscribe()Copy the code

This code runs with the exception of Another Subscribe. No matter how you click it, it doesn’t print anything else. App $is not a simple Observable instance. Let’s take a look at the description of multicast on RXJS website:

The return value is a ConnectableObservable instance that generates data to share with potential subscribers (i.e. subscribers on the Subject instance). Let’s change the code.

// app$.subscribe()
app$.connect()Copy the code

From the figure we see the result consistent with Subject multicast above. Here we see an unfamiliar method called Connect, a ConnectableObservable that inherits from an Observable, with both a connect method and a refCount method. The CONNECT method determines when a subscription is valid and returns a method to determine when to cancel all subscriptions.

var clickAddOne$ = Rx.Observable.fromEvent(document.'click').mapTo(1).scan((acc, one) = > acc + one, 0).do(x= >console.log('do: ' + x))

var subject = new Rx.Subject

subject.subscribe(printA)
setTimeout(function() {
    console.log('another subscribe.')
    subject.subscribe(printB)
}, 3000)

var app$ = clickAddOne$.multicast(subject)

var connector = app$.connect()

setTimeout(function() {
    connector.unsubscribe()
}, 6000)Copy the code

After six seconds, clicking does not produce any printed information. This shows that unsubscribe on calls to connect and return instances is too imperative, and we can also use refCount to focus the process on subscribes and unsubscribes to the Observer. Let me rewrite the example above

var clickAddOne$ = Rx.Observable.fromEvent(document.'click').mapTo(1).scan((acc, one) = > acc + one, 0).do(x= >console.log('do: ' + x))

var subject = new Rx.Subject

var app$ = clickAddOne$.multicast(subject).refCount()

app$.subscribe(printA)
setTimeout(function() {
    console.log('another subscribe.')
    app$.subscribe(printB)
}, 3000)Copy the code

This is more Observable, and we’ve achieved the goal of making An Observable multicast. Expensive!

Coming full circle back to publishReplay, the source code below is much clearer

export function publishReplay(bufferSize = Number.POSITIVE_INFINITY, windowTime = Number.POSITIVE_INFINITY, scheduler) {
    return multicast.call(this.new ReplaySubject(bufferSize, windowTime, scheduler));
}Copy the code

PublishReplay is the syntactic sugar of Observable. multicast(new ReplaySubject), so let’s look at what ReplaySubject is. Start with a small chestnut

const printA = (val) = > console.log('observerA :' + val)
const printB = (val) = > console.log('observerB :' + val)
var subject = new Rx.ReplaySubject(3);
subject.subscribe({
    next: (v) = > console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
    next: (v) = > console.log('observerB: ' + v)
});

subject.next(5);

subject.subscribe({
    next: (v) = > console.log('observerC: ' + v)
});Copy the code

You can see that the last two subscriptions print out the values generated by the first three observables. This is kind of like an Observable subscription, but without creating new Observable instances, the ability to resend previous data is a ReplaySubject. So the two ends of the code are the same function

var app$ = Rx.Observable.interval(1000).multicast(new Rx.ReplaySubject(3)).refCount()
app$.subscribe(printA)
setTimeout(function () {
    app$.subscribe(printB)
}, 3000)Copy the code
var app$ = Rx.Observable.interval(1000).publishReplay(3).refCount()
app$.subscribe(printA)
setTimeout(function () {
    app$.subscribe(printB)
}, 3000)Copy the code

Subtask 2 – Replay 24 request data

Now that we have completed the rest request every two seconds, we will first complete the task. When we cache the 23rd point, we will print the update graph for each new point. In connection with the previous content, first we need to have a ReplaySubject instance with buffersize 24. Each subscription produces the previous 24 values, but there is a problem with subscribing to the old values. After subscribing to the old values, the subscription is meaningless. The basis of the Replay function is actually the buffer capability, but the Subject provides the cold, lazy capability to Replay. We prefer this replay capability to be hot. When a bufferSize is reached, the bufferSize data is automatically produced. This is a bit like interval, which produces a data after a certain interval. Is there a static operator like intervalBuffer? 🙂 let’s start by searching the buffer API.

BufferCount is a buffer that generates a count of data. Look at a chestnut

var source$ = Rx.Observable.interval(1000)
var buffer$ = source$.bufferCount(10)
buffer$.subscribe(x= > console.log(x))Copy the code

You can see that every 10 seconds we’re printing a number of length 10, which is obviously not what we want, we want to print a number of numbers every second, and discard the oldest number, so if you look at the function signature of bufferCount,

public bufferCount(bufferSize: number, startBufferEvery: number): Observable

BufferCount also takes a second parameter, which represents the starting point for computing bufferSize. If bufferSize is reached for the first time, produce, BufferSize is calculated from the startBufferEvery of the last buffer data, that is, bufferCount is bufferSize-startBufferEvery, In other words, you need to cache startBufferEvery buffer to produce the next buffer. Change the last chestnut.

var source$ = Rx.Observable.interval(1000)
var buffer$ = source$.bufferCount(10.1)
buffer$.subscribe(x= > console.log(x))Copy the code

And you can see that we’ve met our expectations. Now we finish subtask 2, where we cache 5 points for demonstration purposes.

const print = x= > console.log('x: ', x)
const intervalEmit$ = Observable.interval(2000)
const fetch$ = intervalEmit$.switchMap(e= > Observable.fromPromise(Mock.fetch()))
const app$ = fetch$.bufferCount(5.1).do('the update drawing')
app$.subscribe(print)Copy the code

OK!

Subtask 3 – Drawing

Now we finish the drawing function.

const line = new LineChart(document.getElementById('showAge') as HTMLDivElement)
line.setOptions({
        title: {
            left: 'center'.text: 'Dynamic data (Age)'
        },
        xAxis: {
            type: 'time'.splitLine: {
                show: false}},yAxis: {
            type: 'value'.boundaryGap: [0.'100%'].splitLine: {
                show: false}},series: [{
            type: 'line'.data: []
        }]
    })

line.showLoading()

const now = new Date().getTime()
const span = 2 * 1000
const bufferSize = 12

let counter = 0

const intervalEmit$ = Observable.interval(span)

const fetch$ = intervalEmit$.switchMap(e= > Observable.fromPromise(Mock.fetch()))

const app$ = fetch$.bufferCount(bufferSize, 1).map(
    buffer= > {
        counter === 0 && line.hideLoading()
        const points =  buffer.map((b, index) = > {
            const point = []
            point[0] = now + index * span + span * counter
            point[1] = b
            return point
        })
        counter++
        return points
    }
).do(data= > {
    debugger;
    line.setOptions({
        series: [{
            data
        }]
    })
})
app$.subscribe()Copy the code

Results the following

The last

A simple real-time monitoring line chart demo is completed, because I am also a beginner to learn RXJS, some knowledge points will inevitably have omissions, but also try not to mislead, I believe that we will still have some harvest.