RxJS is a responsive library, it receives from the event source sent one event after another, after the layers of processing pipeline, passed to the final receiver, this processing pipeline is composed of operators, developers only need to select and combine operators can complete a variety of asynchronous logic, greatly simplifies asynchronous programming. In addition, the design of RxJS follows the concept of function and flow.

It’s hard to understand the concepts directly, so let’s implement a simple RxJS and look at these.

The use of RxJS

RxJS wraps a layer of event sources, called Observables, that emit events.

Like this:

const source = new Observable((observer) = > {
    let i = 0;
    setInterval(() = > {
        observer.next(++i);
    }, 1000);
});
Copy the code

Set a timer inside the callback function that continuously passes events through Next.

These events are listened on by the receiver, called observers.

const subscription = source.subscribe({
    next: (v) = > console.log(v),
    error: (err) = > console.error(err),
    complete: () = > console.log('complete')});Copy the code

The Observer can receive events sent from next, which may contain errors, and handle errors here, as well as events that have been transmitted.

Such a listen, or Subscription, is called Subscription.

You can subscribe or unsubscribe:

subscription.unsubscribe();
Copy the code

The unsubscribe callback is returned in Observable:

const source = new Observable((observer) = > {
    let i = 0;
    const timer = setInterval(() = > {
        observer.next(++i);
    }, 1000);
    return function unsubscribe() {
        clearInterval(timer);
    };
});
Copy the code

Sending events and listening for events are just the basics. The process of handling events is the essence of RxJS, which designs the concept of a pipe that can be assembled using the operator operator:

source.pipe(
    map((i) = > ++i),
    map((i) = > i * 10)
).subscribe(() = > {
    / /...
})
Copy the code

Events are piped to the Observer, passing operator by operator along the way.

For example, the logic here is to add one to the data that’s being passed, and then multiply it by 10.

To sum up, the code using RxJS looks like this:

const source = new Observable((observer) = > {
    let i = 0;
    const timer = setInterval(() = > {
        observer.next(++i);
    }, 1000);
    return function unsubscribe() {
        clearInterval(timer);
    };
});
const subscription = source.pipe(
    map((i) = > ++i),
    map((i) = > i * 10)
).subscribe({
    next: (v) = > console.log(v),
    error: (err) = > console.error(err),
    complete: () = > console.log('complete')});setTimeout(() = > {
    subscription.unsubscribe();
}, 4500);
Copy the code

Observable creates an event source that emits one event per second, which passes to the Observer through a pipeline consisting of two Map operators that perform + 1 and * 10 processing on data.

The Observer receives the data as it is passed, prints it, and handles errors and end-of-event events. Observable also provides logic for unsubscribing, clearing timers when unsubscribing at 4.5 seconds.

Using RxJS is basically this flow, so how does it work?

80 lines of code to implement RxJS

Observable starts with the event source and implements Observable:

Take a look at its features:

  1. It receives a callback function that can be called next to transfer data.
  2. It has a subscribe method that can be used to add an Observer subscription and return subscription
  3. It can return the processing logic for unSBScribe in the callback function
  4. It has the PIPE method to pass in the operator

Let’s implement it according to these characteristics:

First, the Observable constructor receives the _subscribe callback, not immediately, but when it subscribes:

class Observable {
    constructor(_subscribe) {
        this._subscribe = _subscribe;
    }
    subscribe() {
        this._subscribe(); }}Copy the code

The callback takes an object with the next, error, and complete methods used to pass the event:

class Observable {
    constructor(_subscribe) {
        this._subscribe = _subscribe;
    }
    subscribe(observer) {
        const subscriber = new Subscriber(observer);
        this._subscribe(subscriber); }}class Subscriber{
    constructor(observer) {
        super(a);this.observer = observer;
        this.isStopped = false;
    }
    next(value) {
        if (this.observer.next && !this.isStopped) {
            this.observer.next(value); }}error(value) {
        this.isStopped = true;
        if (this.observer.error) {
            this.observer.error(value); }}complete() {
        this.isStopped = true;
        if (this.observer.complete) {
            this.observer.complete();
        }
        if (this.unsubscribe) {
            this.unsubscribe(); }}}Copy the code

The next, error, and complete methods can then be called inside the callback function:

In addition, the return value of the callback function is the processing logic from unsbScribe, to be collected and called when unsubscribing:

class Subscription {
    constructor() {
        this._teardowns = [];
    }
    unsubscribe() {
        this._teardowns.forEach((teardown) = > {
            typeof teardown === 'function' ? teardown() : teardown.unsubscribe()
        });
    }
    add(teardown) {
        if (teardown) {
            this._teardowns.push(teardown); }}}Copy the code

Provides an unsubscribe method for unsubscribe, and _teardowns collects all unsubscribe callbacks and calls all teardown callbacks when unsubscribe.

This logic is general and can be used as the parent of Subscriber.

Observable then calls Add to add teardown and returns the subscription (which has an unsubscribe method) :

class Observable {
    constructor(_subscribe) {
        this._subscribe = _subscribe;
    }
    subscribe(observer) {
        const subscriber = new Subscriber(observer);
        subscriber.add(this._subscribe(subscriber));
        returnsubscriber; }}class Subscriber extends Subscription {
    constructor(observer) {
        super(a);this.observer = observer;
        this.isStopped = false;
    }
    next(value) {
        if (this.observer.next && !this.isStopped) {
            this.observer.next(value); }}error(value) {
        this.isStopped = true;
        if (this.observer.error) {
            this.observer.error(value); }}complete() {
        this.isStopped = true;
        if (this.observer.complete) {
            this.observer.complete();
        }
        if (this.unsubscribe) {
            this.unsubscribe(); }}}class Subscription {
    constructor() {
        this._teardowns = [];
    }
    unsubscribe() {
        this._teardowns.forEach((teardown) = > {
            typeof teardown === 'function' ? teardown() : teardown.unsubscribe()
        });
    }
    add(teardown) {
        if (teardown) {
            this._teardowns.push(teardown); }}}Copy the code

So we implemented Observable and Observer in 50 lines of code. Let’s test it out:

const source = new Observable((observer) = > {
    let i = 0;
    const timer = setInterval(() = > {
        observer.next(++i);
    }, 1000);
    return function unsubscribe() {
        clearInterval(timer);
    };
});
const subscription = source.subscribe({
    next: (v) = > console.log(v),
    error: (err) = > console.error(err),
    complete: () = > console.log('complete')});setTimeout(() = > {
    subscription.unsubscribe();
}, 4500);
Copy the code

The Observer listened for data 1, 2, 3, and 4 passed by the Observable. The subscription was cancelled at 4.5s, so there was no more data.

We implemented the basic RxJS in 50 lines!

Of course, the quintessential operator is not yet implemented, so continue to refine it.

Observable adds a pipe method to Observable. It calls the incoming operator and the result is the input to the next operator.

class Observable {
    constructor(_subscribe) {
        / /...
    }
    subscribe(observer) {
       / /...
    }
    pipe(. operations) {
        return pipeFromArray(operations)(this); }}function pipeFromArray(fns) {
    if (fns.length === 0) {
        return (x) = > x;
    }
    if (fns.length === 1) {
        return fns[0];
    }
    return (input) = > {
        return fns.reduce((prev, fn) = > fn(prev), input);
    };
}
Copy the code

When the number of parameters passed in is 0, it directly returns the previous Observable; when there is 1, it directly returns it; otherwise, it connects the parameters in series through reduce to form a pipeline.

The operator implementation listens to the previous Observable and returns a new one.

For example, the map implementation is to pass in project to process the value and pass the result to next:

function map(project) {
    return (observable) = > new Observable((subscriber) = > {
        const subcription = observable.subscribe({
            next(value) {
                return subscriber.next(project(value));
            },
            error(err) {
                subscriber.error(err);
            },
            complete(){ subscriber.complete(); }});return subcription;
    });
}
Copy the code

Now that we have implemented the operator, let’s test:

We call the PIPE method and use two map operators to organize the process, doing +1 and *10 processing on the data.

So the 1, 2, 3, and 4 that Observable passes to an Observer become 20, 30, 40, and 50.

At this point, we have implemented the concepts of Observable, Observer, Subscription, and operator for RxJS, making it a simplified version of RxJS. It only took 80 lines of code.

Let’s go back to the first ideas:

Why is it called reactive?

This programming pattern is called reactive because it does listening and processing on the event source.

Why is it called functional?

Since each operator step is a pure function, it returns a new Observable, in line with the idea that functions are immutable, and that modifications return a new Observable.

Why is it called flow?

Since events are generated and transmitted dynamically, this dynamic generation and transmission of data can be called flow.

The complete code is as follows:

function pipeFromArray(fns) {
    if (fns.length === 0) {
        return (x) = > x;
    }
    if (fns.length === 1) {
        return fns[0];
    }
    return (input) = > {
        return fns.reduce((prev, fn) = > fn(prev), input);
    };
}
class Subscription {
    constructor() {
        this._teardowns = [];
    }
    unsubscribe() {
        this._teardowns.forEach((teardown) = > {
            typeof teardown === 'function' ? teardown() : teardown.unsubscribe()
        });
    }
    add(teardown) {
        if (teardown) {
            this._teardowns.push(teardown); }}}class Subscriber extends Subscription {
    constructor(observer) {
        super(a);this.observer = observer;
        this.isStopped = false;
    }
    next(value) {
        if (this.observer.next && !this.isStopped) {
            this.observer.next(value); }}error(value) {
        this.isStopped = true;
        if (this.observer.error) {
            this.observer.error(value); }}complete() {
        this.isStopped = true;
        if (this.observer.complete) {
            this.observer.complete();
        }
        if (this.unsubscribe) {
            this.unsubscribe(); }}}class Observable {
    constructor(_subscribe) {
        this._subscribe = _subscribe;
    }
    subscribe(observer) {
        const subscriber = new Subscriber(observer);
        subscriber.add(this._subscribe(subscriber));
        return subscriber;
    }
    pipe(. operations) {
        return pipeFromArray(operations)(this); }}function map(project) {
    return (observable) = > new Observable((subscriber) = > {
        const subcription = observable.subscribe({
            next(value) {
                return subscriber.next(project(value));
            },
            error(err) {
                subscriber.error(err);
            },
            complete(){ subscriber.complete(); }});return subcription;
    });
}


const source = new Observable((observer) = > {
    let i = 0;
    const timer = setInterval(() = > {
        observer.next(++i);
    }, 1000);
    return function unsubscribe() {
        clearInterval(timer);
    };
});
const subscription = source.pipe(
    map((i) = > ++i),
    map((i) = > i * 10)
).subscribe({
    next: (v) = > console.log(v),
    error: (err) = > console.error(err),
    complete: () = > console.log('complete')});setTimeout(() = > {
    subscription.unsubscribe();
}, 4500);
Copy the code

conclusion

In order to understand the concepts of RxJS, such as responsiveness, function and flow, we implemented a simplified version of RxJS.

We implemented concepts such as Observable, Observer, and Subscription to generate, subscribe, and unsubscribe events.

Operators and pipes are then implemented, with each operator returning a new Observable that layers the data.

After writing this, we can understand more clearly how the concepts of responsiveness, function, and flow are reflected in RxJS.

It takes only 80 lines of code to implement a simplified version of RxJS.