In order to make better use of Subject, RXJS encapsulates some Variant Subjects that are closer to actual use scenarios on the basis of Subject

AsyncSubject

// /src/internal/AsyncSubject.ts
/** * A variant of Subject that only emits a value when it completes. It will emit * its latest value to all its observers on completion. */
Copy the code

Looking directly at the official comments, AsyncSubject broadcasts the value of the last occurrence to all observers only once an end event has occurred (that is, when complete is called)

const subject = new AsyncSubject()
subject.subscribe(data= > console.log('Subscribe A:${data}`))
subject.next(1)
subject.next(2)
subject.subscribe(data= > console.log('Subscribe B:${data}`))
subject.next(3)
subject.complete()
// Subscribe A: 3
// Subscribe B: 3
Copy the code

Since AsyncSubject behaves differently from Subject when calling next and complete, AsyncSubject must have overridden these two methods, so take a look

// /src/internal/AsyncSubject.ts
next(value: T): void {
  if (!this.isStopped) {
    this._value = value;
    this._hasValue = true; }}Copy the code

Use this._value to store the value of next every time, make sure this._value is always the latest value, and after next, set this._hasValue to true. If next is never called, this._hasValue is false, and no value is broadcast if a subsequent call to complete is called.

// /src/internal/AsyncSubject.ts
complete(): void {
  const { _hasValue, _value, _isComplete } = this;
  if(! _isComplete) {this._isComplete = true;
    _hasValue && super.next(_value!) ;super.complete(); }}Copy the code

Only when complete is called is Subject’s next method logic executed, that is, for all observers broadcast values

BehaviorSubject

// /src/internal/BehaviorSubject.ts
/** * A variant of Subject that requires an initial value and emits its current * value whenever it is subscribed to. */
Copy the code

The BehaviorSubject needs to pass in an initial value during initialization, and automatically emits (emits) the latest value whenever a Subscribe event occurs

When the Subject subscribs, it is only storing subscriber methods in observers, and events are broadcast only when next is called. If you want to receive the preset values immediately on the first subscription and also receive the latest values for the last subscription, use BehaviorSubject

const subject = new BehaviorSubject(1)
subject.subscribe(data= > console.log(` A:${data}`))
subject.next(2)
subject.subscribe(data= > console.log(` B:${data}`))

/ / A: 1
/ / A: 2
/ / B: 2
Copy the code
// /src/internal/BehaviorSubject.ts
export class BehaviorSubject<T> extends Subject<T> {
  constructor(private _value: T) {
    super();
  }
}
Copy the code

The BehaviorSubject of the BehaviorSubject inherits the BehaviorSubject and receives a private initial value _value, which is the behavior of the BehaviorSubject when the first subscription is made. The BehaviorSubject’s SUBSCRIBE behavior is different. So it must have overridden the _subscribe method of the Subject, so let’s see

// /src/internal/BehaviorSubject.ts
protected _subscribe(subscriber: Subscriber<T>): Subscription {
  const subscription = super._subscribe(subscriber); ! subscription.closed && subscriber.next(this._value);
  return subscription;
}
Copy the code

It still calls the _SUBSCRIBE of the Subject, and on top of that, it calls subscriber.next, which is why the BehaviorSubject issues a value at subscription time

This._value stores the latest value. The initial BehaviorSubject value was passed in when the BehaviorSubject was initialized, but this value is updated

// /src/internal/BehaviorSubject.ts
next(value: T): void {
  super.next((this._value = value));
}
Copy the code

The BehaviorSubject also has a value property, which allows the BehaviorSubject to query the value of this._value

// /src/internal/BehaviorSubject.ts
get value() :T {
  return this.getValue();
}
getValue(): T {
  const { hasError, thrownError, _value } = this;
  // ...
  return _value;
}
Copy the code

ReplaySubject

The official interpretation of a ReplaySubject is a bit complicated. Just like a BehaviorSubject, a BehaviorSubject emit only the last value when a subscription event occurs. ReplaySubject can emit the last N values, the size of which is determined by the developer. In addition, ReplaySubject has no concept of initial values, so it must call Next before it can emit any values

const subject = new ReplaySubject(3);
subject.subscribe(data= > console.log('First subscription:${data}`));
subject.next(1);
subject.next(2);
subject.subscribe(data= > console.log('Second subscription:${data}`));
// First subscription: 1
// First subscription: 2
// Second subscription: 1
// Second subscription: 2
Copy the code
// /src/internal/ReplaySubject.ts
export class ReplaySubject<T> extends Subject<T> {
  private _buffer: (T | number=) [] [];private _infiniteTimeWindow = true;

  / * * *@param bufferSize The size of the buffer to replay on subscription
   * @param windowTime The amount of time the buffered items will say buffered
   * @param timestampProvider An object with a `now()` method that provides the current timestamp. This is used to
   * calculate the amount of time something has been buffered.
   */
  constructor(
    private _bufferSize = Infinity.private _windowTime = Infinity.private _timestampProvider: TimestampProvider = dateTimestampProvider
  ) {
    super(a);this._infiniteTimeWindow = _windowTime === Infinity;
    this._bufferSize = Math.max(1, _bufferSize);
    this._windowTime = Math.max(1, _windowTime); }}Copy the code

The ReplaySubject takes three optional initialization parameters, the first being the initial N, which represents the number of intercepted events, and the second _windowTime, which represents the event window in which the intercepted event occurred

There is an array _buffer in ReplaySubject. If the second parameter _windowTime is passed when ReplaySubject is initialized, the next method will store two values in _buffer in sequence: The value of Next, and the expiration time of this value

The value of next, we know, is the parameter to next, what’s the expiration time? It’s the current timestamp plus _windowTime

// /src/internal/ReplaySubject.ts
next(value: T): void {
  const { isStopped, _buffer, _infiniteTimeWindow, _timestampProvider, _windowTime } = this;
  if(! isStopped) { _buffer.push(value); ! _infiniteTimeWindow && _buffer.push(_timestampProvider.now() + _windowTime); }this._trimBuffer();
  super.next(value);
}
Copy the code

_timestampprovider.now () date.now ()

And then when WE subscribe, we’re going to iterate over this buffer and get it out

// /src/internal/ReplaySubject.ts
protected _subscribe(subscriber: Subscriber<T>): Subscription {
  // ...
  this._trimBuffer();
  // ... 
  const copy = _buffer.slice();
  for (let i = 0; i < copy.length && ! subscriber.closed; i += _infiniteTimeWindow ?1 : 2) {
    subscriber.next(copy[i] as T);
  }
  // ...
}
Copy the code

If the second parameter _windowTime is passed when ReplaySubject is initialized, the structure of _buffer looks like [value, expiredTime, value, expiredTime…]. The value in the array with subscript 2n (n >= 0) is the true value, so the step value of the for loop is 2

// /src/internal/ReplaySubject.ts
private _trimBuffer() {
  // Remove the array from the beginning of the _buffer array, keeping the length of _buffer consistent with _bufferSize * 2
  const adjustedBufferSize = (_infiniteTimeWindow ? 1 : 2) * _bufferSize;
  _bufferSize < Infinity && adjustedBufferSize < _buffer.length && _buffer.splice(0, _buffer.length - adjustedBufferSize);

  if(! _infiniteTimeWindow) {const now = _timestampProvider.now();
    let last = 0;
    // The expiration time is smaller than the current timestamp, indicating that it has expired
    for (let i = 1; i < _buffer.length && (_buffer[i] as number) <= now; i += 2) {
      last = i;
    }
    // Delete all expired values
    last && _buffer.splice(0, last + 1); }}Copy the code

In both next and _SUBSCRIBE, another method is called, _trimBuffer. This method is used to trim the array in _buffer. For example, _bufferSize = 2 is passed in during initialization. If the _buffer contains more than two values, all values except the last 2 will be deleted (splice); If the second windowTime argument is passed, all expired data in _buffer is deleted when the method is called

const subject = new ReplaySubject(100.200);
subject.next(1);
subject.next(2);
subject.next(3);
setTimeout(() = > {
  subject.next(4);
}, 400)
setTimeout(() = > {
  subject.subscribe(data= > console.log(` subscription:${data}`));
}, 500)
// Subscribe: 4
Copy the code

The first parameter of the ReplaySubject is 100, which means that the last 100 values can be replayed to the newly subscribed observer, but since the second parameter is specified as 200, when the subscribe event occurs, The values 1, 2, and 3 are all out of date. 500-200 = 300ms, but only 4 is still out of date, so only 4 is replayed

ReplaySubject also has a third initialization parameter, timestampProvider

// /src/internal/types.ts
export interface TimestampProvider {
  now(): number;
}
Copy the code

If this parameter is not passed, it defaults to Date. If it is passed, a Scheduler Scheduler is passed, which is not expanded in this article

summary

This paper briefly analyzes the implementation principle of the built-in Subject in RXJS. Based on the original Subject, these mutant Subjects have realized some functions that are convenient for developers to use in actual scenes. If the built-in Subject cannot meet our needs, We can continue to expand the Subject we need from the Subject