Topic describes

Have you ever used RxJS? The most important concepts are Observables and observers. Observable defines how to pass values to an Observer. An Observer is simply a set of callback functions, as follows:

const observer = {
  next: (value) = > {
     console.log('we got a value', value);
  },
  error: (error) = > {
    console.log('we got an error', error);
  },
  complete: () = > {
    console.log('ok, no more values'); }}Copy the code

We need to associate the Observer with an Observable, which provides values or error messages to the Observer.

const observable = new Observable(subscriber= > {
  subscriber.next(1);
  subscriber.next(2);
  setTimeout(() = > {
    subscriber.next(3);
    subscriber.complete();
    subscriber.next(4)},100);
})
Copy the code

When a subscriber subscribes:

const sub = observable.subscribe(subscriber);
Copy the code

The output is:

1
2
timeout 100ms
3
Copy the code

Implement a basic Observable that makes the above description possible. Some additional requirements are listed here

  • Error and Complete can only be triggered once. Subsequent next/error/complete needs to be ignored.
  • Next /error/ Complete requirements are not required when subscribing. If a function is passed in, it needs to default to next.
  • Multiple subscriptions need to be supported.

Topic link

Their thinking

If you don’t know about RXJS, it might be a bit confusing, but it’s better to think about it from a publishare-subscription perspective. We’ll define a Publisher class that acts as a manager, with the ability to manage subscribers and notify things:

/** * 1. Add observer * 2. Remove observer * 3. Notifies the observer to trigger the update operation */
class Publisher {
  observers: Observer[] = [];
  name = ' ';

  add(observer: Observer) {
    this.observers.push(observer);
  }

  remove(observer: Observer) {
    this.observers.forEach(
      (item, index) = > item === observer && this.observers.splice(index, 1)); }watch() {
    this.observers.forEach(observer= > observer.update.call(this));
  }
  
  // -----------
  getName() {
    return this.name;
  }
  
  setName(name: string) {
    this.name = name;
    this.watch(); }}Copy the code

The subscriber is responsible for listening to the notification of the publisher’s message:

class Observer {
  pubParam = {};

  update(publisher: Publisher) {
    this.pubParam = publisher.getName();
    this.work();
  }

  work() {
    console.log('Here you can take some parameters of the publisher... '.this.pubParam); }}Copy the code
const pub = new Publisher();
const ob1 = new Observer();
const ob2 = new Observer();

pub.add(ob1);
pub.add(ob2);

/ / notify the observer
pub.setName('djmughal');
Copy the code

Back to business… We need to define an Observable class that accepts a function that defines data internally and subscribes to data changes.

class Observable {
  constructor(setup) { // Define the setup function to receive subscribers
    this.setup = setup;
  }

  subscribe(observer) {
    return this.setup(observer); }}const observable = Observable(fn);
observable.subscribe(observer);
Copy the code

An Observable first buffers an execution function, which is executed using the SUBSCRIBE method, so observer is the argument to the Observable constructor. Observer contains the next error Complete, defined as follows:

// Define the Observer class + instantiation
class Observer {
  constructor() {
    // Define the status, isComplete isError when unsubscribe
    this.isComplete = false;
    this.isError = false;
  }
  next() {}
  complete() {}
  error() {}
  unsubscribe() {
    this.next = () = > null;
    this.complete = () = > null;
    this.error = () = > null; }}Copy the code

With an Observer we can implement an Observable and subscribe to it

const observable = new Observable(observer= > {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    setTimeout(() = > {
        observer.next(4);
        observer.complete("it's over");
        observer.next(5);
    }, 2000);
    observer.next(6);
});
const sub = observable.subscribe(observer);

// 1 2 3 6 -> timeout 2000ms -> 4
Copy the code

AC code

class Observable {
    constructor(setup) {
        this.setup = setup;
    }

    subscribe(observer) {
        // const observer = new Observer(subscriber);
        this.setup(observer);

        return {
            unsubscribe: observer.__proto__.unsubscribe, }; }}class Observer {
    constructor(callback) {
        this.isComplete = false;
        this.isError = false;
    }
    next(value) {
        if (!this.isComplete && !this.isError) {
            console.log('status: next', value); }}complete(value) {
        if (!this.isComplete && !this.isError) {
            console.log('status: complete', value);
        }

        this.isComplete = true;
        this.unsubscribe();
    }
    error(err) {
        if (!this.isComplete && !this.isError) {
            console.log('status: error', err);
        }
        this.isError = true;
        this.unsubscribe();
    }

    unsubscribe() {
        this.next = () = > null;
        this.complete = () = > null;
        this.error = () = > null; }}const observable = new Observable(observer= > {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    setTimeout(() = > {
        observer.next(4);
        observer.complete("it's over");
        observer.next(5);
    }, 2000);
    observer.next(6);
});

const observer = new Observer();

const sub = observable.subscribe(observer);

setTimeout(() = > {
    sub.unsubscribe();
}, 1000);
Copy the code

This article is participating in the “Nuggets 2021 Spring Recruitment Campaign”, click to see the details of the campaign