Pull and Push

Are two different protocols used to describe the communication between Data producers and Data consumers.

What is the Pull pattern? In a Pull data system, consumers decide when to receive data from the Producer, while the Producer itself has no sense of when to deliver data.

For example, Function, which we are most familiar with, is a simple Pull data system for each JavaScript Function. The function itself is a data producer, and the calling runtime consumes data by pulling the function’s single return value.

In ES2015, generator functions and iterators (function*) were introduced, followed by another way of implementing Pull data systems. The iterator iterator.next() is executed to pull data from iterator, the data producer, for consumption.


Producer

Consumer

Pull

Passive: Produce data when a data request comes in

Proactive: Decide when to request data

Push

Initiative: Produce data at your own pace

Passive: Receives data in a responsive manner

What is Push mode? In a Push data system, producers decide when to send data to consumers. Consumers themselves have no sense of when to receive data.

Promise is the most common Push data system in JS today. In a Promise, the Consumer registers the callbacks with the then method, and the Promise, as the Producer, sends an Resolved value, but unlike the function, the Promise determines exactly when the data will be pushed back to the callbacks.

RxJS introduces Observable as a new way to implement Push data system. Each Observable is a Producer containing multiple, or even infinite, data that is pushed to the Observer (Consumer).

summary

For the comparison of the above two different data communication modes, there are several different realization modes respectively:

  • Function is lazily executed, and only calls synchronize return single data in their execution
  • A generator is itself a Function, and the generated iterator executes lazily, unlike a Function, which can return (yield) zero or unlimited data
  • A Promise may return a single piece of data or even not return anything
  • An Observable is lazy, synchronizing or asynchronously returning (next) zero or even infinite data

​Subscription

Functions and Observables are lazy. If you don’t call a Function or subscribe to an Observable, you don’t get value return or side effects.

Subscribing to an Observable is analogous to calling a Function.

Having an Observable subscribe to an observer is equivalent to calling a Function. In other words, Observable.subscribe is the entry point that directly triggers Observable data push, which is key to understanding how RxJS works overall.

All of the following “Observable data push” statements refer to an Observable instance subscribing to an observer.

The difference between a Function and an Observable is that an Observable can continuously “return” multiple values. For example,

function foo() {
    return 'one value';
    return 'another value'; // dead code. the 'another value will never be returned'
}

// console
// 'one value'
Copy the code

And observables can

import { Observable } from 'rxjs';

const foo = new Observable(subscriber= > {
    subscriber.next('one value');
    subscriber.next('another value'); // "return" another value 
    / /...
});

foo.subscribe(console.log.bind(null));

// console
// 'one value'
// 'another value'Copy the code

An Observable can transmit data synchronously or asynchronously

import { Observable } from 'rxjs';

const foo = new Observable(subscriber= > {
    subscriber.next('one value');
    setTimeout((a)= > {
        subscriber.next('another value'); // "return" another value, happens asynchronously
    });
    / /...
});

foo.subscribe(console.log.bind(null));
console.log('after subscribe');
// console
// 'one value'
// 'after subscribe'
// 'another value' // returns asynchronouslyCopy the code

summary

  • Function.call(), which means that a return value is synchronized
  • Observable.subscribe, which means to get any number of return values synchronously or asynchronously

Observable

Observables, which makes it easier to combine code for asynchronous calls or callbacks based on lazy-data-push collections of multiple values.

The anatomy of the observables

Observables created using a new Observable or an operator (a special type of operator), Subscribed to with an Observer, execute to deliver next/error/complete notifications to the Observer.

Observable, as a data Producer, needs to accept a subscriber as a constructor parameter and implement the following interface form:

{
    next: (value) => any,
    error: (err) => any,
    complete: () => any
}Copy the code

Then, observable. subscribe needs to receive an observer that also implements the above subscriber interface or function next() as the data consumer, thus forming a data communication pipeline based on the observation and subscription mode. The power of this pipeline is that an Observable is a pipeline that can pipe various operators to control data transmission in the process of Observable data push. It includes quantity limitation (take, takeUntil), type conversion (map, mapTo, etc.), timing control, context switching (mainly done by scheduler) and so on.

Observable pipeline mechanism

According to the above analysis, we know that Observable is a push data system, and it will continuously push data to the data Consumer such as subscriber that has been subscribed by way of subscribe.next (value). In fact, to implement the pipeline mechanism, the problem to be solved is nothing more than how to transfer the data in order to the data processor such as operator, and then finally to the subscriber.

A subscriber is required to implement the Subscription interface and contains a next method. As the name implies, next is the “next”, which is exactly the meaning of pipeline, just need to connect all operators through the next method. It was natural to just call the downstream subscriber’s next method in the upstream subscriber’s next method. RxJS makes a very clever setting, passing downstream subscriber to its constructor when generating an operator instance, concatenating its own subscriber with the downstream subscriber just delivered through the destination attribute, It can also be the ultimate subscriber subscribed, so that a subscriber Linked list based on the destination attribute is established.



Observable. pipe(map(x => x+1)))

The details are illustrated with comments in the code below

export function map<T.R> (project: (value: T, index: number) = >R.thisArg? :any) :OperatorFunction<T.R> {
  return function mapOperation(source: Observable<T>) :Observable<R> {
    if (typeofproject ! = ='function') {
      throw new TypeError('argument is not a function. Are you looking for `mapTo()`? ');
    }
    // "source.lift" call the Observable which is passed through to lift the operator
    // then create a new Observable instance 
    // and link the initial Observable which call the "pipe"
    return source.lift(new MapOperator(project, thisArg));
  };
}Copy the code

In the Observable pipe method:

pipe(... operations: OperatorFunction<any, any>[]): Observable<any> {if (operations.length === 0) {
    return this as any;
  }

  return pipeFromArray(operations)(this); // pass "this" of current Observable through
}Copy the code

export function pipeFromArray<T.R> (fns: Array<UnaryFunction<T, R>>) :UnaryFunction<T.R> {
  if(! fns) {return noop as UnaryFunction<any, any>;
  }

  if (fns.length === 1) {
    return fns[0];
  }
	// there the "input" is the initial instantiated Observable
  // "fn(prev)" just call the "source.lift" above
  return function piped(input: T) :R {
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) = > fn(prev), input as any);
  };
}Copy the code

By lifting operator, link the initially instantiated Observable instance with the newly generated Observable instance through the source attribute to generate an Observable Linked list, and mount the current operator instance. The operator instance here is the new mapOperator (project, thisArg) in the mapOperator function as mentioned in the above example.

lift(operator) {
  const observable = new Observable();
  observable.source = this;
  observable.operator = operator;
  return observable;
}Copy the code

When we subscribe to a specific subscriber after we get the final List of Observable instances, we generate our own subscriber for each specific operator instance. In the following code, the operator call method generates its own new MapSubscriber(subscriber, this.project, this.thisarg). The downstream subscriber will be delivered in, and the Observable instance in the Linked list will be traced back to the Observable instance that was originally instantiated.

export class MapOperator<T.R> implements Operator<T.R> {
  constructor(private project: (value: T, index: number) => R, private thisArg: any) {
  }

  call(subscriber: Subscriber<R>, source: any): any {
    // the "source" is just some very upstream Observable of the Observable linked list
    return source.subscribe(new MapSubscriber(subscriber, this.project, this.thisArg)); }}Copy the code

subscribe(observerOrNext? : PartialObserver<T> |((value: T) = > void) | null, error? :((error: any) = > void) | null, complete? :(() = > void) | null): Subscription {

    const { operator } = this;
    const sink = toSubscriber(observerOrNext, error, complete);

    // when the final Observable instance of the linked list of Observables subscribe some observer
    // there the operator call the “call” instance method to make the subscriber of itself
    if (operator) {
      sink.add(operator.call(sink, this.source));
    } else {
      sink.add(
        this.source || (config.useDeprecatedSynchronousErrorHandling && ! sink.syncErrorThrowable) ?this._subscribe(sink) :
        this._trySubscribe(sink)
      );
    }
    // ...

    return sink;
  }Copy the code

Relationship between Observable and Scheduler

What is a Scheduler

Scheduler controls when Subscription starts and when data messages are delivered. It consists of three main parts:

  • SchedulerIs a data structure. Actions are stored and sorted by action priority or other factors such as timer delay.
  • SchedulerIs an execution context. Represents the execution environment and timing of the action (whether it is executed immediately, or based on callback mechanisms such as setTimeout, setInterval, Process.nexttick, or animation Frame).
  • SchedulerThere is an internal virtual clock. Through internalnow()Method to obtain the current timestamp. Any triggered action can be associated with this timestamp by the virtual clock.

Scheduler works

A Scheduler lets you define in what execution context will an Observable deliver notifications to its The main function of the Scheduler is to determine the execution context of Observable Data push.



RxJS built-in schedulers mainly include asyncScheduler, queueScheduler, asapScheduler and animationFrameScheduler.

QueueScheduler, asapScheduler, and animationFrameScheduler all inherit from asyncScheduler, Both trigger asynchronous Observable data push via timer timer setInterval. Determines whether to enable the timer or self-implemented asynchronous mode from the received timer delay

  • asyncSchedulerMaintains an Action Queue while usingasyncActionConvert subscribe to asynchronous execution using a timer setInterval. Cooperate withasyncSchedulerThe main operator ofsubscribeOn.subscribeOnThe work Handler is used to add downstream subscriber to upstream Observable sourceasyncActionThe Timer method asynchronously triggers Observable data push.
  • queueScheduler, just useasyncSchedulerThe Action queue feature of the The main operator coordination isobserveOn.
  • asapSchedulerThe inside is throughpromiseForm micro Task Queue to implement asynchronism.
  • animationFrameSchedulerThe inside is throughrequestAnimationFrameTo implement asynchrony.

The Scheduler and the Action

QueueAction, asapAction, and animationFrameAction all inherit from asyncAction

An Action is a scheduling Action. A scheduled action contains the specific Work Handler to execute the action and state, which is processed in the Work Handler.

  • asyncSchedulertheasyncAction, the execution of the internal Work handler is triggered by the Timer setInterval callback mechanism.

protected requestAsyncId(scheduler: AsyncScheduler, id? : any,delay: number = 0): any {
  return setInterval(scheduler.flush.bind(scheduler, this), delay);
}Copy the code

  • queueSchedulerthequeueAction, uses scheduler’s Action queue to store the current actions in order, and triggers Observable data push based on the order of the actions in the queue.
  • asapSchedulertheasapAction, using the promise form to trigger the Work Handler.Immediate.setImmediateInternally, the model is modeled using the promise.resolve().then(cb) formSetImmediate.

protected requestAsyncId(scheduler: AsapScheduler, id? : any,delay: number = 0): any {
  // If delay is greater than 0, request as an async action.
  if(delay ! = =null && delay > 0) {
    return super.requestAsyncId(scheduler, id, delay);
  }
  // Push the action to the end of the scheduler queue.
  scheduler.actions.push(this);
  // If a microtask has already been scheduled, don't schedule another
  // one. If a microtask hasn't been scheduled yet, schedule one now. Return
  // the current scheduled microtask id.
  return scheduler.scheduled || (scheduler.scheduled = Immediate.setImmediate(
    scheduler.flush.bind(scheduler, undefined))); }Copy the code

  • AnimationFrameScheduler’s animationFrameAction, which uses the requestAnimationFrame to trigger the Work Hander.

protected requestAsyncId(scheduler: AnimationFrameScheduler, id? : any,delay: number = 0): any {
  // If delay is greater than 0, request as an async action.
  if(delay ! = =null && delay > 0) {
    return super.requestAsyncId(scheduler, id, delay);
  }
  // Push the action to the end of the scheduler queue.
  scheduler.actions.push(this);
  // If an animation frame has already been requested, don't request another
  // one. If an animation frame hasn't been requested yet, request one. Return
  // the current animation frame request id.
  return scheduler.scheduled || (scheduler.scheduled = requestAnimationFrame(
    (a)= > scheduler.flush(undefined)));
}Copy the code

As mentioned above, the main function of Scheduler is to determine the execution context of Observable data push, and how to implement the specific scheduling schedule requires user-defined work handler. Several special operators have been constructed inside RxJS. It is designed to cooperate with Scheduler, such as subscribeOn and observeOn.

Here is a simple illustration of subscribeOn as an example:

As an operator, subscribeOn also generates its own subscriber through lift and then call

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
  return new SubscribeOnObservable<T>(
    source, this.delay, this.scheduler
  ).subscribe(subscriber);
}Copy the code

Inside the SubscribeOnObservable, passed SubscribeOnObservable. Dispatch as work handler, and the state is {source, the subscriber}, So when used in conjunction with asyncScheduler, source.subscribe(subscriber) can be executed in timer setInterval callback to asynchronously trigger Observable data push.

static dispatch<T>(this: SchedulerAction<T>, arg: DispatchArg<T>): Subscription {
    const { source, subscriber } = arg;
    return this.add(source.subscribe(subscriber));
  }	

_subscribe(subscriber: Subscriber<T>) {
  const delay = this.delayTime;
  const source = this.source;
  const scheduler = this.scheduler;

  return scheduler.schedule<DispatchArg<any>>(SubscribeOnObservable.dispatch as any, delay, {
		source, subscriber
	});
}Copy the code

Subject

What is Subject? It is a special type of Observable that can pass data to multiple observers at once, whereas a normal Observable can only push data to a single observer.

A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners.

Each Subject is an Observable that can also subscribe. In contrast, within a Subject, subscribe does not immediately trigger Observable data push, but simply registers an observer to its own observer list. Similar to addEventListener.

_subscribe(subscriber: Subscriber<T>): Subscription {
  / /...
  this.observers.push(subscriber);
  return new SubjectSubscription(this, subscriber);
  / /...
}Copy the code

At the same time, each Subject is an Observer with next(v), Error (e), and complete(). When the next(value) method is called, the data is passed to all registered observers. It can be said that Subject is a self-producing and self-consuming data system.

Since Subject is a special Observable, it also has a pipeline mechanism, but is somewhat different from ordinary Observables, which can be clearly seen through the lift method of Subject class.

lift<R>(operator: Operator<T, R>): Observable<R> {
  const subject = new AnonymousSubject(this.this);
    subject.operator = <any>operator;
    return <any>subject;
}Copy the code

From the lift method above, you can see that the Source property used to implement the Observable Pipeline mechanism is not mounted directly on the newly generated AnonymousSubject instance. This is where the magic happens: the AnonymousSubject class, which inherits from Subject, takes the current Subject instance as a parameter when constructing an instance, and its constructor does only one thing: mount the current Subject instance to the source property of the new Subject.

constructor(protected destination? : Observer<T>, source? : Observable<T>) {super(a);this.source = source;
}

next(value: T) {
  const { destination } = this;
  if (destination && destination.next) {
    destination.next(value);
  }
}

_subscribe(subscriber: Subscriber<T>): Subscription {
  const { source } = this;
  if (source) {
    return this.source! .subscribe(subscriber); }else {
    returnSubscription.EMPTY; }}Copy the code

As we can see from the above code implementation, the AnonymousSubject class instance list generated after PIPE actually ends up with both the SUBSCRIBE method and the next method backtracking to the original Subject instance. Since Subject is both an Observable and Observer, the AnonymousSubject class overwrites the subscribe and Next methods of its parent class Subject for this reason.

Observables and story

The Pipeline mechanism of Observable fits well with the Action processing flow of Redux, and the community’s Redux-Observable library makes use of this mechanism of RxJS to produce another kind of Redux middleware.

const result$ = epic$.pipe(
  map(epic= > {
    constoutput$ = epic(action$, state$, options.dependencies!) ;// ...
    return output$;
  }),
  mergeMap(output$= >
    from(output$).pipe(
        subscribeOn(uniqueQueueScheduler),
        observeOn(uniqueQueueScheduler)
    )
  )
);

result$.subscribe(store.dispatch);Copy the code

Epic $as a Subject instance, the action$Observable goes through the pipeline process in the above code through the following middleware startup mode, and actionSubject$as the upstream source of the action$. Successfully subscribe to the entire pipeline by backtracking up with the subscribe action$method. The Redux-Observable also adds queueScheduler to the pipeline to queue all dispatch actions.

epicMiddleware.run = rootEpic= > {
  // ...
  epic$.next(rootEpic);
};Copy the code

In the resulting middleware next function, when actionSubject$.next fires, the action is passed to action$, just through the pipeline from above, and the entire action processing stream starts running.

action => {
  // ...
  actionSubject$.next(action);
  // ...
};Copy the code

In fact, there is an Observable method in the Store of redux itself, which is the simplest implementation of the above.

conclusion

The observe subscription pattern is one of the most important JavaScript design patterns, and not only exists in the JavaScript world, it is used in a wide range of scenarios. Most of the visual libraries, view libraries, or view frameworks in today’s front-end world are based on this pattern, like Redux, React-Redux, Vue, etc., which are familiar to front-end programmers. In fact, to look at the technical nature of the subscription model, my personal understanding is that it is a mode of data communication, which can be synchronous or asynchronous, blocking or non-blocking. From the small image library to the large system software, it is everywhere, deepen the understanding of it, can make the application written more robust!