An overview of

In EdgeX Geneva, EMQ X Kuiper – lightweight sqL-based streaming data processing software is integrated with EdgeX. Before we dive into this tutorial, let’s take a moment to cover some of the basics of Kuiper. EMQ X Kuiper is a lightweight open source software for Internet of Things edge analysis and streaming processing implemented by Golang, which can run on all kinds of edge devices with limited resources. Kuiper supports streaming data processing based on Source, SQL and Sink.

  • Source: A Source of streaming data, such as data from an MQTT server. In the case of EdgeX, the data source is EdgeX Message Bus, which can come from ZeroMQ or MQTT server.

  • SQL: SQL is where you stream data to process specified business logic. Kuiper provides SQL statements that can extract, filter and transform data.

  • Target (Sink) : Target is used to send analysis results to a specific target. For example, sending analysis results to another MQTT server, or to an HTTP Rest address;

To use Kuiper, you typically complete the following three steps.

  • Creating the flow is where you define the data source
  • Write the rules
    • Write SQL for data analysis
    • Specify a target to save the results of the analysis
  • Deploy, and run the rules

This tutorial describes how to use Kuiper to process data from the EdgeX message bus.

Kuiper EdgeX integration

EdgeX uses a message bus for data exchange between different microservices. It contains an abstract message bus interface and implements ZeroMQ and MQTT, respectively, to support information interaction between different microservices. The integration of Kuiper and EdgeX consists of the following three parts:

  • Extended an EdgeX message bus source to support receiving data from the EdgeX message bus

  • In order to analyze the data, Kuiper needs to know the format of the incoming data stream. In general, it is best for users to specify the format of the stream data to be analyzed when creating the stream. As shown below, a demo stream contains a field named Temperature. This is much like creating table definitions in a relational database. Once the flow definition is created, Kuiper can type check incoming data at compile or run time, and errors are reported to the user.

    CREATE STREAM demo (temperature bigint) WITH (FORMAT="JSON"...).Copy the code

    However, in EdgeX, the data type definition is specified in the EdgeX Core Contract Service. To improve the user experience, users can create streams without specifying data types. The Kuiper source gets all the value descriptors definitions from the Core Contract Service when the rule is initialized (so you need to restart the rule if there are any changes to the data type definition). When data is received from the message bus, the root rule is converted to the corresponding data type.

  • The extension supports the EdgeX message bus target (Sink) for writing processing results back to the EdgeX message bus. Users can also choose to send analysis results to the RestAPI interface that Kuiper already supports.

Run the EdgeX Docker instance

Open the EdgeX Develop-Scripts project and download the Docker Compose file for the Geneva version, then start all the EdgeX containers.

# docker-compose -f ./docker-compose-nexus-redis-no-secty.yml up -d --build
Copy the code

After all containers are started, please use the docker ps command to confirm that all containers are started normally.

$ docker ps
CONTAINER ID        IMAGE                                                                  COMMAND                  CREATED             STATUS              PORTS                                                                                              NAMES
5618c93027a9        nexus3.edgexfoundry.org:10004/docker-device-virtual-go:master          "/ device - virtual - p..."37 minutes ago Up 37 minutes 0.0.0.0:49990->49990/ TCP edgex-device-virtual Fabe6b9052f5 nexus3.edgexfoundry.org:10004/docker-edgex-ui-go:master"./edgex-ui-server"37 minutes ago Up 37 minutes 0.0.0.0:2000 -> 2000 / TCP edgex-UI-go 950135a7041D emqx/kuiper:0.3.1"/ usr/bin/docker - ent..."37 minutes ago Up 37 minutes 0.0.0.0:20498->20498/ TCP, 9081/ TCP, 0.0.0.0:48075-48075 / TCP edgex - > kuiper c49b0d6f9347 nexus3.edgexfoundry.org: 10004 / docker - support - the scheduler - go: master"/ support - the scheduler..."37 minutes ago Up 37 minutes 0.0.0.0:48085->48085/ TCP Edgex-support-scheduler 4265dCC2bb48 nexus3.edgexfoundry.org:10004/docker-core-command-go:master"/ core - the command - cp = c..."37 minutes ago Up 37 minutes 0.0.0.0:48082->48082/ TCP edgex-core-command 4667160e2f41 nexus3.edgexfoundry.org:10004/docker-app-service-configurable:master"/ app - service - config..."37 minutes ago Up 37 minutes 48095/tcp, 0.0.0.0:48100-48100 / TCP edgex - app - service - > configurable - nine bbfe95993f5 rules nexus3.edgexfoundry.org:10004/docker-core-metadata-go:master"/ core metadata - cp =..."37 minutes ago Up 37 minutes 0.0.0.0:48081->48081/ TCP, 48082/tcp edgex-core-metadata 2e342a3aae81 nexus3.edgexfoundry.org:10004/docker-support-notifications-go:master"/ support - notificati..."37 minutes ago Up 37 minutes 0.0.0.0:48060->48060/ TCP edgex-supporting-notifications 3CFc628e013A nexus3.edgexfoundry.org:10004/docker-sys-mgmt-agent-go:master"/ sys - MGMT - agent - cp..."37 minutes ago Up 37 minutes 0.0.0.0:48090->48090/ TCP edgex-sys-mgmt-agent f69e9c4d6cc8 nexus3.edgexfoundry.org:10004/docker-core-data-go:master"/ core - data - cp = cons..."37 minutes ago Up 37 minutes 0.0.0.0:5563->5563/ TCP, 0.0.0.0:48080-48080 / TCP edgex - > the core data of 9 e5091928409 nexus3.edgexfoundry.org: 10004 / docker - support - logging - go: master"/ support - logging - c..."37 minutes ago Up 37 minutes 0.0.0.0:48061->48061/ TCP edgex-support-logging 74e8668F892c Redis :5.0.7-alpine"Docker - entrypoint. S..."37 minutes ago Up 37 minutes 0.0.0.0:6379->6379/ TCP edgex-redis 9b341bb217f9 Consul :1.3.1"Docker - entrypoint. S..."37 minutes ago Up 37 minutes 0.0.0.0:8400->8400/ TCP, 8300-8302/ TCP, 8301-8302/udp, 8600/ TCP, 8600/udp, 0.0.0.0:8500-8500 / TCP edgex - core - > consul ed7ad5ae08b2 nexus3.edgexfoundry.org: 10004 / docker - edgex - volume: master"/ bin/sh - c '/ usr/bi..."   37 minutes ago      Up 37 minutes                                                                                                          edgex-files
Copy the code

Create a flow

This step is to create a flow that can consume data from the EdgeX message bus. There are two ways to support administrative flow, and you can choose which way you like.

Method 1: Use Rest APIS

Note that the Kuiper Rest interface in EdgeX uses port 48075 instead of the default port 9081. So when EdgeX calls Kuiper Rest, replace all 9081 with 48075 in the document.

Replace $kuiper_server with the address of the locally running Kuiper instance.

curl -X POST \
  http://$kuiper_server:48075/streams \
  -H 'Content-Type: application/json' \
  -d '{ "sql": "create stream demo() WITH (FORMAT=\"JSON\", TYPE=\"edgex\")" }'
Copy the code

Refer to this documentation for other apis.

Method 2: Use the Kuiper command line

Use the following command to enter a running Kuiper Docker instance.

docker exec -it kuiper /bin/sh
Copy the code

Use the following command to create a flow definition called demo.

bin/cli create stream demo'() WITH (FORMAT="JSON", TYPE="edgex")'
Copy the code

For other command lines, refer to this document.


Now that the STREAM is created, you may wonder how Kuiper knows the address and port of the message bus, since such information is not specified in CREATE STREAM. This information is actually specified in the configuration file /etc/sources.edge. yaml, which you can view by typing cat /etc/sources.edge. yaml in the command line window. If you have a different server, port, or service address, update the configuration accordingly. As mentioned earlier, these configuration options can be overridden at container startup time.

#Global Edgex configurations
default:
  protocol: tcp
  server: localhost
  port: 5566
  topic: events
  serviceServer: http://localhost:48080
.....  
Copy the code

Refer to this document for more information about configuration files.

Create rules

Let’s create a rule to send the analysis results to the MQTT server, and refer to this link for configuration of the MQTT target. Similar to the process of creating flows, you can choose to use REST or the command line to manage rules.

The following example will select all data on all events topics and the analysis results will be

  • Publish to a common MQTT serverbroker.emqx.ioThe theme of theresultOn;
  • Print to log file

Option 1: Use the Rest API

curl -X POST \
  http://$kuiper_server:9081/rules \
  -H 'Content-Type: application/json' \
  -d '{ "id": "rule1", "sql": "SELECT * FROM demo", "actions": [ { "mqtt": { "server": "tcp://broker.emqx.io:1883", "topic": "result", "clientId": "demo_001" } }, { "log":{} } ] }Copy the code

Option 2: Use the Kuiper command line

You can use any editor to create a rule, copy the following content into the editor and name it rule-txt.

{
  "sql": "SELECT * from demo"."actions": [{"mqtt": {
        "server": "tcp://broker.emqx.io:1883"."topic": "result"."clientId": "demo_001"}}, {"log"] : {}}}Copy the code

In the running container, execute the following command.

# bin/cli create rule rule1 -f rule.txt
Connecting to 127.0.0.1:20498...
Creating a new rule from file rule.txt.
Rule rule1 was created successfully, please use 'cli getstatus rule rule1' command to get rule status.
Copy the code

To send results to other targets, refer to other targets supported in Kuiper. You can now look at the log file in log/stream.log to see the details of the rule.

time="2020-04-17T06:32:24Z" level=info msg="Serving Kuiper (version-0.3.1-4-G9e63FE1) on port 20498, and restful API on Port 9081. \n" file="server.go:101"
time="2020-04-17T06:32:24Z" level=info msg="The connection to edgex messagebus is established successfully." file="edgex_source.go:95" rule=rule1
time="2020-04-17T06:32:24Z" level=info msg="Successfully subscribed to edgex messagebus topic events." file="edgex_source.go:104" rule=rule1
time="2020-04-17T06:32:24Z" level=info msg="The connection to server tcp://broker.emqx.io:1883 was established successfully" file="mqtt_sink.go:161" rule=rule1
time="2020-04-17T06:32:25Z" level=info msg="Get 24 of value descriptors from service." file="edgex_source.go:223"
time="2020-04-17T06:32:25Z" level=info msg="sink result for rule rule1: [{\"int32\":-697766590}]" file="log_sink.go:16" rule=rule1
time="2020-04-17T06:32:25Z" level=info msg="sink result for rule rule1: [{\"int8\":-47}]" file="log_sink.go:16" rule=rule1
time="2020-04-17T06:32:25Z" level=info msg="sink result for rule rule1: [{\"int16\":-318}]" file="log_sink.go:16" rule=rule1
time="2020-04-17T06:32:25Z" level=info msg="sink result for rule rule1: [{\"int64\":-8680421421398846880}]" file="log_sink.go:16" rule=rule1
time="2020-04-17T06:32:31Z" level=info msg="sink result for rule rule1: [{\"bool\":true}]" file="log_sink.go:16" rule=rule1
Copy the code

Monitoring analysis results

Because all analysis results have been published to the TCP: / / broker emqx. IO: 1883, you can directly use the following mosquitto_sub command to listen as a result, you can also refer to other MQTT client tools.

# mosquitto_sub -h broker.emqx.io -t result
[{"bool":true}]
[{"bool":false}]
[{"bool":true}]
[{"randomvalue_int16": [{3287}]"float64": 8.41326 e+306}] [{"randomvalue_int32": - / {1872949486}]"randomvalue_int8"53}] [{: -"int64": - / {1829499332806053678}]"int32": - / {1560624981}]"int16": [{8991}]"int8": - ({4}]"bool":true}]
[{"bool":false}]
[{"float64": 1.737076 e+306}]...Copy the code

You can also type the following command to check the status of the rule execution. A REST API for viewing rule status is also available. Check the documentation.

# bin/cli getstatus rule rule1
Connecting to 127.0.0.1:20498...
{
  "source_demo_0_records_in_total": 29."source_demo_0_records_out_total": 29."source_demo_0_exceptions_total": 0."source_demo_0_process_latency_ms": 0."source_demo_0_buffer_length": 0."source_demo_0_last_invocation": "The 2020-04-17 T10:30:09. 294337"."op_preprocessor_demo_0_records_in_total": 29."op_preprocessor_demo_0_records_out_total": 29."op_preprocessor_demo_0_exceptions_total": 0."op_preprocessor_demo_0_process_latency_ms": 0."op_preprocessor_demo_0_buffer_length": 0."op_preprocessor_demo_0_last_invocation": "The 2020-04-17 T10:30:09. 294355"."op_filter_0_records_in_total": 29."op_filter_0_records_out_total": 21."op_filter_0_exceptions_total": 0."op_filter_0_process_latency_ms": 0."op_filter_0_buffer_length": 0."op_filter_0_last_invocation": "The 2020-04-17 T10:30:09. 294362"."op_project_0_records_in_total": 21."op_project_0_records_out_total": 21."op_project_0_exceptions_total": 0."op_project_0_process_latency_ms": 0."op_project_0_buffer_length": 0."op_project_0_last_invocation": "The 2020-04-17 T10:30:09. 294382"."sink_sink_mqtt_0_records_in_total": 21."sink_sink_mqtt_0_records_out_total": 21."sink_sink_mqtt_0_exceptions_total": 0."sink_sink_mqtt_0_process_latency_ms": 0."sink_sink_mqtt_0_buffer_length": 1,
  "sink_sink_mqtt_0_last_invocation": "The 2020-04-17 T10:30:09. 294423"
Copy the code

conclusion

In this tutorial, we have taken a very simple example of using the EdgeX Kuiper rules engine. If you find any problems while using it, please report them to EdgeX or Kuiper Github.

More practice

The current rules do not filter any data sent to Kuiper, so how do you filter data? Use the delete rule, then try to change the SQL statement, and when you’re done, redeploy the rule. If you listen to the resulting topic of the MQTT service, check whether the relevant rules are in effect.

Further reading

  • Read the EdgeX source for more details, type conversions, and more.
  • How do I use meta functions to extract more information sent in the EdgeX message bus? When the device service sends data to the bus, some additional information is sent along with it, such as time created, ID, and so on. If you want to use this information in SQL statements, please refer to this article.
  • EdgeX message bus target. This document describes how to use the EdgeX message bus target. This article may be of interest to you if you want to send analysis results to a message bus.

For more information on EMQ X Kuiper, please refer to the following resources.

  • Kuiper Github code library
  • Kuiper Reference Guide

Copyright: EMQ

Original link: www.emqx.io/cn/blog/kui…