This article is provided with demo, click here to access demo

Tasks that consume a large amount of CPU computing resources are called CPU-bound tasks.

Its main feature is high CPU utilization, rather than heavy I/O operations. Let’s take a quick look at how these types of tasks behave in Node.js.

Solving subset summation problems by force

Problem: Similar to Leetcode-40. Combination Sum II

Given a collection of candidate numbers (C) and a target number (T), find all unique combinations in C where the candidate numbers sums to T.


Each number in C may only be used once in the combination.

Example:

For example, given candidate set
[10, 1, 2, 7, 6, 1, 5] and target 8, A solution set is:

[1, 7], [1, 2, 5], [2, 6],Copy the code

The violence algorithm is a recursive algorithm, and the focus here is on node.js solutions, not the algorithm itself.

Building the Demo itself

We used EventEmitter to build this demo:

"use strict"; const EventEmitter = require('events').EventEmitter; Class SubsetSum extends EventEmitter {/** * constructor, * @param {Number} sum Constructor (sum, set) {super(); / / constructor(sum, set) {super(); this.sum = sum; this.set = set; this.totalSubsets = 0; } /** * recursively generate every possible subset without giving CPU control back to the event loop. * @param {Array} set subset */ _combine(set, subset) {for(let I = 0; i < set.length; i++) { let newSubset = subset.concat(set[i]); this._combine(set.slice(i + 1), newSubset); this._processSubset(newSubset); }} /** * Check whether the subset meets the requirements, once matched, Subset {console.log(' subset ', ++this.totalSubsets, subset); const res = subset.reduce((prev, item) => (prev + item), 0); if(res == this.sum) { this.emit('match', subset); } /** * start() {this._combine(this.set, []);} /** * start() {this._combine(this.set, []); this.emit('end'); } } module.exports = SubsetSum;Copy the code

To illustrate the problems caused by cpu-bound tasks, create an HTTP server that responds to network requests:

"use strict"; const http = require('http'); const SubsetSum = require('./subsetSum'); http.createServer((req, res) => { const url = require('url').parse(req.url, true); if(url.pathname === '/subsetSum') { const data = JSON.parse(url.query.data); res.writeHead(200); const subsetSum = new SubsetSum(url.query.sum, data); subsetSum.on('match', match => { res.write('Match: ' + JSON.stringify(match) + '\n'); }); subsetSum.on('end', () => res.end()); subsetSum.start(); } else { res.writeHead(200); res.end('I\m alive! \n'); } }).listen(8000, () => console.log('Started'));Copy the code

Since the SubsetSum instance returns the result using an event, we can process the matched result using Stream immediately after the algorithm is generated. Another detail to note is that each time our server returns I’m alive! So that each time we send a request that is different from /subsetSum. Can be used to check if our server is down, as you’ll see later.

Up and running:

node app
Copy the code

Once the server is up, we are ready to send our first request; Let’s try to send a set of 17 random numbers, the server will always be blocked in the calculation, which will cause the server to process for some time:

curl -G http://localhost:8000/subsetSum --data-urlencode "Data = [116119101101-116109101, 105, 102117, 115, 97119, 116, 104, 105115]" - data - urlencode "sum = 0"Copy the code

This is a huge problem if we try to type the following command on another terminal while the first request is still running:

curl -G http://localhost:8000
Copy the code

Option 1: Use setImmediate

Now let’s see how this pattern applies to subset summation algorithms. All we need to do is slightly modify the subsetsum.js module. For convenience, we’ll create a new module called subsetSumdefer.js, using the code of the original subsetSum class as a starting point. The first change we’ll make is to add a new method called _combineInterleaved(), which is at the heart of the pattern we’re implementing:

_combineInterleaved(set, subset) { this.runningCombine++; setImmediate(() => { this._combine(set, subset); if(--this.runningCombine === 0) { this.emit('end'); }}); }Copy the code

As we can see, all we have to do is use setImmediate() to call the primitive synchronized _combine() method. However, the problem now is that the algorithm is no longer synchronous, making it more difficult to know when all combinations have been evaluated.

To solve this problem, we have to trace all running instances of the _combine() method using a pattern very similar to the Asynchronous parallel execution we saw in chapter3-asynchronous Control Flow Patterns with Callbacks. When all instances of the _combine() method have finished running, the end event is fired to inform any listeners that all actions that the process needs to do have been completed.

Refactored version of the final subset summation algorithm. First, we need to replace the recursive steps in the _combine() method with asynchronous ones:

_combine(set, subset) { for(let i = 0; i < set.length; i++) { let newSubset = subset.concat(set[i]); this._combineInterleaved(set.slice(i + 1), newSubset); this._processSubset(newSubset); }}Copy the code

With the change above, we ensure that each step of the algorithm will use setImmediate() to queue up in the event loop, executing after I/O requests in the event loop queue, instead of running synchronously causing blocking.

Another minor tweak is for the start() method:

start() {
  this.runningCombine = 0;
  this._combineInterleaved(this.set, []);
}
Copy the code

In the previous code, we initialized the number of running instances of the _combine() method to 0. We also replace the call with _combine() by calling _combineInterleaved() and remove the trigger of end, since _combineInterleaved() is now handled asynchronously. With this final change, our subset summation algorithm should now be able to run its potentially cpu-heavy code alternately at intervals that the event loop can run without causing any more blocking.

Finally update the app.js module so that it can use the new version of SubsetSum:

const http = require('http'); // const SubsetSum = require('./subsetSum'); const SubsetSum = require('./subsetSumDefer'); http.createServer(function(req, res) { // ... }) "use strict"; const EventEmitter = require('events').EventEmitter; class SubsetSumDefer extends EventEmitter { constructor(sum, set) { super(); this.sum = sum; this.set = set; this.totalSubsets = 0; } /** * add a _combineInterleaved method, @param {Array} subset {Array} subset */ _combineInterleaved(set, subset) { this.runningCombine++; setImmediate(() => { this._combine(set, subset); if(--this.runningCombine === 0) { this.emit('end'); }}); } _combine(set, subset) { for(let i = 0; i < set.length; i++) { let newSubset = subset.concat(set[i]); Set.slice (I + 1), newSubset); this._processSubset(newSubset); } } _processSubset(subset) { console.log('Subset', ++this.totalSubsets, subset); const res = subset.reduce((prev, item) => prev + item, 0); if(res == this.sum) { this.emit('match', subset); }} start() {// Set a counter and count to zero this. RunningCombine = 0; this._combineInterleaved(this.set, []); } } module.exports = SubsetSumDefer;Copy the code

Scheme 2: Implement a process pool

Start by building the processpool.js module:

const fork = require('child_process').fork; class ProcessPool { constructor(file, poolMax) { this.file = file; this.poolMax = poolMax; // Maximum number of processes in the pool this.pool = []; // Ready to run the process this.active = []; // List of running processes, less than poolMax this.waiting = []; // task queue} //... }Copy the code

In the first part of the module, we introduce the child_process.fork() function that we will use to create a new process. We then define the constructor for ProcessPool, which takes as parameters the file parameters representing the Node.js program to run and the maximum number of instances to run in the pool, poolMax. Then we define three instance variables:

  • poolRepresents a process that is ready to run
  • activeRepresents a list of currently running processes
  • waitingA task queue containing all of these requests holds tasks that cannot be implemented immediately due to a lack of available resources

Look at the acquire() method of the ProcessPool class, which is responsible for fetching a process ready to be used:

acquire(callback) {
  let worker;
  if(this.pool.length > 0) {  // [1]
    worker = this.pool.pop();
    this.active.push(worker);
    return process.nextTick(callback.bind(null, null, worker));
  }

  if(this.active.length >= this.poolMax) {  // [2]
    return this.waiting.push(callback);
  }

  worker = fork(this.file);  // [3]
  this.active.push(worker);
  process.nextTick(callback.bind(null, null, worker));
}
Copy the code

The function logic is as follows:

  1. If there is a process in the process pool that is ready to be used, we simply move it toactiveArray, and then call its callback function asynchronously.
  2. If there are no processes available in the pool, or the maximum number of running processes has been reached, you must wait. By putting the current callback intowaitingThe array.
  3. If we have not reached the maximum number of running processes, we will usechild_process.fork()Create a new process and add it toactiveList, and then invoke its callback.

The final method of the ProcessPool class is release(), which puts a process back into the ProcessPool:

release(worker) { if(this.waiting.length > 0) { // [1] const waitingCallback = this.waiting.shift(); waitingCallback(null, worker); } this.active = this.active.filter(w => worker ! == w); // [2] this.pool.push(worker); }Copy the code

The previous code is also simple and explains as follows:

  • If thewaitingThere are tasks in the task queue that need to be executed, and we just need to assign a process to that taskworkerThe execution.
  • Otherwise, if inwaitingThere are no tasks in the task queue that need to be executedactiveThe process in the process list of the

As we can see, the process is never interrupted, only constantly reassigning tasks to it, allowing us to save time and space by not restarting a process on each request. However, it is important to note that this may not always be the best option, depending largely on the requirements of our application. To reduce the memory usage of the process pool for a long time, the following adjustments are possible:

  • After a process has been idle for a period of time, the process is terminated to free up memory space.
  • Add a mechanism to terminate or restart unresponsive or crashed processes.
"use strict"; const fork = require('child_process').fork; class ProcessPool { constructor(file, poolMax) { this.file = file; this.poolMax = poolMax; // Maximum number of processes in the pool this.pool = []; // Ready to run the process this.active = []; // List of running processes, less than poolMax this.waiting = []; @param {Function} callback */ acquire(callback) {let worker; Worker = this.pool.pop(); if(this.pool.length > 0) {worker = this.pool.pop(); // Retrieve the process this.active.push(worker); NextTick (callback.bind(null, null, worker)); } if(this.active. Length >= this.poolmax) {// If there are no processes in the pool ready for use, or the maximum number of processes running has been reached, Return this.waiting.push(callback); } worker = fork(this.file); This.active.push (worker); this.active.push(worker); NextTick (callback.bind(null, null, worker)); // Add the current process to the list of running processes. } /** * Release running process resources, Release (worker) {if(this.waiting.length > 0) {// If there are other tasks waiting to be executed, Const waitingCallback = this.waiting.shift(); WaitingCallback (null, worker); } this.active = this.active. Filter (w => worker! == w); This.pool.push (worker); this.pool.push(worker); this.pool.push(worker); this.pool.push(worker); }} module.exports = ProcessPool;}} module.exports = ProcessPool; "use strict"; const fork = require('child_process').fork; class ProcessPool { constructor(file, poolMax) { this.file = file; this.poolMax = poolMax; // Maximum number of processes in the pool this.pool = []; // Ready to run the process this.active = []; // List of running processes, less than poolMax this.waiting = []; } acquire(callback) {let worker; Worker = this.pool.pop(); if(this.pool.length > 0) {worker = this.pool.pop(); // Retrieve the process this.active.push(worker); NextTick (callback.bind(null, null, worker)); } if(this.active. Length >= this.poolmax) {// If there are no processes in the pool ready for use, or the maximum number of processes running has been reached, Return this.waiting.push(callback); } worker = fork(this.file); This.active.push (worker); this.active.push(worker); NextTick (callback.bind(null, null, worker)); // Add the current process to the list of running processes. } release(worker) {if(this.waiting.length > 0) {// If there are tasks waiting to be executed, Const waitingCallback = this.waiting.shift(); WaitingCallback (null, worker); } this.active = this.active. Filter (w => worker! == w); This.pool.push (worker); this.pool.push(worker); this.pool.push(worker); this.pool.push(worker); }} module.exports = ProcessPool;}} module.exports = ProcessPool;Copy the code

Parent-child process communication

Now that our ProcessPool class is ready, we can use it to implement the SubsetSumFork module, which communicates with the child process to get the sum of the subset. As mentioned earlier, starting a process with child_process.fork() also gives us a simple message-based pipeline to see how it works by implementing the subsetSumfork.js module:

"use strict"; const EventEmitter = require('events').EventEmitter; const ProcessPool = require('./processPool'); const workers = new ProcessPool(__dirname + '/subsetSumWorker.js', 2); class SubsetSumFork extends EventEmitter { constructor(sum, set) { super(); this.sum = sum; this.set = set; } /** * Start SubsetSum */ start() {worker.acquire ((err, worker) => {// Try to obtain a new process worker.send({sum: this.sum, set: this.set}); Const onMessage = MSG => {if (msg.event === 'end') {if (msg.event === 'end') { RemoveListener ('message', onMessage); removeListener('message', onMessage); Worker. release(worker); } this.emit(msg.event, msg.data); // Other events are emitted to external listeners}; worker.on('message', onMessage); // Listen for messages from child processes}); } } module.exports = SubsetSumFork;Copy the code

First, notice that we call the constructor of ProcessPool in subsetSumworker.js to create an instance of ProcessPool. We also set the maximum capacity of the process pool to 2.

In addition, we try to maintain the same public API as the original SubsetSum class. SubsetSumFork is a subclass of EventEmitter whose constructor accepts sum and set. The start() method triggers the execution of the algorithm, and the SubsetSumFork instance runs on a separate process. What happens when the start() method is called:

  1. We tried to get a new child process from the process pool. After successfully creating the process, we try to send a message to the child process containingsumandset.send()Method isNode.jsAutomatically supplied tochild_process.fork()All of the processes created, which actually have to do with the communication pipeline between parent and child processes.
  2. We then start listening for any messages returned by the child process that we useon()Method appends a new event listener (which is also all thatchild_process.fork()Part of the communication channel provided by the created process.
  3. In the event listener, we first check to see if we received oneendEvent, which meansSubsetSumAll tasks have been completed, in which case we deleteonMessageListener and releaseworkerAnd put it back into the process pool, no longer allowing it to consume memory resources andCPUResources.
  4. workerIn order to{event, data}The format generates the message so that we can receive it externally any time the child process has finished processing the task.

This is the SubsetSumFork module. Now let’s implement the worker application.

Communicates with the parent process

Now let’s create the subsetSumworker.js module, our application, the entire contents of which will run in a separate process:

"use strict"; const SubsetSum = require('./subsetSum'); Process. on('message', MSG => {// The child executes a callback when it receives a message from the parent, Const subsetSum = new subsetSum (MSG. Sum, MSG. Set); const subsetSum = new subsetSum (MSG. Sum, MSG. Subsetsum. on('match', data => {// If the match event from subsetSum is received, the parent process is sent the received data process.send({event: 'match', data: data}); }); Subsetsum. on('end', data => {process.send({event: 'end', data: data}); }); subsetSum.start(); });Copy the code

Since our handler is in a separate process, we don’t have to worry about such CPU-bound tasks blocking the event loop, and all HTTP requests will continue to be handled by the main application’s event loop without interruption.

When the child process starts, the parent process:

  1. The child immediately starts listening for messages from the parent. This can be done byprocess.on()Functions are easy to implement. We expect the only message from the parent process to be newSubsetSumThe task provides the input message. As soon as we receive such a message, we create oneSubsetSumClass, and registermatchandendEvent listeners. And finally, we usesubsetSum.start()Let’s start counting.
  2. Each time the subset summation algorithm receives an event, it encapsulates the result in the format{event, data}Object and send it to the parent process. These messages are then sent insubsetSumFork.jsModule, as we saw in the previous section.

Note: when the child process is not
Node.jsProcess, the above communication channel is not available. In this case, we can still establish interfaces for parent-child communication by implementing our own protocols on top of the standard input and standard output streams exposed to the parent process.