preface

For the record: starting this year, my articles will be calibrated to the technical level of P7 or so, so if you want to test your ability, please click to follow.

What will you learn about RXJS

If your functions focus on a single responsibility, or on abstracting common functions, then with RXJS, you can write a function that kills most of the front-end students. Because RXJS is naturally designed to make your functions conform

  • Principle of single responsibility
  • O Open and close principle
  • Demeter’s Law (which can be simply understood as communication decoupling from the publish-subscribe model)
  • Handles complex coordination of multiple asynchronous tasks. See my previous article for details (RXJS Practical Case)

Pre-knowledge: The idea of function composition (strongly related to RXJS)

First of all, I will briefly talk about how to write extensible functions in the business, that is, to make functions have pluggable plug-in effects. The answer found so far is functional programming to solve this problem (only when writing functions).

Why do functions become so extensible, as you can see below

Data A, processed by F1, F2, and F3 produces result B, which means we can write a function

// fn is a combination of f1\f2\f3
const fn = compose(f1, f2, f3);

The compose function has a very large number of scenarios, which we assume are the simplest for synchronous execution
const compose = (. fns) = > x= > fns.reduce((y, f) = > f(y), x);
Copy the code

For example, if there is a change in the product, our compose function reserves an expansion port. You can add other functions before and after the F1 / F2 /f3 function to enhance the function. You can also write a new FN, which may only use f1 and F3 functions to recombine the new function.

What kind of structure is it? It’s like a pyramid, a base function that’s combined into a function that’s more complex at the top.

You can choose to use the library ramdaJS, which I have been using, but in my opinion ramdaJS has a lot of functions that are completely unusable. Secondly, RAMdaJS can’t solve complex asynchronous problems. Refer to my previous article (RXJS Practical Case) for details. One line of RXJS might solve the problem.

However, RXJS is really difficult to understand, because the same operator in the synchronous and asynchronous two scenarios of the results are not the same, resulting in beginners very difficult to remember, coupled with the lack of online learning materials, learning costs are very high.

The effect after reading

This article refer to a lot of great god online article summarized, but also has a lot of part of the original, although it is a very basic RXJS implementation, but the structure of the framework and six versions of RXJS basic consistent, basically guarantee read the code, you can understand it asynchronous and synchronous flows of enforcement mechanism, then fit RXJS directly, There is no problem, two weeks of common operators, is basically an API master.

Open dry! RXJS basic structure

Let’s start with a very simple effect like this:

const observable = new Observable(function publish(observer) {
    const clearId = setInterval(() = > {
        observer.next('hello world');
    }, 1000);
    setTimeout(() = > {
        observer.complete();
    }, 1500);
    return () = > clearTimeout(clearId);
})

observable.subscrible({
    next: (value) = > console.log(value),
    error: (err) = > console.log(err),
    complete: () = > console.log('done')})Copy the code

That is, ‘Hello World’ is printed after 1 second, and setInterval is cancelled after 1.5 seconds and done is printed.

First, we need to implement the simplest Observable class, which registers events. Analogous to the publishable-subscribe model, it registers a function that takes an observer, an instance of class-Observer. We’ll implement that later, but ignore it for now.

Subscrible is publishing events and actually calling the functions that you register, which is what you register in the New Observable.

class Observable {
    // register the listener publish
    constructor(publish) {
        this.publish = publish
    }
    // Publish function, which calls the previously registered publish function
    subscrible(next, error, complete) {
        let observer;
        // Convert the Subscrible parameters to an instance of an Observer
        observer = new Observer(next.next, next.error, next.complete);
        // The function call to be registered
        const unsubscribeCb = this.publish(observer);
        // Add the unregistered function to the Observer instance
        observer.onUnsubscribe(unsubscribeCb)
        returnobserver; }}Copy the code

We then implement an Observer class that generates instances to be called as arguments to previously registered functions.

const noop = () = >{};class Observer {
    // When you call the Observer unsubscribe method, the unsubscribe b call is triggered
    unsubscribeCb;
    // next represents the function to consume data
    constructor(next = noop, error = noop, complete = noop) {
        this._next = next;
        this._error = error;
        this._complete = complete;
        // indicates whether the error or complete methods have been called. If they have been called, they cannot be next again
        this.isStopped = false;
    }
    next(value) {
        if (!this.isStopped) {
            this._next(value); }}error(err) {
        if (!this.isStopped) {
            this._error(err);
            this.unsubscribe(); }}complete() {
        if (!this.isStopped) {
            this._complete();
            this.unsubscribe(); }}unsubscribe() {
        this.isStopped = true;
        this.unsubscribeCb && this.unsubscribeCb();
    }
    onUnsubscribe(unsubscribe) {
        this.unsubscribeCb = unsubscribe; }}Copy the code

Ok, brothers and sisters and behind the prison (prison) buddies, digest understanding to continue again, this is the most basic case oh!

Let’s strike while the iron is hot and implement two Observable static methods

  • formEvent
  • interval

Starting with formEevent, our demo looks like this. Click on body and 223 will be printed:

Observable.fromEvent(document.body, 'click').subscrible({
    next: () = > {
        console.log('223')}})Copy the code

The implementation is as follows:

Observable.fromEvent = function (target, eventName) {
    // Return an instance of the Observable class, since it must satisfy our publish/subscribe framework
    // The framework is an instance of an Observable class that registers events and invokes events
    // An Observer instance is a parameter to the caller
    return new Observable((observer) = > {
        const handleFn = (e) = > {
            observer.next(e);
        }
        target.addEventListener(eventName, handleFn);
        return () = > {
            target.removeEventListener(eventName, handleFn)
        }
    })
}
Copy the code

And then we implement interval, so our demo is going to print 0,1,2, and then we’re done.

const obs = Observable.interval(1000).subscrible({
    next:(index) = >console.log(index)
})
setTimeout(() = >{
    obs.unsubscribe();
}, 3000)
Copy the code

So let’s implement interval

Observable.interval = function (delay) {
    return new Observable(function (observer) {
        let index = 0;
        const clearId = window.setInterval(() = > {
            observer.next(index++)
        }, delay);
        return () = > {
            window.clearInterval(clearId)
        }
    })
}
Copy the code

Well, that’s the end of the phase, and we’re going to refine the Observabel code. After a break, let me play a song for you.

So let’s enrich the subscrible method. I used to look like this:

class Observable {
    constructor(publish) {
        this.publish = publish
    }
    subscrible(next, error, complete) {
        let observer;
        observer = new Observer(next.next, next.error, next.complete);
        const unsubscribeCb = this.publish(observer);
        observer.onUnsubscribe(unsubscribeCb)
        returnobserver; }}Copy the code

We want to pass in the Observer instance or function when calling the Subscrible. You will see the following code immediately.

class Observable {
    constructor(publish) {
        this.publish = publish
    }
    // Consume the consumer function
    subscribe(next, error, complete) {
        let observer;
        // What type of next should be converted to an Observer instance
        if (next instanceof Observer) {
            observer = next;
        } else if (typeof next === 'function') {
            observer = new Observer(next, error, complete);
        } else {
            observer = new Observer(next.next, next.error, next.complete);
        }
        const unsubscribeCb = this.publish(observer);
        observer.onUnsubscribe(unsubscribeCb);
        returnobserver; }}Copy the code

Synchronize the RXJS running process of the function

This is a step up in difficulty. If you don’t understand it, read it several times.

Our initial goal is to achieve such an effect. The following code outputs 3,5,7

Observable.fromArray([1.2.3.4.5.6])
   .pipe(
     map((item) = > item + 1),
     filter((item) = >!!!!! (item %2))
   )
   .subscribe((item) = > {
     console.log(item);
   });
Copy the code

From above, we have the pipe method, the fromArray method, and the map and filter operators. Let’s start with the fromArray method

fromArray

An observer acts as a consumer in RXJS, such as observable. fromArray([1,2,3,4,5,6]), which publishes an array and calls the subscribe method to consume it, such as

// prints 1,2,3,4,5,6
Observable.fromArray([1.2.3.4.5.6]).subscribe((item) = > {
     console.log(item);
   });
Copy the code

Observable in RXJS is a producer that produces data. FromArray is a function that produces array data, so we need to understand the implementation of these two functions.

// Create a static method
Observable.fromArray = function(array) {
    if(!Array.isArray(array)) {
        // If the argument passed is not an array, an exception is thrown
        throw new Error('params need to be an array');
    }
    return new Observable(function(observer) {
        try{
            // Iterate over each element and send out
            array.forEach(value= > observer.next(value))
            observer.complete()
        } catch(err) {
            observer.error(err)
        }
    });
}
Copy the code

If the fromArray operator is simple enough, let’s look at the Pipe method, which is a bit more difficult for those unfamiliar with function combinations.

class Observable {
    source; // Add new code
    operator; // Add new code
    constructor(publish) {
        this.publish = publish
    }
    // Consume the consumer function
    subscribe(next, error, complete) {
        let observer;
        // What type of next should be converted to an Observer instance
        if (next instanceof Observer) {
            observer = next;
        } else if (typeof next === 'function') {
            observer = new Observer(next, error, complete);
        } else {
            observer = new Observer(next.next, next.error, next.complete);
        }
        // Add new code
        if(this.operator){
            return this.operator.call(observer, this.source);
        }
        const unsubscribeCb = this.publish(observer);
        observer.onUnsubscribe(unsubscribeCb);
        return observer;
    }
    // Add new code
    lift(operator){
        const observable = new Observable();
        observable.source = this;
        observable.operator = operator;
        return observable;
    }
    // Add new code
    pipe(. operations){
        if(operations.length === 0) {
            return this;
        } else if (operations.length === 1) {
            return operations[0] (this);
        } else {
            return operations.reduce((source, func) = > func(source), this)}}}Copy the code

Pipe (map.filter); pipe(map.filter); pipe(map.filter);

operations.reduce((source, func) = > func(source), this)
Copy the code

Combined with pipe(map.filter)(parameter)

The filter (map (parameters))Copy the code

See, it’s very simple. The map processes the data and then the filter processes it

Ok, so what does the lift function do? This function should be used in conjunction with the map operator’s source code. If you want to keep in mind that this is a linked list structure, Lift transforms the processing between functions into a linked list, as described above

The filter (map (parameters))Copy the code

And this is if you change it to a linked list

List elements1Contains the attribute fn: map --> linked list element2Contains the attribute fn: filterCopy the code

So we’re going to throw the data out to the list element 2

Map operator implementation

The purpose of this function is to return an Observable
function map(mapFn) {
    // this is called in the pipe method
    // Returns an Observable with the operator property new mapOperator(mapFn).
    The source attribute holds the previous Observable instance, the source parameter below
    return function mapOperation(source) {
        return source.lift(new mapOperator(mapFn))
    }
}

class mapOperator {
    constructor(mapFn) {
        this.mapFn = mapFn;
    }
    // The call method finally calls the Subscribe method of the Source Observable
    // Wrap an incoming observer
    call(observer, source) {
        return source.subscribe(
            {
                next: (value) {
                    observer.next(mapFn(value))
                },
                complete: () {
                    this.observer.complete(); }})}}Copy the code

It doesn’t matter if you don’t understand it. Let’s finish the implementation of filter and sort out the execution process of the function.

Function filter(filterFn) {// this returns an Observable, The operator attribute is the new mapOperator(mapFn) // source attribute that stores the last Observable instance, Return function mapOperation(source) {return source.lift(new filterOperator(filterFn)); }; } class filterOperator { constructor(filterFn) { this.filterFn = filterFn; } // The call method finally calls the Subscribe method of the Source Observable. // Wrap the call(observer, source) { return source.subscribe( { next: (value) { if(this.filterFn(value)){ this.observer.next(this.filterFn(value)); } }, complete: () { this.observer.complete(); }}); }}Copy the code

First we call observable. fromArray([1,2,3]), which returns an instance like this

{
  operatorundefined
  publish: ƒ (observer) 
   // Publish is the following function
   // function (observer) {
   // try {
   // // iterate over each element and send out
   // array.forEach((value) => observer.next(value));
   // observer.complete();
   // } catch (err) {
   // observer.error(err);
   // }
   / /}
  sourceundefined
  __proto__:
    constructorclass Observable
    lift: ƒ lift(operator)
    pipepipe(. operations) { if (operations.length === 0) { return this; } else if (operations.length === 1) { return operations[0] (this); } else { return operations.reduce((source, func) = >{... }subscribeƒ subscribe(next, error, complete)__proto__Object
}
Copy the code

So let’s go ahead and look at what this code is doing, and I’ve drawn a picture

Observable.fromArray([1.2.3.4.5.6])
   .pipe(
     map((item) = > item + 1),
     filter((item) = >!!!!! (item %2))
   )
   .subscribe((item) = > {
     console.log(item);
   });
Copy the code

So, our next function call looks like this: The Observer of filter contains the original Observer, the Observer of Map contains the Observer of Filter, the Observer of formArray contains the Observer of Map,

So when we call formArray publish, the argument is an observer of map, as follows:

next(value) {
    if(filterFn (mapFn (value))) {original observer. The next (filterFn (mapFn (value))); }}Copy the code

This is really a little round, if you really don’t understand it, it doesn’t matter, just know about such a process, usually write code is really not on.

Asynchronous function processing

The above operators are synchronous, we usually encounter more asynchronous complexity, asynchronous in RXJS main processing ideas are as follows:

  • There will be a class called NotifierObserver, what does that do
  • It’s called a notification class, and what that means is that it’s done asynchronously, so I’m going to say, done, and then I’m going to move on to the next task, or it’s going to have some effect on the previous task

Let’s take an example, and understand a little bit about the NotifierObserver class, so that you can do this asynchronously, so that when you see a slightly more complex operator such as switchMap, you can think about how this class might work.

takeUntil

The operator works like this. At first the console prints numbers every second, but when you click on the page, it stops printing numbers:

const notifier = Observable.fromEvent(document.'click');
const observable = Observable.interval(1000).pipe(takeUntil(notifier))
Copy the code

Implement takeUntil

function takeUntil(notifier) {

  return function takeUntilOperation(source) {
    
    return source.lift(new takeUntilOperator(notifier));

  };

}



class takeUntilOperator {

  constructor(notifier) {

    this.notifier = notifier;

  }


  call(observer, source) {
     OuterObserver is an instance of takeUntilObserver
     // NotifierObserver means that the outerObserver is notified if the condition is met
    const outerObserver = new takeUntilObserver(observer, this.notifier);

    const notifierObserver = new NotifierObserver(outerObserver);
    If this.notifier is triggered, the state of the outerObserver will be changed
    this.notifier.subscribe(notifierObserver);
    // The following command is executed only when the outerObserver status seenValue is false
    if(! outerObserver.seenValue) {returnsource.subscribe(outerObserver); }}}class NotifierObserver extends Observer {

  constructor(outerObserver) {

    super(a);this.outerObserver = outerObserver;

  }

  OuterObserver is notified when a value is received

  next(value) {

    this.outerObserver.notifyNext(value);

  }

  error(err) {

    this.outerObserver.notifyError(err);

    this.unsubscribe();

  }

  complete() {

    this.outerObserver.notifyComplete();

    this.unsubscribe(); }}class takeUntilObserver extends Observer {

  constructor(destination) {

    super(a);this.destination = destination;

    this.seenValue = false;

  }

  // The subscription is completed upon receipt of notifyNext or notifyComplete

  notifyNext(value) {

    this.seenValue = true;

    this.destination.complete();

  }

  notifyComplete() {

    this.seenValue = true;

    this.destination.complete();

  }

  next(value) {

    if (!this.seenValue) {

      this.destination.next(value); }}}Copy the code

To summarize the asynchronous scenario, there is a notification class that is triggered under certain conditions and changes the state of an Observable, which then directly affects the execution of the observer.

We can make bold assumptions about how various asynchronous operators work on the basis of notification classes, such as RACE (I haven’t read the source code). Race’s parameters are an array of Observables, and which observable is triggered first will use that observable to perform other operations downstream. The other Observables drop it

We can guess who was the first Observable to trigger, and the notification class changes the state of the RACE operator to invalidate subsequent Observables

Well, that’s all for this article.

Here’s a quick AD: THIS year I’ll have a big theme that will teach you how to implement an open source react PC and mobile component library. It includes:

  • Cli tool for component library packaging, development, testing, and automation of push repository (including changeLog file modification and tagging) (100% complete)
  • The Icon package of the component library, that is, the collection of all ICONS in a service’s NPM package, is specific to your project.
  • Component library site building (write your own, not storybook, Dumi, or Docz)
  • PC side component library (includes all ant components and functions, mainly borrowed from its source code, also known as source code analysis)
  • Mobile component library (mainly borrowed from ZARM, a React component library of Zhongan)

Reference article:

  • # 200 lines of code understand the core concepts of RxJS
  • RxJS core principle (source code implementation)