The common MQS are Kafka, RocketMQ, and RabbitMQ, and they are also common. The former is common and belongs to MQ between microservices.

So what is MQTT? MQTT is IoT, the Internet of Things concept.

Check out the author who has been using MQtt.js to develop IM functionality for 2 years

First, take a look at the application scenarios of MQTT in the field of Internet of Things:

Mqtt.js is an implementation of MQTT on the NodeJS side.

With NPM package.json package management, front-end of modern VUE technology stack is also available, such as projects built with VUE-CLI, create-React-app, etc.

Mqtt.js has also made official support for wechat mini program and Alipay mini program. The MQTT protocol of wechat applet is named WXS, while alipay applet is alis.

If you’re still confused, follow me through mqtt.js to get to know the darling of the Internet of Things space.

  • What is a micro message queue?
  • MQTT key terms explained
  • P2P messages and Pub/Sub messages
  • Encapsulated MQtt.js common class
  • The client sends the packet function sendPacket
  • Client connection mqtt.connect()
  • Subscribe to the topic MQTT. Client# the subscribe ()
  • Send message mqtt.client# publish()
  • Receive message MQTT.Client# “message” event

What is a micro message queue?

Message queues generally fall into two categories:

  • Microservice message queue (message transfer between microservices, such as RabbitMQ, Kafka, RocketMQ)
  • Internet of Things message queue (Internet of Things side and cloud messaging, representing MQTT)

What I’ve been experimenting with so far, and what we’ll delve into in this post, is MQtt.js for the Internet of Things message queue.

Traditional message queues (messaging between microservices)

The traditional message queue between microservices (between multiple subsystem servers) is a very common way of messaging between servers.

Typical examples are: RabbitMQ, Kafka, RocketMQ. Ali Cloud official website has AMQP (compatible with RabbitMQ), Kafka, and RocketMQ microservice message queue, which will be a great help for us to land in the future.

There are a variety of use scenarios:

  • High concurrency: second kill, grab tickets (FIFO)
  • Shared type: Integral exchange (integral module shared by multiple subsystems)
  • Communication: server-side messaging (NodeJS, Java, Python, Go, etc.)

MQTT message queue (Messaging between iot end and cloud)

MQTT is an Internet of Things MQTT protocol, mainly to solve the complex problem of IoT network.

Ali Cloud has MQTT message queue service. Communication protocols support MQTT, STOMP, GB-808, etc. The data transmission layer supports TCP long connection, SSL encryption, and Websocket.

Application scenarios are mainly data transmission:

  1. Internet of Vehicles (remote control, vehicle data upload)
  2. IM communication (1 to 1 single chat, 1 to many friends circle)
  3. Live video (bullet screen notification, chat interaction)
  4. Smart home (electronic data upload, remote control command)

At present, the chat system that I am responsible for running for two years is using this service. We mainly communicate data between devices and PCS according to the way of device <->server<->PC, MQTT protocol and Websocket transmission protocol.

MQTT key terms explained

Instance

Each MQTT instance corresponds to a globally unique service access point. The visible difference is that the url of the broker is the same when connecting to the server (Broker) via MQtt.connect (URL). Let’s say we have Saleman1, Salesman2… their local urls to establish connections between the front-end and the server are all the same, just separate them in clientId.

Client Id (Client Id)

The MQTT Client ID is a unique identifier for each Client and is required to be globally unique. Connections using the same Client ID will be rejected. Ali Cloud ClientID is composed of two parts

@@@

. Generally, the Group ID is unified with multiple front-end, such as PC terminal, Android mobile terminal, ios mobile terminal and DeviceID. So how do you tell the difference? You can modify the @@@ in the middle of the Client ID. Such as:

let CID_PC = `<GroupID>@@@-PC<DeviceID>`
let CID_Android = `<GroupID>@@@-Android<DeviceID>`
let CID_IOS = `<GroupID>@@@-IOS<DeviceID>`
Copy the code

Group Id (Group Id)

A group name used to specify a group of logically identical nodes, representing a class of devices with the same function.

Device ID

Unique identification for each device. This parameter must be globally unique. It can be the serial number of each sensor device or the userId of the login PC.

Parent Topic

The MQTT protocol is based on the Pub/Sub model, where any message belongs to a Topic. A Topic can have multiple levels, with the first level being the parent Topic. The console needs to be created separately.

Subtopics

MQTT can have second-level or third-level topics. No need to create it, just write it in code.

P2P messages and Pub/Sub messages

Pub/Sub messages are subscription and publish patterns, like event listening and broadcasting. If you don’t understand publishing and subscribing, what is Webhook? MQTT supports P2P mode as well as Pub/Sub mode.

What is a P2P message?

  • P2P (Point to Point)
  • One-to-one message sending and receiving mode, with only one sender and one receiver.
  • In P2P mode, the sender of a message clearly knows the intended recipient of the message, and the message can only be consumed by this particular client.
  • When a sender sends a message, the recipient is specified by Topic, and the recipient gets the message without subscribing.
  • P2P mode not only reduces the cost of registering subscriptions, but also reduces push latency due to link optimization.

Difference between P2P mode and Pub/Sub mode

When sending a message

  • In Pub/Sub mode, the sender needs to send messages according to the Topic agreed with the recipient
  • In P2P mode, the sender does not need to send according to Tpic, but can directly send according to specifications

When receiving a message

  • In Pub/Sub mode, recipients need to subscribe to topics in advance to receive messages
  • In P2P mode, messages can be received without subscription

Nodejs sends P2P messages

const p2pTopic =topic+"/p2p/GID_xxxx@@@DEVICEID_001";
mqtt.client.publish(p2pTopic);
Copy the code

Encapsulated MQtt.js common class

  • Client connection to initClient(config)
  • SubscribeTopic subscribeTopic(topic, config)
  • Send message publishMessage(message)
  • Receive the handleMessage(callback)
import mqtt from 'mqtt';
import config from '@/config';

export default class MQTT {
  constructor(options) {
    this.name = options.name;
    this.connecting = false;
  }
  /** * Client connection */
  initClient(config) {
    const { url, groupId, key, password, topic: { publish: publishTopic }} = config;
    return new Promise((resolve) = > {
      this.client = mqtt.connect(
        {
          url,
          clientId: `${groupId}@ @ @${deviceId}`.username: key,
          password,
        }
      );
      this.client.on('connect'.() = > {
        this.connecting = true;
        resolve(this);
      });
    });
  }

  /** * subscribe topic */
  subscribeTopic(topic, config) {
    if (this.connecting) {
      this.client.subscribe(topic, config);
    }
    return this;
  }

  /** * Send message */
  publishMessage(message) {
    this.client.publish(publishTopic, message, { qos: 1 });
  }

  /** * Receive message */
  handleMessage(callback) {
    if (!this.client._events.message) {
      this.client.on('message', callback); }}}Copy the code

The client sends the packet function sendPacket

Mqtt-packet generates a transportable buffer

var mqtt = require('mqtt-packet')
var object = {
  cmd: 'publish'.retain: false.qos: 0.dup: false.length: 10.topic: 'test'.payload: 'test' // Can also be a Buffer
}
var opts = { protocolVersion: 4 } // default is 4. Usually, opts is a connect packet

console.log(mqtt.generate(object))
// Prints:
//
// <Buffer 30 0a 00 04 74 65 73 74 74 65 73 74>
//
// Which is the same as:
//
// new Buffer([
// 48, 10, // Header (publish)
// 0, 4, // Topic length
// 116, 101, 115, 116, // Topic (test)
// 116, 101, 115, 116 // Payload (test)
// ])
Copy the code

SendPacket function

The PacketSend event is emitted and the packet is written to the client stream via mqtt.writeToStream.

var mqttPacket = require('mqtt-packet')

function sendPacket (client, packet) {
  client.emit('packetsend', packet)
  mqttPacket.writeToStream(packet, client.stream, client.options)
}
Copy the code

_sendPack method

MqttClient.prototype._sendPacket = function (packet) {
     sendPacket(this, packet);
}
Copy the code

Client connection mqtt.connect()

An MQTT client establishes a connection to an MQTT Server (Broker), usually by giving a URL for the protocols’ MQTT ‘, ‘MQTTS ‘,’ TCP ‘, ‘TLS ‘,’ WS ‘, ‘WSS ‘,’ WXS ‘, ‘ALIS’.

mqtt.connect([url], options)
Copy the code

Official note:

  • Connect to a broker with a given URL and configuration and return a Client.
  • Url can follow the following protocol: ‘the MQTT’, ‘MQTTS’, ‘TCP’, ‘the TLS’, ‘the ws’,’ WSS ‘, ‘WXS’, ‘alis’. (MQtt. js supports wechat applet and Alipay applet, with protocols WXS and ALIS respectively.)
  • Urls can also be objects returned via url.parse ().
  • You can pass in a single object that contains both the URL and the options.

Let’s look at the connection configuration and connection result of my project. Sensitive information has been desensitized using a combination of foo, bar, baz or XXXX.

Connection configuration

 {
    key: 'xxxxxxxx'.secret: 'xxxxxxxx'.url: 'wss://foo-bar.mqtt.baz.com/mqtt'.groupId: 'FOO_BAR_BAZ_GID'.topic: {
      publish: 'PUBLISH_TOPIC'.subscribe: ['SUBSCRIBE_TOPIC/noticePC/'.'SUBSCRIBE_TOPIC/p2p'].unsubscribe: 'SUBSCRIBE_TOPIC/noticeMobile/',}}Copy the code
  • Key account
  • The secret password
  • Url is used to establish a link between client and Server (Broker) MQTT connections
  • GroupId the group id
  • Topic Sends messages, subscribes, and unsubscribes

Join results

Includes overview, response headers, and request headers.

General
Request URL: wss://foo-bar.mqtt.baz.com
Request Method: GET
Status Code: 101 Switching Protocols
Copy the code
Response Header
HTTP/1.1 101 Switching Protocols
upgrade: websocket
connection: upgrade
sec-websocket-accept: xxxxxxx
sec-websocket-protocol: mqtt
Copy the code
Request Header
GET wss://foo-bar.mqtt.baz.com/ HTTP/1.1 Host: foo-bar.mqtt.baz.com Connection: Upgrade Pragma: no-cache cache-control: No - the cache the user-agent: Mozilla / 5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36 Upgrade: websocket Origin: https://xxx.xxx.com Sec-WebSocket-Version: 13 Accept-Encoding: gzip, deflate, br Accept-Language: zh-CN,zh; Q = 0.9, en - US; Q = 0.8, en. Q = 0.7, useful - TW; Q =0.6 sec-websocket-key: XXXXXXXXX sec-websocket-extensions: permessage-deflate; client_max_window_bits Sec-WebSocket-Protocol: mqttCopy the code
Source code analysis

Here is the code for the MQTT connection.

this.client = mqtt.connect(
  {
    url,
    clientId: `${groupId}@ @ @${deviceId}`.username: key,
    password,
  }
);
Copy the code
function parseAuthOptions (opts) {
  var matches
  if (opts.auth) {
    matches = opts.auth.match(/ ^ (. +) : (. $/ +))
    if (matches) {
      opts.username = matches[1]
      opts.password = matches[2]}else {
      opts.username = opts.auth
    }
  }
}
/**
 * connect - connect to an MQTT broker.
 *
 * @param {String} [brokerUrl] - url of the broker, optional
 * @param {Object} opts - see MqttClient#constructor
 */
function connect (brokerUrl, opts) {
  if ((typeof brokerUrl === 'object') && !opts) {
    // You can pass in a single object that contains both the URL and the options
    opts = brokerUrl
    brokerUrl = null
  }
  opts = opts || {}
  // Set username and password
  parseAuthOptions(opts)
  if (opts.query && typeof opts.query.clientId === 'string') {
    // Set the Client Id
    opts.clientId = opts.query.clientId
  }
  function wrapper (client) {...return protocols[opts.protocol](client, opts)
  }
  // Finally return an MQTT client instance
  return new MqttClient(wrapper, opts)
}
Copy the code

Subscribe to the topic MQTT. Client# the subscribe ()

The actual code

const topic =  {
      subscribe: ['SUBSCRIBE_TOPIC/noticePC/'.'SUBSCRIBE_TOPIC/p2p'].unsubscribe: 'SUBSCRIBE_TOPIC/noticeMobile/'};const config = { qos:1 };
this.client.subscribe(topic.subscribe, config)
Copy the code

Source code analysis

MqttClient.prototype.subscribe = function () {
  var packet
  var args = new Array(arguments.length)
  for (var i = 0; i < arguments.length; i++) {
    args[i] = arguments[i]
  }
  var subs = []
   // obj is the list of subscribed topics
  var obj = args.shift()
  // Configure qos
  var opts = args.pop()
  var defaultOpts = {
    qos: 0
  }
  opts = xtend(defaultOpts, opts)
  // Array type list of subscribed topics
  if (Array.isArray(obj)) {
    obj.forEach(function (topic) {
      if(! that._resubscribeTopics.hasOwnProperty(topic) || that._resubscribeTopics[topic].qos < opts.qos || resubscribe) {var currentOpts = {
          topic: topic,
          qos: opts.qos
        }
        // Subs are the final list of subscribed topics
        subs.push(currentOpts)
      }
    })
  }
  // This packet is important
  packet = {
    // Issue a subscription command
    cmd: 'subscribe'.subscriptions: subs,
    qos: 1.retain: false.dup: false.messageId: this._nextId()
  }
  // Issue a subscription package
  this._sendPacket(packet)
  return this
}
Copy the code

Send message mqtt.client# publish()

The actual code

const topic = {
      publish: 'PUBLISH_TOPIC'};const messge = {
   foo: ' '.bar: ' '.baz: ' '. }const msgStr = JSON.stringify(message);
this.client.publish(topic.publish, msgStr);
Copy the code

Note that the publish message needs to be serialized using json.stringify and then sent to the specified topic.

Source code analysis

MqttClient.prototype.publish = function (topic, message, opts, callback) {
  var packet
  var options = this.options
  var defaultOpts = {qos: 0.retain: false.dup: false}
  opts = xtend(defaultOpts, opts)

  // The payload that sends the message to the packet
  packet = {
    cmd: 'publish'.topic: topic,
    payload: message,
    qos: opts.qos,
    retain: opts.retain,
    messageId: this._nextId(),
    dup: opts.dup
  }
  // Handle different qos
  switch (opts.qos) {
    case 1:
    case 2:
       // Publish packet
       this._sendPacketI(packet); .default:
       this._sendPacket(packet); . }return this
}
Copy the code

Receive message MQtt. Client Message event

The actual code

this.client.on('message', callback);
Copy the code

The data is received as callback.

function (topic, message, packet) {}
Copy the code

Topic represents the topic received, and buffer is the specific data. Message is the received data, remember to parse buffer with json.parse ().

handleMessage(callback) {
    this.client.on('message', callback);
}
this.client.handleMessage((topic, buffer) = > {
  let receiveMsg = null;
  try {
   receiveMsg = JSON.parse(buffer.toString());
  } catch (e) {
   receiveMsg = null;
  }
  if(! receiveMsg) {return;
  }
  ...do something with receiveMsg...
});
Copy the code

Source code analysis

MqttClient inherits EventEmitter. So you can use on to listen for “message” events.

inherits(MqttClient, EventEmitter)
Copy the code

So where exactly is the message event sent? >emit the message event

  1. Establish a Websocket connection based on websocket-stream
  2. Use pipe to connect to a Writable stream created based on readable-stream.Writable
  3. NextTick call _handlePacket
  4. Call handlePublish in handlePacket to issue the message event
1. Establish a Websocket connection based on websocket-stream
this.stream = this.streamBuilder(this)
function streamBuilder (client, opts) {
  return createWebSocket(client, opts)
}
var websocket = require('websocket-stream')
function createWebSocket (client, opts) {
  var websocketSubProtocol =
    (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)?'mqttv3.1'
      : 'mqtt'

  setDefaultOpts(opts)
  var url = buildUrl(opts, client)
  return websocket(url, [websocketSubProtocol], opts.wsOptions)
}
Copy the code
2. Use pipe to connect the Writable stream created based on readable-stream.Writable
var Writable = require('readable-stream').Writable
var writable = newWritable ();this. The stream. The pipe (writable);Copy the code
3. Call _handlePacket nextTick
writable._write = function (buf, enc, done) {
    completeParse = done
    parser.parse(buf)
    work()
}
function work () {
    var packet = packets.shift()
    if (packet) {
      that._handlePacket(packet, nextTickWork)
    }
}
function nextTickWork () {
    if (packets.length) {
      process.nextTick(work)
    } else {
      var done = completeParse
      completeParse = null
      done()
    }
}
Copy the code
4. Call handlePublish in handlePacket to issue the message event
MqttClient.prototype._handlePacket = function (packet, done) {
  switch (packet.cmd) {
    case 'publish':
      this._handlePublish(packet, done)
      break. }// emit the message event
MqttClient.prototype._handlePublish = function (packet, done) {
  switch (qos) {
    case 1: {
      // emit the message event
        if(! code) { that.emit('message', topic, message, packet) }
    }
}
Copy the code

References:

  • Cloud.tencent.com/developer/n…
  • Github.com/mqttjs/MQTT…
  • Help.aliyun.com/document_de…
  • Help.aliyun.com/document_de…

I am looking forward to communicating with you and making progress together. Welcome to join the technical discussion group I created which is closely related to front-end development:

  • SegmentFault column: Be a good front-end engineer while you’re still young
  • Zhihu column: Be an excellent front-end engineer while you are still young
  • Github blog: Personal blog 233 while You’re Still Young
  • excellent_developers

Strive to be an excellent front-end engineer!