Project background

Recently, I received a relatively simple task with the following requirements: 2. From MQTT server, subscribe to site alarm (0 power failure, 1 call), GPS information (latitude and longitude), equipment signal, and then cache these three kinds of information for each site in memory. 3. Make an HTTP GET request interface for each site (SS header code) and ClientID (device code). The front end can request the data of the site according to the site code and device code. It is mainly used for judging the online and offline status of the site and providing power outage alarms. The simple mind map of the program is shown below:Originally intended to use C++ to write, considering C++ to write HTTP interface is relatively troublesome, or use Nodejs to write more convenient, because Nodejs support for MQTT, HTTP is more friendly, more suitable for writing this simple background program. The general process of the program is as follows: 1. Subscribe the following three topic messages from the MQTT server: Subscribe topic (1). Alarm, 0 power off, 1 call /alarmSing 0

News topic and content of sample is as follows: / alarmSing / 865650043997457 = > 0

(2) GPS information /lbsLocation LAT = 022.6315208&LNG =114.0741963

News topic and content of sample is as follows: / lbsLocation / 865650043997457 = > lat LNG = = 022.6315208 & 114.0741963

(3). Device signal/CSQ 18 The message subject and content are as follows: / CSQ /865650043997457=>27

The configuration information for the MQTT server needs to be configured in the config.yaml file as shown in the following example:

rxmqtt:
  host:    127.0. 01.
  port:     8099
  user:     poweralarm
  pwd: "poweralarm@123"
  id: "mqweb_20200826_nodejs_alarm"
  clean:    true
Copy the code

Then connect to the MQTT server, set up the subscribed topics and write the corresponding callback handlers for each of the three topics.

2. Maintain a Map cached data structure of site information in memory, written in TypeScript for convenience.

 stationInfos: Map<string, StationInfo>;
Copy the code

Where StationInfo is a site information class

3. When receiving the three messages of alarm (/alarmSing), GPS information (/lbsLocation) and device signal (/ CSQ) pushed by MQTT server, modify the Map cache object stationInfos respectively, and query whether the site exists according to the transmitted DeviceId. If yes, update the corresponding data, latest communication time, and online status of the site. 4. Write the HTTP interface and collect the site information Map cache stationInfos to return the corresponding information according to the site code. 5.

Database structure

At present, database operation involves only two tables: Breakelectric, the site and device ID table, and PowerCutHistory, the power outage alarm record table

MySQL data table structure

DROP TABLE IF EXISTS `breakelectric`;
CREATE TABLE `breakelectric`  (
  `SStation` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT 'Site code',
  `DeviceId` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT 'device Id',
  `SStationName` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT 'Site name'
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;

DROP TABLE IF EXISTS `powercuthistory`;
CREATE TABLE `powercuthistory`  (
  `SStation` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
  `DeviceId` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,
  `SDateTime` datetime(0) NULL DEFAULT NULL,
  `DevState` bit(1) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;

Copy the code

SQLServer data table structure

DROP TABLE [dbo].[Breakelectric]
GO
CREATE TABLE [dbo].[Breakelectric] (
[SStation] varchar(255) NOT NULL ,
[DeviceId] varchar(100) NULL ,
[SStationName] varchar(255) NOT NULL 
)

DROP TABLE [dbo].[PowerCutHistory]
GO
CREATE TABLE [dbo].[PowerCutHistory] (
[SStation] varchar(255) NULL ,
[DeviceId] varchar(100) NULL ,
[SDateTime] datetime NULL ,
[DevState] bit NULL 
)
Copy the code

A few key wrapper classes

MQTT – TypeScript encapsulation

For simplicity, encapsulate the MQTT client into a class to use as follows:

import mqtt = require('mqtt')
import moment = require('moment')

export interface MqttConnOpt extends mqtt.IClientOptions{}

export declare type OnMessageFunc = (topic: string, payload: Buffer) = > void

declare class Topic {
  public topic: string;
  public qos: 0|1|2;
}

export class MQTT {
  mqclient: mqtt.MqttClient;
  brokerHost: string;
  brokerPort: number;
  subscribeTopics: Array<Topic>;
  subscribeCallbacks: Map<string, OnMessageFunc>;
  connOpt: MqttConnOpt;
  /** * Successfully connected to MQTT broker */
  connected: boolean;

  constructor(host? :string | any, port? :number) {
    this.brokerHost = host;
    this.brokerPort = port;
    this.subscribeTopics = new Array<Topic>();
    this.subscribeCallbacks = new Map<string, OnMessageFunc>();
    this.connected = false;
  }

  /** * subscribe to the topic */
  public subscribe(topic: string, qos: 0|1|2) {
    this.subscribeTopics.push({topic: topic, qos: qos});
    if (this.is_connected()){
      this.mqclient.subscribe(topic, {qos: qos}); }}/** * sets the message data callback function */
  public set_message_callback(topicPatten: string, cb: OnMessageFunc) {
    this.subscribeCallbacks.set(topicPatten, cb);
  }

  /** * Is connected to the server */
  public is_connected() {
    // return this.mqclient.connected == true;
    return this.connected == true;
  }

  /** * Connect to the server */
  public connect(opts? : MqttConnOpt){
    // Open re-subscribe
    opts.resubscribe = false;
    this.connOpt = opts;
    this.mqclient = mqtt.connect(`mqtt://The ${this.brokerHost}:The ${this.brokerPort}`, opts);

    this.mqclient.on('connect'.(connack) = >{
      console.log('Successfully connected to the server [The ${this.brokerHost}:The ${this.brokerPort}] `);
      this.connected = true;
      for (let index = 0; index < this.subscribeTopics.length; index++) {
        const element = this.subscribeTopics[index];
        this.mqclient.subscribe(element.topic, {qos: element.qos}); }});this.mqclient.on('message'.(topic: string, payload: Buffer) = >{
      console.log(` [${moment().format('YY-MM-DD HH:mm')}] The ${this.brokerHost} ${topic}`)
      this.mqclient;
      this.subscribeCallbacks.forEach((val, key) = >{
        if(topic.indexOf(key) ! = -1){ val(topic, payload); }}); });this.mqclient.on('reconnect'.() = >{
      console.log("Reconnect")});this.mqclient.on('error'.(err: Error) = >{
      console.log(err)
    });
  }

  /** * push data */
  public publish(topic: string, message: string, qos: 0|1|2) {
    this.mqclient.publish(topic, message, {qos: qos, retain: false}}})Copy the code

One thing to be aware of is that the MQTT server may be accidentally restarted or disconnected for other reasons, which requires disconnection and reconnection. In C++, C#, Java and other languages, a disconnection reconnection monitoring thread can be started to monitor the connection with the MQTT server at regular intervals, and reconnect if disconnection occurs.

Yaml file configuration class object

To facilitate the use of yaml files as configuration files, XML, ini, and yaml were also used as configuration files when C++ was used, and Java SpringBoot was also used as yml or yaml configuration files. My YAML configuration file looks like this:

rxmqtt:
  host:    127.0. 01.
  port:     8099
  user:     poweralarm
  pwd: "poweralarm@123"
  id: "mqweb_20200826_nodejs_alarm"
  clean:    true
# dbsql:
# host: 127.0.0.1
# port: 1433
# user: sa
# pwd: "123456"
# database: EMCHNVideoMonitor
dbsql:
  host: 127.0. 01.
  port: 3306
  user: root
  pwd: "123456"
  database: EMCHNVideoMonitor
redis: 
  host:     127.0. 01.
  port:     7001
  pwd: 123456
  index:     3
http: 3000
rpcUrl: 127.0. 01.: 18885
enableMqtt: true
enableDB: true
enableRedis: true
enableWS: true
enableRPC: true
offlineTimeout: 90000
cacheInterval: 10000
Copy the code

For the above yamL configuration file, write the corresponding YAML configuration reading class, as follows:

import YAML = require('yaml')
import fs = require('fs')

declare interface MqttConnOpt{
  host: string;
  port: number;
  user: string;
  pwd: string;
  clean: boolean;
  id: string;
}
declare interface DBConnOpt{
  host: string;
  port: number;
  user: string;
  pwd: string;
  database: string;
}
declare interface RedisConnOpt{
  host: string;
  port: number;
  pwd: string;
  db: number;
}

export {
  MqttConnOpt,
  DBConnOpt,
  RedisConnOpt,
  Config,
}


class Config {
  rxmqtt: MqttConnOpt;
  dbsql: DBConnOpt;
  redis: RedisConnOpt;
  /** * HTTP port */
  http: number;
  /** * rpcUrl server address */
  rpcUrl: string;
  /** * whether to enable MQTT */
  enableMqtt: boolean;
  /** * Whether to enable sqlServer or mysql database */
  enableDB: boolean;
  /** * Whether to enable redis */
  enableRedis: boolean;
  /** * Whether to enable websocket */
  enableWS: boolean;
  /** * Whether to enable RPC */
  enableRPC: boolean;
  /** * Offline timeout in milliseconds */
  offlineTimeout: number;
  /** * Cache storage interval, milliseconds */
  cacheInterval: number;

  constructor(){
    try{
      let buffer = fs.readFileSync('config.yaml'.'utf8');
      let config = YAML.parse(buffer);
      this.rxmqtt = config['rxmqtt'];
      this.dbsql = config['dbsql'];
      this.redis = config['redis'];
      this.http = config['http'];
      this.rpcUrl = config['rpcUrl'];
      this.enableMqtt = config['enableMqtt'];
      this.enableDB = config['enableDB'];
      this.enableRedis = config['enableRedis'];
      this.enableWS = config['enableWS'];
      this.enableRPC = config['enableRPC'];
      this.offlineTimeout = config['offlineTimeout'];
      this.cacheInterval = config['cacheInterval'];
    }catch(err){
      console.log(err)
    }
  }

  /** * save */
  public save() {
    try{
      fs.writeFileSync('config.yaml', YAML.stringify(this))}catch(err){
      console.log(err)
    }
  }
}

Copy the code

It’s actually easier to read and write YAML files using the third-party library and typescript.

Encapsulation of data manipulation classes

Mysql action class

In nodeJS, you can use libraries such as Mariadb or Sequelize to operate mysql database. In this case, the mariadb library mariadbclient.ts is used

import mariadb = require('mariadb')
import { StationInfo } from './StationInfo'
import moment = require('moment')

// Define the data query callback interface
export declare type OnQueryInfoReqCallback = (err: Error, rc: Array<any>) = > void
// Define the inbound callback interface
export declare type OnRecordReqCallback = (err: Error, rc: boolean) = > void

export class MariaDBClient {
  dbpool: mariadb.Pool;
  host: string;
  port: number;
  user: string;
  password: string;
  dbName: string;
  connected: boolean;

   // Site information Map
   public stationInfos: Map<string, StationInfo>;

  constructor(username: string, password: string, dbName: string, host? :string | any, port? :number) {
    this.host = host;
    this.port = port;
    this.user = username;
    this.password = password;
    this.dbName = dbName;
    this.connected = false;
    this.stationInfos = new Map<string, StationInfo>();
    // Initialize the Mariadb database client
    this.initMariadb();
    // Load the site information into memory
    this.getStationInfo();
  }

   /** * Initialize the Mariadb database client */
  public initMariadb() {
    this.dbpool = mariadb.createPool({
      host: this.host,
      port: this.port,
      user: this.user,
      password: this.password,
      database: this.dbName,
      connectionLimit: 10}); }/** * Whether the MariaDB database is connected */
  public is_connected() {
    return this.connected == true;
  }

   /** * Get site information */
  public async getStationInfo() {
    let conn;
    try {
      conn = await this.dbpool.getConnection();
      const rows = await conn.query("SELECT SStation, DeviceId, SStationName from Breakelectric WHERE SStation ! = '' AND DeviceId ! = ' ';");
      for (let i = 0; i < rows.length; i++) {
        const it = rows[i];
        const SStation = it['SStation'];
        this.stationInfos.has
        if (!this.stationInfos.has(SStation)) {
          let si = new StationInfo();
          si.SStation = it['SStation'];
          si.DeviceId = it['DeviceId'];
          si.SStationName = it['SStationName'];
          console.log(The first `${i + 1}Site code:${si.SStation}The device Id:${si.DeviceId}, site name:${si.SStationName}`);

          this.stationInfos.set(SStation, si); }}}catch (e) {
      console.error(e);
    } finally {
      if (conn) conn.release(); //release to pool}}/ * * * *@param Record gets a list of sites */
  public async getStationList(cb: OnQueryInfoReqCallback) {
    let conn;
    try {
      conn = await this.dbpool.getConnection();
      const rows = await conn.query("SELECT SStation, DeviceId, SStationName from Breakelectric WHERE SStation ! = '' AND DeviceId ! = ' ';");
      let stationList = new Array<any> ();for (let i = 0; i < rows.length; i++) {
        const rowItem = rows[i];
        let iitem = {
          'SStation': rowItem['SStation'].'DeviceId': rowItem['DeviceId'].'SStationName': rowItem['SStationName']
        }
        stationList.push(iitem);
      }
      if (cb) cb(null, stationList);
    } catch (e) {
      console.error(e);
      if (cb) cb(e, null);
    } finally {
      if (conn) conn.release(); //release to pool}}/** * add, delete, modify, query CRUD API */

  / * * * *@param Record Insert power off alarm record *@param cb 
   */
  public async insertStationRecord(record: any) {
    if (record === null) {
      return;
    }
    let sql1 = "INSERT INTO `emchnvideomonitor`.`powercuthistory` (`SStation`, `DeviceId`, `SDateTime`, `DevState`) VALUES";

    let conn: mariadb.PoolConnection;
    try {
      conn = await this.dbpool.getConnection();
      
      var sqlstr = sql1;

      let SStation = record.SStation;    // Site name
      let DeviceId = record.DeviceId;    / / device Id
      let SDateTime = record.SDateTime;  / / time
      let DevState = record.DevState;    // State (0 power off, 1 incoming call)

      var it = ` ('${SStation}', '${DeviceId}', '${SDateTime}',${DevState}) `;
      sqlstr += it;
      console.log(sqlstr);
      await conn.query(sqlstr);
      // if (cb) cb(null, true);
    } catch (e) {
      console.error('Failed to insert power outage alarm,',e);
      // if (cb) cb(e, false);
    } finally {
      if (conn) conn.release(); //release to pool}}}Copy the code

Essentially a action class

In nodejs, you can use libraries such as theoretical, MMSQL, and sequelize to operate sqlserver databases. Mariadbclient.ts is used to encapsulate sqlserver operations using MSSQL

import mssql = require('mssql');

// Define the data query callback interface
export declare type OnQueryCallback = (err: Error, rc: any) = > void
export declare type OnExecCallback = (err: Error, rc: boolean) = > void

export class MSSQLDBClient {
  // Database connection string
  // Connection mode: "MSSQL :// username: password @IP address :1433(default port number)/ database name"
  constr: string;

  constructor(username: string, password: string, host: string, port: number, dbName: string) {
    this.constr = `mssql://${username}:${password}@${host}:${port}/${dbName}`;
    mssql.connect(this.constr).then(function () {
      console.log('-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -');
      console.log('- Database login successful -');
      console.log('-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -');
    }).catch(function (err) {
      console.log(err); })}/** * Query a table in the database according to the SQL script *@param StrSql SQL script *@param Cb Callback function */ for query results
  public async query(strSql: string, cb: OnQueryCallback) {
    try {
      await mssql.connect(this.constr).then(function() {
        new mssql.Request().query(strSql).then(function(result) {
          // console.log(result);
          if (cb) cb(null, result);
        }).catch(function(err) {
          console.log(err);
          if (cb) cb(err, null);
        });
        // Stored Procedure
    }).catch(function(err) {
      console.log(err);
      if (cb) cb(err, null); })}catch (err) {
      console.log(err);
      if (cb) cb(err, null); }}/ * * * *@param StrSql SQL script *@param Cb Executes the callback */ of the SQL script
  public async exec(strSql: string, cb: OnExecCallback) {
    await mssql.connect(this.constr, function () {
      mssql.query(strSql, function (err, data) {
        if (err) {
          if (cb) cb(err, false);
        } else {
          if (cb) cb(null.true); mssql.close(); }}); }); }}Copy the code

The main service class service.ts

import moment = require('moment')
import sql = require('mssql')
import { Config } from './config'
import { MQTT } from './mq'
import * as http from 'http'
import { StationInfo } from './StationInfo'
// const mssqlDBClient = require('./db');
// import { MSSQLDBClient } from './MSSQLDBClient'
import { MariaDBClient } from './MariaDBClient'


/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 1, from the MQTT server to subscribe to the outage alarm information and then into the SQLServer database * 2, from the MQTT server to subscribe to the site alarm (0 power, 1 call), GPS information, equipment signal, and then in memory cache each site of the three kinds of information, Add the latest communication time (the latest time to receive the subscribed message), * then make an HTTP GET request interface for each site (SS header code) and ClientID (device code), the front-end can request the data of the site according to the site code and device code. * Mainly for the later stage to do the site online, offline services. * /

export class Service {
  mqttList: Array<MQTT>;      // List of MQTT clients
  stationInfos: Map<string, StationInfo>;
  config: Config;
  Server: http.Server;
  App: any;
  // mssqlDBClient: MSSQLDBClient;
  mySQLDBClient: MariaDBClient;

  // constructor
  constructor(app:any, server:http.Server) {
    this.Server = server;
    this.App = app;
    this.config = new Config();
    // Initialize the configuration
    // this.mssqlDBClient = new MSSQLDBClient(
    // this.config.dbsql.user,
    // this.config.dbsql.pwd,
    // this.config.dbsql.host,
    // this.config.dbsql.port,
    // this.config.dbsql.database
    // );
     // Create database client
     this.mySQLDBClient = new MariaDBClient(
      this.config.dbsql.user,
      this.config.dbsql.pwd,
      this.config.dbsql.database,
      this.config.dbsql.host,
      this.config.dbsql.port
    );
    
    this.mqttList = new Array<MQTT>();
    this.stationInfos = new Map<string, StationInfo>();
    // Establish a client connection
    if (this.config.enableMqtt) {
      this.connectMqtt();
    }
    // Load cached data into memory
    this.LoadStations();
    // Periodically check whether the site is online
    // this.taskCheckStationOnline();
    // Periodically store site data cache
    this.taskStoreStationData();
    // Periodically reload site information
    this.timerLoadStationInfo();
    // Initializes the HTTP request
    this.initApp();
  }

  /** * Periodically loads site information */
  public timerLoadStationInfo() {
    setInterval(async() = > {// this.getStationInfo();
      await this.mySQLDBClient.getStationInfo();
      this.stationInfos = this.mySQLDBClient.stationInfos;
    }, 120*1000);
  }

  /** * Load cached data into memory */
  public async LoadStations() {
    // Load the last cached data
    // await this.LoadRedisData();
    // Load the site information
    // await this.getStationInfo();
    await this.mySQLDBClient.getStationInfo();
    this.stationInfos = this.mySQLDBClient.stationInfos;
  }
 
  /** * Periodically check the status of the site */
  public taskCheckStationOnline() {
    setInterval(() = >{
      this.timerCheckOnline();
    }, this.config.offlineTimeout);
  }

  /** * Periodically store site cache data */
  public taskStoreStationData() {
    setInterval(() = >{
      // this.timerStorStationData();
      // this.taskStorNewData();
    }, this.config.cacheInterval);
  }


  /** * Check whether the site is online */
  public timerCheckOnline() {
    let stcodeIds = [];
    this.stationInfos.forEach((val, key) = > {
      let previous_online = val.Online;
      val.checkOnline();
      // If you were online before, you are now offline
      if(previous_online && ! val.Online) { stcodeIds.push(val.SStation);// this.sendHeart2WSClient(val.toWebHeartString());}})}/** * Initializes the HTTP request */
  public initApp() {
    if (!this.App) {
      return;
    }
    // Route interface
    // Get a list of all site codes and device ID mappings
    this.App.get('/api/getStationList'.(req, res) = > {
      let stationList = [];
      this.stationInfos.forEach((val, key) = > {
        stationList.push({
          'SStation': val.SStation,
          'DeviceId': val.DeviceId,
          'SStationName': val.SStationName
         });
      });
      res.send({
        rc: true.data: stationList
      })
    });
    
    // Obtain the current site information based on the site code
    this.App.get('/api/getAlarmInfo/:stcode'.(req, res) = > {
      let { stcode } = req.params;
      if (this.stationInfos.has(stcode)) {
        let item = this.stationInfos.get(stcode);
        return res.send({
          rc: true.data: item
        })
      } else {
        res.send({
          rc: false.data: 'Site code does not exist'})}})// Obtain the power failure alarm, equipment signal, longitude and latitude information of all stations
    this.App.get('/api/getAllStationInfos'.(req, res) = > {
      let stationList = [];
      this.stationInfos.forEach((val, key) = > {
        stationList.push(val);
      })
      res.send({
        rc: true.data: stationList
      })
    })
  }

  /** * Get site information */
  public async getStationInfo() {
    / / a, essentially
    // let strSql = "SELECT SStation, DeviceId, SStationName from Breakelectric WHERE SStation ! = '' AND DeviceId ! = ' ';" ;
    // this.mssqlDBClient.query(strSql, (err, result) => {
    // if (result == null || result == '' || result.recordsets[0] == undefined
    // || result.recordsets[0] == null) {
    // return;
    / /}
    // let resultArray = result.recordsets[0];
    // if (resultArray ! = null && resultArray.length > 0) {
    // for (let i = 0; i < resultArray.length; i++) {
    // // this.stationList.push(resultArray[i]);
    // let iitem = resultArray[i];
    // console.log(${iitem.SStation},DeviceId: ${iitem.DeviceId},SStationName: ${iitem.SStationName} ');
    // // console.log(resultArray[i]);
    // let stcode = iitem['SStation'];
    // if (! this.stationInfos.has(stcode)) {
    // this.stationInfos.set(stcode, new StationInfo());
    / /}
    // var si = this.stationInfos.get(stcode);
    // si.SStation = iitem['SStation'];
    // si.DeviceId = iitem['DeviceId'];
    // si.SStationName = iitem['SStationName'];
    / /}
    // console.log(JSON.stringify(this.stationInfos));
    / /}
    // });
    / / second, MariaDB
    this.mySQLDBClient.getStationList((err, result) = > {
      if(! err && result ! =null&& result ! = []) {for (let i = 0; i < result.length; i++) {
          let iitem = result[i];
          let SStation = iitem['SStation'];
          if (!this.stationInfos.has(SStation)) {
            this.stationInfos.set(SStation, new StationInfo());
          } 
          var si = this.stationInfos.get(SStation);
          si.SStation = iitem['SStation'];
          si.DeviceId = iitem['DeviceId'];
          si.SStationName = iitem['SStationName'];
          console.log(The first `${i + 1}Site code:${si.SStation}The device Id:${si.DeviceId}, site name:${si.SStationName}`); }}})}/** * Connect to MQTT server */
  public connectMqtt() {
    var it = new MQTT(this.config.rxmqtt.host, this.config.rxmqtt.port);
    // Subscribe to the topic
    // 1
    // /alarmSing 0
    it.subscribe('/alarmSing'.0);
    // 2. GPS information
    / / / lbsLocation lat LNG = = 022.6315208 & 114.0741963
    it.subscribe('/lbsLocation'.0);
    // 3. Device signal
    // /csq 18
    it.subscribe('/csq'.0);
    it.set_message_callback('/alarmSing'.this.handleAlarmSing.bind(this));
    it.set_message_callback('/lbsLocation'.this.handleGpsLocation.bind(this));
    it.set_message_callback('/csq'.this.handleCsq.bind(this));
    it.connect({
      username: this.config.rxmqtt.user,
      password: this.config.rxmqtt.pwd,
      clientId: this.config.rxmqtt.id,
      clean: this.config.rxmqtt.clean,
    });
  
    this.mqttList.push(it);
  }

  /** ** power alarm data processing function */
  handleAlarmSing(topic: string, payload: Buffer) {
    console.log('Power failure alarm data:${topic}= >${payload.toString()}`);
    const topics = topic.split('/');
    // /alarmSing/867814045313299=>1
    console.log('DeviceId', topics[1]);
  
    if (topics[1] = ='alarmSing') {
      let deviceId = topics[2];
      console.log('Device Id:', deviceId);
      let alarmDevState = parseInt(payload.toString());
      console.log('Power alarm DevState:', alarmDevState == 0 ? 'power' : 'call');
      // Query the corresponding site id SStation based on DeviceId
      let stcode = ' ';
      this.stationInfos.forEach((val, key) = > {
        if (val.DeviceId == deviceId) {
          stcode = key;
          // Update the communication time and power failure alarm information of the site
          let comTime = moment().format('YYYY-MM-DD HH:mm:ss');
          var si = this.stationInfos.get(stcode);
          let strStcode = si.SStation;
          si.alarmSing = alarmDevState;
          si.CommTime = comTime;
          si.Online = true;
          this.stationInfos.set(stcode, si);
          // Put the power alarm information into the database
          // this.powerCutAlarmStore({
          // SStation: strStcode,
          // DeviceId: deviceId,
          // SDateTime: comTime,
          // DevState: alarmDevState
          // });
          this.mySQLDBClient.insertStationRecord({
            SStation: strStcode,
            DeviceId: deviceId,
            SDateTime: comTime,
            DevState: alarmDevState }) } }); }}/** ** GPS data processing function */
 handleGpsLocation (topic: string.payload: Buffer) {
    console.log('GPS information data:${topic}= >${payload.toString()}`);
    const topics = topic.split('/');
    / / / lbsLocation / 867814045313299 = > lat LNG = = 022.7219409 & 114.0222168
    if (topics[1] = ='lbsLocation') {
      let deviceId = topics[2];
      console.log('Device Id:', deviceId);
      let strPayload = payload.toString();
      let strLatitude = strPayload.substring(strPayload.indexOf("lat=") +4, strPayload.indexOf("&"));
      let latitude = parseFloat(strLatitude);
      console.log('latitude: ', latitude);

      let strLongitude = strPayload.substring(strPayload.indexOf("lng=") +4);
      let longitude = parseFloat(strLongitude.toString());
      console.log('longitude: ', longitude);

      // Query the corresponding site id SStation based on DeviceId
      let stcode = ' ';
      this.stationInfos.forEach((val, key) = > {
        if (val.DeviceId == deviceId) {
          stcode = key;
          // Update the communication time and latitude and longitude information of the site
          let comTime = moment().format('YYYY-MM-DD HH:mm:ss');
          var si = this.stationInfos.get(stcode);
          si.Online = true;
          si.longitude = longitude;
          si.latitude = latitude;
          si.CommTime = comTime;
          this.stationInfos.set(stcode, si); }}); }}/** * device signal data processing function */
 handleCsq(topic: string, payload: Buffer) {
    console.log('Equipment signal data:${topic}= >${payload.toString()}`);
    const topics = topic.split('/');
    // /csq/867814045454838=>20
    if (topics[1] = ='csq') {
     let deviceId = topics[2];
     console.log('Device Id:', deviceId);
     let csq = parseInt(payload.toString());
     console.log('Equipment signal:', csq);
     // Query the corresponding site id SStation based on DeviceId
     let stcode = ' ';
     this.stationInfos.forEach((val, key) = > {
       if (val.DeviceId == deviceId) {
         stcode = key;
         // Update the communication time and CSQ signal value of the site
         let comTime = moment().format('YYYY-MM-DD HH:mm:ss');
         var si = this.stationInfos.get(stcode);
         si.Online = true;
         si.csq = csq;
         si.CommTime = comTime;
         this.stationInfos.set(stcode, si); }}); }}/** * site power alarm data store */
  public async powerCutAlarmStore(alarmRecord: any) {
    var SStation = alarmRecord.SStation;
    var DeviceId = alarmRecord.DeviceId;
    var SDateTime = alarmRecord.SDateTime;
    var DevState = alarmRecord.DevState;

    let strInsert = "INSERT INTO powercuthistory(SStation, DeviceId, SDateTime, DevState) VALUES";
    strInsert += ` ('${SStation}', '${DeviceId}', '${SDateTime}',${DevState}) `;
    // this.mssqlDBClient.exec(strInsert, (err, rc) => {
    // if (err) {
    // console.log(' Error inserting alarm data :', err);
    / /}
    // })}}Copy the code

app.js

Here, for simplicity, I directly use express generator to generate the basic framework of the project, and the corresponding app.js file is as follows:

var createError = require('http-errors');
var express = require('express');
var app = express();
var path = require('path');
var logger = require('morgan');

// var indexRouter = require('./routes/index');
// var usersRouter = require('./routes/users');

var app = express();

// view engine setup
app.set('views', path.join(__dirname, 'views'));
app.set('view engine'.'jade');

app.use(logger('dev'));
app.use(express.json());
app.use(express.urlencoded({ extended: false }));
app.use(express.static(path.join(__dirname, 'public')));

// app.use('/', indexRouter);
// app.use('/users', usersRouter);

module.exports = app;
Copy the code

bin/www

An instance of the Service class is created in the bin/ WWW file, the Config configuration is read, and the related services are started. Note: We need to pass the app and server to the service object. We need to write the HTTP interface in the service object to ensure that the HTTP interface and the site information cache share the same data. If we write the HTTP interface in app.js or routes/api.js, we need to pass the app and server to the service object. Creating two Service objects does not guarantee data synchronization for site information cache information.

#! /usr/bin/env node

/** * Module dependencies. */

var app = require('.. /app');
var debug = require('debug') ('hnmqalarmstore:server');
var http = require('http');

var config_1 = require('.. /config');
var Service_1 = require('.. /service');

var config = new config_1.Config();

/** * Get port from environment and store in Express. */

var port = normalizePort(process.env.PORT || config.http);
app.set('port', port);

/** * Create HTTP server. */

var server = http.createServer(app);

// Service object
new Service_1.Service(app, server);

// catch 404 and forward to error handler
app.use(function(req, res, next) {
  next(createError(404));
});

// error handler
app.use(function(err, req, res, next) {
  // set locals, only providing error in development
  res.locals.message = err.message;
  res.locals.error = req.app.get('env') = = ='development' ? err : {};

  // render the error page
  res.status(err.status || 500);
  res.render('error');
});

/** * Listen on provided port, on all network interfaces. */

server.listen(port);
server.on('error', onError);
server.on('listening', onListening);

/** * Normalize a port into a number, string, or false. */

function normalizePort(val) {
  var port = parseInt(val, 10);

  if (isNaN(port)) {
    // named pipe
    return val;
  }

  if (port >= 0) {
    // port number
    return port;
  }

  return false;
}

/** * Event listener for HTTP server "error" event. */

function onError(error) {
  if(error.syscall ! = ='listen') {
    throw error;
  }

  var bind = typeof port === 'string'
    ? 'Pipe ' + port
    : 'Port ' + port;

  // handle specific listen errors with friendly messages
  switch (error.code) {
    case 'EACCES':
      console.error(bind + ' requires elevated privileges');
      process.exit(1);
      break;
    case 'EADDRINUSE':
      console.error(bind + ' is already in use');
      process.exit(1);
      break;
    default:
      throwerror; }}/** * Event listener for HTTP server "listening" event. */

function onListening() {
  var addr = server.address();
  var bind = typeof addr === 'string'
    ? 'pipe ' + addr
    : 'port ' + addr.port;
  debug('Listening on ' + bind);
}
Copy the code

Some third-party libraries used

The package.json files of yamL, MSSQL, Mariadb, MQTT and Express are as follows:

Copy the code

“Name “:” hnmqalarmStore “, “version”: “0.0.0”, “private”: true, “scripts”: {“start”: “Node. / bin/WWW”}, “dependencies” : {” @ js – joda/core “:” ^ 3.0.0 “, “body – parser” : “^ 1.19.0”, “cookie – parser” : “^ 1.4.5”, “debug” : “~ 2.6.9”, “express” : “^ 4.16.4”, “express – session” : “^ 1.17.1”, “HTTP – errors” : “^ 1.8.0 comes with”, “jade” : “^ 1.11.0 mariadb”, “” :” ^ “2.4.2,” moment “:” ^ 2.27.0 “, “Morgan” : “^ 1.9.1”, “the MQTT” : “^ 2”, “MSSQL” : “^ 6.2.1”, “yaml” : “^ 1.10.0”}, “devDependencies” : {” nodemon “:” ^ 2.0.4 “}}

Copy the code