Introduction: KubeEdge is an open source edge computing platform. Based on the original container arrangement and scheduling capabilities of Kubernetes, it extends to realize cloud-side collaboration, computing sinking, massive edge device management, edge autonomy and other capabilities. KubeEdge will also support scenarios such as 5G MEC and AI cloud-side collaboration in the form of plug-ins, which have been applied in many fields.

This article mainly shares the practical experience of implementing edge streaming data processing based on KubeEdge and Kuiper.

Kuiper: Streaming products at the edge

Kuiper has been developed since the beginning of 2019. In October 2019, it released the first version and has been iterating until now. Its overall architecture is a classic streaming architecture.

Product design goal: Streaming processing that runs in the cloud, like Spark and Flink, can run on the edge

Kuiper architecture diagram

The overall architecture can be divided into three parts. Sources on the left represents the location of the data source. The data source may be MQTT macOS Broker at the edge of KubeEdge, or files, Windows or databases.

On the right is Sinks, representing the position to be stored after data processing is completed, that is, the target system. The target can be MQTT, and it can be saved in files and databases, or HTTP service can be called.

The middle part consists of these layers. The uppermost layer is for data business logic processing. On this layer, SQL Statement, Rule Parser, SQL Processors are processed and converted into SQL plans. Streaming Runtime and SQL Runtime run the final plan. The lowest layer is storage, which stores some outgoing messages.

Kuiper usage scenarios

Streaming processing: Real-time streaming processing at the edge

Rule engine: Flexibly defines rule engines to implement alarm and message forwarding

Data format and protocol conversion: realize flexible conversion between different types of data formats and heterogeneous protocols on the edge and cloud, and realize IT&OT fusion

KubeEdge integrates with Kuiper

Partial Architecture diagram

Kuiper is installed behind the KubeEdge MQTT Broker, running on the edge of the whole, with different Mapper underneath, that is, access to a variety of different protocols. Edge MQTT brokers are used to exchange messages.

Types of data processing:

Gets the type definition from the device model file definition

Convert the data to Kuiper’s data type

When creating a flow, you can use schema-less flow definitions

The supported data types are int, string, bool, and float

KubeEdge model file and configuration

The following figure shows part of the configuration file, including the device name, attributes, name, data Type, and Description.

Partial configuration file

1) Save the device model file

2) Configure the model file information in ect/ mqTT_source. yaml

KubeEdgeVersion: not currently in use, reserved for future versions of model files

KubeEdgeModelFile: model file path

3) Run the config-map command to deliver the configuration and save the configuration to the related directory

Kuiper uses the process

1) Define flow: similar to the definition of tables in a database

DATASOURCE= “$hw/events/device/+/twin/update”

2) Define and submit rules

Use SQL to implement business logic, and send the results to the specified target supported SQL SELECT/FROM/WHERE/ORDER JOIN/GROUP/HAVING 4 types of time window +1 count window 60+SQL function

3) run

Kuiper rules are deployed in KubeEdge

Method one:

  1. The use of Kuiper Kubernetes – tool

2) This program is a tool class that runs separately in the container and executes the command configuration file delivered through config-map

The configuration file is used to specify the address and port of the Kuiper service

Directory where the command file resides

  1. Run the config-map command to deliver the command execution file. The tool periodically scans the file and runs the command

Method 2: Kuiper Manager – Cloud side Collaborative Management console

Another way is to manage many Kuiper nodes through the administrative console, since Kuiper can run on many nodes.

For example, Kuiper can be run in the box of the Internet of Cars. There are many cars in the Internet of cars. All instances can be accessed through Kuiper-Manager and rules can be updated uniformly.

The first step is to install the plug-in. We provide some knowledge of plug-ins, such as accessing different sources. If the source here does not support it, you can write a plug-in and install the plug-in. The picture

Next define the create flow:

The following figure shows the location of the data store. The following figure shows the path for saving data to the file system.

Below is the visual editing interface, which can be used to write rules.

The application case

Case 1: National Industrial Internet Big Data Center

Note: A complete case study is recommended at the end of this article

This case is a very typical usage scenario. K8s+CloudCore is deployed in the cloud, and the rules are transferred to Kuiper through the management channel. The Kuiper is located in the MQTT broker, which defines and cleans the data.

At present, there are two channels, the first is to send the processed messages to Cloud MQTT broker, and the second channel is to store the data locally in the ongoing database Influxdb. Some third-party applications that occur on the edge can directly check the data in the Influxdb. Do some visualization and so on. The bottom line is to connect different data using Mapper.

Usage scenarios for the rules engine in Kuiper

LF EdgeX Foundry built-in rules engine, released in Geneva in April 2020.

Case 2: Heterogeneous system docking data format conversion

To realize data exchange with ERP, MES and other IT systems, we provide a very flexible expansion capability, including heterogeneous data collection through the extension plug-in, can use SQL built-in functions or extension functions for fast and flexible processing;

The second point is that after receiving the data processing results, the analysis results can be converted through sink’s data template to flexibly adapt to the data formats and protocols required by various target systems. For example, if the same rule with a temperature greater than 30 degrees, the command to control the equipment should be sent to wechat. The interfaces and data required by these two different target systems are different, but this rule is the same. Then in data, two different operations can be triggered according to the same rule. You can specify different topics, and the data can be sent without complicated programming.

The third point is to use SAP NetWeaver RFC SDK to read data from SAP, process and convert it and send it to other heterogeneous systems.

The performance data

1) Kuiper supports running thousands of rules concurrently

8000 rule 0.1 message/second/rule, total TPS is 800 rules/second definition source: MQTT SQL:select temperature from source where temperature > 20 Log configuration AWS:2core4GB Ubuntu resource usage Memory: 89%-72%; 0.4 MB/GPU rule: 25%

2) AWS T2. micro configures 10K +/s message throughput

Welcome to Huawei cloud native team, we will provide you with:

We update the cloud native technology dynamics, practical progress, application cases and so on every day;

Join the group and the industry technology champion, 10,000 + cloud native lovers link to learn together;

Irregularly invite cloud OG level technology to share technical combat

…………………