background

Taking a common Internet of Things usage scenario as an example, this paper introduces how to use edge computing to achieve fast, low-cost and efficient processing of business.

In various Internet of Things projects, such as intelligent building projects, building data (such as elevators, gas, water and electricity, etc.) need to be collected and analyzed. One solution is to connect all devices directly to an IoT platform in the cloud, something like Azure IoT Hub or AWS IoT Hub. The problem with this solution is that

  • Long delay in data processing: It takes a long time for data to be transmitted over the Internet or processed in the cloud to be returned to the device
  • Data transfer and storage costs: Transmission over the Internet requires bandwidth, which can be considerable for large-scale connected iot projects
  • Security of data: Some iot data can be very sensitive, and it is risky to transfer all iot data

In order to solve the above problems, the industry has put forward the solution of edge computing. The core of edge computing is to process the data nearby to avoid unnecessary delay, cost and security problems.

The business scenario

Suppose you have a set of devices, each of which has an ID, that send data over the MQTT protocol to the corresponding topic on the MQTT messaging server. The theme design is as follows, where {device_id} is the device ID.

devices/{device_id}/messages
Copy the code

Each device sends the temperature and humidity data collected by the sensor in JSON format.

{
    "temperature": 30."humidity" : 20
}
Copy the code

Now you need to analyze the data in real time and come up with the following requirements: Average value (T_av) was calculated for the temperature data of each device every 10 seconds, and the maximum value (t_max), minimum value (t_min) and number of data items (t_count) were recorded within 10 seconds. After the calculation, the four results were saved, as shown in the following example:

[{"device_id" : "1"."t_av" : 25."t_max" : 45."t_min" : 5."t_count" : 2
    },
    {
        "device_id" : "2"."t_av" : 25."t_max" : 45."t_min" : 5."t_count" : 2},... ]Copy the code

Plan to introduce

As shown in the figure below, we adopt the edge analysis/streaming data processing method, at the edge end we adopt EMQ X solution, and finally output the calculation results to Azure IoT Hub.

  • EMQ X Edge can be connected to devices of various protocol types, such as MQTT, CoAP, LwM2M, etc., so that users do not need to worry about protocol adaptation. It is also lightweight enough to be deployed on edge devices.
  • EMQ X Kuiper is a lightweight SQL-based edge streaming data analysis engine published by EMQ. The installation package is only about 7MB, which is ideal for running on edge devices
  • The Azure IoT Hub provides a comprehensive solution for device access and data analysis, which is used for cloud result data access and application result data analysis

Implementation steps

Install EMQ X Edge & Kuiper

  • As of this writing, the latest version of EMQ X Edge is 4.0, and users can install and start EMQ X Edge from Docker

    # docker pull emqx/emqx-edge
    # docker run -d --name emqx -p 1883:1883  emqx/emqx-edge:latest
    # docker psCONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES a348e3ac150c emqx/emqx-edge:latest "/usr/bin/docker-entr" 3 seconds ago Up 2 seconds 4369/tcp, 5369/tcp, 6369/tcp, 8080/tcp, 8083-8084/tcp, 8883/tcp, 11883/tcp, 0.0.0.0:1883 - > 1883 / TCP, 18083 / TCP emqxCopy the code

    You can run the Telnet command to check whether the startup is successful, as shown in the following figure.

    # telnet localhost 1883
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    Copy the code
  • Install and start Kuiper

    Click here to download the latest version of Kuiper and unzip it. At the time of this writing, the latest version of Kuiper is 0.0.3.

    #Unzip kuiper - Linux - amd64-0.0.3.zip
    # cd kuiper
    # bin/server
    Serving Kuiper server on port 20498
    Copy the code

    If it cannot be started, check the log file log/stream.log.

Create a flow

Kuiper provides a command to manage flows and rules. You can see the subcommands and their help by typing bin/cli in a command line window. By default, cli commands connect to the local Kuiper server. Cli commands can also connect to other Kuiper servers. Users can modify the connected Kuiper server in the etc/client.yaml configuration file. For more information about the command line, see here.

Create a flow definition: The purpose of creating a flow is to define the format of the data to be sent on the flow, similar to defining the structure of a table in a relational database. All supported data types in Kuiper can be found here.

# cd kuiper
# bin/cli create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="devices/+/messages")'
Copy the code

Kuiper creates a stream definition called Demo in Kuiper, which contains two fields, temperature and humidity, and data sources for the topic devices/+/messages subscribed to MQTT. Note that the wildcard + is used here. Messages for subscribing to different devices. The MQTT server address corresponding to the data source can be configured according to the server address in the etc/mqtt_source.yaml configuration file. Configure the Servers project as shown in the figure below.

#Global MQTT configurations
default:
  qos: 1
  sharedsubscription: true
  servers: [TCP: / / 127.0.0.1:1883]
Copy the code

Users can type the Describe subcommand from the command line to view the newly created flow definition.

# bin/cli describe stream demoConnecting to 127.0.0.1:20498 Fields -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- temperature float humidity bigint FORMAT: JSON DATASOURCE: devices/+/messagesCopy the code

Data business logic processing

Kuiper uses SQL to implement the business logic, and calculates the average, maximum, minimum and frequency of the temperature every 10 seconds, and groups the temperature according to the device ID. The SQL implemented is as follows.

SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/".1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)
Copy the code

The SQL here uses four aggregate functions to count the correlated values within the 10-second window.

  • avgAverage:
  • maxThe maximum value of:
  • min: the minimum
  • countCounting:

Two basic functions are also used

  • mqtt: fetch MQTT protocol information,mqtt(topic)Gets the subject name of the current get message
  • split_value: Splits the first argument with the second argument, and then the third argument specifies the subscript to get the split value. So the functionsplit_value("devices/001/messages", "/", 1)The call returns001

GROUP BY = device_id; GROUP BY = device_id; Time window TUMBLINGWINDOW(SS, 10), which means that a batch of statistics is generated every 10 seconds.

Debugging SQL

Before formally writing rules, we need to debug the rules, Kuiper provides SQL debugging tools, you can make it very convenient for users to debug SQL.

  • Go to the kuiper installation directory and run bin/ CLI Query

  • Enter the prepared SQL statement at the command line prompt that appears.

    # bin/cli queryConnecting to 127.0.0.1:20498 kuiper > SELECT AVg (temperature) AS t_av, Max (temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10) query is submit successfully. kuiper >Copy the code

    In the log file log/stream.log, you can see that a temporary rule named internal-kuiper_query_rule was created.

    . time="2019-11-12T11:56:10+08:00" level=info msg="The connection to server TCP ://10.211.55.6:1883 was established successfully" rule=internal-kuiper_query_rule
    time="2019-11-12T11:56:10+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=internal-kuiper_query_rule
    Copy the code

    Note that this rule, called internal-kuiper_query_rule, is created with query. The server checks the Query client every 5 seconds to see if it is online. If the Query client does not respond for more than 10 seconds (such as being shut down), The internal-kuiper_query_rule is automatically deleted and the following information is printed in the log file when it is deleted.

    . time="2019-11-12T12:04:08+08:00" level=info msg="The client seems no longer fetch the query result, stop the query now."
    time="2019-11-12T12:04:08+08:00" level=info msg="stop the query."
    time="2019-11-12T12:04:08+08:00" level=info msg="unary operator project cancelling...." rule=internal-kuiper_query_rule
    ...
    Copy the code
  • Sending test data

    Send the following test data to EMQ X Edge using any test tool. The author used JMeter MQTT plug-in in the test process, because JMeter can do some flexible automatic data generation, business logic control, as well as a large number of equipment simulation, etc.. Users can also simply emulate mosquitos with mosquitos and other clients.

    • Topic:devices/$device_id/messages, including$device_idIs the first column in the following data
    • Message:{"temperature": $temperature, "humidity" : $humidity}, including$temperature$humidityAre the second and third columns in the following data
    #device_id, temperature, humidity1,20,30,31,40,35,50,20,30,80,90,45,20,10,90,12,30,65,35,55,32Copy the code

    We can see that after sending the mock data, two sets of data are printed in two 10-second time Windows on the Query client command line. The number of output results is dependent on how often the user sends data. If Kuiper receives all data within a time window, it prints only one result.

    kuiper > [{" device_id ":" 1 ", "t_av" : 45, "t_count" : 3, "t_max:" 80, "t_min" : 20}, {" device_id ":" 2 ", "t_av" : 25.5, "t_count" : 2, "t_max" : 31, "t_mi n":20}] [{" device_id ", "2", "t_av" : 37.333333333333336, "t_count" : 3, "t_max" : 55, "t_min" : 12}, {" device_id ":" 1 ", "t_av" : 37.5, "t_count" : 2, "t_max":65,"t_min":10}]Copy the code

Create and submit rules

After debugging the SQL, configure the rules file and send the resulting data to the remote Azure IoT Hub via Kuiper’s MQTT Sink. In Azure IoT Hub, users need to create the following

  • IoT Hub: The name created in this article isrockydemoIs used to access devices
  • IoT Device: represents a Device, which is the gateway for processing Device data. Kuiper is installed on the gateway. After processing relevant data, the gateway sends the results to Azure cloud
  • Device connection user name and password: Refer to the Azure related documentation for the User name and password for the Azure IoT MQTT connection. For details about generating SAS tokens, see this document.

The device is created in the Azure IoT Hub as shown below.

Write Kuiper rules files

The rule file is a text file that describes the logic of business processing (previously debugged SQL statements) and the configuration of Sink (destination of message processing results). Most of the information for connecting to the Azure IoT Hub is described in the previous section. Note that the protocol_version must be set to 3.1.1 instead of 3.1.

{
  "sql": "SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), \"/\", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)"."actions": [{"log": {}}, {"mqtt": {
        "server": "ssl://rockydemo.azure-devices.net:8883"."topic": "devices/demo_001/messages/events/"."protocol_version": 3.1.1 ""."qos": 1."clientId": "demo_001"."username": "rockydemo.azure-devices.net/demo_001/?api-version=2018-06-30"."password": "SharedAccessSignature sr=*******************"}}}]Copy the code

Create rules from the Kuiper command line

# bin/cli create rule rule1 -f rule1.txt
Connecting to 127.0.0.1:20498
Creating a new rule from file rule1.txt. 
Rule rule1 was created.
Copy the code

In the log file, you can view the running connections of the rules, and if the configuration items are correct, you should see that the connection to the Azure IoT Hub was successfully established.

. time="2019-11-12T14:30:34+08:00" level=info msg="The connection to server TCP ://10.211.55.6:1883 was established successfully" rule=rule1
time="2019-11-12T14:30:34+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=rule1
time="2019-11-12T14:30:35+08:00" level=info msg="The connection to server ssl://rockydemo.azure-devices.net:8883 was established successfully" rule=rule1
......
Copy the code
  • Start Azure iot Hub monitoring by using the az iot Hub monitors-events -n Rockydemo command and send simulated data to the local EMQ X Edge like debugging SQL statements. After Kuiper processing, the corresponding processing results are sent to the Azure IoT Hub.

    #az iot hub monitor-events -n rockydemoStarting event monitor, use ctrl-c to stop... { "event": { "origin": "demo_001", "payload": "[{\"device_id\":\"2\",\"t_av\":32,\"t_count\":3,\"t_max\":45,\"t_min\":20},{\"device_id\":\"1\",\"t_av\":45,\"t_count\" :3,\"t_max\":80,\"t_min\":20}]" } } { "event": { "origin": "demo_001", "payload": "[{\" device_id \ ": \" 2 \ ", \ "t_av \" : 33.5, \ "t_count \" : 2, \ "t_max \" : 55, \ "t_min \" : 12}, {\ "device_id \" : \ "1 \", \ "t_av \" : 37.5, \ "t_cou nt\":2,\"t_max\":65,\"t_min\":10}]" } }Copy the code

conclusion

Through this article, readers can understand that the solution of EMQ X at the edge can be very fast and flexible to develop a system based on edge data analysis, and realize low delay, low cost and safe processing of data. Azure IoT also provides the IoT Edge solution. Compared to Azure’s solution,

  • Kuiper’s runtime is very lightweight; The Azure IoT Edge solution needs to provide runtime in relevant languages, and the installation package can be relatively large.
  • Kuiper based on SQL implementation of business logic is more rapid and simple, the complex business logic processing lack of flexibility; Azure IoT Edge is relatively flexible in terms of business implementation.
  • Kuiper is more flexible when integrating with third-party IoT Hubs. The Azure IoT Edge connects to the Azure IoT Hub only.

If you are interested in learning more about edge streaming data analysis, please refer to the Kuiper open source project.


For more information, please visit our official website emqx. IO, or follow our open source project github.com/emqx/emqx. For more details, please visit our official documentation.