Throw out problem

  • One of the biggest features of JS is its event mechanism and callback processing. This feature has both advantages and disadvantages. The advantages are non-obstructive, while the disadvantages are not very friendly to support asynchronous scenarios.
  • Getting to the subject quickly is a bit of a challenge in a real world where synchronous or serial processing is often required.
  • Scenario 1: During live broadcast, we need to send messages to the server in an orderly manner and ensure that the messages are successfully sent. If using Ajax requests alone does not guarantee order, for example, when sending two messages, first 1+1=? Because of the uncertainty of the network request, it may arrive at the server or other end to receive the answer 2, and then receive 1+1=? Such a result is obviously not right.
  • Scenario 2: During live broadcast, messages pushed by the server are obtained and processed in a unified manner according to time blocks. A unified list of messages received within a period of time is drawn. If messages are processed once there is a message, rendering performance will be affected.
  • Scenario 3: Live locking, room switching or other switching behaviors are all fine scenes of live broadcasting. The previous logic cannot be continued and operations need to be suspended, and network disconnection and retry are required.
  • Core point – Any discussion of technology outside of a business scenario is hooliganism, and our current discussion is in a live business process or a scenario that requires asynchronous message serial processing.

Common solutions

  • You can check out the god’s summary of link JS asynchronous programming
  • To summarize some solutions callback -> promise -> generator -> async + await

Project analysis

  • Although we can handle asynchronous requests using promises and other methods, it is easy to solve the problem in the fixed request scenario, such as three requests controlling the sequence and controlling the return, which is not described here. But sending data in real time is much more complex than the scenario of three fixed requests.
  • We need to retry the failure, message first in, first out, and finish the previous processing before we can proceed to the next one. You also need message caching, rendering multiple pieces of data at once, and so on.
  • Even using async + await will make our code structure relatively complex and cannot be abstracted and reused.
  • So how do you do that?

Implementation approach

  • Core idea 1: message order, using queue design to achieve first in first out. Unified data management can be achieved, traceable, manageable, and viewable.

  • Core idea 2: The message needs to be produced and consumed. If the message has not been consumed (in the process of sending a request to the server, or fails to be returned), the message must always exist. Only when the message is successfully sent to the server, the message can be removed from the queue

  • Core idea 3: Message flow control. You need to set the retry times to send the request to the server. If the request fails, you can retry several times to ensure that the message is in order and normal. It is necessary to control the time window of message receiving and processing, not only to receive messages from the server side, but also to send their own messages. Dom list is uniformly drawn in a time window to prevent multiple rendering and affect performance. RXJS, a third-party library, is used here (needless to say, the benefits of encapsulated API, Can cancel and so on) is the use of RXJS with the help of its API capabilities can be a good implementation of unsubscribe, suspend operations, off the network retry and so on.

  • Core idea 4: link closed loop, message production – entering queue cache – message consumption – consumption confirmation – continue consumption. What can be used to make it all closed? The answer is observer mode, in which we simply subscribe to the changes in the queue, and when the changes occur, we start consuming the data in the queue, send the data to the server successfully, confirm the consumption, update the queue (delete the data that entered first), and continue. Do you have to look at this picture? Of course you do it with a proxy.

The paper come zhongjue shallow, and must know this to practice

  • Start the first step to implement the basic functionality of message queuing first in first out
/ * * *@name: LiveMQ
 * @msg: message base class, implement queue function */
class LiveMQ {
  public queue: Array<any>;// Queue data
  public callback: (message) = > void;// When the message is received, process the callback function
  public handler = {};// Proxy handler for data hijacking
  constructor() {}
  / / team
  public enqueue() {
    var len = arguments.length;
    if (len == 0) {
      return;
    }
    for (var i = 0; i < len; i++) {
      this.queue.push(arguments[i]); }}/ / out of the team
  dequeue() {
    var result = this.queue[0];
    return typeofresult ! ="undefined" ? result : new Error("error");
  }
  // Confirm consumption
  confirm() {
    this.queue.splice(0.1);
  }
  // Whether the queue is empty
  isEmpty() {
    return this.queue.length === 0;
  }
  // Return the queue length
  size() {
    return this.queue.length;
  }
  // Clear the queue
  clear() {
    this.queue = new Proxy([], this.handler);
  }
  // return to queue
  show() {
    return this.queue; }}Copy the code
  • Start the second step to implement ordered consumption of message queues (which can be used to send different messages to the server, or to receive messages to draw the DOM)
/ * * *@name: LiveHandleMQ
 * @msg: Ordered message queue processing */
class LiveHandleMQ extends LiveMQ {
  private lock = false;// Lock the message. Unlock the message
  private retry: number;// Try again this time
  private observer: any;/ / observer
  private subscription: any;/ / subscriber
  public handler = {
    set: (target, key, value, receiver) = > {
       // Consume data when queue length changes
      if (!this.lock && value > 0 && key == "length") {
        this.subscribe();
      }
      return Reflect.set(target, key, value, receiver); }};constructor(callback: (arg) => void, retry: number = 0) {
    super(a);// Verify the validity of the retry times
    if (retry % 1= = =0 && retry >= 0) {
      this.callback = callback;
      this.retry = retry;
       // Use Proxy to hijack queue data changes
      this.queue = new Proxy([], this.handler);
    } else {
      console.error("retry is not legitimate"); }}private subscribe() {
    this.lock = true;
    this.observer = window["Rx"].Observable.create(async (observer) => {
      try {
        await this.callback(this.dequeue());
        observer.next("");
        observer.complete();
      } catch (error) {
        console.log("Error retry");
        observer.error(error);
      }
    }).retry(this.retry);
    this.subscription = this.observer.subscribe({
      next: () = > {
        this.next();
      },
      error: () = > {
        this.next(); }}); }/ * * *@name: next
   * @msg: Next call */
  private next() {
    // Confirm consumption
    this.confirm();
    // Whether there is any more data in the queue that needs to be consumed, if there is more data to be consumed, if there is no unlock
    if (!this.isEmpty()) {
      this.subscribe();
    } else {
      this.lock = false; }}/ * * *@name: destroy
   * @msg: Clear subscription */
  destroy() {
    if (this.subscription) {
      this.subscription.unsubscribe(); }}}Copy the code
  • The local and server messages are collected during the third step. The production messages are queued for processing
/ * * *@name: LiveCollectionMQ
 * @msg: interval data acquisition queue cache */
class LiveCollectionMQ extends LiveMQ {
  private emitter = window["mitt"] ();// Internal event
  private bufferTime: number;// Data collection interval
  private observer: any;
  private subscription: any;
  private mq: any;// Message handler
  public handler = {
    set: (target, key, value, receiver) = > {
       // Listen for every data change in the queue
      if (!isNaN(Number(key))) {
        this.emitter.emit("notify", value);
      }
      return Reflect.set(target, key, value, receiver); }};constructor(callback: (arg) => void, bufferTime: number = 1000) {
    super(a);if (bufferTime % 1= = =0 && bufferTime > 0) {
      const _this = this;
      this.mq = new LiveHandleMQ(callback);
      this.bufferTime = bufferTime;
      this.queue = new Proxy([], this.handler);
      // Subscribe to internal event data
      this.observer = window["Rx"].Observable.fromEventPattern(
        function addHandler(h) {
          _this.emitter.on("notify", h);
        },
        function delHandler(h) {
          _this.emitter.off("notify", h); });this.subscription = this.observer
        .bufferTime(_this.bufferTime)
        .subscribe((messages) = > {
          if (messages.length > 0) {
            this.mq.enqueue(messages); }}); }else {
      console.error("bufferTime is not legitimate"); }}/ * * *@name: destroy
   * @msg: Clear subscription */
  destroy() {
    if (this.subscription) {
      this.subscription.unsubscribe();
    }
    this.mq.destroy(); }}Copy the code
  • To run a unit test,ts produces the live.js command TSC
<! DOCTYPEhtml>
<html lang="en">
  <body>
    <button type="button" id="xxx">Click on me to text</button>
    <script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.umd.js"></script>
    <script src="https://unpkg.com/mitt/dist/mitt.umd.js"></script>
    <script src="live.js"></script>
    <script>
     // asynchronous handlers
      function test(mes, observer) {
        return new Promise((resolve, reject) = > {
          let time = Math.ceil(Math.random() * 10000);
          console.log("time", time, mes);
          setTimeout(() = > {
            if (false) {
              resolve();
            } else {
              reject();
            }
          }, time);
        });
      }
      // Simply execute the function
      function test1(mes) {
        console.log(mes);
      }
      var count = 0;
      // var queue = new LiveHandleMQ(test, 3);
      // instantiate the object
      var queue = new LiveCollectionMQ(test, 10000);
      document.getElementById("xxx").addEventListener("click".function () {
        count++;
        // Data enters the team
        queue.enqueue(count);
        if (count > 10) {
          // Provide a destruct function to declare the periodqueue.destroy(); }});</script>
  </body>
</html>
Copy the code

conclusion

  • The use of RXJS is relatively superficial. It is unknown whether RX has a larger development space under this scene, and I need to keep learning
  • Writing maintainable code is code with clear logic, highly available code methods, and portability.
  • Finally, I wish everyone a happy Year of the Ox, come on, come on, come on!!