Abstract: This article mainly shares the practical experience of edge streaming data processing based on KubeEdge and Kuiper.

Introduction: KubeEdge is an open source edge computing platform. Based on the original container arrangement and scheduling capabilities of Kubernetes, it extends to realize cloud-side collaboration, computing sinking, massive edge device management, edge autonomy and other capabilities. KubeEdge will also support scenarios such as 5G MEC and AI cloud-side collaboration in the form of plug-ins, which have been applied in many fields.

At the edge of the run-off processing product Kuiper

Kuiper started in early 2019, released the first version in October 2019, and has been iterating until now. Its overall architecture is a relatively classic streaming architecture.

Product design goal: Streaming processing that runs in the cloud, like Spark and Flink, can run on the edge

Kuiper architecture diagram

The overall architecture can be divided into three parts. Sources on the left represents the location of the data source. The data source may be an edge MQTT macOS broker in KubeEdge, or files, Windows or databases. On the right is Sinks, representing the position to be stored after data processing is completed, that is, the target system. The target can be MQTT, and it can be saved in files and databases, or HTTP service can be called.

The middle part is divided into several layers. The uppermost layer is for data business logic processing. On this layer, SQL Statement, Rule Parser, SQL Processors are processed and converted into SQL plans. Streaming Runtime and SQL Runtime run the final plan. The bottom layer is storage, which is used to store some outgoing messages.

Kuiper usage scenarios

Streaming processing: Real-time streaming processing at the edge

Rule engine: Flexibly defines rule engines to implement alarm and message forwarding

Data format and protocol conversion: realize flexible conversion between different types of data formats and heterogeneous protocols on the edge and cloud, and realize IT&OT fusion

KubeEdge integrates with Kuiper

Partial Architecture diagram

Kuiper is installed behind the KubeEdge MQTT Broker, running on the edge of the whole, with different Mapper underneath, that is, access to a variety of different protocols. Edge MQTT brokers are used to exchange messages.

Types of data processing:

Gets the type definition from the device model file definition

Convert the data to Kuiper’s data type

When creating a flow, you can use schema-less flow definitions

The supported data types are int, string, bool, and float

KubeEdge model file and configuration

The following figure shows part of the configuration file, including the device name, attributes, name, data Type, and Description.

Partial configuration file

Save the device model file

Configure the model file information in ect/ mqTT_source.yaml

1) KubeEdgeVersion: not currently used, reserved for future different version model files

2) KubeEdgeModelFile: model file path

Run the config-map command to deliver the configuration and save the configuration to the related directory

Kuiper uses the process

1) ** define flow: ** is similar to the definition of tables in the redundant database

DATASOURCE= “$hw/events/device/+/twin/update

**2) ** Define and submit rules

Implement the business logic with SQL and send the results to the specified target

Support the SQL

SELECT/FROM/WHERE/ORDER

JOIN/GROUP/HAVING

4 class time Windows +1 count window

60 + SQL functions

3) run

Kuiper rules are deployed in KubeEdge

1) Use kuiper-kubernetes-tool

2) This program is a tool class that runs separately in the container and executes the command configuration file delivered through config-map

The configuration file is used to specify the address and port of the Kuiper service

Directory where the command file resides

3) Deliver the command execution file through config-map. The tool periodically scans the file and runs the command

Kuiper Manager – Cloud side Collaborative Management console

Another way is to manage many Kuiper nodes through the administrative console, since Kuiper can run on many nodes.

For example, Kuiper can be run in the box of the Internet of Cars. There are many cars in the Internet of cars. All instances can be accessed through Kuiper-Manager and rules can be updated uniformly.

The first step is to install the plug-in. We provide some knowledge of plug-ins, such as accessing different sources. If the source here does not support it, you can write a plug-in and install the plug-in.

Next, define the create flow

The following figure shows the location of the data store. The following figure shows the path for saving data to the file system.

Below is the visual editing interface, which can be used to write rules.

Application case: National Industrial Internet Big Data Center

This case is a very typical usage scenario. K8s+CloudCore is deployed in the cloud, and the rules are transferred to Kuiper through the management channel. The Kuiper is located in the MQTT broker, which defines and cleans the data. At present, there are two channels, the first is to send the processed messages to Cloud MQTT broker, and the second channel is to store the data locally in the ongoing database Influxdb. Some third-party applications that occur on the edge can directly check the data in the Influxdb. Do some visualization and so on. The bottom line is to connect different data using Mapper.

Usage scenarios for the rules engine in Kuiper

LF EdgeX Foundry built-in rules engine, released in Geneva in April 2020.

Application case: Heterogeneous system docking data format conversion

To realize data exchange with ERP, MES and other IT systems, we provide a very flexible expansion capability, including heterogeneous data collection through the extension plug-in, can use SQL built-in functions or extension functions for fast and flexible processing; The second point is that after receiving the data processing results, the analysis results can be converted through sink’s data template to flexibly adapt to the data formats and protocols required by various target systems. For example, if the same rule with a temperature greater than 30 degrees, the command to control the equipment should be sent to wechat. The interfaces and data required by these two different target systems are different, but this rule is the same. Then in data, two different operations can be triggered according to the same rule. You can specify different topics, and the data can be sent without complicated programming. The third point is to use SAP NetWeaver RFC SDK to read data from SAP, process and convert it and send it to other heterogeneous systems.

The performance data

Kuiper supports running thousands of rules concurrently

8000 rules *0.1 messages/second/rule, for a total of 800 TPS/second

A rule definition

Source: the MQTT

SQL:select temperature from source where temperature > 20

Goal: Log

configuration

AWS:2core*4GB

Ubuntu

Resource use

Memory:89%~72%; 0.4 MB/rule

GPU:25%

AWS T2.Micro configures 10K +/s message throughput

Click to follow, the first time to learn about Huawei cloud fresh technology ~