Scheduler is also a concept that is rarely used in development scenarios, but is very important in Rxjs systems

Scheduler itself has the meaning of scheduling, scheduling, synchronous logic certainly does not need scheduling, can not be scheduled, according to the order of execution, only asynchronous operations need scheduling, the existence of Scheduler is to enable Rxjs to have the ability to process asynchronous operations

AsyncScheduler & AsyncAction

There are four schedulers built into RXJS, and the other three inherit from AsyncScheduler

// /src/internal/scheduler/async.ts
export const asyncScheduler = new AsyncScheduler(AsyncAction);

export const async = asyncScheduler;
Copy the code

RXJS does not directly expose AsyncScheduler. The exposed API related to AsyncScheduler is Async/AsyncScheduler. It can be seen that an instance of AsyncScheduler is exposed. And an AsyncAction is passed in when the instance is constructed

Schedule task as if you used setTimeout(task, duration)

// /src/internal/scheduler/async.ts
const task = () = > console.log('it works! ');

asyncScheduler.schedule(task, 2000);
Copy the code

After the above code is executed, it works will be output after 2000ms! In this example, asyncScheduler.schedule behaves just like setTimeout

// /src/internal/scheduler/async.ts
function task(this: SchedulerAction<number>, state? :number) {
  console.log('task state: ', state);
  if(state ! = =void 0) {
    this.schedule(state + 1.1000);
  }
}
asyncScheduler.schedule<number>(task, 1000.0);
Copy the code

This example better demonstrates the effect of asyncScheduler, so let’s start with this example

// /src/internal/scheduler/AsyncScheduler.ts
export class AsyncScheduler extends Scheduler {
  // ...
  constructor(SchedulerAction: typeof Action, now: () => number = Scheduler.now) {
    super(SchedulerAction, now); }}Copy the code

The AsyncScheduler constructor method passes arguments directly to the parent Scheduler

export class Scheduler implements SchedulerLike {
  // ...
  constructor(private schedulerActionCtor: typeof Action, now: () => number = Scheduler.now) {
    this.now = now;
  }
  public schedule<T>(work: (this: SchedulerAction<T>, state? : T) = > void.delay: number = 0, state? : T): Subscription {return new this.schedulerActionCtor<T>(this, work).schedule(state, delay); }}Copy the code

Scheduler simply places parameters on the properties of the instance; the schedule method that follows is the key

SchedulerActionCtor creates an instance of this. SchedulerActionCtor. This is the outermost AsyncAction that is passed in. The Schedule method on AsyncAction is called after the instantiation

// /src/internal/scheduler/AsyncAction.ts
export class AsyncAction<T> extends Action<T> {
  // ...
  constructor(protected scheduler: AsyncScheduler, protected work: (this: SchedulerAction<T>, state? : T) =>void) {
    super(scheduler, work);
  }
  publicschedule(state? : T,delay: number = 0): Subscription {
    // ...
    // If this action has already an async Id, don't request a new one.
    this.id = this.id || this.requestAsyncId(scheduler, this.id, delay);

    return this;
  }
  protectedrequestAsyncId(scheduler: AsyncScheduler, _id? :any.delay: number = 0) :any {
    return intervalProvider.setInterval(scheduler.flush.bind(scheduler, this), delay); }}Copy the code

The schedule will be called requestAsyncId, and call the intervalProvider requestAsyncId. SetInterval, in most cases, IntervalProvider setInterval is equivalent to setInterval, here for a moment also can see it that way, so that is launched a timer, Scheduler.flush. Bind (scheduler, this) is executed every delay, and scheduler is an instance of AsyncScheduler

Although setInterval is the real polling API, we usually use serial setTimeout to simulate for well-known reasons, probably considering that people who see the code will be confused, so the reason for doing this is also officially commented. Each task should be independent of the other, and the execution time of the next task should not be affected by the last one

For example, it is expected to start a task with a polling interval of 1s, then the expected start time of the second and third tasks is 2s and 3s after the program runs. However, the task takes a long time, resulting in the execution time of each task taking 10s. Therefore, if setTimeout is used for serial polling, The start time of the second and third tasks is the 12s and 23s after the program runs

If setInterval is adopted, the start time of the second and third tasks will be as close to the expected time as possible (i.e., 2s and 3s after the program runs), although the task cannot be started precisely at the expected time due to CPU preemption by time-consuming tasks. However, compared with setTimeout, setInterval will definitely perform better and will not cause an increasing gap between the expected time and the start time of the later tasks due to the accumulated time of the previous tasks

// /src/internal/scheduler/AsyncScheduler.ts
public flush(action: AsyncAction<any>) :void {
  // ...
  do {
    if ((error = action.execute(action.state, action.delay))) {
      break; }}while((action = actions.shift()!) );// exhaust the scheduler queue
  // ...
}

// /src/internal/scheduler/AsyncAction.ts
public execute(state: T, delay: number) :any {
  // ...
  const error = this._execute(state, delay);
  // ...
}

protected _execute(state: T, _delay: number) :any {
  / /...
  try {
    this.work(state);
  } catch (e) {
    errored = true;
    errorValue = e ? e : new Error('Scheduled action threw falsy error');
  }
  // ...
}
Copy the code

Flush finally calls this.work, passing state as an argument, and this.work is the task function we passed in our own business code, specifying our own methods

We call this.schedule(state + 1, 1000) again in task; Since the this calling our method refers to AsyncAction, this statement continues the call to AsyncAction schedule

This step is critical. Using the setTimeout execution method, the timer ends when specified, but with setInterval, you must actively cancel the timer at the appropriate time

// /src/internal/scheduler/AsyncAction.ts
public execute(state: T, delay: number) :any {
  // ...
  this.pending = false;
  const error = this._execute(state, delay);
  if (error) {
    return error;
  } else if (this.pending === false && this.id ! =null) {
    this.id = this.recycleAsyncId(this.scheduler, this.id, null); }}Copy the code

Each time we enter the execute method, we execute this.pending = false, which seems to be going into the else if of the following logical branch, but since we execute this._execute in between, we end up executing our custom task, If our task calls this.schedule then this.schedule will set this.pending to true, so it can’t actually go to the else if branch when there are more tasks to be executed

Each time we execute, this.pending = false, and then execute our custom task. If this. So we can’t get into the else if statement; If our task method no longer calls this.schedule, then this.pending is false and goes to the else if branch, Id = this.recycleAsyncid (this.scheduler, this.id, null)

protected recycleAsyncId(_scheduler: AsyncScheduler, id: any.delay: number | null = 0) :any {
  if(delay ! =null && this.delay === delay && this.pending === false) {
    return id;
  }
  intervalProvider.clearInterval(id);
  return undefined;
}
Copy the code

RecycleAsyncId method at this time due to the third parameter is null, so will perform intervalProvider. ClearInterval (id), or remove the polling timer

In addition to this case, if we call this.schedule and the delay passed is inconsistent with the previous delay, then we will also clear the polling timer and start the timer again with the current delay. How long the task is delayed is up to the caller

Finally, if an error occurs during the execution of a task, the timer is also cleared

// /src/internal/scheduler/AsyncAction.ts
protected _execute(state: T, _delay: number) :any {
  // ...
  if (errored) {
    this.unsubscribe();
    returnerrorValue; }}unsubscribe() {
  if (!this.closed) {
    const { id, scheduler } = this;
    const { actions } = scheduler;

    this.work = this.state = this.scheduler = null! ;this.pending = false;

    arrRemove(actions, this);
    if(id ! =null) {
      this.id = this.recycleAsyncId(scheduler, id, null);
    }

    this.delay = null! ;super.unsubscribe(); }}Copy the code

The unsubscribe method does some cleanup, such as clearing timers and resetting instance attribute values

So, the poll timer is cleared when one of the following three conditions is met:

  • Developer customtaskMethod is no longer calledthis.schedule
  • Developer customtaskThe method callthis.scheduleWhen passed indelayInconsistent with the last timer, in which case the last timer is cleared and the current one passed indelayRestart one timer, which means the polling timer still exists, but a different one
  • An error occurred during the scheduling task execution. Procedure

QueueScheduler & QueueAction

Or from the official source code to see the example

queueScheduler.schedule(function(state) {
  if(state ! = =0) {
    console.log('before', state);
    this.schedule(state - 1); // `this` references currently executing Action,
                              // which we reschedule with new state
    console.log('after', state); }},0.3);
// before 3
// after 3
// before 2
// after 2
// before 1
// after 1
Copy the code

In fact, if you change queueScheduler to asyncScheduler in this code, the output is the same. Is queueScheduler changed? Of course not

For queueScheduler or asyncScheduler, the output is exactly the same, but add two more lines of code and you’ll see the problem

console.log('start')
queueScheduler.schedule(function(state) {
  if(state ! = =0) {
    console.log('before', state);
    this.schedule(state - 1); // `this` references currently executing Action,
                              // which we reschedule with new state
    console.log('after', state); }},0.3);
console.log('end')
// start
// before 3
// after 3
// before 2
// after 2
// before 1
// after 1
// end
Copy the code

QueueScheduler. Schedule is executed synchronously. If asyncScheduler is used, the output of queueScheduler is executed synchronously

// start
// end
// before 3
// after 3
// before 2
// after 2
// before 1
// after 1
Copy the code

QueueScheduler is executed synchronously. QueueScheduler is executed synchronously. QueueScheduler is executed synchronously

If queueScheduler is executed synchronously, a synchronous method that calls itself in a method body will push the stack recursively. The output should not be that way, for example, for the following synchronous code:

function work(state: number) {
  if(state ! = =0) {
    console.log('before')
    work(state - 1)
    console.log('after', state)
  }
}
work(3)
Copy the code

The output

// "before", 3
// "before", 2
// "before", 1
// "after", 1
// "after", 2
// "after", 3
Copy the code

So why is queuescheduler.schedule not the expected output? Look at the code

Like AsyncScheduler, RXJS does not expose QueueScheduler directly, but instead exposes an instance of it

// /src/internal/scheduler/queue.ts
export const queueScheduler = new QueueScheduler(QueueAction);

/ * * *@deprecated Renamed to {@link queueScheduler}. Will be removed in v8.
 */
export const queue = queueScheduler;
Copy the code

QueueScheduler behaves exactly as AsyncScheduler if delay is 0 (or not), otherwise, QueueScheduler differs from queueScheduler, so the default delay is not equal to 0

QueueScheduler inherits from AsyncScheduler without any modifications, so it can be considered AsyncScheduler

// /src/internal/scheduler/QueueScheduler.ts
import { AsyncScheduler } from './AsyncScheduler';
export class QueueScheduler extends AsyncScheduler {}Copy the code

QueueAction is the one that has changed

// /src/internal/scheduler/QueueAction.ts
export class QueueAction<T> extends AsyncAction<T> {}
Copy the code

QueueAction inherits from AsyncAction and overrides three methods: Schedule, Execute, and requestAsyncId

// /src/internal/scheduler/QueueAction.ts
publicschedule(state? : T,delay: number = 0): Subscription {
  // ...
  this.scheduler.flush(this);
  return this;
}
Copy the code

Schedule calls scheduler. Flush directly, whereas AsyncAction calls setInterval via requestAsyncId, This is the difference between QueueScheduler executing synchronously and AsyncScheduler executing asynchronously

// /src/internal/scheduler/AsyncAction.ts
publicschedule(state? : T,delay: number = 0): Subscription {
  // ...
  this.id = this.id || this.requestAsyncId(scheduler, this.id, delay);
  return this;
}

protectedrequestAsyncId(scheduler: AsyncScheduler, _id? :any.delay: number = 0) :any {
  return intervalProvider.setInterval(scheduler.flush.bind(scheduler, this), delay);
}
Copy the code

So why didn’t synchronization work as expected? This logic is in AsyncScheduler

// /src/internal/scheduler/AsyncScheduler.ts
public flush(action: AsyncAction<any>) :void {
  const { actions } = this;
  if (this._active) {
    actions.push(action);
    return;
  }

  let error: any;
  this._active = true;

  do {
    if ((error = action.execute(action.state, action.delay))) {
      break; }}while((action = actions.shift()!) );// exhaust the scheduler queue

  this._active = false;
  // ...
}
Copy the code

AsyncScheduler has this._active. If QueueScheduler calls this logic synchronically, this._active is true when the last task is executed on the action. If (this._active), the next task will be synchronized to the branch body, and the task will be put into actions instead of being executed directly, so there is no stack operation

Like a queue, subsequent tasks are stored in an array queue called Actions and then passed through the following do… QueueScheduler also maintains an internal queue for tasks to be queued before they are executed. This is also synchronous, but the stack is different

AsapScheduler & AsapAction

How does RXJS describe the Scheduler

// /src/internal/scheduler/asap.ts
// Perform task as fast as it can be performed asynchronously
Copy the code

Perform tasks asynchronously as quickly as possible

What apis are available for asynchronous execution in JS? SetTimeout, setInterval, setImmediate, postMessage, MessageChannel, Promise, MutationObserver, Process.nexttick, etc

Which is faster?

This brings us to macroTask and microTask, which execute faster than the former

console.log('start')
asyncScheduler.schedule(() = > console.log('async')); // scheduling 'async' first...
asapScheduler.schedule(() = > console.log('asap'));
console.log('end')
// start
// end
// asap
// async
Copy the code

It can be confirmed from the output results of the above code that both asyncScheduler and asapScheduler are executed asynchronously, but the execution timing of asapScheduler is faster than that of asyncScheduler. As can be seen from the above, AsyncScheduler internally uses setInterval to realize asynchronous logic, while setInterval belongs to MacroTask. Therefore, by contrast, the faster asynchronous execution within asapScheduler must use MicroTask

// /src/internal/scheduler/AsapScheduler.ts
export class AsapScheduler extends AsyncScheduler {
  publicflush(action? : AsyncAction<any>) :void {
    // ...
    const { actions } = this;
    // ...action = action || actions.shift()! ;const count = actions.length;
    do {
      if ((error = action.execute(action.state, action.delay))) {
        break; }}while (++index < count && (action = actions.shift()));
    // ...}}Copy the code

AsapScheduler inherits from AsyncScheduler and overrides the flush method. Instances of AsapScheduler maintain a queue of actions from which tasks are evaluated and executed each time they are flushed. The actions queue data is appended to the AsapAction

// /src/internal/scheduler/AsapAction.ts
export class AsapAction<T> extends AsyncAction<T> {
  // ...
  protectedrequestAsyncId(scheduler: AsapScheduler, id? :any.delay: number = 0) :any {
    if(delay ! = =null && delay > 0) {
      return super.requestAsyncId(scheduler, id, delay);
    }
    scheduler.actions.push(this);
    return scheduler._scheduled || (scheduler._scheduled = immediateProvider.setImmediate(scheduler.flush.bind(scheduler, undefined))); }}Copy the code

If delay is not 0, then AsyncAction logic will be used. If delay is not 0, then AsyncAction logic will be used. AsapScheduler behaves just like asyncScheduler, just like queueScheduler

Scheduler. _scheduled returns the scheduled value if it has no value, and the task is executed asynchronously.

This is an optimization made by RXJS to implement the phrase Perform Task as fast as it can be performed asynchronously

function task() {
  Promise.resolve().then(() = > {
    console.log('task')
  })
}

task()
Promise.resolve().then(() = > {
  console.log('Promise')
})
task()
// task
// Promise
// task
Copy the code

Microtasks are also queued, and there are first come, last come, and the first to queue is first executed. After synchronous execution of the above code, three microtask tasks are put into the microtask queue in sequence: Task, Promise, and task. When the synchronization task is finished, the microtask queue will be executed according to the first-in, first-out principle. Therefore, task, Promise, and Task will be executed in sequence if AsapScheduler is used

asapScheduler.schedule(() = > console.log('asapScheduler'));
Promise.resolve().then(() = > {
  console.log('Promise')
})
asapScheduler.schedule(() = > console.log('asapScheduler'));
// asapScheduler
// asapScheduler
// Promise
Copy the code

As you can see, although the second ASapScheduler.schedule comes after the Promise, it is executed ahead of the Promise in terms of execution timing, which is why the asapScheduler internally maintains the Actions queue

In addition, the emergence of a new method for the timer here immediateProvider. SetImmediate

// /src/internal/scheduler/immediateProvider.ts
import { Immediate } from '.. /util/Immediate';
const { setImmediate, clearImmediate } = Immediate;
// ...
export const immediateProvider: ImmediateProvider = {
  setImmediate(. args) {
    const { delegate } = immediateProvider;
    return(delegate? .setImmediate || setImmediate)(... args); },clearImmediate(handle) {
    const { delegate } = immediateProvider;
    return(delegate? .clearImmediate || clearImmediate)(handle); },delegate: undefined};Copy the code

SetImmediate setImmediate is the most common example of a microtask that uses Promise. Resolve

// /src/internal/util/Immediate.ts
export const Immediate = {
  setImmediate(cb: () = > void) :number {
    const handle = nextHandle++;
    activeHandles[handle] = true;
    if(! resolved) { resolved =Promise.resolve();
    }
    resolved.then(() = > findAndClearHandle(handle) && cb());
    return handle;
  },

  clearImmediate(handle: number) :void{ findAndClearHandle(handle); }};Copy the code

AnimationFrameScheduler & AnimationFrameAction

// /src/internal/scheduler/animationFrame.ts
// Perform task when `window.requestAnimationFrame` would fire
Copy the code

Execution time sumwindow.requestAnimationFrameconsistent

const div = document.querySelector('div') as HTMLDivElement;

animationFrameScheduler.schedule(function(height) {
  div.style.height = height + "px";
  this.schedule(height + 1);  // `this` references currently executing Action,
                              // which we reschedule with new state
}, 0.0);
Copy the code

When the example given in the comments works properly, you will see a div on the page with a gradual increase in height

So if you replace animationFrameScheduler with asyncScheduler, it’s going to do the same thing, but asyncScheduler uses setInterval asynchronously, So maybe sometimes it’s not as smooth as the animationFrameScheduler; QueueScheduler: queueScheduler: queueScheduler: queueScheduler: queueScheduler: queueScheduler: queueScheduler: queueScheduler AsapScheduler doesn’t work either, as microtask queues are executed until the page is rendered and behave as if the page is stuck

AnimationFrameScheduler has the same code as AsapScheduler, but in AsyncAction, it’s worth looking at the requestAsyncId method

// /src/internal/scheduler/AnimationFrameAction.ts
protectedrequestAsyncId(scheduler: AnimationFrameScheduler, id? :any.delay: number = 0) :any {
  // ...
  return scheduler._scheduled || (scheduler._scheduled = animationFrameProvider.requestAnimationFrame(() = > scheduler.flush(undefined)))}Copy the code

This method is similar to AsapAction’s requestAsyncId, except that, Here use animationFrameProvider. RequestAnimationFrame replaced immediateProvider. SetImmediate, While animationFrameProvider. RequestAnimationFrame is window. RequestAnimationFrame (normally)

// /src/internal/scheduler/animationFrameProvider.ts
export const animationFrameProvider: AnimationFrameProvider = {
  // ...
  requestAnimationFrame(. args) {
    const { delegate } = animationFrameProvider;
    return(delegate? .requestAnimationFrame || requestAnimationFrame)(... args); },cancelAnimationFrame(. args) {
    const { delegate } = animationFrameProvider;
    return(delegate? .cancelAnimationFrame || cancelAnimationFrame)(... args); },delegate: undefined};Copy the code

summary

This article looked at several schedulers built into RXJS, and although we don’t use schedulers directly in RXJS in most cases, reading the source code reveals some clever ways of writing common code