This article was translated by volunteers of Apache Pulsar Chinese Community, StreamNative organization. Building Edge Applications With Apache Pulsar by Tim Spann, StreamNative evangelist. Translator: YOLO, BOMC Team, BSC BOMC ORP The original link: streamnative. IO/blog/engine…

In recent years, the explosive growth of remotely connected devices has posed a challenge to the centralized computing paradigm. Network and infrastructure constraints make it increasingly difficult for companies to move and process data generated by all devices in data centers or the cloud without delays or performance issues. As a result, edge applications are on the rise. Gartner estimates that by 2025, enterprises will create and process 75 percent of their data outside the data center or cloud.

So what are edge applications? Edge applications run on or near data sources, such as iot devices, local edge servers, and edge execution. Edge computing enables computing, storage, caching, management, alerting, machine learning, and routing to take place outside the data center and cloud. Industries such as retail, agriculture, manufacturing, transportation, healthcare, and telecommunications use edge applications to enable lower latency, better bandwidth, lower infrastructure costs, and more efficient decision-making.

This article describes some of the challenges of developing edge applications, as well as Apache Pulsar’s solutions for edge applications. This article also shares a step-by-step example of how to build edge applications using Pulsar.

The key challenge

The decentralized nature of edge computing brings many benefits as well as challenges, including:

  • Edge applications typically need to support a variety of devices, protocols, languages, and data formats.
  • Communication from edge applications needs to be asynchronous with the flow of events from sensors, logs, and applications at fast but uneven speeds.
  • Edge producers of data need to deploy different messaging clusters based on design requirements.
  • By design, edge applications are geographically dispersed and diverse.

The solution

An open source solution that is adaptable, hybrid, geocopy-enabled, and extensible is needed to solve the problems of building edge applications. Open source projects with a large number of users can provide broad community support and a rich ecosystem of adapters, connectors, and extensions needed for edge applications. Based on my experience working with different technologies and open source projects over the past two decades, I believe Apache Pulsar meets the needs of edge applications.

Apache Pulsar is an open source, cloud-native, distributed message flow platform. Since Pulsar became an Apache Software Foundation top program in 2018, its community engagement, surrounding ecological growth, and global adoption have skyrocketed. Pulsar is able to solve many of the challenges of edge computing due to the following factors:

  • Apache Pulsar supports fast messaging, metadata, and multiple data formats under multiple schemas.
  • Pulsar supports multi-language clients such as Go, C++, Java, node.js, Websockets, and Python. In addition, community developers provide Haskell, Scala, Rust, and. Net open source client, as well as Apache Flink and Apache Spark stream processing libraries.
  • Pulsar supports several messaging protocols, including MQTT, Kafka, AMQP, and JMS.
  • Pulsar’s cross-geographical replication capabilities address the location of distributed devices.
  • Pulsar’s cloud-native architecture allows it to run in multi-cloud, local or Kubernetes environments. It can also accommodate small edge gateways and powerful devices like NVIDIA Jetson Xavier NX.

In this example, we built the edge application on NVIDIA Jetson Xavier NX, which gave us enough power to run the Edge Apache Pulsar stand-alone Broker, multiple Web cameras, and deep learning edge application. My edge device included 384 NVIDIA CUDA® cores and 48 Tensor cores, 6 64-bit ARM cores and 8 GB 128-bit LPDDR4x RAM. In a future blog post, I’ll show you that Pulsar can still handle fast edge event streams even on simpler devices like Raspberry PI 4S and NVIDIA Jetson Nano.

architecture

Given the physical structure of the solution, the question now is how to logically structure the application with incoming data. For those unfamiliar with Pulsar, the first thing you need to know is that each topic belongs to a tenant and namespace, as shown in the figure below.

These logical structures allow us to group data according to various criteria, such as the original source of the data and different businesses. Once we have decided on tenants, namespaces, and topics, we need to identify the fields needed to collect additional data for analysis.

Next, we need to determine the format of the data. Depending on the architecture, it can be the same as the original format, or it can be transformed according to the specific requirements of transmission, processing, or storage. In addition, in many cases, our devices, facilities, sensors, operating systems, or transmission modes will require us to choose a particular data format.

In this article, we’ll use a JSON data format that can meet the readability needs of almost any language and most people. Also, Apache Avro, as a binary format, is a good choice, but my series of blogs will go with the simplest format.

Once the data format is selected, we may need to add additional fields in addition to sensors, machine learning classifications, logs, or other sources to enrich the raw data. I like to add IP addresses, MAC addresses, host names, creation timestamps, execution times, and fields about the health of the device, such as disk space, memory, and CPU. If you don’t think it’s necessary, or if your device already broadcasts device health, add or subtract as appropriate. Especially if you have thousands of devices, these fields can help you debug your program. So I’m used to adding this data when bandwidth allows.

We need to find primary keys or unique identifiers for event records, and iot data often doesn’t come with it. We can compose one using the UUID generator when creating a record.

Once we have a list of fields, we need to set a schema for the data and determine the field name, type, default value, and whether it is null. Once a schema is defined, we can use JSON Schema or build a class using fields to query the data in the topic using Pulsar SQL. A time series master data store for such events is typically used for iot applications. I recommend Aerospike, InfluxDB or ScyllaDB. We can use the Pulsar IO Sink connector or other mechanisms depending on the scenario and requirements. If necessary, we can also use the Spark connector, Flink connector, or NiFi connector.

Our final event will look like the JSON example shown below.

{"uuid": "xav_uuid_video0_lmj_20211027011044", "camera": "/dev/video0", "ipAddress ": "192.168.1.70"," networkTime ": 4.284832000732422, "Top1PCT ": 47.265625," Top1 ": "Spot "," CPUTemp ": "29.0", "gpuTemp ": "28.5", "gputempf": "83", "cputempf": "84", "runtime": "4", "host": "nvidia-desktop", "filename": "/home/nvidia/nvme/images/out_video0_tje_20211027011044.jpg", "imageinput": "/home/nvidia/nvme/images/img_video0_eqi_20211027011044.jpg", "host_name": "nvidia-desktop", "macaddress": "70:66:55:15:B4:a5 "," TE ": "4.1648781299591064", "systemTime ": "10/26/2021 21:10:48"," CPU ": 11.7, "diskUsage ": "32367.5 MB", "memory" : 82.1}Copy the code

Marginal producer

Next we tested some libraries, languages, and clients on NVIDIA Jetson Xavier NX to see which fit our scenario best. After prototyping several libraries running on the NVIDIA Jetson Xavier NX version ARM with Ubuntu installed, I found the following technical options that could generate the messages I needed for my application. These are not the only paths, but they are good options for this edge platform.

  • Go Lang Pulsar Producer
  • Python 3.x Websocket Producer
  • Python 3.x MQTT Producer
  • Java 8 Pulsar Producer
  • Go Lang Pulsar Producer

Go language Pulsar producer

package main import ( "context" "fmt" "log" "github.com/apache/pulsar-client-go/pulsar" "github.com/streamnative/pulsar-examples/cloud/go/ccloud" "github.com/hpcloud/tail" ) func main() { client := ccloud.CreateClient() producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "jetson-iot", }) if err ! = nil { log.Fatal(err) } defer producer.Close() t, err := tail.TailFile("demo1.log", tail.Config{Follow:true}) for line := range t.Lines { if msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{ Payload: []byte(line.Text), }); err ! = nil { log.Fatal(err) } else { fmt.Printf("jetson:Published message: %v-%s \n", msgId,line.Text) } } }Copy the code

Python3 Websocket producer

import requests, uuid, websocket, base64, json

uuid2 = uuid.uuid4()
row = {}
row['host'] = 'nvidia-desktop'
ws = websocket.create_connection( 'ws://server:8080/ws/v2/producer/persistent/public/default/energy')
message = str(json.dumps(row) )
message_bytes = message.encode('ascii')
base64_bytes = base64.b64encode(message_bytes)
base64_message = base64_bytes.decode('ascii')
ws.send(json.dumps({ 'payload' : base64_message, 'properties': { 'device' : 'jetson2gb', 'protocol' : 'websockets' },'key': str(uuid2), 'context' : 5 }))
response =  json.loads(ws.recv())
if response['result'] == 'ok':
            print ('Message published successfully')
else:
            print ('Failed to publish message:', response)
ws.close()
Copy the code

Java Pulsar producer with Schema

public static void main(String[] args) throws Exception { JCommanderPulsar jct = new JCommanderPulsar(); JCommander jCommander = new JCommander(jct, args); if (jct.help) { jCommander.usage(); return; } PulsarClient client = null; if ( jct.issuerUrl ! = null && jct.issuerUrl.trim().length() > 0 ) { try { client = PulsarClient.builder() .serviceUrl(jct.serviceUrl.toString()) .authentication( AuthenticationFactoryOAuth2.clientCredentials(new URL(jct.issuerUrl.toString()),new URL(jct.credentialsUrl.toString()), jct.audience.toString())).build(); } catch (PulsarClientException e) { e.printStackTrace(); } catch (MalformedURLException e) { e.printStackTrace(); } } else { try { client = PulsarClient.builder().serviceUrl(jct.serviceUrl.toString()).build(); } catch (PulsarClientException e) { e.printStackTrace(); } } UUID uuidKey = UUID.randomUUID(); String pulsarKey = uuidKey.toString(); String OS = System.getProperty("os.name").toLowerCase(); String message = "" + jct.message; IoTMessage iotMessage = parseMessage("" + jct.message); String topic = DEFAULT_TOPIC; if ( jct.topic ! = null && jct.topic.trim().length()>0) { topic = jct.topic.trim(); } ProducerBuilder<IoTMessage> producerBuilder = client.newProducer(JSONSchema.of(IoTMessage.class)) .topic(topic) .producerName("jetson"). sendTimeout(5, TimeUnit.SECONDS); Producer<IoTMessage> producer = producerBuilder.create(); MessageId msgID = producer.newMessage() .key(iotMessage.getUuid()) .value(iotMessage) .property("device", OS) .property("uuid2", pulsarKey) .send(); producer.close(); client.close(); producer = null; client = null; } private static IoTMessage parseMessage(String message) { IoTMessage iotMessage = null; try { if ( message ! = null && message.trim().length() > 0) { ObjectMapper mapper = new ObjectMapper(); iotMessage = mapper.readValue(message, IoTMessage.class); mapper = null; } } catch(Throwable t) { t.printStackTrace(); } if (iotMessage == null) { iotMessage = new IoTMessage(); } return iotMessage; } java-jar target/ iotproducer-1.0-jar-with-dependencies. Jar --serviceUrl pulsar:// Nvidia desktop:6650 --topic 'iotjetsonjson' --message "... JSON..."Copy the code

You can find all the source code here.

Now we decide how to execute the application on the device. You can use a scheduler that comes with the system, such as Cron or some add-ons. For reference, I often use Cron, MiNiFi proxies, Shell scripts, or run the application continuously as a service. You need to configure your devices and sensors to get the best scheduling.

Validate data and monitor it

Now that we have a steady stream of events coming into our Pulsar cluster, we can validate the data and monitor progress. Take the StreamNative Cloud Manager interface as an example, as shown below. We also have the option of viewing the Pulsar metric endpoint recorded here.

Check statistics with REST

  • http://:8080/admin/v2/persistent/public/default/mqtt-2/stats
  • http://:8080/admin/v2/persistent/public/default/mqtt-2/internalStats

Check statistics on the Admin CLI

bin/pulsar-admin topics stats-internal persistent://public/default/mqtt-2
Copy the code

Find the subscription for the topic

http://nvidia-desktop:8080/admin/v2/persistent/public/default/mqtt-2/subscriptions

Consume from a subscription through REST

http://nvidia-desktop:8080/admin/v2/persistent/public/default/mqtt-2/subscription/mqtt2/position/10

Consume messages through the CLI

bin/pulsar-client consume "persistent://public/default/mqtt-2" -s "mqtt2" -n 5
Copy the code

Query topics through Pulsar SQL

select * from pulsar."public/default".iotjetsonjson;

Subsequent steps

We have now built an edge application that can transfer data at event speed and connect streaming data from thousands of other applications into an Apache Pulsar cluster. Next, we can add rich real-time analysis using Flink SQL. This allows for advanced flow processing, integrated event flows, and large-scale data processing.

read

If you are interested in learning more about edge applications and building your own connectors, see the following resources:

  • Using the FLiPN Stack for Edge AI (Flink, NiFi, Pulsar)
  • PPT download address
  • Pulsar client library
  • Sample source data
  • InfluxDB Pulsar IO sink connector

Follow the public account “Apache Pulsar” for more technical dry goods

Join Apache Pulsar Chinese Communication group 👇🏻