This article uses a practical example in CentOS 7.2 to show how to use MySQL to store MQTT data.

MySQL is a traditional relational database product, and its open architecture makes users have a strong choice. In addition, with the gradual maturity of technology, MySQL supports more and more functions, performance is constantly improving, and support for the platform is increasing. In addition, there are a lot of community development and maintenance. Today, MySQL is popular because of its stability, excellent performance, and free use and modification under the GPL.

Install and verify the MySQL server

You can download and install the MySQL server by referring to the official MySQL documentation or using Docker. This article uses MySQL 5.6.

To facilitate management operations, you can download and use MySQL Workbeanch, the official free graphical management software.

EMQ X cannot connect to MySQL 8.0.

To prepare

Initialize the data table

The plug-in relies on the following data tables, which must be created by users and whose structure cannot be changed.

Mqtt_client Indicates that the storage device is online

DROP TABLE IF EXISTS `mqtt_client`;
CREATE TABLE `mqtt_client` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `clientid` varchar(64) DEFAULT NULL.`state` varchar(3) DEFAULT NULL.-- Online 0 Offline 1 Online
  `node` varchar(100) DEFAULT NULL.-- Owning node
  `online_at` datetime DEFAULT NULL.-- Online time
  `offline_at` datetime DEFAULT NULL.-- Offline time
  `created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `mqtt_client_idx` (`clientid`),
  UNIQUE KEY `mqtt_client_key` (`clientid`))ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

Mqtt_sub Stores the topic subscription of the device

DROP TABLE IF EXISTS `mqtt_sub`;
CREATE TABLE `mqtt_sub` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `clientid` varchar(64) DEFAULT NULL.`topic` varchar(255) DEFAULT NULL.`qos` int(3) DEFAULT NULL.`created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `mqtt_sub_idx` (`clientid`.`topic`(255),`qos`),
  UNIQUE KEY `mqtt_sub_key` (`clientid`.`topic`))ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

Mqtt_msg stores MQTT messages

DROP TABLE IF EXISTS `mqtt_msg`;
CREATE TABLE `mqtt_msg` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `msgid` varchar(100) DEFAULT NULL.`topic` varchar(1024) NOT NULL.`sender` varchar(1024) DEFAULT NULL.`node` varchar(60) DEFAULT NULL.`qos` int(11) NOT NULL DEFAULT '0'.`retain` tinyint(2) DEFAULT NULL.`payload` blob.`arrived` datetime NOT NULL.-- Whether to arrive (QoS > 0)
  PRIMARY KEY (`id`))ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

Mqtt_retain Stores the Retain message

DROP TABLE IF EXISTS `mqtt_retain`;
CREATE TABLE `mqtt_retain` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `topic` varchar(200) DEFAULT NULL.`msgid` varchar(60) DEFAULT NULL.`sender` varchar(100) DEFAULT NULL.`node` varchar(100) DEFAULT NULL.`qos` int(2) DEFAULT NULL.`payload` blob.`arrived` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `mqtt_retain_key` (`topic`))ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

Mqtt_acked stores client message acknowledgement

DROP TABLE IF EXISTS `mqtt_acked`;
CREATE TABLE `mqtt_acked` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `clientid` varchar(200) DEFAULT NULL.`topic` varchar(200) DEFAULT NULL.`mid` int(200) DEFAULT NULL.`created` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `mqtt_acked_key` (`clientid`.`topic`))ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

Configure the EMQ X server

EMQ X is installed in RPM mode, and the MySQL related configuration file is located in /etc/emqx/plugin/emqx_backend_mysql.conf. This article only tests the MySQL persistence function, and most of the configuration does not need to be changed. Enter username, password and database:

Auth.mysql. server = 127.0.0.1:3306 auth.mysql.username = root auth.mysql.password = 123456 auth.mysql.database = MQTTCopy the code

Leave the rest of the configuration file unchanged, and then you need to start the plug-in. There are three ways to launch plug-ins: command line, console, and REST API.

Start from the command line

emqx_ctl plugins load emqx_backend_mysql
Copy the code

Start from the administrative console

On the EMQ X admin Console plug-in page, locate the emqx_backend_mysql plug-in and click Start.

Start through the REST API

Use the PUT/API/v4 / nodes: node/plugins / : plugin_name/load API can start the plugin.

Client online status storage

When the client goes offline, the plug-in updates the online status, offline time, and node client list to the MySQL database.

Configuration items

Open the configuration file and configure Backend rules.

## disconnected: client. Connected, client. Disconnected
## action/function: on_client_connected, on_client_disconnected


## Client offline
backend.mysql.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}

backend.mysql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
Copy the code

Use the sample

Open http://127.0.0.1:18083 EMQ X management console, open tools -> Websocket to create a client connection, specify clientid as sub_client, click Connect, and manually disconnect after successful connection:

MySQL Workbeanch MySQL Workbeanch MySQL Workbeanch MySQL Workbeanch

Client proxy subscription

When the client goes online, the storage module reads the preset list of subscriptions directly from the database, and the proxy loads the subscription topic. The application can set/change the broker subscription list at the data level in scenarios where the client needs to communicate (receive messages) over a predetermined topic.

Configuration items

Open the configuration file and configure Backend rules.

## hook: client.connected
## action/function: on_subscribe_lookup
backend.mysql.hook.client.connected.2    = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
Copy the code

Use the sample

When the sub_client device comes online, you need to subscribe to sub_client/upstream and sub_client/downlink QoS 1 topics:

  1. inmqtt_subInitialize insert broker subscription topic information in table:
insert into mqtt_sub(clientid, topic, qos) values("sub_client"."sub_client/upstream".1);
insert into mqtt_sub(clientid, topic, qos) values("sub_client"."sub_client/downlink".1);
Copy the code
  1. EMQ X management consoleWebSocketPage to Clientidsub_clientCreate a new client connection and switch toTo subscribe toPage, you can see that the current client is automatically subscribedsub_client/upstreamsub_client/downlinkTwo QoS 1 topics:

  1. Switch back to the administrative consoleWebSocketPage,sub_client/downlinkTopics publish messages that can be received in the message subscription list.

Persist publish messages

Configuration items

Open the configuration file and configure Backend rules to filter messages using topic. The # wildcard is used to store any topic messages.

## hook: message.publish
## action/function: on_message_publish

backend.mysql.hook.message.publish.1     = {"topic": "#"."action": {"function": "on_message_publish"}, "pool": "pool1"}
Copy the code

Use the sample

In the EMQ X administration Console WebSocket page, multiple messages are published to the topic upstream_topic, and EMQ X persists the list of messages to the MQTT_MSG table:

Only QoS 1 and 2 message persistence is supported.

Retain message persisted

Configuration items

Open the configuration file and configure Backend rules.

Enable retain persistence for three life cycles

## Publish a non-empty RETAIN message (store)
backend.mysql.hook.message.publish.2     = {"topic": "#"."action": {"function": "on_message_retain"}, "pool": "pool1"}

Query the RETAIN message when a device subscribes to a topic
backend.mysql.hook.session.subscribed.2  = {"topic": "#"."action": {"function": "on_retain_lookup"}, "pool": "pool1"}

## Publish empty RETAIN message (clear)
backend.mysql.hook.message.publish.3     = {"topic": "#"."action": {"function": "on_retain_delete"}, "pool": "pool1"}

Copy the code

Use the sample

After establishing a connection in the EMQ X management console WebSocket page, select Reserved for publishing messages:

Publish (message is not empty)

When a non-empty RETAIN message is published, EMQ X will persist the message to the MQTT_RETAIN table using topic as the unique key. Different retain messages will be delivered to the same topic, and only the last one will be persisted:

To subscribe to

After the client subscrires to the RETAIN topic, EMQ X queries the MQTT_RETAIN data table and performs the post retain message operation.

Publish (message empty)

Under the MQTT protocol, publishing an empty retain message clears the retain record, at which point the retain record is deleted from the MQTT_RETAIN table.

Message confirmation persistence

After message Acknowledgement (ACK) persistence is enabled, EMQ X initializes ACK records in the database with clientid + Topic as the unique key when clients subscribe to QoS 1 or QoS 2 topics.

Configuration items

Open the configuration file and configure Backend rules. You can use topic wildcards to filter messages to be applied:

Initialize the ACK record when subscribing
backend.mysql.hook.session.subscribed.1  = {"topic": "#"."action": {"function": "on_message_fetch"}, "pool": "pool1"}


Update arrival status when messages arrive
backend.mysql.hook.message.acked.1       = {"topic": "#"."action": {"function": "on_message_acked"}, "pool": "pool1"}

Delete record rows when unsubscribing
backend.mysql.hook.session.unsubscribed.1= {"topic": "#"."action": {"sql": ["delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}
Copy the code

Use the sample

After establishing a connection in the EMQ X management Console WebSocket page, subscribe to topics with QoS > 0:

At this point, the MQTT_ACKED table will insert the initialization row, and the mid of the row will increment by 1 after the message is published to the topic with QoS > 0:

Topics in the proxy subscription that meet QoS > 0 will also be initialized, and the related record will be deleted after the client unsubscribes.

Custom SQL

In addition to built-in functions and table structures, emqx_backend_mysql also supports custom SQL statements, such as client connection history and updating custom tables, by dynamically constructing SQL statements using template syntax such as ${clientid}.

SQL statement parameters

hook The available parameters Example (${name} in SQL statement represents available parameters)
client.connected clientid insert into conn(clientid) values(${clientid})
client.disconnected clientid insert into disconn(clientid) values(${clientid})
session.subscribed clientid, topic, qos insert into sub(topic, qos) values({qos})
session.unsubscribed clientid, topic delete from sub where topic = ${topic}
message.publish msgid, topic, payload, qos, clientid insert into msg(msgid, topic) values(${msgid}, ${topic})
message.acked msgid, topic, clientid insert into ack(msgid, topic) values(${msgid}, ${topic})
message.delivered msgid, topic, clientid insert into delivered(msgid, topic) values(${msgid}, ${topic})

Example client connection log

The structure of the design table is as follows:

CREATE TABLE `mqtt`.`connect_logs` (
  `id` INT NOT NULL AUTO_INCREMENT,
  `clientid` VARCHAR(255) NULL.`created_at` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP.-- Record time
  `state` INT NOT NULL DEFAULT 0.Record type: 0 offline 1 online
  PRIMARY KEY (`id`))ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

Custom SQL:

SQL > connect hook
# # you can configure multiple SQL statements "SQL" : [" sql_a ", "sql_b", "sql_c"]

# # the connection
backend.mysql.hook.client.connected.3 = {"action": {"sql": ["insert into connect_logs(clientid, state) values(${clientid}1)"]}, "pool": "pool1"}

# # is disconnected
backend.mysql.hook.client.disconnected.3 = {"action": {"sql": ["insert into connect_logs(clientid, state) values(${clientid}, 0)"]}, "pool": "pool1"}
Copy the code

When the client goes online, it populates and executes a predetermined SQL statement, writing the connection record to the CONNECT_logs table.

Advanced options

backend.mysql.time_range = 5s

backend.mysql.max_returned_count = 500
Copy the code

conclusion

After understanding the data structure and custom SQL stored in MySQL, readers can expand related applications with MySQL.


For more information, please visit our official website emqx. IO, or follow our open source project github.com/emqx/emqx. For more details, please visit our official documentation.