Following on from the previous two articles on egg-core source code analysis and egg-core source code analysis, today we will look at the source code implementation logic of egg-cluster.

Javascript execution in NodeJs is single-threaded, so a process can only use one CPU. In order to maximize the use of server resources, we generally use the following three methods:

  • Deploy multiple Node services on the same machine, use different ports, and then use Nginx as load balancing to forward requests to different Node instances;
  • The PM2 process management tool allows multiple processes to use the same port. PM2 is responsible for restarting processes.
  • Node provides child_process and cluster modules to facilitate communication between multiple processes.

What is an egg – cluster

Egg-cluster is the startup mode of the egg multi-process model. When using the cluster mode to start an Egg application, we only need to configure the relevant startup parameters. The egg-Cluster automatically creates the related processes, and manages the communication between the processes and exception handling. The Master process creates the Agent child through the fork of the child_process module and the Worker child through the cluster module. Master/Agent/Worker perform their respective duties to ensure the normal operation of Egg application:

  • Master:

The Master process has only one and starts first. It is mainly responsible for process management, including initialization and restart of Worker and Agent processes and communication between processes. The Master does not run any business code, and its stability is particularly important. Once it dies, the entire Node service dies.

  • Agent

There is only one Agent process, which is rarely used in business development. It is mainly useful in two aspects :(1) if you want to make your code run on only one process (2) Agent process can broadcast a certain message to all Worker processes for processing;

  • Worker

There can be multiple Worker processes according to users’ own Settings, which are mainly responsible for processing business logic and users’ requests. When the Worker process exits abnormally, the Master process will restart a new Worker process.

The Agent subprocess is an instance of the egg. Agent class, the Worker subprocess is an instance of the egg. Application class, and both egg. Agent and egg. Application are subclasses of EggApplication. The EggApplication class is a subclass of EggCore. Egg-core is an example of egg-Core. The EggApplication class is a subclass of EggCore.

+ -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + example -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | Agent child processes -- -- -- -- -- -- -- - > | Agent class | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + / \ Child_process. Fork/inherited \ / \ + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + inheritance + -- -- -- -- -- -- -- -- -- -- -- -- + | Master main process | | EggApplication Class | -- -- -- -- -- - > | EggCore class | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- + \ \ / inheritance cluster fork \ / + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + example -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | Worker child processes -- -- -- -- -- - > | Application class | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- +Copy the code

Egg-cluster source code analysis

The entry to the egg-cluster module is master.js, and its initialization process is as follows:

  1. Initialization of workerManager instances and Messenger instances
  2. Automatic detection and acquisition of available clusterPort, using cluster-client makes direct communication between Agent and Worker possible
  3. Start the Agent subprocess
  4. Start the Worker child process
  5. After startup, real-time monitoring of each process service status
// Master inherits the EventEmitter module, which can be used to monitor and subscribe to events. Class Master extends EventEmitter {constructor(options) {super(); This.workermanager = new Manager(); This. Messenger = new messenger (this); ClusterPort detectPort((err, port) => {this.options. ClusterPort = port; // Step 3. Start the Agent child process this.forkAgentworker (); }); This. Once ('agent-start', this.forkappworkers.bind (this)); // Step 4. This.ready (() => {this.isstarted = true; this.isstarted = true; const action = 'egg-ready'; This.messenger. Send ({action, to: 'parent', data: {port: this[REALPORT], address: this[APP_ADDRESS] } }); this.messenger.send({ action, to: 'app', data: this.options }); this.messenger.send({ action, to: 'agent', data: this.options }); / / using workerManager startCheck function regularly monitor process state if (this. IsProduction) {this. WorkerManager. StartCheck (); }}); }}Copy the code

Step 1: Management of child processes and interprocess communication (Manager and Messenger)

  • manager

Manager implementation is relatively simple, mainly through the two attributes of workers and Agent to maintain the state information of the process, provides multiple functions for obtaining, deleting, setting the relevant process. Here’s a look at the implementation of the startCheck function that listens for the process’s survival status:

Class Manager extends EventEmitter {startCheck() {this.exception = 0; // Monitor the number of workers and agents every 10s this.timer = setInterval(() => {const count = this.count(); If (count. Agent && count. Worker) {this.exception = 0; return; } this.exception++; // If the service is not available for three consecutive checks, an exception event is raised. If (this.exception >= 3) {this.emit('exception', count); clearInterval(this.timer); }}, 10000); }}Copy the code

  • messenger

Both Worker subprocess and Agent subprocess can communicate with Master process through IPC, while Worker subprocess and Agent subprocess cannot communicate directly, so they must communicate through Master process as an intermediate bridge. Each Master/Agent/Worker process has an instance of Messenger, which is used to manage communication with other processes.

Note that the messenger class for master. messenger instance is defined in the egg-cluster source code. The messenger classes that correspond to agent. messenger and worker. messenger instances are defined in the egg source code. The former defines how the main process sends messages to its children, while the latter defines how the children send messages to their parents, and both are implemented abstractly based on another module, SendMessage. Since sendMessage is compatible with the differences between Master/Agent/Worker communication modes, why not implement the two Messenger classes in one module, which has a bit of repetitive code? Let’s look at the implementation of the SendMessage function:

Module. exports = function send(child, message) {// if there is no send function, Child emits a message event to the child if (typeof child.send! == 'function') { return setImmediate(child.emit.bind(child, 'message', message)); } // Agent child created by child_process.fork() // Worker child created by cluster.fork() The cluster.fork() function will call child_process.fork() and bind its return object to worker.process. Connected var connected = child.process? child.process.connected : child.connected; If (connected) {return child. Send (message); }};Copy the code

When the Agent receives a message, although it must be sent by the Master process, we cannot determine whether the transmission process of the message is Master -> Agent or Worker -> Master -> Agent. Therefore, in order to mark where the message comes from and where it goes, A message body contains the following fields:

  • Where did the news come from
  • To: Where does the message go
  • Action: What’s the message for
  • Data: indicates the specific content of the message

The functions in Messenger are fairly simple to implement, so here’s what they provide.

Functions provided by Master.messenger:

  1. SendToAgentWorker: Sent to the Agent process
  2. SendToAppWorker: Send to the Worker process
  3. SendToMaster: Send it to the Master process itself
  4. Send: You can dynamically specify from, to, or action to send a message

Agent.messenger and worker. messenger provide functions:

  1. Broadcast: Sends messages to all agents and workers, including itself
  2. SendToApp: Send to all workers
  3. SendToAgent: Send it to the Agent. The Agent sends it to the Agent itself
  4. SendRandom: Agent sends a random message to a Worker
  5. SendTo: you can specify who to sendTo

Step 2: Cluster-client enhances the communication between Agent and Worker processes

As we know from the previous analysis, if only IPC is used for inter-process communication, the communication between Agent and Worker must be mediated by Master, especially when there is a long connection between client and server. If we establish a long connection with client through Agent process, Then the Agent establishes a long connection with the Worker. Compared with the client directly establishing a long connection with the Worker, the number of connections can be reduced by N times (number of workers). Therefore, Egg provides a direct channel for long-term connection between Agent and Worker. The Leader/Follower mode is adopted, and the Agent (Leader) is responsible for maintaining long-term connection with remote client. The long connection communication between Worker and Agent makes the development very simple through the “subscribe/publish” mode. The specific implementation steps are as follows:

+-------+ | start | +---+---+ | +--------+---------+ __| port competition |__ win / +------------------+ \ lose / \ +---------------+ tcp conn +-------------------+ | Leader(Agent) |<---------------->| Follower(Worker1) | +---------------+ +-------------------+ | \ tcp conn | \ +--------+ +-------------------+ | Client | | Follower(Worker2)  | +--------+ +-------------------+Copy the code

  • When the service is started, the system automatically detects an available clusterPort number as the port number to establish the long connection between Agent and Worker. The source code is as follows:
DetectPort module. Exports = (port, callback) => {let maxPort = port + 10; Return tryListen(port, maxPort,) if (typeof callback === 'function') {// tryListen(port, maxPort,) callback); }}; // The listen function uses the NET module to establish a connection to test port availability, // net.server ().listen(port) function listen(port =0) callback) { const server = new net.Server(); server.on('error', err => { server.close(); If (err. Code === 'ENOTFOUND') {return callback(null, port); } return callback(err); }); server.listen(port, hostname, () => { port = server.address().port; server.close(); return callback(null, port); }); } // the tryListen function will call the listen function, and check whether the port can be used when hostname is '0.0.0.0', 'localhost', IP, etc. Function tryListen(port, maxPort, callback) {function handleError() {tryListen(port, maxPort, callback) callback); } listen(port, null, (err, realPort) => {listen(port, '0.0.0.0', err => {if (err) {return handleError(err); } listen(port, 'localhost', err => { if (err && err.code ! == 'EADDRNOTAVAIL') { return handleError(err); } listen(port, address.ip(), (err, realPort) => { if (err) { return handleError(err); } callback(null, realPort); }); }); }); }); }Copy the code

  • The clusterPort is assigned to the Agent process (Leader), which initializes cluster properties through the cluster-client module
  • The Worker process also initializes the cluster properties using the cluster-client module based on the clusterPort.

The EggApplication class defines the EggApplication class in egg-cluster. The EggApplication class defines the EggApplication class in egg-cluster. The EggApplication class defines the EggApplication class as follows:

Const cluster = require('cluster-client'); class EggApplication extends EggCore { constructor(options) { this.cluster = (clientClass, options) => { options = Object.assign({}, this.config.clusterClient, options, { port: This. The options. ClusterPort, / / will get to in the previous step detectPort clusterPort incoming isLeader: This. type === 'agent' // Specify the agent process as leader and the Worker process as follower}); const client = cluster(clientClass, options); this._patchClusterClient(client); return client; }; }}Copy the code

  • Now both Agent and Worker instances have cluster attributes, and then business developers can implement long connection communication between Agent and Worker according to certain convention specifications. How to implement the Leader/Follower long connection mode in cluster-client and how to use it in business development are not detailed here, you can see the related documentation: cluster-client source code and Egg official documentation – multi-process development mode.

Step 3: Start the Agent subprocess

Immediately after receiving the clusterPort, the egg-cluster calls forkAgentWorker to start the Agent child process:

ForkAgentWorker forkAgentWorker() {// Agent process parameter initialization const args = [json.stringify (this.options)];  Const agentWorker = childprocess.fork(this.getagentworkerfile (), args, opt); this.workerManager.setAgent(agentWorker); // Listen for Agent subprocess messages, On ('message', MSG => {if (typeof MSG === 'string') MSG = {action: MSG, data: msg }; msg.from = 'agent'; this.messenger.send(msg); }); // The Agent process in the service must be alive, so when the Agent listens for an exception message to log, On ('error', err => {err.name = 'AgentWorkerError'; err.id = agentWorker.id; err.pid = agentWorker.pid; this.logger.error(err); }); Once ('exit', (code, signal) => {this.messenger. Send ({action: 'agent-exit', data: { code, signal }, to: 'master', from: 'agent', }); }); }Copy the code

The forkAgentWorker function simply forks a child from the Master process. The actual execution and initialization of the child process is done in agent_worker.js, where graceful-process is used to gracefully exit the process. You need to wait for all existing connections to close before exiting the process, but no new connections will be received. This function is also used in app_worker.js. The new child process code is as follows:

Const gracefulExit = require('graceful-process'); // egg-cluster -> agent_worker.js const options = JSON.parse(process.argv[2]); Const Agent = require(options.framework).agent; const Agent = require(options.framework).agent; const agent = new Agent(options); agent.ready(err => { if (err) return; agent.removeListener('error', startErrorHandler); // Start the Worker child process.send({action: 'agent-start', to: 'master'}); }); // set gracefulExit mode gracefulExit({logger: consoleLogger, label: 'agent_worker', beforeExit: () => agent.close(),});Copy the code

Step 4: Start the Worker child process

The startup process of Worker is basically the same as that of Agent, with the following differences: – Different startup methods: Fork – The Worker needs to provide HTTP services externally, but the Agent does not

The forkAppWorkers module is a wrapper around cluster.fork that manages the creation of children and the re-creation of children upon exit.

// forkAppWorkers const cfork = require('cfork'); forkAppWorkers() { const args = [ JSON.stringify(this.options) ]; {exec: this.getAppworkerfile (), // app_worker.js args, count: This.options. workers, // Number of child processes refork: this.isproduction, // whether to refork when exit}); / / to monitor the fork event cluster. On (' the fork, the worker = > {this. WorkerManager. SetWorker (worker); // Register child processes with workerManager worker.on('message', If (typeof MSG === 'string') MSG = {action: MSG, data: MSG}; msg.from = 'app'; this.messenger.send(msg); }); }); }Copy the code

How to initialize worker instances and start HTTP services in app_worker.js

  • Sticky mode: The Master monitors the external port in a unified manner, and then forwards it to the fixed Worker sub-process according to the user IP address. Each Worker starts a new local service
  • Non-sticky mode: Each Worker starts services to monitor external ports
// egg-cluster source code -> app_worker.js create worker child process // create egg. Application instance const options = jjson. const Application = require(options.framework).Application; const app = new Application(options); Send ({to: 'Master ', action: 'realport', data: port}); // Wait until everything is ready to start the service app.ready(startServer); Function startServer(err) {let server; If (options.https) {server = require(' HTTPS ').createserver (httpsOptions, app.callback()); } else { server = require('http').createServer(app.callback()); } if (options.sticky) {// In sticky mode, each Worker randomly selects a port to start the service. Listen (0, '127.0.0.1'); server.listen(0, '127.0.0.1'); // The Master service will only send the sticky-session:connection message process.on('message', (message, connection) => {if (message! == 'sticky-session:connection') { return; } server.emit('connection', connection); connection.resume(); }); } else {// If (listenconfig. path) {server.listen(listenconfig. path); } else { const args = [ port ]; if (listenConfig.hostname) args.push(listenConfig.hostname); server.listen(... args); }}} // set gracefulExit mode gracefulExit({logger: consoleLogger, label: 'app_worker', beforeExit: () => app.close(),});Copy the code

After the Worker starts, it will send the app-start event to the Master. At this time, the Master will execute the onAppStart function, which does some initialization work for the service after starting. The most important thing is to start MasterSocketServer in sticky mode to forward requests to each Worker.

OnAppStart and startMasterSocketServer onAppStart(data) {// Tell Agent about all WorkerIds currently started this.messenger.send({ action: 'egg-pids', to: 'agent', data: this.workerManager.getListeningWorkerIds(), }); // If the sticky mode is set to create a MasterSocketServer, then start the Master process.  if (this.options.sticky) { this.startMasterSocketServer(err => { if (err) return this.ready(err); this.ready(true); }); } else { this.ready(true); }} startMasterSocketServer(cb) {// The Master process creates a TCP service and listens on the real external port. Because this connection will only be resolved if it is actually passed to a Worker using require('net').createserver ({pauseOnConnect: True}, connection => {// Developers must configure remoteAddress in Nginx to implement sticky mode based on user IP addresses. connection.remoteAddress) { connection.close(); } else {// Resolve the connection to the specified Worker instance according to the user IP and forward the connection to the Worker child process, Const Worker = this.stickyworker (connection.remoteAddress); const Worker = this.stickyworker (connection.remoteAddress); worker.send('sticky-session:connection', connection); } }).listen(this[REALPORT], cb); }Copy the code

Step 5: After startup, real-time monitoring of service status of each process

After the startup, to ensure the normal and stable running of the service, the Master process needs to listen for various exception events:

  • Agent – exit events:

The corresponding handler is onAgentExit. The Agent subprocess must survive. Once the Agent subprocess exits, it means that the entire service needs to exit or restart the Agent subprocess. RemoveAllListeners to prevent memory leaks.

  • App – exit events:

The exit event of Worker subprocess is handled by onAppExit, which cleans the registration message for the process in master. manager and removeAllListeners, and determines whether to exit the service according to different startup environments and parameters. The production environment continues to fork new Worker child processes without exiting the service

  • SIGINT and SIGQUIT/SIGTERM event

These events are triggered when the Master process receives a Ctrl-c or process.exit() exit signal. The corresponding handler is onSignal, which calls close, Close calls _doClose, where the real exit operations are performed:

// All Worker child processes try {yield this.killappWorkers (appTimeout); // All Worker child processes try {yield this.killappworkers (appTimeout); } catch (e) { this.logger.error('[master] app workers exit error: ', e); } // Exit the Agent subprocess try {yield this.killAgentWorker(agentTimeout); } catch (e) { this.logger.error('[master] agent worker exit error: ', e); }}Copy the code

The killAppWorkers and killAgentWorker functions call Terminate when exiting each process. We know that when a process is killed, children created depending on that process become orphaned, and terminate is responsible for both the process itself and the child’s exit:

Module. exports = function* (subProcess, timeout) {const pid = subprocess.process? subProcess.process.pid : subProcess.pid; const childPids = yield getChildPids(pid); KillProcess (subProcess, timeout), // Kill the current process killChildren(childPids, timeout), // Kill all child processes];  }; Kill function* killProcess(subProcess, timeout) {subprocess.kill ('SIGTERM'); // If SIGTERM does not work, kill function* killProcess(subProcess, timeout) {subProcess. yield Promise.race([ awaitEvent(subProcess, 'exit'), sleep(timeout), ]); if (subProcess.killed) return; (subProcess.process || subProcess).kill('SIGKILL'); } function* killChildren(children, timeout) {if (! children.length) return; kill(children, 'SIGTERM'); const start = Date.now(); const checkInterval = 400; let unterminated = []; while (Date.now() - start < timeout - checkInterval) { yield sleep(checkInterval); unterminated = getUnterminatedProcesses(children); if (! unterminated.length) return; } kill(unterminated, 'SIGKILL'); }Copy the code

Refer to the article

  • Egg multi-process model and interprocess communication
  • Egg source code analysis of egg-core
  • Egg source code analysis of egg-cluster
  • An egg – cluster source code