preface

Seneca is a NodeJS microservices toolset that gives systems the ability to easily build and update continuously. Below, I will talk to you one by one about the introduction and practice of relevant technologies.

Insert a hard stretch here. After simple integration, I developed a vastify framework —- lightweight NodeJS micro-service framework. If you are interested, please have a look at it and feel free to join the star wave. If you have any questions or code problems, please leave a message below the blog.

The environment

  • Based on the environment
"node": "^ 10.0.0"
"npm": "^ 6.0.0"
"pm2": "^ 2.10.3"
"rabbitmq": "^ 3.7.5." "
"consul": "^ 1.1.0." "
"mongodb": "^ 3.6"
Copy the code
  • Microservice engineering
"bluebird": "^ 3.5.1 track of"
"koa": "^ 2.5.1." "
"koa-router": "^ 7.4.0"
"seneca": "^ rule 3.4.3"
"seneca-web": "^ 2.2.0." "
"seneca-web-adapter-koa2": "^ 1.1.0." "
"amqplib": "^ 0.5.2"
"winston": "^" 2.4.2
"mongoose": "^ 5.1.2." "
Copy the code

FEATURES

  • Pattern matching makes calls between services: Slightly different from SpringCloud service discovery (HTTP protocol, IP + PORT pattern), it uses a more flexible pattern matching (Patrun module) principle to make calls between microservices
  • Koa2 Provides RESTFUl apis for the C terminal
  • Plugins: More flexibility in writing small and micro reusable modules
  • Seneca has built-in log output
  • Third-party log libraries compare Winston, Bunyan, and Log4JS
  • RabbitMQ message queue
  • PM2: Node service deployment (service cluster), management, and monitoring
  • PM2: Automatic deployment
  • Integrated docker PM2
  • Request tracing (reconstructing the user request process)
  • Review Consul service registration and discovery basic logic
  • Framework integration node-Consul
  • Mongodb persistent storage
  • Combined with Seneca’s and Consul’s routing service middleware, which can support multiple service clusters with the same name through? Version)
  • Support stream processing (file upload/download, etc.)
  • Jenkins Automated deployment
  • Nginx load balancing
  • Continuous integration solution
  • Redis cache
  • Prisma provides the GraphQL interface

Pattern matching (Patrun module)

index.js(accout-server/src/index.js)

const seneca = require('seneca')()

seneca.use('cmd:login', (msg, done) => {
	const { username, pass } = msg
	if (username === 'asd' && pass === '123') {
		return done(null, { code: 1000})}return done(null, { code: 2100})})const Promise = require('bluebird')

const act = Promise.promisify(seneca.act, { context: 'seneca' })

act({
	cmd: 'login'.username: 'asd'.pass: '123'
}).then(res= > {
	console.log(res)
}).catch(err= > {
	console.log(err)
})
Copy the code

After performing

{ code: 1000 } {"kind":"notice","notice":"hello seneca K5i8j1cvw96h / 1525589223364/10992 / rule 3.4.3 / - ", "level" : "info", "Seneca" : "k5i8j1cvw96h / 1525589223364/10992 / rule 3.4.3 / -", "when" : 1525 589223563}Copy the code

The seneca.add method adds an action pattern to the Seneca instance with three parameters:

  1. pattern: Messages for JSON in Seneca match patterns, objects, or formatting strings
  2. sub_pattern: Sub-mode, priority lower than main mode (optional)
  3. action: Action function after the match is successful

The seneca.act method, which performs the matched action in the Seneca instance, also takes two arguments:

  1. msg: JSON message
  2. sub_pattern: sub-message with lower priority than the main message (optional)
  3. response: used to receive the result of the service invocation

Seneca. Use method, which adds a plug-in to the Seneca instance with two parameters :(here the principle of the plug-in is a little different from that of middleware)

  1. func: Plug-in execution method
  2. optionsOptions required by the plug-in (optional)

The core is to use JSON objects for pattern matching. This JSON object contains both the characteristics of a microservice that needs to call another microservice and the parameters that need to be passed. It is similar to Java microservice discovery but replaces IP +port with pattern. So far, pattern is fully capable of service discovery, but whether it is more flexible remains to be explored.

Points to note

  • Patterns among microservices need to be distinguished by design

Start the first microservice

index.js(config-server/src/index.js)

const seneca = require('seneca') ()const config = {
SUCCESS_NORMAL_RES: {
    code: 1000.desc: 'Server responding properly'
}}

seneca.add('$target$:config-server', (msg, done) => {
  return done(null, config)
}).listen(10011)
Copy the code

After running this script can enter http://localhost:10011/act? in your browser CMD =config Initiates a request to obtain global configuration information

OR

const seneca = require('seneca') ()const Promise = require('bluebird')

const act = Promise.promisify(seneca.act, { context: seneca })

seneca.client(10011)
act('? target:config-server, default$:{msg:404}').then(res= > {
  console.log(res)
}).catch(err= > {
  console.log(err)
})
Copy the code

Internally: multiple microservices calling each other (critical)

noname-server

const seneca = require('seneca')()
seneca.add('? target:account-server', (msg, done) => {
	done(null, { seneca: '666' })
})
seneca.listen(10015)
Copy the code

Config-server (same as above)

call

const seneca = require('seneca') ()const Promise = require('blurebird')

const act = Promise.promisify(seneca.act, { context: seneca })

seneca.client({
	port: '10011'.pin: '? target:account-server'
})
seneca.client({
	port: '10015'.pin: '? target:noname-server'
})

act('? target:account-server').then(res= > {
	console.log(res)
}).catch(err= > {
	console.log(err)
})

act('? target:noname-server').then(res= > {
	console.log(res)
}).catch(err= > {
	console.log(err)
})

Copy the code

External: Providing REST services (key)

Integration of koa

const seneca = require('seneca') ()const Promise = require('bluebird')
const SenecaWeb = require('seneca-web')
const Koa = require('koa')
const Router = require('koa-router')
const app = new Koa()
const userModule = require('./modules/user.js')

// Initialize the user module
seneca.use(userModule.init)

// Initialize Seneca -web plug-in and adapt koA
seneca.use(SenecaWeb, {
  context: Router(),
  adapter: require('seneca-web-adapter-koa2'),
  routes: [...userModule.routes]
})

// Export routes to koa app
seneca.ready((a)= > {
  app.use(seneca.export('web/context')().routes())
})

app.listen(3333)
Copy the code

The user module

const $module = 'module:user'
let userCount = 3

const REST_Routes = [
  {
    prefix: '/user'.pin: `The ${$module},if:*`.map: {
      list: {
        GET: true.name: ' '
      },
      load: {
        GET: true.name: ' '.suffix: '/:id'
      },
      edit: {
        PUT: true.name: ' '.suffix: '/:id'
      },
      create: {
        POST: true.name: ' '
      },
      delete: {
        DELETE: true.name: ' '.suffix: '/:id'}}}]const db = {
  users: [{
    id: 1.name: '甲'
  }, {
    id: 2.name: '乙'
  }, {
    id: 3.name: '丙'}}]function user(options) {
  this.add(`The ${$module},if:list`, (msg, done) => {
    done(null, db.users)
  })
  this.add(`The ${$module},if:load`, (msg, done) => {
    const { id } = msg.args.params
    done(null, db.users.find(v= > Number(id) === v.id))
  })
  this.add(`The ${$module},if:edit`, (msg, done) => {
    let { id } = msg.args.params
    id = +id
    const { name } = msg.args.body
    const index = db.users.findIndex(v= > v.id === id)
    if(index ! = =- 1) {
      db.users.splice(index, 1, {
        id,
        name
      })
      done(null, db.users)
    } else {
      done(null, { success: false})}})this.add(`The ${$module},if:create`, (msg, done) => {
    const { name } = msg.args.body
    db.users.push({
      id: ++userCount,
      name
    })
    done(null, db.users)
  })
  this.add(`The ${$module},if:delete`, (msg, done) => {
    let { id } = msg.args.params
    id = +id
    const index = db.users.findIndex(v= > v.id === id)
    if(index ! = =- 1) {
      db.users.splice(index, 1)
      done(null, db.users)
    } else {
      done(null, { success: false})}})}module.exports = {
  init: user,
  routes: REST_Routes
}
Copy the code

Vscode – restClient (restClient plug-in for VScode, used to initiate RESTFUL requests)

# # # 1POST http://localhost:3333/user HTTP / 1.1 the content-type: application/json {"name": "Test add user"
}

### deleteThe DELETE http://localhost:3333/user/2 HTTP / 1.1### PUTPUT http://localhost:3333/user/2 HTTP / 1.1 the content-type: application/json {"name": "Test modify user information"
}

### GETGET http://localhost:3333/user HTTP / 1.1### GETGET http://localhost:3333/user/3 HTTP / 1.1Copy the code

Seneca has built-in log output

The configuration can be passed in the constructor, and the log attribute controls the log level

Example 1: Pass a string

require('seneca') ({// quiet silent any all print standard test
	log: 'all'
})

Copy the code

Example 2: Pass objects

require('seneca') ({log: {
		// none debug+ info+ warn+
		level: 'debug+'
	},
	// If this parameter is set to true, the Seneca log function outputs fields such as Encapsulate senecaId,senecaTag, and actId (usually two characters).
	short: true
})
Copy the code

Example 2: The seneca-web-adapter-koa2 plug-in prints debug logs, which facilitate logging of web interface access.

Winston Log Module

portal

Logger.js

const { createLogger, format, transports } = require('winston')
const { combine, timestamp, label, printf } = format

const logger = createLogger({
  level: 'info'.format: combine(
    label({label: 'microservices'}),
    timestamp(),
    printf(info= > {
      return `${info.timestamp} [${info.label}] ${info.level}: ${info.message}`})),transports: [ new transports.Console() ]
})

// highest to lowest
const levels = {
  error: 0.warn: 1.info: 2.verbose: 3.debug: 4.silly: 5
}

module.exports = logger
Copy the code

Log Output format

2018-05-17T14:43:28.330z [MicroServices] INFO: Call request received from RPC client 2018-05-17T14:43:28.331z [MicroServices] WARN: Warn Message 2018-05-17T14:43:28.331z [MicroServices] Error: Error messageCopy the code

RabbitMQ message queue service

  • The installation

1. Single task single consumer

producer.js

// Create an AMQP peer
const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) = > {
    const q = 'taskQueue1'
    const msg = process.argv.slice(2).join(' ') | |'hello world'

    RabbitMQ exits or restarts when the queue is durable:true (durable:true is also required in consumer scripts)
    ch.assertQueue(q, { durable: true })
    Persistent :true (persistent:true); // Persistent :true (persistent:true); (This feature is unreliable, however, because no synchronous IO is performed for all messages, which are cached and written to disk at an appropriate time.)
    ch.sendToQueue(q, Buffer.from(msg), { persistent: true })
    setTimeout((a)= > {
      conn.close(); process.exit(0)},100)})})Copy the code
// Create an AMQP peer
const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) = > {
    const q = 'taskQueue1'

    RabbitMQ exits or restarts when the queue is durable:true (durable:true)
    ch.assertQueue(q, { durable: true })
    ch.prefetch(1)
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q)
    ch.consume(q, msg => {
      const secs = msg.content.toString().split('. ').length - 1
      console.log(" [x] Received %s", msg.content.toString())
      setTimeout((a)= > {
        console.log(" [x] Done")
        ch.ack(msg)
      }, secs * 1000)})// noAck (default: false) specifies whether the consumer needs to send an ACK to producer after processing the task. If this is true, the RabbitMQ service does not care if it sends a task to the consumer. Send indicates that the task is completed. If the RabbiMQ does not receive an ACK, the RabbiMQ waits for an ACK and then flags it. If the RabbiMQ receives an ACK or the consumer process exits, the RabbiMQ continues the dispatcher task})})Copy the code

The inspection process

  • Execute rabbitmqctl list_queues to view the current queue
Timeout: 60.0 seconds... Listing queues for vhost / ...Copy the code
  • Nodeproducer.js (rabbitMQ creates an anonymous exchange, a queue and binds the queue to the anonymous exchange)

  • rabbitmqctl list_bindings

Listing bindings for vhost /...
        exchange        taskQueue1      queue   taskQueue1      []
Copy the code
  • rabbitmqctl list_queues
Timeout: 60.0 seconds... Listing queues for vhost / ... taskQueue1 1Copy the code
  • node consumer.js
Waiting for messages in taskQueue1. To exit press CTRL+C
[x] Received hello world
[x] Done
Copy the code
  • rabbitmqctl list_queues
Timeout: 60.0 seconds... Listing queues for vhost / ... taskQueue1 0Copy the code

knowledge

  • Producer-consumer pattern (one producer’s messages are processed by one consumer at a time)
  • ACK mechanism (confirmation mechanism for RabbitMQ)
  • Create queues {durable:true} and send messages to queues {persistent:true} (Messages are stored persistently, but not guaranteed, for example, when a message is not written to disk from cache and the program crashes, it will be lost)
  • Round-robin Dispatch
  • Handle window control (Prefetch to control distribution Windows)
  • Asynchronous multitasking mechanisms (e.g., a large task breakdown, divide and conquer)
  • The entire message flow process (a producer process -> Anonymous Exchange -> via Binding -> Specify queue -> a consumer process)

2. Single task, multiple Consumer, publish/subscribe model (full message model)

publisher.js

const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) = > {
    const ex = 'logs'
    const msg = process.argv.slice(2).join(' ') | |'Hello World! '

    // ex is the exchange name (unique)
    // The mode is fanout
    // Do not persist messages
    ch.assertExchange(ex, 'fanout', { durable: false })
    // The second argument specifies a binding, or if it is null it is specified by RabbitMQ randomly
    ch.publish(ex, ' ', Buffer.from(msg))
    console.log(' [x] Send %s', msg)
  })

  setTimeout((a)= > {
    conn.close()
    process.exit(0)},100)})Copy the code

subscriber.js

const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) = > {
    const ex = 'logs'

    // Ex -> Exchange is a publish/subscribe message carrier,
    Fanout -> Mode for sending messages, fanout, direct, topic, headers
    // durable Set to false reduces some reliability and improves performance, as there is no need for persistent disk I/O to store messages, in addition
    ch.assertExchange(ex, 'fanout', { durable: false })
    // Use anonymous queues (that is, queues with random names automatically generated by RabbitMQ)
    // Exclusive is set to true, which means it can be automatically deleted when its host connection is closed
    ch.assertQueue(' ', { exclusive: true }, (err, q) => {
      console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue)
      // Bind a queue to an Exchange carrier (listen for messages from an exchange)
      // The third entry is binding key
      ch.bindQueue(q.queue, ex, ' ')
      // Consuming is subscribing to an Exchange message and setting a processing handle
      // Since the publish/subscribe message model is non-reliable, the subscriber can only receive the relevant message if the subscriber subscribs and the publisher does not care who the subscriber is and what the result is, so noAck is set to true
      ch.consume(q.queue, (msg) => {
        console.log(' [x] %s', msg.content.toString())
      }, { noAck: true})})})})Copy the code

The inspection process

rabbitmqctl stop_app; rabbitmqctl reset; Rabbitmqctl start_app (emptied queues, echanges, Bindings used in previous tests)

node subscriber.js

[*] Waiting for messages in amq.gen-lgNW51IeEfj9vt1yjMUuaw. To exit press CTRL+C
Copy the code

rabbitmqctl list_exchanges

Listing exchanges for vhost / ...
logs    fanout
Copy the code

rabbitmqctl list_bindings

Listing bindings for vhost /...
        exchange        amq.gen-jDbfwJR8TbSNJT2a2a83Og  queue   amq.gen-jDbfwJR8TbSNJT2a2a83Og  []
logs    exchange        amq.gen-jDbfwJR8TbSNJT2a2a83Og  queue           []
Copy the code

node publisher.js tasks.........

[x] Send tasks......... // publiser.js

[x] tasks......... // subscriber.js
Copy the code

knowledge

  • Publish/subscribe (publishers send messages on a one-to-many basis to subscribers for processing)
  • NoAck (Non-ACK mechanisms are recommended in this mode, since publishers often do not need how the subscriber processes the message and its results)
  • Durable :false (In this mode, persistent data storage is not recommended for the reasons described above)
  • Exchange working mode (i.e., route type, FANout, Direct, Topic, headers, etc., described in the next section)
  • The entire message flow flow (one publisher process -> specify exchange -> pass binding and work mode -> one or more anonymous queues that are subscriber processes)

3. Direct Routing

exchange.js

module.exports = {
  name: 'ex1'.type: 'direct'.option: {
    durable: false
  },
  ranks: ['info'.'error'.'warning'.'severity']}Copy the code

direct-routing.js

const amqp = require('amqplib/callback_api')
const ex = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) = > {

    ch.assertExchange(ex.name, ex.type, ex.options)
    setTimeout((a)= > {
      conn.close()
      process.exit(0)},0)})})Copy the code

subscriber.js

const amqp = require('amqplib/callback_api')
const ex = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) = > {
    const ranks = ex.ranks

    ranks.forEach(rank= > {
      // Declare a non-anonymous queue
      ch.assertQueue(`${rank}-queue`, { exclusive: false }, (err, q) => {
        ch.bindQueue(q.queue, ex.name, rank)
        ch.consume(q.queue, msg => {

          console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
        }, { noAck: true})})})})})Copy the code

publisher.js

const amqp = require('amqplib/callback_api')
const ex = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) = > {
    const ranks = ex.ranks

    ranks.forEach(rank= > {
      ch.publish(ex.name, rank, Buffer.from(`${rank}logs... `))
    })

    setTimeout((a)= > {
      conn.close()
      process.exit(0)},0)})})Copy the code

The inspection process

rabbitmqctl stop_app; rabbitmqctl reset; Rabbitmqctl start_app (emptied queues, echanges, Bindings used in previous tests)

node direct-routing.js rabbitmqctl list_exchanges

Listing exchanges for vhost / ...
amq.headers	headers
ex1	direct
amq.fanout	fanout
amq.rabbitmq.trace	topic
amq.topic	topic
	direct
amq.direct	direct
amq.match	headers
Copy the code

node subscriber.js rabbitmqctl list_queues

Timeout: 60.0 seconds... Listing queues for vhost / ... severity-queue 0 error-queue 0 info-queue 0 warning-queue 0 Listing bindings for vhost /... exchange error-queue queue error-queue [] exchange info-queue queue info-queue [] exchange severity-queue queue severity-queue [] exchange warning-queue queue warning-queue [] ex1 exchange error-queue queue error [] ex1 exchange info-queue queue info [] ex1 exchange severity-queue queue severity [] ex1 exchange warning-queue queue warning []Copy the code

node publisher.js

[x] info: 'info logs... ' [x] error: 'error logs... ' [x] severity: 'severity logs... ' [x] warning: 'warning logs... 'Copy the code

knowledge

  • Route key: Used to route messages in Exchange Direct mode
  • Whenever an assertQueue is placed, the queue is bound to an anonymous exchange using the queue name as the routing key
  • Can be used to log different levels of log processing

4. Topic Routing

exchange.js

module.exports = {
  name: 'ex2'.type: 'topic'.option: {
    durable: false
  },
  ranks: ['info'.'error'.'warning'.'severity']}Copy the code

topic-routing.js

const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) = > {
    ch.assertExchange(exchangeConfig.name, exchangeConfig.type, exchangeConfig.option)

    setTimeout((a)= > {
      conn.close()
      process.exit(0)},0)})})Copy the code

subscriber.js

const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) = > {
    const args = process.argv.slice(2)
    const keys = (args.length > 0)? args : ['anonymous.info']

    console.log(' [*] Waiting for logs. To exit press CTRL+C');
    keys.forEach(key= > {
      ch.assertQueue(' ', { exclusive: true }, (err, q) => {
        console.log(` [x] Listen by routingKey ${key}`)
        ch.bindQueue(q.queue, exchangeConfig.name, key)

        ch.consume(q.queue, msg => {
          console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
        }, { noAck: true})})})})})Copy the code

publisher.js

const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) = > {
    const args = process.argv.slice(2)
    const key = (args.length > 1)? args[0] : 'anonymous.info'
    const msg = args.slice(1).join(' ') | |'hello world'

    ch.publish(exchangeConfig.name, key, Buffer.from(msg))

    setTimeout((a)= > {
      conn.close()
      process.exit(0)},0)})})Copy the code

The inspection process

rabbitmqctl stop_app; rabbitmqctl reset; Rabbitmqctl start_app (emptied queues, echanges, Bindings used in previous tests)

node topic-routing.js

Listing exchanges for vhost / ...
amq.fanout	fanout
amq.rabbitmq.trace	topic
amq.headers	headers
amq.match	headers
ex2	topic
	direct
amq.topic	topic
amq.direct	direct
Copy the code

node subscriber.js "#.info" "*.error"

[*] Waiting for logs. To exit press CTRL+C
[x] Listen by routingKey #.info
[x] Listen by routingKey *.error
Copy the code
  • Nodepublisher.js “account-server.info” “User Service Test”
  • Nodepublisher.js “config-server.info” “Config service Test”
  • Nodepublisher.js “config-server.error”
[x] account-server.info:' User service test '[x] config-server.info:' Config service test' [x] config-server.error:' Config service test 'Copy the code

knowledge

  • The key contains a maximum of 255 bytes
  • #Can match zero or more words,*Can match exactly 1 word

5. RPC

rpc_server.js

const amqp = require('amqplib/callback_api')
const logger = require('./Logger')

let connection = null

amqp.connect('amqp://localhost', (err, conn) => {
  connection = conn
  conn.createChannel((err, ch) = > {
    const q = 'account_rpc_queue'

    ch.assertQueue(q, { durable: true })
    ch.prefetch(2)

    ch.consume(q, msg => {
      let data = {}
      let primitiveContent = msg.content.toString()
      try {
        data = JSON.parse(primitiveContent)
      } catch (e) {
        logger.error(new Error(e))
      }
      logger.info('Call request received from RPC client')
      if (msg.properties.correlationId === '10abc') {
        logger.info(primitiveContent)
        const uid = Number(data.uid) || - 1
        let r = getUserById(uid)
        ch.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(r)), { persistent: true })
        ch.ack(msg)
      } else {
        logger.info('Mismatched call request')}})})function getUserById (uid) {
  let result = ' '

  if (uid === +uid && uid > 0) {
    result = {
      state: 1000.msg: 'success'.data: {
        uid: uid,
        name: 'jack'.sex: 1}}}else {
    result = {
      state: 2000.msg: 'Parameter format error'}}return result
}

process.on('SIGINT', () => {
  logger.warn('SIGINT')
  connection && connection.close()
  process.exit(0)})Copy the code

rpc_client.js

const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) = > {
    const q = 'account_rpc_queue'
    const callback = 'callback_queue'

    ch.assertQueue(callback, { durable: true })
    ch.consume(callback, msg => {
      const result = msg.content.toString()
      console.log('Received a callback message! `)
      console.log(result)
      ch.ack(msg)
      setTimeout((a)= > {
        conn.close()
        process.exit(0)},0)
    })

    ch.assertQueue(q, { durable: true })
    const msg = {
      uid: 2
    }
    ch.sendToQueue(q, Buffer.from(JSON.stringify(msg)), {
      persistent: true.correlationId: '10abc'.replyTo: 'callback_queue'})})})Copy the code

The inspection process

node rpc_server.js

rabbitmqctl list_queues

Timeout: 60.0 seconds... Listing queues for vhost / ... account_rpc_queue 0Copy the code

node rpc_client.js

Rpc_client is printed on the CLI

Received callback message! {" state ", 1000, "MSG" : "success", "data" : {" uid ": 2," name ":" small strong ", "sex" : 1}}Copy the code

Rpc_server on the CLI

Received call request from RPC client {uid: 2}Copy the code

PM2: Node service deployment (service cluster), management, and monitoring

Pm2 website

Start the

pm2 start app.js

  • -w --watch: Monitors directory changes and automatically restarts the application
  • --ignore-file: Files ignored when listening for directory changes. Such aspm2 start rpc_server.js --watch --ignore-watch="rpc_client.js"
  • -n --name: Sets the application name to distinguish applications
  • -i --instances: Sets the number of application instances. 0 is the same as Max
  • -f --force: Forcibly start an application, usually when the same application is running
  • -o --output <path>: Path of the standard output log file
  • -e --error <path>: Path to the error output log file
  • --env <path>: Configures environment variables

For example, pm2 start rpc_server.js -w -i Max -n s1 –ignore-watch=”rpc_client.js” -e./server_error.log -o./server_info.log

In cluster-mode, or -i Max, the log file automatically appends -${index} to ensure that it does not repeat itself

Other simple and common commands

pm2 stop app_name|app_id pm2 restart app_name|app_id pm2 delete app_name|app_id pm2 show app_name|app_id OR pm2 describe app_name|app_id pm2 list pm2 monit pm2 logs app_name|app_id –lines

–err

Graceful Stop

pm2 stop app_name|app_id

process.on('SIGINT', () => {
  logger.warn('SIGINT')
  connection && connection.close()
  process.exit(0)})Copy the code

Before the process terminates, the program intercepts SIGINT signals to disconnect from the database before the process is killed, and then gracefully exits the process by executing process.exit(). (If the process is not finished after 1.6 seconds, send the SIGKILL signal to force the process to end.)

Process File

ecosystem.config.js

const appCfg = {
  args: ' '.max_memory_restart: '150M'.env: {
    NODE_ENV: 'development'
  },
  env_production: {
    NODE_ENV: 'production'
  },
  // source map
  source_map_support: true.// Do not merge log output for cluster service
  merge_logs: false.// This parameter is usually used when starting applications
  listen_timeout: 5000.// Process SIGINT command time limit, that is, the process must listen to SIGINT signal must end the process at the following time
  kill_timeout: 2000.// If an exception occurs, the o&M personnel try to find the cause and try again
  autorestart: false.// Do not start the process with the same script
  force: false.// Command queue executed after the pull/upgrade operation is performed on the Keymetrics dashboard
  post_update: ['npm install'].// Listen for file changes
  watch: false.// Ignore listening for file changes
  ignore_watch: ['node_modules']}function GeneratePM2AppConfig({ name = ' ', script = ' ', error_file = ' ', out_file = ' ', exec_mode = 'fork', instances = 1, args = "" }) {
  if (name) {
    return Object.assign({
      name,
      script: script || `${name}.js`.error_file: error_file || `${name}-err.log`.out_file: out_file|| `${name}-out.log`,
      instances,
      exec_mode: instances > 1 ? 'cluster' : 'fork',
      args
    }, appCfg)
  } else {
    return null}}module.exports = {
  apps: [
    GeneratePM2AppConfig({
      name: 'client'.script: './rpc_client.js'
    }),

    GeneratePM2AppConfig({
      name: 'server'.script: './rpc_server.js'.instances: 1}})]Copy the code

pm2 start ecosystem.config.js

It is recommended that the processFile be named in *.config.js format. Or face the consequences.

monitoring

Please click the app. Keymetrics. IO

PM2: Automatic deployment

SSH to prepare

  1. ssh-keygen -t rsa -C ‘qingf deployment’ -b 4096
  2. If multiple keys or users exist, you are advised to configure ~/. SSH /config in the following format
// Specify the private key Host qingf.me User deploy IdentityFile ~/. SSH /qf_deployment_rsa // To remove the y/ N option when making SSH requests to different remote hosts StrictHostKeyChecking no Host Deployment User deploy Hostname qingf.me IdentityFile ~/.ssh/qingf_deployment_rsa StrictHostKeyChecking noCopy the code
  1. Copy the public key to the remote (generally for deployment server) corresponding to the user directory, such as/home/deploy /. SSH/authorized_keys file (authorized_keys file permissions set to 600)

Configuration ecosystem. Config. Js

Add the deploy attribute in the same way as the above apps, as follows

deploy: {
    production: {
        'user': 'deploy'.'host': 'qingf.me'.'ref': 'remotes/origin/master'.'repo': 'https://github.com/Cecil0o0/account-server.git'.'path': '/home/deploy/apps/account-server'.// Lifecycle hook, executed before setup operation after SSH to remote
        'pre-setup': ' '.// Lifecycle hooks that are executed after the initial setup, git pull
        'post-setup': 'ls -la'.// Lifecycle hook, executed before remote Git fetch Origin
        'pre-setup': ' '.// Lifecycle hook, executed after remote Git modifies the HEAD pointer to the specified ref
        'post-deploy': 'npm install && pm2 startOrRestart deploy/ecosystem.config.js --env production'.// The following environment variable will be injected into all apps
        "env"  : {
          "NODE_ENV": "test"}}}Copy the code

Please make git working directory clean first!

If you do not understand or have any questions, please refer to Demo

Then execute the following two commands ** (note the config file path) **

  1. pm2 deploy deploy/ecosystem.config.js production setup
  2. pm2 deploy deploy/ecosystem.config.js production

Other commands

pm2 deploy <configuration_file>

Commands: setup run remote setup commands update update deploy to the latest release revert [n] revert to [n]th last deployment or  1 curr[ent] output current release commit prev[ious] output previous release commit exec|run <cmd> execute the given <cmd> list list previous deploy commits [ref] deploy to [ref], the "ref" setting, or latest tagCopy the code

Recommend the shell toolkit

oh my zsh

Request tracking

How to?

  • Use the seneca.fixedargs[‘tx$’] value in seneca.add and seneca.act as a traceID for a request process. Also, Seneca’s built-in log system prints this value.

Is it?

How does Seneca’s built-in log system print custom logs?

Warm note: Please start with a normal HTTP request, as it has been tested that Seneca. Fixedargs [‘tx$’] values are different if the microservice initiates the act itself.

Consul service registration and discovery

Consul is a distributed cluster service registry discovery tool with advanced features such as health check, tiered KV storage, and multi-data centers.

The installation

  • Optionally use a precompiled installation package
  • You can also choose to clone the source code after compiling and installing

Based on using

  • Quickly start the service mode proxy in development mode and open the Web interface to visit http://localhost:8500

consul agent -dev -ui

  • Write a service definition file
{"service": {// Service name, later used for query service" name": "account-server", // service tags" tags": ["account-server"], // Service metadata "meta": {"meta": "for my service"}, // The service port" port": 3333, // the tag "enable_tag_override" is not allowed: False, // check health checks and -enable-script-checks=true are used together, including script mode, TCP mode, HTTP mode, and TTL mode. "checks": [{" HTTP ": "http://localhost:3333/user", "interval": "10s" } ] } }Copy the code
  • Query Specifies the account-server service defined

curl http://localhost:8500/v1/catalog/service/account-server

[{"ID": "e66eb1FF-460c-e63F-b4AC-0cb42DAED19c ", "Node": "haojiechen.local", "Address": "127.0.0.1", "Datacenter": "Dc1", "TaggedAddresses" : {" LAN ":" 127.0.0.1 ", "wan" : "127.0.0.1"}, "NodeMeta" : {" consul - network - segment ": "" }, "ServiceID": "account-server", "ServiceName": "account-server", "ServiceTags": [ "account-server" ], "ServiceAddress": "", "ServiceMeta": { "meta": "for my service" }, "ServicePort": 3333, "ServiceEnableTagOverride": false, "CreateIndex": 6, "ModifyIndex": 6 } ]Copy the code

Production-level usage (distributed clustering)

A node starts a server mode agent as follows

consul agent -server -bootstrap-expect=1 \
	-data-dir=/tmp/consul -node=agent-one -bind=valid extranet IP \
	-enable-script-checks=true -config-dir=/usr/local/etc/consul.d
Copy the code

Viewing Cluster Members

consul members

Node       Address         Status  Type    Build  Protocol  DC   Segment
agent-one  valid extranet IP:8301  alive   server  1.1.0  2         dc1  <all>
Copy the code

Another node starts a client mode proxy, as follows

Consul agent \ -data-dir=/ TMP /consul -node= agent-two-bind =139.129.5.228 \ -enable-script-checks=true -config-dir=/usr/local/etc/consul.dCopy the code

Viewing Cluster Members

consul members

Node Address Status Type Build Protocol DC Segment agent-two 139.129.5.220:8301 Alive server 1.1.0 2dc1 <all>Copy the code

Join the Cluster

Consul Join 139.129.5.228 Consul members

Node Address Status Type Build Protocol DC Segment agent-one Valid extranet IP:8301 alive server 1.1.0 2 dC1 <all> Agent-two 139.129.5.220:8301 Alive server 1.1.0 2dc1 <all>Copy the code

Integrated node – consul

config.js

// Service registration and discovery
// https://github.com/silas/node-consul#catalog-node-services
  'serverR&D': {
    consulServer: {
      type: 'consul'.host: '127.0.0.1'.port: 8500.secure: false.ca: [].defaults: {
        token: ' '
      },
      promisify: true
    },
    bizService: {
      name: 'defaultName'.id: 'defaultId'.address: '127.0.0.1'.port: 1000.tags: [].meta: {
        version: ' '.description: 'Register cluster'
      },
      check: {
        http: ' '.// Check interval (ex: 15s)
        interval: '10s'.// Check timeout (ex: 10s)
        timeout: '2s'.// Timeout period for automatically logging out of the service when the service is in critical state
        deregistercriticalserviceafter: '30s'.// The initialization status value is success
        status: 'passing'./ / note
        notes: '{"version":"111","microservice-port":1115}'}}}Copy the code

server-register.js

/* * @author: Cecil * @last Modified by: Cecil * @last Modified time: 2018-06-02 11:26:49 * @description how to register microservices */
const defaultConf = require('.. /config') ['serverR&D']
const { ObjectDeepSet, isString } = require('.. /helper/utils')
const Consul = require('consul')
const { generateServiceName, generateCheckHttp } = require('.. /helper/consul')

// Register service

function register({ consulServer = {}, bizService = {} } = {}) {
  if(! bizService.name && isString(bizService.name))throw new Error('name is invalid! ')
  if(bizService.port ! == +bizService.port)throw new Error('port is invalid! ')
  if(! bizService.host && isString(bizService.host))throw new Error('host is invalid! ')
  if(! bizService.meta.? version)throw new Error('meta.? version is invalid! ')
  if(! bizService.meta.? microservicePort)throw new Error('meta.? microservicePort is invalid! ')
  const consul = Consul(ObjectDeepSet(defaultConf.consulServer, consulServer))
  constservice = defaultConf.bizService service.name = generateServiceName(bizService.name) service.id = service.name service.address = bizService.host service.port = bizService.port service.check.http = generateCheckHttp(bizService.host,  bizService.port) service.check.notes =JSON.stringify(bizService.meta)

  return new Promise((resolve, reject) = > {
    consul.agent.service.list().then(services= > {
      // Check whether the host + port is occupied
      Object.keys(services).some(key= > {
        if (services[key].Address === service.address && services[key].Port === service.port) {
          throw new Error('The service cluster endpoint[${service.address}.${service.port}] Occupied! `)}})// Register the cluster service
      consul.agent.service.register(service).then((a)= > {
        logger.info(`${bizService.name}The service is registered)
        resolve(services)
      }).catch(err= > {
        console.log(err)
      })
    }).catch(err= > {
      throw new Error(err)
    })
  })
}

module.exports = class ServerRegister {
  constructor() {
    this.register = register
  }
}
Copy the code

validation

After consul and mongodb services exist in the runtime, clone the Demo repository, CD the Demo repository to the root directory of the project, and run node SRC.

Framework integration node-Consul

server-register.js

/* * @author: Cecil * @last Modified by: Cecil * @last Modified time: 2018-06-02 13:58:22 * @description how to register microservices */
const defaultConf = require('.. /config') ['serverR&D']
const { ObjectDeepSet, isString } = require('.. /helper/utils')
const Consul = require('consul')
const { generateServiceName, generateCheckHttp } = require('.. /helper/consul')
const logger = new (require('./logger'))().generateLogger()

// Register the service method definition

function register({ consulServer = {}, bizService = {} } = {}) {
  if(! bizService.name && isString(bizService.name))throw new Error('name is invalid! ')
  if(bizService.port ! == +bizService.port)throw new Error('port is invalid! ')
  if(! bizService.host && isString(bizService.host))throw new Error('host is invalid! ')
  if(! bizService.meta.? version)throw new Error('meta.? version is invalid! ')
  if(! bizService.meta.? microservicePort)throw new Error('meta.? microservicePort is invalid! ')
  const consul = Consul(ObjectDeepSet(defaultConf.consulServer, consulServer))
  constservice = defaultConf.bizService service.name = generateServiceName(bizService.name) service.id = service.name service.address = bizService.host service.port = bizService.port service.check.http = generateCheckHttp(bizService.host,  bizService.port) service.check.notes =JSON.stringify(bizService.meta)

  return new Promise((resolve, reject) = > {
    consul.agent.service.list().then(services= > {
      // Check whether the host + port is occupied
      Object.keys(services).some(key= > {
        if (services[key].Address === service.address && services[key].Port === service.port) {
          throw new Error('The service cluster endpoint[${service.address}.${service.port}] Occupied! `)}})// Register the cluster service
      consul.agent.service.register(service).then((a)= > {
        logger.info(`${bizService.name}Service registered successfully)
        resolve(services)
      }).catch(err= > {
        console.log(err)
      })
    }).catch(err= > {
      throw new Error(err)
    })
  })
}

module.exports = class ServerRegister {
  constructor() {
    this.register = register
  }
}
Copy the code

account-server/src/index.js

const vastify = require('vastify')
const version = require('.. /package.json').version
const microservicePort = 10015
const httpPort = 3333

// Register service
vastify.ServerRegister.register({
  bizService: {
    name: 'account-server'.host: '127.0.0.1'.port: httpPort,
    meta: {
      ? version: version,
      ? microservicePort: microservicePort
    }
  }
})
Copy the code

Mongodb persistent storage

  • Mongoose framework uses Mongoose to do mongoClient, of course, you can also choose native NodeJS mongoClient.

Before the transformation of the user module, steal a lazy not to paste the code, please see the Demo

Combined with Seneca and Consul’s routing service middleware

microRouting.js

/* * @Author: Cecil * @Last Modified by: Cecil * @Last Modified time: 2018-06-02 16:22:02 * @description Micro service internal routing middleware, currently does not support custom routing matching policy */

'use strict'

const Consul = require('consul')
const defaultConf = require('.. /config')
const { ObjectDeepSet, isNumber } = require('.. /helper/utils')
const { getServiceNameByServiceKey, getServiceIdByServiceKey } = require('.. /helper/consul')
const logger = new (require('.. /tools/logger'))().generateLogger()
const { IPV4_REGEX } = require('.. /helper/regex')

let services = {}
let consul = null

/** * @author Cecil0o0 * @description Synchronizes all available services in Consul and the corresponding check and assemblies them into objects for easy value */
function syncCheckList () {
  return new Promise((resolve, reject) = > {
    consul.agent.service.list().then(allServices= > {
      if (Object.keys(allServices).length > 0) {
        services = allServices
        consul.agent.check.list().then(checks= > {
          Object.keys(checks).forEach(key= > {
            allServices[getServiceIdByServiceKey(key)]['check'] = checks[key]
          })
          resolve(services)
        }).catch(err= > {
          throw new Error(err)
        })
      } else {
        const errmsg = 'No available service found'
        logger.warn(errmsg)
        reject(errmsg)
      }
    }).catch(err= > {
      throw new Error(err)
    })
  })
}

function syncRoutingRule(senecaInstance = {}, services = {}) {
  Object.keys(services).forEach(key= > {
    let service = services[key]
    let name = getServiceNameByServiceKey(key)
    let? addr = service.Addresslet? microservicePort =' '
    let? version =' '
    try {
      let base = JSON.parse(service.check.Notes) ? microservicePort = base.? microservicePort ? version = base.? version }catch (e) {
      logger.warn('Service name${serviceName}. The service check.Notes is in non-standard JSON format and has been ignored by the program. Please check how the service is registered (be sure to call ServerRegister's Register to register the service) ')}if(IPV4_REGEX.test(? addr) && isNumber(? microservicePort)) {if (service.check.Status === 'passing') {
        senecaInstance.client({
          host: ?addr,
          port: ?microservicePort,
          pin: {
            ?version,
            ? target: name
          }
        })
      } else {
        logger.warn(`The ${? target}@The ${? version ||'no'}The service is in critical and therefore cannot be used)}}else {
      logger.warn(` host (The ${? addr}) or microservice port number (The ${? microservicePort}) error, please check ')}}}function startTimeInterval() {
  setInterval(syncCheckList, defaultConf.routing.servicesRefresh)
}

function microRouting(consulServer) {
  var self = this
  consul = Consul(ObjectDeepSet(defaultConf['serverR&D'].consulServer, consulServer))
  syncCheckList().then(services= > {
    syncRoutingRule(self, services)
  })
}

module.exports = microRouting
Copy the code

After obtaining consul and mongodb runtime, use the config-server and Account-server Demo to test the consul and mongodb runtime.

[To be continued….]