Use cluster example

Starting a server in Node is as simple as a few lines of code. Node instances are also single-threaded and use event loops to achieve high concurrency. When we use the kernel count of our server to make use of (improve QPS throughput), then there is a cluster that we use, and the usage method is very simple:

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
// When a child is forked, the child will also re-execute the file
console.log('Begin to run js file. pid = ' + process.pid);

// The default program first loads the master logic, so isMaster must be true.
if (cluster.isMaster) {
  console.log(` main process${process.pid}Running ');

  // Spawn the work process.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }
  cluster.on('exit'.(worker, code, signal) = > {
    console.log('Working process${worker.process.pid}Have withdrawn from `);
  });
} else {
  // Worker processes can share any TCP connection.
  // In this case, the HTTP server is shared.
  http.createServer((req, res) = > {
    res.writeHead(200);
    res.end(`response from worker ${process.pid}`);
  }).listen(9000);

  console.log('Working process${process.pid}Has launched `);
}

Copy the code

By doing this, a multi-process Node service is set up.

(1) Raise some questions

However, you and I, who love the beauty of source code, must be wondering how this can start several processes. The following questions may be listed:

  1. If it entered if, how could it enter else and create a new server in several child processes?
  2. What are the roles of the main and child processes? How do they coordinate their work?
  3. Why does the child process listen on the same port when creating the server, but does not report an error about port occupation?

With the in-depth analysis, there will be more questions in front of us. Take your time and solve the above framework problems first.

So let’s throw out the conclusion.

(2) Make a simple answer from the level of architecture

Question 1 answer:

Within the cluster, when executing the fork method in our business code, the current JS file is passed in as the first argument to fork (see the child_process.fork API documentation for details), so the child process will also execute the js file when it is created.

Answer to question 2:

This involves the relationship between the master and child processes. The above first

The master process is responsible for collecting and distributing requests. That is, when a request comes in, it first calls the master, and then the master somehow forwards it to a specific worker process. (Is the Master role similar to nginx’s reverse proxy?) .

At this point, a detail comes out. We didn’t create the master in our code! So we can go out on a limb and say cluster created it for us. The source code will be discussed in detail later.

Answer to question 3:

If the main process does not start the server when creating the first worker process, create a server and listen for that port. Methods are then used to prevent the child process from listening on a specific port. The main process starts a server and starts port listening. The child process starts a server, but does not listen for a specific port. Then, through some methods, the main process and child process to connect communication, such as a request to come over, the main process holds the handle of the child process, it is natural to call the child process for processing, and after the completion of processing the results back to the main process.

In fact, through the above questions and answers, the whole cluster has a whole frame view. Next, I will analyze the author’s reasons for doing so through examples one by one, so as to have a deeper understanding of cluster.

Now what if we want to implement a multi-process architecture ourselves?

I can think of ways to implement multiple processes

Here we go!

To implement it yourself, you need to know that you need to create the child process by child_process.fork(). Cluster is also the underlying call to this method, as mentioned earlier.

2.1 Directly Creating the subprocess Server

Case 1:

// master.js

 const {fork} = require('child_process');
 const os = require('os');
// For example, a 4-core CPU
for(let i=0; i< os.cpus().length; i++) { fork('./worker.js');
 }
 
 
 // woker.js
 
 const http = require('http');

http.createServer((req, res) = > {
    res.send('Create child process');
}).listen(3000);

Copy the code

By executing this example, you can find the error that the port has been occupied. The reason is that generally the same port on the same machine in a Tcp connection can only be used by one process. So creating multiple child processes in this way is not an option.

2.2 Creating a Server by listening on multiple ports

Example 2:

If it’s not possible to listen on the same port number, let’s listen on multiple ports

// master.js

 const {fork} = require('child_process');
 const os = require('os');

 for(let i=0; i<1; i++) {
     let port = Math.ceil(Math.random() * 5000);
     fork('./worker.js', [port]);
 }
 
 
 // woker.js
 
const port = process.argv[2];
 const http = require('http');

 http.createServer((req, res) = > {
     res.send('Create child process');
 }).listen(port);

Copy the code

This one above will solve the problem. But why not the NodeJS gods? Ports are inherently limited, it’s not an elegant approach, and more importantly it’s not possible to distribute requests right now. (Actually, you can use connection pooling to do this, and use connection pooling to schedule child processes, which is another way to do this, but cluster is not implemented that way.)

2.3 Creating the main process server and sub-process Server process Processing

Example 3:

// master.js

const {fork} = require('child_process');
const os = require('os');
const net = require('net');

const workers = [];
for (let i =0; i< os.cpus().length; i++) {
    const worker = fork('./worker.js');
    workers.push(worker);
}

// Create a TCP server and pass the TCP Server to the worker process
const server = net.createServer();
server.listen(3000.() = > {
    console.log('Send to worker process');
    workers.forEach(worker= > {
        worker.send('SERVER', server);
    });
    // Prevents the server from accepting new connections and keeping existing ones
    server.close();
})


// worker.js

const http = require('http');

const httpServer = http.createServer((req, res) = > {

    res.end(`Hello worker by ${process.pid}`);
});

process.on('message'.(msg, tcpServer) = > {
    console.log('Worker process listens for message, pid:' + process.pid);
    if (msg === 'SERVER') {
        tcpServer.on('connection'.socket= > {
            // Pass the main server socket to the child process
            httpServer.emit('connection', socket); })}})Copy the code

Example 3, in which the master passes a handle (a reference that identifies a resource and contains an internal file descriptor pointing to an object) to the worker process, is relatively close to the implementation of cluster. In this case, however, there is no effective load balancing, and four processes are competing for the incoming requests, resulting in a process processing more than 97% of the requests.

Each of these approaches has its drawbacks. We also believe that through the above several ways to achieve the multi process can have a feeling. So the analysis begins.

Third, source code analysis

I have thought about how to share it for a long time, because I don’t understand it so thoroughly. I don’t think I am good at some of the underlying logic of c++. Strong said I might be wrong, but if I say it is too rough, I will always feel that I can’t grasp the skeleton without flesh and blood filling. Therefore, I still try my best to understand the general idea of the premise, js part as much as possible to speak more detailed, c++ part of the general first, after c++ this part of the internal logic and then add. Thank you very much for your comments.

3.1 Resolve the multi-process initialization problem before the client request arrives

  1. The initial loading of our Cluster business code

So let’s take a look at the Node source code

if (cluster.isMaster) {

// cluster.js

'use strict';
// NODE_UNIQUE_ID is the id of the child process, which will be assigned during the child process creation
const childOrPrimary = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'primary';
The primary process is exported by default
module.exports = require(`internal/cluster/${childOrPrimary}`);

Copy the code

So take a look at primary.js

const handles = new SafeMap();
cluster.isWorker = false;
// You can see that the active assignment is true, and this property will be removed in future versions and replaced with isPrimary
cluster.isMaster = true; // Deprecated alias. Must be same as isPrimary.
cluster.isPrimary = true;
cluster.Worker = Worker;
cluster.workers = {};
cluster.settings = {};
cluster.SCHED_NONE = SCHED_NONE;  // Leave it to the operating system.
cluster.SCHED_RR = SCHED_RR;

Copy the code

Further down the business code logic, we enter the fork logic, which in turn leads to the fork method in primary.js

cluster.fork = function(env) {
  // Do some setting and listen for internalMessage events
  cluster.setupPrimary();
  const id = ++ids;
  // Start creating the worker process, using the fork of child_process
  const workerProcess = createWorkerProcess(id, env);
  // instantiate Worker
  const worker = new Worker({
    id: id,
    process: workerProcess
  });

  worker.on('message'.function(message, handle) {
    cluster.emit('message'.this, message, handle);
  });
  / /...
  // See the following logic later

Copy the code

The createWorkerProcess() method forks the child process. (Focus)


function createWorkerProcess(id, env) {
  // NODE_UNIQUE_ID variable appears!! That is, only when the child process is created.
  constworkerEnv = { ... process.env, ... env,NODE_UNIQUE_ID: `${id}` };
  /* node --harmony script.js --version process.argv ['/usr/local/bin/node', 'script.js', '--version'] */
  const execArgv = [...cluster.settings.execArgv];
  const debugArgRegex = /--inspect(? :-brk|-port)? |--debug-port/;
  const nodeOptions = process.env.NODE_OPTIONS || ' ';

  // Related to inspect, this detail can be skipped
  / /...
  
  
  // Start executing the fork. Note that the fork is executing the same business code we wrote. This explains why the else logic in our business code is executing again.
  // So, we find that the business code file that starts the child process is executed multiple times,
  // Each time it is executed, its process.pid is actually different.

  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    serialization: cluster.settings.serialization,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid
  });
}

Copy the code

After the child process is created, the Worker instance is created using new Worker.

Seeing this, I believe that the first problem at the beginning has been solved from the source perspective. Question 2: How do the main and child processes work together? At the beginning of the article when a head, now detailed analysis. The question left at the beginning is how is the server for the main process created?

The key is the Listen method when the service is created

At this point, we know that the main process and several child processes have been created, and we know from our business code that once the child process has been successfully created, we need to create an HTTP or NET service.

http.createServer((req, res) = > {
    res.writeHead(200);
    res.end(`response from worker ${process.pid}`);
  }).listen(9000);

Copy the code

CreateServer calls Net. createServer and net listen.

Server.prototype.listen = function(. args) {
  const normalized = normalizeArgs(args);
  let options = normalized[0];
  const cb = normalized[1];
  
  / /...
  const backlogFromArgs =
    // (handle, backlog) or (path, backlog) or (port, backlog)
    toNumber(args.length > 1 && args[1]) ||
    toNumber(args.length > 2 && args[2]);  // (port, host, backlog)

  options = options._handle || options.handle || options;
  
  / /...
  
  // In our business code, we are listening for port: 9000, so we will use this logic
  if (typeof options.port === 'number' || typeof options.port === 'string') {
    validatePort(options.port, 'options.port');
    backlog = options.backlog || backlogFromArgs;
    // start TCP server listening on host:port
    if (options.host) {
      lookupAndListen(this, options.port | 0, options.host, backlog,
                      options.exclusive, flags);
    } else { // Undefined host, listens on unspecified address
      // Default addressType 4 will be used to search for primary server
     // Host is not passed, so it will go here
      listenInCluster(this.null, options.port | 0.4,
                      backlog, undefined, options.exclusive);
    }
    return this; }}/ / listenInCluster logic
 
 function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) { exclusive = !! exclusive;if (cluster === undefined) cluster = require('cluster');

      if (cluster.isPrimary || exclusive) {
        // Will create a new handle
        // _listen2 sets up the listened handle, it is still named like this
        // to avoid breaking code that wraps this method
        // If it is the main process, it will listen directly and set the new handle
        server._listen2(address, port, addressType, backlog, fd, flags);
        return;
      }
      // We will use this logic for now, because we are still in child logic
      const serverQuery = {
        address: address,
        port: port,
        addressType: addressType,
        fd: fd,
        flags,
      };

      // Get the primary's server handle, and listen on it
      // child process passes Server instance directly to function to execute _getServer in./child.js,
      / / because in the child, so will go to the node - master/lib/internal/cluster/child. The js

      // Also, queryServer () continues, and in primary.js, start creating RoundRobinHandle
      cluster._getServer(server, serverQuery, listenOnPrimaryHandle);

      function listenOnPrimaryHandle(err, handle) {
        err = checkBindError(err, port, handle);

        if (err) {
          const ex = exceptionWithHostPort(err, 'bind', address, port);
          return server.emit('error', ex);
        }

        // Reuse primary's server handle
        server._handle = handle;
        // _listen2 sets up the listened handle, it is still named like this
        // to avoid breaking code that wraps this methodserver._listen2(address, port, addressType, backlog, fd, flags); }}Copy the code

Now create the server in the child process, this logic is not messy so, go into child.js and execute the _getServer method, and let’s assume it’s on a Unix-like system, not considering Win32

// obj is the server instance
cluster._getServer = function(obj, options, cb) {
  let address = options.address;

  // Resolve unix socket paths to absolute paths
 
  if (options.port < 0 && typeof address === 'string'&& process.platform ! = ='win32')
    address = path.resolve(address);

  const indexesKey = ArrayPrototypeJoin(
    [
      address,
      options.port,
      options.addressType,
      options.fd,
    ], ':');

  let indexSet = indexes.get(indexesKey);

  if (indexSet === undefined) {
    indexSet = { nextIndex: 0.set: new SafeSet() };
    indexes.set(indexesKey, indexSet);
  }
  const index = indexSet.nextIndex++;
  indexSet.set.add(index);

  const message = {
    act: 'queryServer',
    index,
    data: null. options }; message.address = address;// Set custom data on handle (i.e. tls tickets key)
  if (obj._getServerData)
    message.data = obj._getServerData();
  // The child sends the message, executes the callback,
  // send: message = util._extend({CMD: 'NODE_CLUSTER'}, message);
  // This triggers an internalMessage event inside the node. It is worth mentioning here in reference to the CodeJS API
  send(message, (reply, handle) = > {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle) {
      // Shared listen socket
      shared(reply, { handle, indexesKey, index }, cb);
    } else {
      // Round-robin.rr(reply, { indexesKey, index }, cb); }}); obj.once('listening'.() = > {
    cluster.worker.state = 'listening';
    const address = obj.address();
    message.act = 'listening';
    message.port = (address && address.port) || options.port;
    send(message);
  });
};

Copy the code

Take a closer look at the send method in the above section. The send method is important because it wraps the message and sends it to the main process, which creates the server if it hasn’t created it and listens on the port.

// child.js

function send(message, cb) {
  return sendHelper(process, message, null, cb);
}

// internal/cluster/utils.js
function sendHelper(proc, message, handle, cb) {
  if(! proc.connected)return false;

  // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js
  // Execute internalMessage event
  message = { cmd: 'NODE_CLUSTER'. message, seq };if (typeof cb === 'function')
    callbacks.set(seq, cb);

  seq += 1;
  return proc.send(message, handle);
}

Copy the code

{CMD: ‘NODE_CLUSTER’} is an internal node event identifier, explained in nodeJS API:

The child Node.js process has its own process.send() method, which allows the child to send messages back to the parent.

There is a special case when sending {CMD: ‘NODE_foo’} messages. Messages with the NODE_ prefix in the CMD attribute are reserved for internal use by the Node.js kernel and will not trigger the child’s ‘message’ event. Instead, such messages can be triggered using the ‘internalMessage’ event and consumed internally by Node.js. Applications should avoid using such messages or listening for the ‘internalMessage’ event, as it may change without notification.

Child_process (child_process); child_process (child_process); child_process (child_process); Now to complete

cluster.fork = function(env) {
  // Do some setting and listen for internalMessage events
  cluster.setupPrimary();
  const id = ++ids;
  // Start creating the worker process, using the fork of child_process
  const workerProcess = createWorkerProcess(id, env);
  // instantiate Worker
  const worker = new Worker({
    id: id,
    process: workerProcess
  });
  // Set up to listen for events for each child process
  worker.on('message'.function(message, handle) {
    cluster.emit('message'.this, message, handle);
  });

  worker.process.once('exit'.(exitCode, signalCode) = > {
    /* * Remove the worker from the workers list only * if it has disconnected, otherwise we might * still want to access it. */
    if(! worker.isConnected()) { removeHandlesForWorker(worker); removeWorker(worker); } worker.exitedAfterDisconnect = !! worker.exitedAfterDisconnect; worker.state ='dead';
    worker.emit('exit', exitCode, signalCode);
    cluster.emit('exit', worker, exitCode, signalCode);
  });

  worker.process.once('disconnect'.() = > {
    /* * Now is a good time to remove the handles * associated with this worker because it is * not connected to the primary  anymore. */
    removeHandlesForWorker(worker);

    /* * Remove the worker from the workers list only * if its process has exited. Otherwise, we might * still want to access it. */
    if(worker.isDead()) removeWorker(worker); worker.exitedAfterDisconnect = !! worker.exitedAfterDisconnect; worker.state ='disconnected';
    worker.emit('disconnect');
    cluster.emit('disconnect', worker);
  });

  // When a message arrives, the message object is filtered. If message. CMD is prefixed with NODE, it responds to an internal event, internalMessage;
  // If message. CMD is NODEHANDLE, it takes the message.type value and restores an object with the resulting file descriptor.
  // The child process creates the corresponding TCP server object according to message.type
  worker.process.on('internalMessage', internal(worker, onmessage));
  process.nextTick(emitForkNT, worker);
  // Store the created worker process in cluster.workers,
  // For future round-robin calls
  cluster.workers[worker.id] = worker;
  return worker;
};

/ / the onmessage method
function onmessage(message, handle) {
  // The main process receives the logic of the listen method when the child process creates the HTTP/NET server
  const worker = this;

  // Message. act is passed to 'queryServer' at creation time
  const fn = methodMessageMapping[message.act];

  if (typeof fn === 'function')
     // Execute the queryServer method
    fn(worker, message);
}



// The queryServer method executes

function queryServer(worker, message) {
  // Stop processing if worker already disconnecting
  if (worker.exitedAfterDisconnect)
    return;

  const key = `${message.address}:${message.port}:${message.addressType}: ` +
              `${message.fd}:${message.index}`;
  let handle = handles.get(key);
  // For the first time, handle is undefined, so it follows the following logic
  if (handle === undefined) {
    let address = message.address;

    // Find shortest path for unix sockets because of the ~100 byte limit
    if (message.port < 0 && typeof address === 'string'&& process.platform ! = ='win32') {

      address = path.relative(process.cwd(), address);

      if (message.address.length < address.length)
        address = message.address;
    }

    // UDP is exempt from round-robin connection balancing for what should
    // be obvious reasons: it's connectionless. There is nothing to send to
    // the workers except raw datagrams and that's pointless.
    // Start creating RoundRobinHandle instance, which will be generated in the parent process.
    // This is very important. Find out how the main process server is created.
    if(schedulingPolicy ! == SCHED_RR || message.addressType ==='udp4' ||
        message.addressType === 'udp6') {
       // To create a UDP service
      handle = new SharedHandle(key, address, message);
    } else {
    // When creating a TCP service, use the round-robin algorithm to determine which sub-process server the main process server selects to process the request.
      handle = new RoundRobinHandle(key, address, message);
    }

    handles.set(key, handle);
  }

  if(! handle.data) handle.data = message.data;// Set custom server data
  // Run the add method on the RoundRobinHandle instance
  handle.add(worker, (errno, reply, handle) = > {
    const { data } = handles.get(key);

    if (errno)
      handles.delete(key);  // Gives other workers a chance to retry.

    // Re-send to the child, but hanle is null, that is, the main process handle is not passed to the child.
    send(worker, {
      errno,
      key,
      ack: message.seq, data, ... reply }, handle); }); }Copy the code

It’s worth looking at what the Round Handle does.

function RoundRobinHandle(key, address, { port, fd, flags }) {
  this.key = key;
  this.all = new SafeMap();
  this.free = new SafeMap();
  this.handles = [];
  this.handle = null;
  // This is where the server in the main process is created and listens on the port defined in the child process
  this.server = net.createServer(assert.fail);
  // Determine the file descriptor
  if (fd >= 0)
    this.server.listen({ fd });
  else if (port >= 0) {
    // Port is passed in the business code
    this.server.listen({
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
    });
  } else
    this.server.listen(address);  // UNIX socket path.

  this.server.once('listening'.() = > {
    this.handle = this.server._handle;
    // When onConnection is executed, this.distribute is executed
    this.handle.onconnection = (err, handle) = > this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
  });
}

Copy the code

ListenInCluster: listenInCluster: listenInCluster: listenInCluster: listenInCluster: listenInCluster: listenInCluster: listenInCluster: listenInCluster: listenInCluster: listenInCluster: listenInCluster: listenInCluster: listenInCluster: listenInCluster: listenInCluster


if (cluster.isPrimary || exclusive) {
    // Will create a new handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    // If it is the main process, it will listen directly and set the new handle
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }

Copy the code

The TCP connection handle of the main process is created in server._listen2. The handle will be used by the client in the future. It does this setupListenHandle method, which I’ll talk about later, but it’s good to know what the main process does.

Summary: At this stage, the child process’s server is created, and a server is also created in the main process, and the main process also listens on the port we defined.

The next step is to solve the port problem: the main server now listens on our default port, but does not pass the main server handle to the child process. So the child process server needs to do:

  • Child processes do not listen on ports. Otherwise, the ports shown in Example 1 will be occupied, or multiple ports will be opened in example 2, which is inconsistent with the original purpose of cluster.

After the main process’s server is created and the port is listened to, the child process is still waiting for the callback function to execute when the following

 handle.add(worker, (errno, reply, handle) = > {
     / /...
 }

Copy the code

Call the add prototype method in RoundRobinHandle:

RoundRobinHandle.prototype.add = function(worker, send) {
  assert(this.all.has(worker.id) === false);
  this.all.set(worker.id, worker);

  const done = () = > {
    if (this.handle.getsockname) {
      const out = {};
      this.handle.getsockname(out);
      // TODO(bnoordhuis) Check err.
      The send method is the callback that calls the add method and sends an event to the child process.
      send(null, { sockname: out }, null);
    } else {
      console.log('【round_robin_handle.js】 send(null, null, null);');
      send(null.null.null);  // UNIX socket.
    }
    // Push the current child into this.free (now just push, the client will retrieve the child to use when requested)
    this.handoff(worker);  // In case there are connections pending.
  };

  if (this.server === null)
    return done();

  // Still busy binding.
  this.server.once('listening', done);
  this.server.once('error'.(err) = > {
    send(err.errno, null);
  });
};

Copy the code

In the send operation, the target.send in child_process.js is executed, and finally causes the above callback function to fire:

// child.js

 send(message, (reply, handle) = > {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle) {
      // Shared listen socket
      shared(reply, { handle, indexesKey, index }, cb);
    } else {
      // Round-robin.
      // the rr method is executedrr(reply, { indexesKey, index }, cb); }});/ / rr method
  
  function rr(message, { indexesKey, index }, cb) {
  if (message.errno)
    return cb(message.errno, null);

  let key = message.key;

  function listen(backlog) {
    // TODO(bnoordhuis) Send a message to the primary that tells it to
    // update the backlog size. The actual backlog should probably be
    // the largest requested size by any worker.
    return 0;
  }

  function close() {
    // lib/net.js treats server._handle.close() as effectively synchronous.
    // That means there is a time window between the call to close() and
    // the ack by the primary process in which we can still receive handles.
    // onconnection() below handles that by sending those handles back to
    // the primary.
    if (key === undefined)
      return;

    send({ act: 'close', key });
    handles.delete(key);
    removeIndexesKey(indexesKey, index);
    key = undefined;
  }

  function getsockname(out) {
    if (key)
      ObjectAssign(out, message.sockname);

    return 0;
  }

  // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
  // with it. Fools net.Server into thinking that it's backed by a real
  // handle. Use a noop function for ref() and unref() because the control
  // channel is going to keep the worker alive anyway.
  // Here we hack, fake a fake handle to trick the upper level caller. When the listen function executes,
  // Always return 0
  const handle = { close, listen, ref: noop, unref: noop };

  if (message.sockname) {
    handle.getsockname = getsockname;  // TCP handles only.
  }

  assert(handles.has(key) === false);
  handles.set(key, handle);
  // Pass a fake handle to CB
  cb(0, handle);
}
  
 // The cb is the listenOnPrimaryHandle callback in net.js
 
 function listenOnPrimaryHandle(err, handle) {
    // After the main process is created, it passes data back to the child process, which then passes a fake Handle to the callback function
    err = checkBindError(err, port, handle);

    if (err) {
      const ex = exceptionWithHostPort(err, 'bind', address, port);
      return server.emit('error', ex);  
    }

    // Reuse primary's server handle
    server._handle = handle;
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    // The child process doesn't actually need to set listened Handle anymore, but calls this function for code naming integrity
    // Make a distinction in this function
    server._listen2(address, port, addressType, backlog, fd, flags);
  }
  
  

Copy the code

In this case, the setupListenHandle method is used because of this:

Server.prototype._listen2 = setupListenHandle; 
Copy the code

The setupListenHandle function has two parts of logic, one is to create TCP handles as mentioned above, and the other is to fack a handle, so that the child process does not actually listen on the port anymore.

function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  debug('setupListenHandle', address, port, addressType, backlog, fd);

  // If there is not yet a handle, we need to create one and bind.
  // In the case of a server sent via IPC, we don't need to do this.
  // When the child calls this method, it already has _handle, so it will not create handle
  if (this._handle) {
    debug('setupListenHandle: have a handle already');
  } else {
  // The main process will call
    debug('setupListenHandle: create a handle');

    let rval = null;

    // Try to bind to the unspecified IPv6 address, see if IPv6 is available
    if(! address &&typeoffd ! = ='number') {
      // The TCP connection path is created by creating the handle of the main process
      rval = createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags);

      if (typeof rval === 'number') {
        rval = null;
        address = DEFAULT_IPV4_ADDR;
        addressType = 4;
      } else {
        address = DEFAULT_IPV6_ADDR;
        addressType = 6; }}if (rval === null)
      rval = createServerHandle(address, port, addressType, fd, flags);

    if (typeof rval === 'number') {
      const error = uvExceptionWithHostPort(rval, 'listen', address, port);
      process.nextTick(emitErrorNT, this, error);
      return;
    }
    this._handle = rval;
  }

  this[async_id_symbol] = getNewAsyncId(this._handle);
  // Child processes also bind onConnection events
  this._handle.onconnection = onconnection;
  this._handle[owner_symbol] = this;

  // Use a backlog of 512 entries. We pass 511 to the listen() call because
  // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
  // which will thus give us a backlog of 512 entries.
  // In this case, _handle for the main process is real, and the main thread can be backlogged up to 511 requests, which is the same as redis, in order to prevent possible problems with inconsistency
  // Since handle is a fack, the child process returns 0 no matter how much it passes
  const err = this._handle.listen(backlog || 511);

  if (err) {
    const ex = uvExceptionWithHostPort(err, 'listen', address, port);
    this._handle.close();
    this._handle = null;
    defaultTriggerAsyncIdScope(this[async_id_symbol],
                               process.nextTick,
                               emitErrorNT,
                               this,
                               ex);
    return;
  }

  // Generate connection key, this should be unique to the connection
  // Return '6::::9000' for main process creation and '4:null:9000' for child process creation
  this._connectionKey = addressType + ':' + address + ':' + port;

  // Unref the handle if the server was unref'ed prior to listening
  if (this._unref)
    this.unref();

  defaultTriggerAsyncIdScope(this[async_id_symbol],
                             process.nextTick,
                             emitListeningNT,
                             this);
}

Copy the code

At this point, the entire multi-process service infrastructure is complete, without mentioning the link between the main process and the child process when the client request comes in.

3.2 The client requests the main process and sub-process to process the link

After 3.1 is completed, there is actually a step where the child emits’ listening ‘to the main process, which is triggered by the Server’s Listen method.

this.server.once('listening'.() = > {
    this.handle = this.server._handle;
    // OnConnection is executed when the client requests it
    this.handle.onconnection = (err, handle) = > this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
});

Copy the code

Now, a client request is called —- the connection event is triggered first:

// round_robin_handle.js
RoundRobinHandle.prototype.distribute = function(err, handle) {
  // Push client requests into handles first
  ArrayPrototypePush(this.handles, handle);
  // eslint-disable-next-line node-core/no-array-destructuring
  // Round-robin Selects the worker process logic: get an idle process from this.free
  const [ workerEntry ] = this.free; // this.free is a SafeMap

  if (ArrayIsArray(workerEntry)) {
    const { 0: workerId, 1: worker } = workerEntry;
    this.free.delete(workerId);
    this.handoff(worker); }};// handoff
RoundRobinHandle.prototype.handoff = function(worker) {
  if (!this.all.has(worker.id)) {
    return;  // Worker is closing (or has closed) the server.
  }
  // Get one from the request list (this.handles)
  const handle = ArrayPrototypeShift(this.handles);
  // The main process has no cached client requests.
  // Then push the current child into this.free and wait for the next round of client requests.
  if (handle === undefined) {
    this.free.set(worker.id, worker);  // Add to ready queue again.
    return;
  }
  // After the child process is selected, a processing message is sent to the child process.
  const message = { act: 'newconn'.key: this.key };
  // Handle -> The Tcp handle of the main process is passed to the previously selected worker process
  // sendHelper was seen before, wrapped as message = {CMD: 'NODE_CLUSTER',... message, seq }; Send to the child process.
  sendHelper(worker.process, message, handle, (reply) = > {
    if (reply.accepted)
      handle.close();
    else
      this.distribute(0, handle);  // Worker is shutting down. Send to another. In other words, if the current worker does not accept, it needs to resend to another worker

    this.handoff(worker);
  });
};

Copy the code

At this point, send is sent to the child process, and the child process starts processing.


// child.js

function onmessage(message, handle) {
    // If message.act is newconn, it is a new request obtained by the worker process and needs to be processed
    if (message.act === 'newconn')
      onconnection(message, handle);
    else if (message.act === 'disconnect')
      ReflectApply(_disconnect, worker, [true]);
}
  
function onconnection(message, handle) {
  // 'null:9000:4:undefined:0'
  const key = message.key;
  // Get the previously defined fake Handle
  const server = handles.get(key);
  // Indicates that the request can be accepted
  constaccepted = server ! = =undefined;
  // The main process is told that the child process can accept the request and process it
  send({ ack: message.seq, accepted });

  if (accepted)
    // create a new Socket instance using c++ logic
    server.onconnection(0, handle);
}

Copy the code
// net.js 

function onconnection(err, clientHandle) {
  debugger;
  const handle = this;
  const self = handle[owner_symbol];

  debug('onconnection');

  if (err) {
    self.emit('error', errnoException(err, 'accept'));
    return;
  }

  if (self.maxConnections && self._connections >= self.maxConnections) {
    clientHandle.close();
    return;
  }

  const socket = new Socket({
    handle: clientHandle,
    allowHalfOpen: self.allowHalfOpen,
    pauseOnCreate: self.pauseOnConnect,
    readable: true.writable: true
  });

  self._connections++;
  socket.server = self;
  socket._server = self;

  DTRACE_NET_SERVER_CONNECTION(socket);
  // Construct the socket object of the Node layer and trigger the Connection event to complete the connection between the underlying socket and the Node NET module and get through the request
  self.emit('connection', socket);
}

Copy the code

Specific socket how to process through c++, because not familiar with c++, unable to say in detail. But, logically, it must be:

The main process server listens on the port, and when the client requests it –> gets the free child from this. Free and sends the TCP Handle to the child —-> the child determines whether to accept the request for processing —-> If yes, it tells the main process, Net server creates a new socket instance with the incoming TCP handle and interacts with the underlying c++. Process —-. The result is returned to the main process, which returns the data to the client.


Through the above two major analyses, the overall pathway is known. I do not know the last part of the c++ underlying logic, but also welcome everyone to inform ~~

I hope I can give you a little bit of inspiration through the analysis of more than 1000 lines. Please like/read/comment ~~