preface

Rxjs is a library for responsive programming using Observables, which makes it easier to write asynchronous or callback-based code. We are now for Rxjs 6 source analysis, analysis of the basic principle of its implementation, we can learn the basic use of Rxjs according to the Chinese document, but this document is the version of Rxjs 5. The basic usage difference is as follows: Rxjs 6 operators are configured in pipes, while Rxjs 5 calls directly

Rxjs 5

fromEvent(addBtn, 'click')
    .throttleTime(3000)
    .subscribe(() => {
        nameInput.value = +(nameInput.value) + 1
    })
Copy the code

Rxjs 6

fromEvent(addBtn, 'click')
    .pipe(throttleTime(3000))
    .subscribe((a)= > {
        nameInput.value = +(nameInput.value) + 1
    })
Copy the code

fromEvent

Method of use

Rxjs has a fromEvent method for handling events. The simplest example is as follows:

import {fromEvent } from './esm2015';

const addBtn = document.getElementById('add')
const minusBtn = document.getElementById('minus')
const nameInput = document.getElementById('name');
fromEvent(addBtn, 'click')  
    .subscribe((a)= > {
        nameInput.value = +(nameInput.value) + 1
    })

fromEvent(minusBtn, 'click')
    .subscribe((a)= > {
        nameInput.value = +(nameInput.value) - 1
    })
Copy the code

The result is as follows:

The principle of

FromEvent: fromEvent: fromEvent: fromEvent: fromEvent: fromEvent: fromEvent

export function fromEvent(target, eventName, options, resultSelector) {
    if (isFunction(options)) {
        resultSelector = options;
        options = undefined;
    }
    if (resultSelector) {
        return fromEvent(target, eventName, options).pipe(map(args= >isArray(args) ? resultSelector(... args) : resultSelector(args))); }return new Observable(subscriber= > {
        function handler(e) {
            if (arguments.length > 1) {
                subscriber.next(Array.prototype.slice.call(arguments));
            }
            else {
                subscriber.next(e);
            }
        }
        setupSubscription(target, eventName, handler, subscriber, options);
    });
}
Copy the code

FromEvent is a method that can pass in four arguments. The previous Demo only passed two arguments. FromEvent returns an Observable, which we can do with the above Demo as follows:

import { fromEvent } from './esm2015';

const addBtn = document.getElementById('add')
const minusBtn = document.getElementById('minus')
const nameInput = document.getElementById('name')
const addFromEventObj = fromEvent(addBtn, 'click')
addFromEventObj.subscribe((a)= > {
    nameInput.value = +(nameInput.value) + 1
})

const minusFromEventObj = fromEvent(minusBtn, 'click')
minusFromEventObj.subscribe((a)= > {
    nameInput.value = +(nameInput.value) - 1
})
Copy the code

We split the following code into two steps,

fromEvent(addBtn, 'click')  
    .subscribe((a)= > {
        nameInput.value = +(nameInput.value) + 1
    })

Copy the code

Const addFromEventObj = fromEvent(addBtn, ‘click’)

Observable
_subscribe
fromEvent
return

    return new Observable(subscriber => {
        function handler(e) {
            if (arguments.length > 1) {
                subscriber.next(Array.prototype.slice.call(arguments));
            }
            else {
                subscriber.next(e);
            }
        }
        setupSubscription(target, eventName, handler, subscriber, options);
    });
Copy the code

The fromEvent method, as simple as it is, returns an Observable and little else. The point is still the object returned, so let’s dig into the Observable.

Observable

The Chinese translation of Observable is “Observable”, which means an Observable object. Since it is an Observable object, is it necessary to inform the corresponding observer after the change is observed?

The constructor

Observable constructors start by analyzing Observable constructors:

    constructor(subscribe) {
        this._isScalar = false;
        if (subscribe) {
            this._subscribe = subscribe; }}Copy the code

The constructor accepts a single argument, subscribe, which is the concept of an observer, and then our observable is associated with the observer object.

Const addFromEventObj = fromEvent(addBtn, ‘click’) addFromEventObj means we created an observable, and once we have an observable, we need to subscribe to that observable, Let’s analyze the SUBSCRIBE method

subscribe

Subscribe source code:

    subscribe(observerOrNext, error, complete) {
        const { operator } = this;
        const sink = toSubscriber(observerOrNext, error, complete);
        if (operator) {
            operator.call(sink, this.source);
        }
        else {
            sink.add(
                this.source || (config.useDeprecatedSynchronousErrorHandling && ! sink.syncErrorThrowable) ?this._subscribe(sink) :
                this._trySubscribe(sink)
            );
        }
        if (config.useDeprecatedSynchronousErrorHandling) {
            if (sink.syncErrorThrowable) {
                sink.syncErrorThrowable = false;
                if (sink.syncErrorThrown) {
                    throwsink.syncErrorValue; }}}return sink;
    }
Copy the code

Instead of adding an operator, let’s examine this method one by one:

  1. const sink = toSubscriber(observerOrNext, error, complete);Created aSubscriberMethod,
  2. becauseoperatorIs undefined, so the conditional statement will enterelse.
sink.add( this.source || (config.useDeprecatedSynchronousErrorHandling && ! sink.syncErrorThrowable) ? this._subscribe(sink) : this._trySubscribe(sink) );Copy the code

_trySubscribe(sink); return this._subscribe(sink); This. _subscribe we call the fromEvent method, return Observable, and pass in the constructor argument as follows:

subscriber => {
        function handler(e) {
            if (arguments.length > 1) {
                subscriber.next(Array.prototype.slice.call(arguments));
            }
            else {
                subscriber.next(e);
            }
        }
        setupSubscription(target, eventName, handler, subscriber, options);
    }
Copy the code

Here is the event binding for the real Dom element. Now click button and the handler method above will be called. The main implementation is subscriber.next(e); , its ‘subscriber’ is const sink = toSubscriber(observerOrNext, error, complete); The sink object created is the esm2015\internal\ subscriber.js object. Now we look at its corresponding next() method, which points to:

    _next(value) {
        this.destination.next(value);
    }
Copy the code

The this.destination property, assigned in the Subscriber constructor, is a SafeSubscriber object

 default:
        this.syncErrorThrowable = true;
        this.destination = new SafeSubscriber(this, destinationOrNext, error, complete);
        break;
Copy the code

Where destinationOrNext corresponds to the argument passed by the SUBSCRIBE method as follows:

const addFromEventObj = fromEvent(addBtn, 'click')
addFromEventObj.subscribe(() => {
    nameInput.value = +(nameInput.value) + 1
})
Copy the code

is

() => {
    nameInput.value = +(nameInput.value) + 1
}
Copy the code

This.destination. Next (value); , its code is as follows:

    next(value) {
        if (!this.isStopped && this._next) {
            const { _parentSubscriber } = this;
            if(! config.useDeprecatedSynchronousErrorHandling || ! _parentSubscriber.syncErrorThrowable) {this.__tryOrUnsub(this._next, value);
            }
            else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) {
                this.unsubscribe(); }}}Copy the code

The final implementation is this.__tryorunsub (this._next, value); This method passes two arguments:

  1. this._nextIn theSafeSubscriberThe constructor assigns it a value:next = observerOrNext;, the second argument to the constructor, which we analyzed aboveSafeSubscriberPlace of objectthis.destination = new SafeSubscriber(this, destinationOrNext, error, complete);thedestinationOrNextissubscribeThe method takes the following arguments:
const addFromEventObj = fromEvent(addBtn, 'click')
addFromEventObj.subscribe(() => {
    nameInput.value = +(nameInput.value) + 1
})
Copy the code
  1. value, that is,clickObject of the eventMouseEvent

Let’s analyze the **__tryOrUnsub** method in detail, with the following code:

    __tryOrUnsub(fn, value) {
        try {
            fn.call(this._context, value);
        }
        catch (err) {
            this.unsubscribe();
            if (config.useDeprecatedSynchronousErrorHandling) {
                throw err;
            }
            else{ hostReportError(err); }}}Copy the code

Call (this._context, value); Subscribe executes the method in subscribe:

() => {
    nameInput.value = +(nameInput.value) + 1
}
Copy the code

pipe

We now have a requirement to control the Button. We can only click once in 3s time to prevent malicious clicking of the Button. Rxjs operates on streams. The Observable provides a pipe method that processes the data before it enters the SUBSCRIBE subscriber method, handling exceptions to make sure it receives the correct data. Let’s take a closer look at the PIPE method. Let’s modify our Demo as follows:

import { fromEvent } from './esm2015';
import { throttleTime } from './esm2015/operators'

const addBtn = document.getElementById('add')
const minusBtn = document.getElementById('minus')
const nameInput = document.getElementById('name')
const addFromEventObj = fromEvent(addBtn, 'click')
const pipeObj = addFromEventObj
    .pipe(throttleTime(1000 * 3))
pipeObj.subscribe((a)= > {
        nameInput.value = +(nameInput.value) + 1
    })

const minusFromEventObj = fromEvent(minusBtn, 'click')

minusFromEventObj
    .pipe(throttleTime(1000 * 3))
    .subscribe((a)= > {
        nameInput.value = +(nameInput.value) - 1
    })
Copy the code

Pipe (throttleTime(1000 * 3))), passing in a throttleTime operator. Here is the code for the PIPE method:

pipe(... operations) {if (operations.length === 0) {
            return this;
        }
        var opts = pipeFromArray(operations);
        var result = opts(this);
         return result;
    }
Copy the code

Pipe = pipeFromArray(operations); var opts = pipeFromArray(operations); Opts corresponds to the method returned by the throttleTime function we passed:

export function throttleTime(duration, scheduler = async, config = defaultThrottleConfig) {
    return (source) = > {returnsource.lift(new ThrottleTimeOperator(duration, scheduler, config.leading, config.trailing)); }}Copy the code

The source is the this (Observable) object above, and we can continue with the Lift method:

    lift(operator) {
        const observable = new Observable();
        observable.source = this;
        observable.operator = operator;
        return observable;
    }
Copy the code

Returns a new Observable with the operator property added. The following code:

const addFromEventObj = fromEvent(addBtn, 'click')
const pipeObj = addFromEventObj
    .pipe(throttleTime(1000 * 3))
Copy the code

PipeObj is shown in the following figure:

So the PIPE method mounts a (group) operator to the operator property of a new Observable. We also need to reanalyze the SUBSCRIBE method

    subscribe(observerOrNext, error, complete) {
        const { operator } = this;
        const sink = toSubscriber(observerOrNext, error, complete);
        if (operator) {
            operator.call(sink, this.source);
        }
        else{ sink.add( this.source || (config.useDeprecatedSynchronousErrorHandling && ! sink.syncErrorThrowable) ? this._subscribe(sink) : this._trySubscribe(sink) ); }if (config.useDeprecatedSynchronousErrorHandling) {
            if (sink.syncErrorThrowable) {
                sink.syncErrorThrowable = false;
                if(sink.syncErrorThrown) { throw sink.syncErrorValue; }}}return sink;
    }
Copy the code

Before we call subscribe, we have already called the pipe method. The object returned by the pipe method already has the operator operator, so the logical branch above will go if operator.call(sink, this.source); , the call code is as follows:

    call(subscriber, source) {
        return source.subscribe(new ThrottleTimeSubscriber(subscriber, this.duration, this.scheduler, this.leading, this.trailing));
    }
Copy the code

We call our subscribe method again, but the first argument we pass in is observerOrNext which is a ThrottleTimeSubscriber object, so we go back to our Subscribe method, const sink = toSubscriber(observerOrNext, error, complete); , check:

export function toSubscriber(nextOrObserver, error, complete) {
    if (nextOrObserver) {
        if (nextOrObserver instanceof Subscriber) {
            return nextOrObserver;
        }
        if (nextOrObserver[rxSubscriberSymbol]) {
            returnnextOrObserver[rxSubscriberSymbol](); }}if(! nextOrObserver && ! error && ! complete) {return new Subscriber(emptyObserver);
    }
    return new Subscriber(nextOrObserver, error, complete);
}
Copy the code

It can be seen from the code that if the first parameter nextOrObserver is a Subscriber type, the object will be returned directly, while our ThrottleTimeSubscriber inherits the Subscriber object. So sink is a ThrottleTimeSubscriber object. We go back to the fromEvent method, where subscriber points to a ThrottleTimeSubscriber object. subscriber.next(e); The next method of the ThrottleTimeSubscriber object is also called.

    return new Observable(subscriber => {
        function handler(e) {
            if (arguments.length > 1) {
                subscriber.next(Array.prototype.slice.call(arguments));
            }
            else {
                subscriber.next(e);
            }
        }
        setupSubscription(target, eventName, handler, subscriber, options);
    });
Copy the code

We will continue to examine how the operator operator works.

throttleTime

As we examined above, the PIPE method mounts a (group) operator to the operator property of a new Observable. Now to see how the operator works, we can start with the throttleTime operator.

We have analyzed above, when we click Button, we will call handler method, subscriber is already a ThrottleTimeSubscriber object.

    return new Observable(subscriber => {
        function handler(e) {
            if (arguments.length > 1) {
                subscriber.next(Array.prototype.slice.call(arguments));
            }
            else {
                subscriber.next(e);
            }
        }
        setupSubscription(target, eventName, handler, subscriber, options);
    });
Copy the code

We will analysis the following: ThrottleTimeSubscriber. Next (e) method, the code is as follows:

    _next(value) {
        if (this.throttled) {
            if (this.trailing) {
                this._trailingValue = value;
                this._hasTrailingValue = true; }}else {
            this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this })
            this.add(this.throttled);
            if(this.leading) { this.destination.next(value); }}}Copy the code

This method is very important

  1. I added one firstthrottledFlag variable that marks whether the throttling switch has been enabled, initiallyundefindedThe code will enterelseBranch,
  2. Then performthis.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this })Assign a value to the throttled, and the next time you come in,throttledIt’s worth it.
  3. performthis.destination.next(value);This method will eventually be calledsubscribeThe method passed in the subscription method, which is
addFromEventObj = addFromEventObj.subscribe(() => {
    nameInput.value = +(nameInput.value) + 1
})
Copy the code

Conclusion:

This method is the key to throttleTime throttling, which determines whether to execute the method in SUBSCRIBE by judging the throttled token.

Throttled = this.scheduler. Schedule (dispatchNext, this.duration, {subscriber: this}).

Scheduler points to the esm2015 internal scheduler AsyncAction.js object.

    schedule(state, delay = 0) {
        if (this.closed) {
            return this;
        }
        this.state = state;
        const id = this.id;
        const scheduler = this.scheduler;
        if(id ! = null) { this.id = this.recycleAsyncId(scheduler, id, delay); } this.pending =true;
        this.delay = delay;
        this.id = this.id || this.requestAsyncId(scheduler, this.id, delay);
        return this;
    }
Copy the code

One of the most important is that this. Id = this. Id | | this. RequestAsyncId (scheduler, enclosing id, delay); , a requestAsyncId method is called with the following code:

requestAsyncId(scheduler, id, delay = 0) {
        return setInterval(scheduler.flush.bind(scheduler, this), delay);
    }
Copy the code

The timer setInterval is set. This is the key point for throttleTime to take effect.

Conclusion:

SetInterval realizes the function of periodically clearing throttled variable values, so as to achieve the principle that after we call the operator throttleTime(1000*3) and click the button once, we cannot click it again within 3S, but we can click it again after 3S

The first parameter to this timer is the scheduler. The flush. Bind (scheduler, this), its corresponding are: esm2015 / internal/scheduler/AsyncScheduler flush in js method, its code is as follows:

    flush(action) {
        const { actions } = this;
        if (this.active) {
            actions.push(action);
            return;
        }
        let error;
        this.active = true;
        do {
            if (error = action.execute(action.state, action.delay)) {
                break; }}while (action = actions.shift());
        this.active = false;
        if (error) {
            while(action = actions.shift()) { action.unsubscribe(); } throw error; }}Copy the code

Execute (AsyncAction); execute (AsyncAction); execute (AsyncAction);

    execute(state, delay) {
        if (this.closed) {
            return new Error('executing a cancelled action');
        }
        this.pending = false;
        const error = this._execute(state, delay);
        if (error) {
            return error;
        }
        else if (this.pending === false && this.id != null) {
            this.id = this.recycleAsyncId(this.scheduler, this.id, null);
        }
    }
Copy the code

Const error = this._execute(state, delay); The _execute code is as follows:

    _execute(state, delay) {
        let errored = false;
        let errorValue = undefined;
        try {
            this.work(state);
        }
        catch (e) {
            errored = true; errorValue = !! e && e || new Error(e); }if (errored) {
            this.unsubscribe();
            returnerrorValue; }}Copy the code

Throttled = this.scheduler. Schedule (dispatchNext, this.duration, {subscriber: This}) is passed as the first argument

function dispatchNext(arg) {
    const { subscriber } = arg;
    subscriber.clearThrottle();
}
Copy the code

The clearThrottle method is eventually called, with the following code:

    clearThrottle() {
        const throttled = this.throttled;
        if (throttled) {
            if (this.trailing && this._hasTrailingValue) {
                this.destination.next(this._trailingValue);
                this._trailingValue = null;
                this._hasTrailingValue = false; } throttled.unsubscribe(); this.remove(throttled); this.throttled = null; }}Copy the code

One of the key steps is to set the throttLED flag to NULL this.throttled = null; ThrottleTimeSubscriber = _next; ThrottleTimeSubscriber = _next;

    _next(value) {
        if (this.throttled) {
            if (this.trailing) {
                this._trailingValue = value;
                this._hasTrailingValue = true; }}else {
            this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this })
            this.add(this.throttled);
            if(this.leading) { this.destination.next(value); }}}Copy the code

This method has the throttled flag, and if it’s not empty, it’s actually equivalent to doing nothing, just empty, Throttled = this.scheduler. Schedule (dispatchNext, this.duration, {subscriber: This}), which is this method, is going to execute the method in our subscribe, which is going to throttle.

let addFromEventObj = fromEvent(addBtn, 'click')
addFromEventObj = addFromEventObj.pipe(throttleTime(1000 * 10))
addFromEventObj = addFromEventObj.subscribe(() => {
    nameInput.value = +(nameInput.value) + 1
})
Copy the code

conclusion

We have briefly analyzed the basic usage of Rxjs above, and here are some basic concepts that we will cover in this article

(Observable), (subscriber) observer, (pipe) pipe, (throttleTime) operator Below, we summarize the whole process of Rxjs data processing based on stream (observed object -> data processing -> observer) according to these concepts.

  1. First you need to create an observable in theesm2015\internal\observable\Under this folder are methods for creating observable objects, like the ones we used in the Demo abovefromEventThat’s one of them, and then return oneObservableObject,ObservableThere is also a static methodcreateYou can just create one of themObservableobject
  2. Pipe attaches operators to the Operator property of an Observable. If pipe passes more than one operator, it recurses from right to left in the Source property (which is also an Observable)
addFromEventObj = addFromEventObj.pipe(throttleTime(1000 * 2),mapTo(1), scan((init, next) => init + next, 0))
Copy the code

The corresponding object is shown as follows:

  1. Then call the SUBSCRIBE method, whose first argument is the observer, the subscriber method below.
let addFromEventObj = fromEvent(addBtn, 'click')
addFromEventObj = addFromEventObj.pipe(throttleTime(1000 * 2),mapTo(1), scan((init, next) => init + next, 0))
const subscriber = value => {
    nameInput.value = value
}
addFromEventObj = addFromEventObj.subscribe(subscriber)
Copy the code