>> Original link

Some of the tool methods implemented in this paper are in the early/testing stage and are still being continuously optimized for reference only…

Developed/tested on Ubuntu20.04 for the Electron project, test version: [email protected]/9.3.5

Contents


├─ Contents (You Are Here!) │ ├── II. Structure Diagram │ ├─ III. What can be done with the electron- Re? │ │ ├ ─ ─ 1) used for Electron application └ ─ ─ 2) application for Electron/Nodejs │ ├ ─ ─ IV. The UI features introduced │ ├ ─ ─ main interface │ ├ ─ ─ function 1: Kill process │ ├ ─ ─ function 2: A key to open DevTools │ ├ ─ ─ function 3: check the process log │ ├ ─ ─ function 4: check the process CPU/Memory footprint trends │ └ ─ ─ function 5: check the MessageChannel request log │ ├ ─ ─ V. New features: │ ├─ ├─ Heavy Load Balancing │ ├─ Heavy Load Balancing │ ├─ Heavy Load Balancing │ ├─ Heavy Load Balancing │ ├─ Heavy Load Balancing │ ├─ Heavy Load Balancing │ ├─ Heavy Load Balancing │ ├─ Heavy Load Balancing │ ├─ Heavy Load Balancing │ ├─ Heavy Load Balancing │ ├─ Heavy Load Balancing │ ├─ Heavy Load Balancing │ ├─ Heavy Load Balancing │ ├─ Heavy Load Balancing │ ├─ Heavy Load Balancing │ ├─ Heavy Load Balancing New features: │ ├─ ├─ LifeCycle exercises │ ├─ start and stop │ ├─ LifeCycle exercises │ ├─ start and stop │ ├─ LifeCycle exercises │ ├─ │ ├─ Next To Do │ ├─ IX. Several practical application examples │ ├ ─ ─ 1) Service/MessageChannel use sample │ ├ ─ ─ 2) used in the production of a real project example of │ ├ ─ ─ 3) ChildProcessPool/ProcessHost using the sample │ ├ ─ ─ 3)test│ ├ ─ ├ ─ 2) Github READMECopy the code

I. introduction


Previously, when developing the Electron application, the Electron process management tool auto-re was written, which supports the features of the Electron/Node multi-process management, service simulation, real-time process monitoring (UI function), and Node.js process pool. Has been published as an NPM component and can be installed directly (the latest features have not been released online and will need to be tested again) :

> > making address

$: npm install electron-re --save
# or
$: yarn add electron-re
Copy the code

The previous two articles on this topic:

  1. The Electron/Node Multi-process Tool Development DiaryDescribes theelectron-reThe development background, the problem scenario and the detailed use method.
  2. Electron Multi-process Tool Development Diary 2Introduces the development and usage of the new feature “multi-process management UI”. The UI is based onelectron-reThe existingBrowserService/MessageChannelChildProcessPool/ProcessHostInfrastructure driver, developed using React17 / Babel7.

This article mainly describes the process pool module support new features – “process pool load balancing” and “intelligent subprocess start and stop”, and related basic implementation principles. At the same time, put forward some problems encountered, as well as the thinking of these problems, solutions, some ideas of the iteration of the later version and so on.

II. Electron – Re architecture diagram


  • Electron Core: a series of Core functions of the Electron application, including the main application process, rendering process, window, and so on (delivered with Electron).
  • BrowserWindow: Render window process, normally used for UI rendering (supplied with Electron).
  • ProcessManager: ProcessManager that collects process resources, asynchronously refreshes the UI, responds to and issues various process management signals, and serves as an observer object to other modules and uis (introduced by electron-re).
  • MessageChannel: a message sending tool for main, renderer, and Service processes. It is based on native IPC encapsulation, mainly for BrowserService, and can also replace the native IPC communication method (introduced by electron-re).
  • ChildProcessBy:child_process.forkMethod, but decorates it with simple process sleep and wake up logic (electron-re introduction).
  • ProcessHost: the tool used with the process pool, which I call “process transaction center”, is encapsulatedprocess.send / process.onBasic logic, which provides a way to invoke Promise to make IPC message communication between main/child processes easier (electron-re introduction).
  • LoadBalancer: LoadBalancer that serves the process pool (electron-re import).
  • LifeCycle: service the LifeCycle of the process pool (introduced by electron-re).
  • ChildProcessPool: based on Node.js –child_process.forkMethod manages multiple ChildProcess instances and supports customized load balancing policies, intelligent start and stop of child processes, and automatic restart of child processes after an abnormal exit (introduced by electron- Re).
  • BrowserServiceThe BrowserWindow based Service process can be thought of as a hidden rendering window process that runs in the background and allows Node injection, but onlyCommonJsGauge (electron- Re introduction).

III. What can be done with electron- Re?


1. For Electron application

  • BrowserService
  • MessageChannel

Some of the best practices in Electron suggest putting CPU-hogging code in the render process rather than directly in the main process. Here’s a look at the Chromium architecture:

Each renderer process has a global object, RenderProcess, which manages communication with the parent browser process and maintains a global state. The browser process maintains a RenderProcessHost object for each renderer process to manage browser state and communicate with the renderer process. Browser processes and renderers communicate using Chromium’s IPC system. In Chromium, UIprocess needs IPC synchronization with main process continuously during page rendering. If main process is busy, UIprocess will block during IPC. Therefore, if the main process continues to consume CPU time or block the task of synchronous IO, it will be blocked to a certain extent, thus affecting the IPC communication between the main process and each render process. If the IPC communication is delayed or blocked, the render process window will be stuck and drop frames, or even frozen in serious cases.

Therefore, based on the existing logic of electron’s Main Process and Renderer Process, an independent Service concept is developed. A Service is a background process that does not need to display an interface. It does not participate in UI interaction and serves the main process or other rendering processes independently. Its underlying implementation is a hidden rendering window process that allows Node injection and remote calls.

This allows you to write cpu-intensive operations in your code (such as maintaining a queue of thousands of upload tasks in file uploads) into a single JS file, and then use the BrowserService constructor to construct a Service instance that takes the address path of this JS file. Thus separating them from the main process. If you say that this part of the CPU operation directly into the render window process can? This depends on the architectural design of the project itself and the trade-offs between data transfer performance loss and transfer time between processes. Create a simple example of a Service:

const { BrowserService } = require('electron-re');
const myServcie = new BrowserService('app', path.join(__dirname, 'path/to/app.service.js'));
Copy the code

If BrowserService is used, to send messages between the main process, the renderer process, and the service process, use the MessageChannel provided by electron -Re. Its interface design is basically the same as the IPC built into Electron, and the underlying layer is also implemented based on the original IPC asynchronous communication principle. Simple examples are as follows:

/* ---- main.js ---- */
const { BrowserService } = require('electron-re');
// Send a message to a service 'app' in the main process
MessageChannel.send('app'.'channel1', { value: 'test1' });
Copy the code

2. Apply it to the Electron/Nodejs application

  • ChildProcessPool
  • ProcessHost

Additionally, if you want to create child processes that are not dependent on the Electron runtime (see Nodejs child_process), you can use the process pool ChildProcessPool provided with Electron -re that is written specifically for the NodeJS runtime. Because the process itself is expensive to create, use a process pool to reuse already created child processes to maximize the performance benefits of a multi-process architecture, as shown in the following simple example:

/* -- in main process -- */
const { ChildProcessPool, LoadBalancer } = require('electron-re');

const pool = new ChildProcessPool({
  path: path.join(app.getAppPath(), 'app/services/child.js'), // The path to the child execution file
  max: 3.// Maximum number of processes
  strategy: LoadBalancer.ALGORITHM.WEIGHTS, // Load balancing policy - Weight
  weights: [1.2.3].// Weight allocation
});

pool
  .send('sync-work', params)
  .then(rsp= > console.log(rsp));
Copy the code

In general, in our child execution file, to synchronize data between the main and child processes, This can be implemented using process.send(‘channel’, params) and process.on(‘channel’, function) (if the process is forked or IPC communication is manually enabled). But this also forces us to focus on the communication between processes while dealing with business logic. You need to know when the child process is finished, and then use process.send to send data back to the main process, which is cumbersome.

Electron – Re introduces the concept of ProcessHost, which I call “process transaction center.” In practice, you only need to register each task function as multiple monitored transactions through processhost.registry (‘task-name’, function) in the child process execution file. Childprocesspool. send(‘task-name’, params) triggers the child process’s transaction logic. Childprocesspool. send() returns a Promise instance to fetch the callback data. A simple example is as follows:

/* -- child process -- */
const { ProcessHost } = require('electron-re');

ProcessHost
  .registry('sync-work'.(params) = > {
    return { value: 'task-value' };
  })
  .registry('async-work'.(params) = > {
    return fetch(params.url);
  });
Copy the code

IV. UI function introduction


The UI function is developed based on the electron- Re infrastructure, which communicates with the main process’s ProcessManager via asynchronous IPC to refresh the process status in real time. You can manually Kill the process, view the console data of the process, view the CPU/Memory usage trend of the number of processes, and view the request sending records of MessageChannel.

The main interface

UI refer to the electron- process-Manager design

Preview:

The main functions are as follows:

  1. Displays all the processes started in Electron, including the main process, the normal render process, the Service process (electronic-re import), and the child processes created by ChildProcessPool (electronic-Re import).

  2. The process list displays the process ID, process ID, parent process ID, memory usage, and CPU usage of each process. The process ids are as follows: Main, Service, renderer, node. Click on the table header to increment/decrement an item.

  3. After selecting a process, you can Kill the process, view the Console data of the process Console, and view the CPU/Memory usage trend of the process within one minute. If the process is a rendering process, you can also use the DevTools button to open the built-in debugging tool.

  4. Child processes created by ChildProcessPool do not support direct DevTools debugging, but because of the –inspect parameter, you can use Chrome ://inspect for remote debugging.

  5. Click the Signals button to view the MessageChannel request send logs, including simple request parameters, request names, request return data, and so on.

Function: Kill process

Run the following command to enable DevTools in one click

Run the following command to view process logs

Run the following command to view the CPU/Memory usage trend of processes

Run the MessageChannel command to view MessageChannel request sending logs

V. New feature: Process pool load balancing


Simplified first release implementation

>> Code address

➣ About load balancing

Load Balance refers to balancing loads (work tasks) and allocating them to multiple operation units, such as FTP server, Web server, enterprise core application server, and other main task servers, so as to jointly complete work tasks. Load balancing builds on the existing network architecture and provides a transparent, inexpensive and effective way to extend the bandwidth of servers and network devices, enhance network data processing capabilities, increase throughput, and improve network availability and flexibility.” — Baidu Encyclopedia

➣ Load balancing policy description

In the previous implementation, after the process pool was created, when sending requests using the pool, the request sending policy was handled in two ways:

  1. By default, a polling strategy is used to select a subprocess to process requests, which guarantees only an even distribution of basic requests.

  2. Another use is to manually specify an extra parameter ID for sending requests: pool.send(channel, params, id). This allows requests with the same ID to be sent to the same child process. One scenario is that the first time we send a request to a child process, the child processes the request and stores some processing results in its runtime memory space, and then at some point we need to retrieve the results of the previous request back to the main process. In this case, we need to use id to distinguish the request.

The new version introduces some load balancing policies, including:

  • POLLING: Child processes take turns processing requests
  • WEIGHTS – WEIGHTS: The child processes requests based on the set WEIGHTS
  • Random-random: The child process processes the request randomly
  • SPECIFY – SPECIFY: The child process processes the request according to the specified process ID
  • WEIGHTS_POLLING: A polling policy is similar to a polling policy, but the polling policy calculates the polling times of the child processes based on the weight to stabilize the average number of requests processed by each child process.
  • WEIGHTS_RANDOM – WEIGHTS_RANDOM: The WEIGHTS_RANDOM policy is similar to the random policy, but the WEIGHTS_RANDOM policy calculates the number of times the child processes are random based on the weight, thus stabilizing the average number of requests processed by each child process.
  • MINIMUM_CONNECTION – Minimum number of connections: Selects the child process on the child process that has the minimum number of connection activities to process the request.
  • The WEIGHTS_MINIMUM_CONNECTION policy is similar to the minimum connection policy, except that the probability of each child process being selected is determined by both the number of connections and the weight.

➣ Simple implementation of load balancing policies

Parameter Description:

  • Tasks: an array of tasks, an example:[{id: 11101, weight: 2}, {id: 11102, weight: 1}].
  • CurrentIndex: Indicates the current task index. The default value is 0. Each call is automatically incremented by 1.
  • Context: main process parameter context, used to dynamically update the current task index and weight index.
  • WeightIndex: weightIndex, used for weight policies. The default value is 0, which is automatically incremented with each call and modulo automatically when the sum of weights exceeds.
  • WeightTotal: total of weights, used for weight policy calculation.
  • ConnectionsMap: A mapping of the active connections of each process for calculation of the minimum connection number policy.
1. POLLING strategy (POLLING)

Principle: index value increment, each call will automatically add 1, when the task array length will automatically take the module, to ensure the average call. Time complexity O(n) = 1

/* polling algorithm */
module.exports = function (tasks, currentIndex, context) {
  if(! tasks.length)return null;

  const task = tasks[currentIndex];
  context.currentIndex ++;
  context.currentIndex %= tasks.length;

  return task || null;
};
Copy the code
2. WEIGHTS strategy (WEIGHTS)

Principle: Each process generates the final calculated value based on (weight value + (total weight * random factor)), and the maximum value of the final calculated value is hit. Time complexity O(n) = n

/* weight algorithm */
module.exports = function (tasks, weightTotal, context) {

  if(! tasks.length)return null;

  let max = tasks[0].weight, maxIndex = 0, sum;

  for (let i = 0; i < tasks.length; i++) {
    sum = (tasks[i].weight || 0) + Math.random() * weightTotal;
    if (sum >= max) {
      max = sum;
      maxIndex = i;
    }
  }

  context.weightIndex += 1;
  context.weightIndex %= (weightTotal + 1);

  return tasks[maxIndex];
};
Copy the code
3. RANDOM Strategy

Principle: Random function in [0, length) randomly select an index can time complexity O(n) = 1

/* random algorithm */
module.exports = function (tasks) {

  const length = tasks.length;
  const target = tasks[Math.floor(Math.random() * length)];

  return target || null;
};
Copy the code
4. WEIGHTS_POLLING strategy

Principle: Similar to the polling strategy, but the polling interval is: [minimum weight value, sum of weights]. The matching interval is calculated according to the cumulative value of weights. The weight index is automatically incremented by 1 on each call and modulo is automatically taken when the total weight exceeds. Time complexity O(n) = n

/* weights polling */
module.exports = function (tasks, weightIndex, weightTotal, context) {

  if(! tasks.length)return null;

  let weight = 0;
  let task;

  for (let i = 0; i < tasks.length; i++) {
    weight += tasks[i].weight || 0;
    if (weight >= weightIndex) {
      task = tasks[i];
      break;
    }
  }

  context.weightIndex += 1;
  context.weightIndex %= (weightTotal + 1);

  return task;
};
Copy the code
5. WEIGHTS_RANDOM

Principle: by (weight sum * random factor) to generate the calculated value, the weight value will be subtracted from it, the first is not greater than zero final value is hit. Time complexity O(n) = n

/* weights random algorithm */
module.exports = function (tasks, weightTotal) {
  let task;
  let weight = Math.ceil(Math.random() * weightTotal);

  for (let i = 0; i < tasks.length; i++) {
    weight -= tasks[i].weight || 0;
    if (weight <= 0) {
      task = tasks[i];
      break; }}return task || null;
};
Copy the code
6. MINIMUM_CONNECTION Policy

Principle: Directly select the item with the smallest number of connections. Time complexity O(n) = n

/* minimum connections algorithm */
module.exports = function (tasks, connectionsMap={}) {
  if (tasks.length < 2) return tasks[0] | |null;

  let min = connectionsMap[tasks[0].id];
  let minIndex = 0;

  for (let i = 1; i < tasks.length; i++) {
    const con = connectionsMap[tasks[i].id] || 0;
    if(con <= min) { min = con; minIndex = i; }}return tasks[minIndex] || null;
};
Copy the code
7. WEIGHTS_MINIMUM_CONNECTION

Principle: weight + (random factor * total weight) + (proportion of connection number * total weight) three factors, calculate the final value, according to the size of the final value for comparison, the minimum value represents the item is hit. Time complexity O(n) = n

/* weights minimum connections algorithm */
module.exports = function (tasks, weightTotal, connectionsMap, context) {

  if(! tasks.length)return null;

  let min = tasks[0].weight, minIndex = 0, sum;

  const connectionsTotal = tasks.reduce((total, cur) = > {
    total += (connectionsMap[cur.id] || 0);
    return total;
  }, 0);

  // algorithm: (weight + connections'weight) + random factor
  for (let i = 0; i < tasks.length; i++) {
    sum =
      (tasks[i].weight || 0) + (Math.random() * weightTotal) +
      (( (connectionsMap[tasks[i].id] || 0) * weightTotal ) / connectionsTotal);
    if (sum <= min) {
      min = sum;
      minIndex = i;
    }
  }

  context.weightIndex += 1;
  context.weightIndex %= (weightTotal + 1);

  return tasks[minIndex];
};
Copy the code

➣ Implementation of the load balancer

None of the code is complex, but there are a few things to note:

  1. The Params object holds parameters for various policy calculations, such as weight index, weight sum, connection number, CPU/Memory usage, and so on.
  2. schedulerObject is used to call various policies for calculation,scheduler.calculate()A hit process ID is returned.
  3. targetsThat is, all the target processes used for calculation, but only the target process PID and its weight are stored in it:[{id: [pid], weight: [number]}, ...].
  4. Algorithm indicates a specific load balancing policy. The default value is polling policy.
  5. ProcessManager.on(‘refresh’, this.refreshParams), the load balancer passes the listenerProcessManagerTo periodically update the calculation parameters of each process.ProcessManagerThere is a timer that collects the resource usage of each monitored process every once in a while and triggers a Refresh event with the collected data.
const CONSTS = require("./consts");
const Scheduler = require("./scheduler");
const {
  RANDOM,
  POLLING,
  WEIGHTS,
  SPECIFY,
  WEIGHTS_RANDOM,
  WEIGHTS_POLLING,
  MINIMUM_CONNECTION,
  WEIGHTS_MINIMUM_CONNECTION,
} = CONSTS;
const ProcessManager = require('.. /ProcessManager');

/* Load Balance Instance */
class LoadBalancer {
  / * * *@param  {Object} options [ options object ]
    * @param  {Array } options.targets [ targets for load balancing calculation: [{id: 1, weight: 1}, {id: 2, weight: 2}] ]
    * @param  {String} options.algorithm [ strategies for load balancing calculation : RANDOM | POLLING | WEIGHTS | SPECIFY | WEIGHTS_RANDOM | WEIGHTS_POLLING | MINIMUM_CONNECTION | WEIGHTS_MINIMUM_CONNECTION]
    */
  constructor(options) {
    this.targets = options.targets;
    this.algorithm = options.algorithm || POLLING;
    this.params = { // data for algorithm
      currentIndex: 0.// index
      weightIndex: 0.// index for weight alogrithm
      weightTotal: 0.// total weight
      connectionsMap: {}, // connections of each target
      cpuOccupancyMap: {}, // cpu occupancy of each target
      memoryOccupancyMap: {}, // cpu occupancy of each target
    };
    this.scheduler = new Scheduler(this.algorithm);
    this.memoParams = this.memorizedParams();
    this.calculateWeightIndex();
    ProcessManager.on('refresh'.this.refreshParams);
  }

  /* params formatter */
  memorizedParams = () = > {
    return {
      [RANDOM]: () = > [],
      [POLLING]: () = > [this.params.currentIndex, this.params],
      [WEIGHTS]: () = > [this.params.weightTotal, this.params],
      [SPECIFY]: (id) = > [id],
      [WEIGHTS_RANDOM]: () = > [this.params.weightTotal],
      [WEIGHTS_POLLING]: () = > [this.params.weightIndex, this.params.weightTotal, this.params],
      [MINIMUM_CONNECTION]: () = > [this.params.connectionsMap],
      [WEIGHTS_MINIMUM_CONNECTION]: () = > [this.params.weightTotal, this.params.connectionsMap, this.params],
    };
  }

  /* refresh params data */
  refreshParams = (pidMap) = >{... }/* pick one task from queue */
  pickOne = (. params) = > {
    return this.scheduler.calculate(
      this.targets, this.memoParams[this.algorithm](... params) ); }/* pick multi task from queue */
  pickMulti = (count = 1. params) = > {
    return new Array(count).fill().map(
      () = > this.pickOne(... params) ); }/* calculate weight */
  calculateWeightIndex = () = > {
    this.params.weightTotal = this.targets.reduce((total, cur) = > total + (cur.weight || 0), 0);
    if (this.params.weightIndex > this.params.weightTotal) {
      this.params.weightIndex = this.params.weightTotal; }}/* calculate index */
  calculateIndex = () = > {
    if (this.params.currentIndex >= this.targets.length) {
      this.params.currentIndex = (ths.params.currentIndex - 1> =0)? (this.params.currentIndex - 1) : 0; }}/* clean data of a task or all task */
  clean = (id) = >{... }/* add a task */
  add = (task) = >{... }/* remove target from queue */
  del = (target) = >{... }/* wipe queue and data */
  wipe = () = >{... }/* update calculate params */
  updateParams = (object) = > {
    Object.entries(object).map(([key, value]) = > {
      if (key in this.params) {
        this.params[key] = value; }}); }/* reset targets */
  setTargets = (targets) = >{... }/* change algorithm strategy */
  setAlgorithm = (algorithm) = > {...}
}

module.exports = Object.assign(LoadBalancer, { ALGORITHM: CONSTS });
Copy the code

➣ Process pools work with LoadBalancer to implement load balancing

A few caveats:

  1. When we usepool.send('channel', params)Is inside the poolgetForkedFromPool()The function is called and selects a process from the process pool to perform the task. If the number of child processes does not reach the maximum, a child process is created first to handle the request.
  2. Child process creation/destruction/exit requires synchronous updatesLoadBalancerIn the listeningtargetsOtherwise, the destroyed process PID may be returned after the load balancing policy calculation is performed.
  3. ForkedProcessIs a decorator class, encapsulatedchild_process.forkLogic, to add some additional functions, such as: process sleep, wake up, bind events, send requests and other basic methods.
const _path = require('path');
const EventEmitter = require('events');

const ForkedProcess = require('./ForkedProcess');
const ProcessLifeCycle = require('.. /ProcessLifeCycle.class');
const ProcessManager = require('.. /ProcessManager/index');
const { defaultLifecycle } = require('.. /ProcessLifeCycle.class');
const LoadBalancer = require('.. /LoadBalancer');
let { inspectStartIndex } = require('.. /.. /conf/global.json');
const { getRandomString, removeForkedFromPool, convertForkedToMap, isValidValue } = require('.. /utils');
const { UPDATE_CONNECTIONS_SIGNAL } = require('.. /consts');

const defaultStrategy = LoadBalancer.ALGORITHM.POLLING;

class ChildProcessPool extends EventEmitter {
  constructor({
    path, max=6, cwd, env={},
    weights=[], // weights of processes, the length is equal to max
    strategy=defaultStrategy,
    ...
  }) {
    super(a);this.cwd = cwd || _path.dirname(path);
    this.env = { ... process.env, ... env };this.callbacks = {};
    this.pidMap = new Map(a);this.callbacksMap = new Map(a);this.connectionsMap={};
    this.forked = [];
    this.connectionsTimer = null;
    this.forkedMap = {};
    this.forkedPath = path;
    this.forkIndex = 0;
    this.maxInstance = max;
    this.weights = new Array(max).fill().map(
      (_, i) = > (isValidValue(weights[i]) ? weights[i] : 1));this.LB = new LoadBalancer({
      algorithm: strategy,
      targets: [],});this.initEvents();
  }

  /* -------------- internal -------------- */

  /* init events */
  initEvents = () = > {
    // process exit
    this.on('forked_exit'.(pid) = > {
      this.onForkedDisconnect(pid); }); . }/**
    * onForkedCreate [triggered when a process instance created]
    * @param  {[String]} pid [process pid]
    */
  onForkedCreate = (forked) = > {
    const pidsValue = this.forked.map(f= > f.pid);
    const length = this.forked.length;

    this.LB.add({
      id: forked.pid,
      weight: this.weights[length - 1]}); ProcessManager.listen(pidsValue,'node'.this.forkedPath); . }/**
    * onForkedDisconnect [triggered when a process instance disconnect]
    * @param  {[String]} pid [process pid]
    */
   onForkedDisconnect = (pid) = > {
    const length = this.forked.length;

    removeForkedFromPool(this.forked, pid, this.pidMap);
    this.LB.del({
      id: pid,
      weight: this.weights[length - 1]}); ProcessManager.unlisten([pid]); . }/* Get a process instance from the pool */
  getForkedFromPool = (id="default") = > {
    let forked;
    if (!this.pidMap.get(id)) {
      // create new process and put it into the pool
      if (this.forked.length < this.maxInstance) {
        inspectStartIndex ++;
        forked = new ForkedProcess(
          this.this.forkedPath,
          this.env.NODE_ENV === "development" ? [`--inspect=${inspectStartIndex}`] : [],
          { cwd: this.cwd, env: { ...this.env, id }, stdio: 'pipe'});this.forked.push(forked);
        this.onForkedCreate(forked);
      } else {
      // get a process from the pool based on load balancing strategy
        forked = this.forkedMap[this.LB.pickOne().id];
      }
      if(id ! = ='default') {
        this.pidMap.set(id, forked.pid); }}else {
      // pick a special process from the pool
      forked = this.forkedMap[this.pidMap.get(id)];
    }

    if(! forked)throw new Error(`Get forked process from pool failed! the process pid: The ${this.pidMap.get(id)}. `);

    return forked;
  }

  /* -------------- caller -------------- */

  /**
  * send [Send request to a process]
  * @param  {[String]} taskName [task name - necessary]
  * @param  {[Any]} params [data passed to process - necessary]
  * @param  {[String]} id [the unique id bound to a process instance - not necessary]
  * @return {[Promise]} [return a Promise instance]
  */
  send = (taskName, params, givenId) = > {
    if (givenId === 'default') throw new Error('ChildProcessPool: Prohibit the use of this id value: [default] ! ')

    const id = getRandomString();
    const forked = this.getForkedFromPool(givenId);
    this.lifecycle.refresh([forked.pid]);

    return new Promise(resolve= > {
      this.callbacks[id] = resolve;
      forked.send({action: taskName, params, id }); }); }... }module.exports = ChildProcessPool;
Copy the code

VI. New feature: Intelligent start and stop of subprocesses


This feature is also what I call the process lifecycle.

When a child process is not invoked for a period of time, it automatically goes to sleep, reducing CPU usage (reducing memory usage is difficult). The duration of hibernation can be controlled by the creator and is 10 minutes by default. When a child process is hibernated, if new requests arrive and are sent to the hibernated process, the child process will automatically wake up and continue processing the current request. After a period of idleness, it will go to sleep again.

➣ Various ways to hibernate a process

1) If the process is paused, you can send SIGSTOP signal to the process, send SIGCONT signal to resume the process.

Node.js:

process.kill([pid], "SIGSTOP");
process.kill([pid], "SIGCONT");
Copy the code

Unix System (Not yet tested for Windows):

kill -STOP [pid]
kill -CONT [pid]
Copy the code

2) Node.js’ new atom. wait API can also be programmed. This method listens for a value at a given index of an Int32Array object and waits (blocking event loop) until a timeout occurs (as determined by the MS parameter) if the value does not change. This shared data can be manipulated in the main process and then unlocked for the child process.

const nil = new Int32Array(new SharedArrayBuffer(4));
const array = new Array(100000).fill(0);
setInterval(() = > {
console.log(1);
}, 1e3);
Atomics.wait(nil, 0.0.Number(600e3));
Copy the code

➣ LifeCycle implementation

The code is also simple, with a few caveats:

  1. Using the mark elimination method, the child process triggers the request to update the call time, while using a timer cycle to calculate the (current time – last call time) difference between each monitored child process. If any process exceeds the set time, send the sleep signal, along with all process Pids.

  2. Each ChildProcessPool process pool instance has a ProcessLifeCycle instance object that controls the sleep/wake up of processes in the current process pool. ChildProcessPool listens for the sleep of ProcessLifeCycle and calls the sleep() method of the ForkedProcess when it has the PROCESS PID that needs the sleep. The process is automatically woken up when the next request is sent to it.

const EventEmitter = require('events');

const defaultLifecycle = {
  expect: 600e3.// default timeout 10 minutes
  internal: 30e3 // default loop check interval 30 seconds
};

class ProcessLifeCycle extends EventEmitter {
  constructor(options) {
    super(a);const {
      expect=defaultLifecycle.expect,
      internal=defaultLifecycle.internal
    } = options;
    this.timer = null;
    this.internal = internal;
    this.expect = expect;
    this.params = {
      activities: new Map()
    };
  }

  /* task check loop */
  taskLoop = () = > {
    if (this.timer) return console.warn('ProcessLifeCycle: the task loop is already running');

    this.timer = setInterval(() = > {
      const sleepTasks = [];
      const date = new Date(a);const { activities } = this.params;
      ([...activities.entries()]).map(([key, value]) = > {
        if (date - value > this.expect) { sleepTasks.push(key); }});if (sleepTasks.length) {
        // this.unwatch(sleepTasks);
        this.emit('sleep', sleepTasks); }},this.internal);
  }

  /* watch processes */
  watch = (ids=[]) = > {
    ids.forEach(id= > {
      this.params.activities.set(id, new Date());
    });
  }

  /* unwatch processes */
  unwatch = (ids=[]) = > {
    ids.forEach(id= > {
      this.params.activities.delete(id);
    });
  }

  /* stop task check loop */
  stop = () = > {
    clearInterval(this.timer);
    this.timer = null;
  }

  /* start task check loop */
  start = () = > {
    this.taskLoop();
  }

  /* refresh tasks */
  refresh = (ids=[]) = > {
    ids.forEach(id= > {
      if (this.params.activities.has(id)) {
        this.params.activities.set(id, new Date());
      } else {
        console.warn(`The task with id ${id} is not being watched.`); }}); }}module.exports = Object.assign(ProcessLifeCycle, { defaultLifecycle });
Copy the code

➣ Rudimentary process mutex

In the previous article, I read about api-atomic. wait. In addition to implementing process sleep, Atomic can use it to understand how process mutex is implemented. Here is a basic prototype for reference, and you can refer to MDN for documentation.

The AsyncLock object needs to be introduced in the child process. The AsyncLock constructor has a parameter sab that needs to be noticed. SharedArrayBuffer this parameter is a SharedArrayBuffer shared data block. This shared data block needs to be created in the main process and then sent to each child process through IPC communication. IPC communication usually serializes common data such as Object/Array. The result is that the message receiver and the message sender get different objects, but the SharedArrayBuffer objects sent via IPC point to the same memory block.

After the AsyncLock instance is created using SharedArrayBuffer data in the child process, any modification of the shared data by any child process will cause the SharedArrayBuffer data content pointing to the memory in the other process to change. This is the basic point of using it to implement process locks.

A quick explanation of the Atomic API:

  • Atomics.compareExchange(typedArray, Index, expectedValue, newValue) : The static Atomics.compareExchange() method replaces the value on the array with the given replacement value when the value is equal to the expected value and returns the old value. This atomic operation guarantees that no further writes will occur until the modified value is written.
  • Atomics.waitasync (typedArray, Index, value[, timeout]) : The static atomics.wait () method ensures that a given value at a given location in the Int32Array array has not changed and is still the given value, and the process will sleep until it is woken up or timed out. This method returns a string with a value of “OK “, “not-equal”, or “timed out”.
  • Atomics.notify() : The static method atomics.notify () wakes up a specified number of dormant processes in the wait queue. If count is not specified, all processes are awakened by default.

AsyncLock is an asynchronous lock that does not block the main thread while waiting for the lock to be released. Focus on the executeAfterLocked() method, which is called with a callback function that executes after the lock has been acquired and automatically releases the lock when it is finished. The key to this step is the tryGetLock() function, which returns a Promise object, so our logic of waiting for the lock to be released is executed in the microtask queue without blocking the main thread.

/ * * *@name AsyncLock
  * @description* Use it in child processes, mutex lock logic. * First create SharedArrayBuffer in main process and transfer it to all child processes to control the  lock. */

class AsyncLock {
  static INDEX = 0;
  static UNLOCKED = 0;
  static LOCKED = 1;

  constructor(sab) {
    this.sab = sab; // data like this: const sab = new SharedArrayBuffer(16);
    this.i32a = new Int32Array(sab);
  }

  lock() {
    while (true) {
      const oldValue = Atomics.compareExchange(
        this.i32a, AsyncLock.INDEX,
        AsyncLock.UNLOCKED, // old
        AsyncLock.LOCKED // new
      );
      if (oldValue == AsyncLock.UNLOCKED) { // success
        return;
      }
      Atomics.wait( // wait
        this.i32a,
        AsyncLock.INDEX,
        AsyncLock.LOCKED // expect); }}unlock() {
    const oldValue = Atomics.compareExchange(
      this.i32a, AsyncLock.INDEX,
      AsyncLock.LOCKED,
      AsyncLock.UNLOCKED
    );
    if(oldValue ! = AsyncLock.LOCKED) {// failed
      throw new Error('Tried to unlock while not holding the mutex');
    }
    Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
  }

  /**
    * executeLocked [async function to acquired the lock and execute callback]
    * @param  {Function} callback [callback function]
    */
  executeAfterLocked(callback) {

    const tryGetLock = async() = > {while (true) {
        const oldValue = Atomics.compareExchange(
          this.i32a,
          AsyncLock.INDEX,
          AsyncLock.UNLOCKED,
          AsyncLock.LOCKED
        );
        if (oldValue == AsyncLock.UNLOCKED) { // success if AsyncLock.UNLOCKED
          callback();
          this.unlock();
          return;
        }
        const result = Atomics.waitAsync( // wait when AsyncLock.LOCKED
          this.i32a,
          AsyncLock.INDEX,
          AsyncLock.LOCKED
        );
        await result.value; // return a Promise, will not block the main thread} } tryGetLock(); }}Copy the code

VII. Known existing problems


  1. Since the native remote API of Electron is used, some features of Electron – Re (service-related) do not support Electron 14 and later versions (remote has been removed). The use of third-party remote libraries for alternative compatibility is being considered in the near future.

  2. Fault tolerance processing is not good enough, this piece will become an important optimization point later.

  3. The “call count” method is used to collect the number of active connections in the process pool. This approach is not very good or accurate, but there is no better way to count the number of active connections in child processes. I think it’s better to start at the bottom, like macro and micro task queues, V8 virtual machines, garbage collection, Libuv underlying principles, Node processes and threads…

  4. The process sleep function has not been tested on The Windows platform. The Windows platform does not support process signals, but Node provides simulation support, but the specific performance needs to be tested.

VIII. Next To Do


  • Have the Service automatically restart when the support code is updated
  • Add the ChildProcessPool scheduling logic
  • Optimized the output of ChildProcessPool multi-process console
  • Added a visual process management page
  • Enhanced the ChildProcessPool process pool
  • Enhanced ProcessHost transaction center functionality
  • Implementation of mutex logic between child processes
  • Use the external remote library to support the latest version of Electron
  • Kill Bugs 🐛

IX. Several practical use examples


  1. Electronux – an Electron project, I use the BrowserService/MessageChannel, and comes with a ChildProcessPool/ProcessHost using the demo.

  2. Shadow Socks – Electron – My other electron cross-platform desktop application project, using electron- Re for debugging and development, The ProcessManager UI can also be turned on in a production environment for CPU/Memory resource usage monitoring and request log viewing.

  3. File-slice-upload – a demo for parallel uploads of multiple file fragments, using ChildProcessPool and ProcessHost, based on [email protected].

  4. You can also directly view the test sample files in index.test.js and test directories, which contain some usage examples.

  5. Github – README also has instructions.