About the Apache Pulsar

Apache Pulsar is the top project of Apache Software Foundation. It is the next generation cloud native distributed message flow platform, integrating message, storage and lightweight functional computing. It adopts the architecture design of computing and storage separation, supports multi-tenant, persistent storage, and multi-room cross-region data replication. It has strong consistency, high throughput, low latency, and high scalability. GitHub address: github.com/apache/puls…

Case Introduction: This case introduces the practice of Apache Pulsar landing in the direction of energy Internet by the Energy Internet Innovation Institute of Tsinghua University. Pulsar’s cloud-native architecture, Schema, Functions and other features meet relevant business needs and reduce their development and operation burden.

It takes about 8 minutes to read this article.

Team and business profile

Energy Internet is the development direction of electric power and energy industry. With the rapid development of information, communication and Internet technologies, the amount of available data is growing explosively. Traditional data processing methods have been unable to cope with these massive and rapidly growing information assets, so the big data theory emerges at the right moment. Big data processing technology can help us quickly distinguish its running state and development trend through massive data, giving us unique insight in the complicated world.

The Energy Big Data and Open Ecology Research Center of the Energy Internet Innovation Institute of Tsinghua University brings together a number of experts in the field of energy and power big data at home and abroad, and is committed to promoting the comprehensive innovation of basic theory and practical application of big data. Great energy data and open ecological research center big data are applied to the energy used the Internet, smart grid, and wisdom to engineering scenarios, such as combination of high performance optimization and parallel computing and advanced technologies such as artificial intelligence, the characteristics of the energy research and development applicable to electric power industry big data/cloud computing platform, and based on the data driven energy power system advanced application, So as to realize the development of big data industry, form a new industrial chain with data as the core, and promote the transformation and upgrading of China’s energy industry.

challenge

Our team focuses on power-related iot scenarios, aiming to develop user needs for data from sensors and other devices. Our team is small, but the tasks are complicated, and we hope to achieve customers’ needs more quickly and steadily.

After collating the business requirements, we came up with a back-end as a Service (BaaS) based message-based service solution. In the Internet of Things, with such solutions, we can share more infrastructure services, and at the same time, we can quickly respond to different needs for business development. Given the specific business requirements, our platform required the following features:

  • Multi-tenant: A platform must support multi-tenant to achieve service separation without service separation, ensure security audit, and meet customers’ sensitive data security requirements. In addition, it can also provide some basic services in communication, data and business, such as the Schema Registry for custom data structure, ACL permission management for custom data ownership (adding API interfaces for deletion and modification), and custom function engine for various businesses.
  • Schema Registry: Provides a Schema Registry that allows you to customize data structures according to different requirements and application scenarios.
  • General API: provides HTTP RESTful APIs and corresponding WebSocket interfaces that include additions and modifications to ensure that basic services are provided for communication and are extended based on this basic service.
  • ACL permission management: You can customize the ACL permission control service to ensure data security.
  • Temporal database: In most cases, Internet of Things scenarios deal with temporal data, so we choose postgresql-based open source TimeScaleDB and build a series of aggregated query interfaces for temporal data.
  • User-defined functions: Custom function engines that implement various services.

Previously we used a scheme based on RabbitMQ and Celery to implement the function engine for user-defined functions. The initial use of this scheme worked well, but as the business grew, there were more problems. Our small team had to spend more time solving problems and optimizing the overall solution. These problems are particularly acute when Celery is used as task queues.

We spend a lot of time and energy dealing with two main problems:

  • Careful configuration of tasks with elongated tasks to avoid blocking other tasks;
  • When Worker updates, the service needs to be interrupted, and the update time is relatively long.

Also, in special scenarios where individual messages are large and message processing takes a long time, memory burden is high for both Celery and RabbitMQ.

As the number of customers and projects increased, these problems became more and more prominent, and we decided to find a new product to replace the original solution.

Why Apache Pulsar?

As mentioned above, we expect messaging middleware to provide the following features:

  • multi-tenant
  • Reliability and high availability
  • Support for multiple protocols, especially for easy protocol conversion: In the field of Internet of Things, we need to deal with different communication protocols, all the data of different communication protocols into the messaging middleware.
  • Multilingual support: Our team mainly uses Go, but we work with a lot of teams that use other languages, so it’s best if the messaging middleware supports other languages.
  • Simple message processing as a lightweight computing engine.

While investigating different message-oriented middleware, we quickly discovered Pulsar. Through Pulsar’s documentation and release logs, we learned that Pulsar had many great features, so we decided to test and evaluate Pulsar. After in-depth study and learning, we found that Pulsar’s cloud native architecture, Schema and Functions are very suitable for our business needs.

  • Cloud Native: Pulsar supports cloud native and has many excellent features, such as computing and storage separation, to take advantage of the elastic scalability of the cloud to ensure capacity expansion and fault tolerance. Pulsar’s good support for Kubernetes also helped us migrate some of our business to Kubernetes.
  • Pulsar Functions: Pulsar Functions is an excellent lightweight computing engine that is a good replacement for the Celery scheme. We could try to use Pulsar Functions more for business, which was the main reason we chose Pulsar.
  • Tiered storage: This feature saves storage costs. Our usage scenario will generate a lot of raw sensor data that needs to be stored as cold data. With tiered storage, we can store this cold data directly in cheaper storage services without having to develop additional services to store the data.
  • MQTT/MoP: Pulsar’s compatibility with various protocols demonstrates the openness of the community. Prior to the release of MoP, we developed the MQTT protocol forwarding tool to forward the data from MQTT protocol to Pulsar.
  • Pulsar Schema: Our platform describes the data Schema through JSON. By docking Pulsar Schema with our own Schema Registry, message serialization can be implemented. At present, the function of Pulsar in Go Schema is still in its infancy, and we will try to make some practice and contribution.
  • Multilingual: We value multilingual support, especially Go. Pulsar has Go language corresponding client, Go Function Runtime, Pulsarctl based on Go language implementation, etc. We also hope Pulsar will support more languages in the future, because we can’t foresee the needs of our customers, and supporting multiple languages will help us solve problems more easily.
  • Pulsar Manager & Dashboard: Pulsar has enabled interfaces at all levels to capture Metrics. Pulsar’s other tools (such as Prometheus, Grafana, Pulsar Manager) help us reduce the cost of operations, optimization, and troubleshooting.
  • Open Source: Pulsar community is open, active and friendly. Backed by companies like StreamNative, users can safely choose Pulsar and migrate their business to Pulsar.

After learning more about Pulsar, we decided to test Pulsar and try to migrate an application to a production environment.

Transfer test: Smart building electricity

Building smart electricity is our attempt in the field of electricity analysis and prediction, we hope to collect the power consumption information of every power point in the office. At the beginning of the renovation of the new office building of the Research Institute, we carried out a technical evaluation and included the smart socket using ZigBee protocol in the renovation plan. The entire deployment consists of three floors, about 700 smart sockets and 50 ZigBee gateways. Sockets are deployed at all power consumption points in the office, including workstation sockets, wall sockets, and central air conditioner fan sockets. All data are transmitted to Pulsar through LAN broadcast scheme provided by smart socket manufacturer to realize data point collection and preprocessing. At present, electricity consumption data is sent every 10 seconds, and other user-related operations (including switching sockets, plugging and unplugging electrical equipment) are sent in real time. Aiming at these data, we made some data visualization attempts, and contributed the data to other teams of the research institute for analysis, or used as reference information and original data for developing algorithms.

Based on the MQTT scheme provided by smart socket equipment manufacturers, we try to forward all the data of MQTT protocol to Pulsar. In the forwarding process, the main problem we encountered was the mapping between MQTT topic and Pulsar topic. Our solution is to directly forward all MQTT data to the same Pulsar topic, and at the same time wrap part of metadata in the forwarded message, and then forward the message to different business topics through Pulsar Functions for message routing. The following figure shows how the data generated by the sensor is transmitted to the platform and finally stored.

In the process of forwarding data from MQTT to Pulsar, we forwarded data from all devices to the same topic by default and verified (including decryption and content checking) by Verificate Function to ensure the validity of the data. Legitimate data will be forwarded to an intermediate topic for message routing distribution. The function of message distribution will parse out the device type and message type from the data and then forward it to the corresponding business topic for processing by the ETL function bound to the corresponding business topic. In the process of using ETL function, we will also extract different data according to the type of devices, such as gateway status and device information for gateway devices and electricity consumption data and socket status information for sockets. These information will match the Schema Registry data structure of our platform, we will make Schema Mapping of the generated data (implemented by Functions), and finally forward these structured data to sink Topic uniformly. Write to database by sink function.

The migration test of building smart electricity strongly confirms that Pulsar meets our needs. During the migration process, we consulted Pulsar documentation, received strong support and help from the community, and the migration process was efficient and smooth. With the opening and convenience of Functions, we soon completed the development and debugging of all Functions in the flow chart and launched the whole business system.

In the process of business migration, Pulsar ran well, and the team agreed that Pulsar could help us reduce the development and operation burden, so we chose Pulsar as the only messaging middleware service in the research center. Our small team also started working with Pulsar on a series of cloud native migrations and optimizations.

After deciding on a solution, we further applied Apache Pulsar to grid smart sensing and smart substation scenarios related to the Internet of Things, energy and power. The following details how we use Pulsar and Pulsar Functions, and how Pulsar Functions simplify related processing of sensor data flows.

Pulsar X Grid smart sensing

The grid intelligent sensing scenario is mainly based on the transmission line intelligent multi-parameter sensor integration research project co-operated by the Energy Internet Innovation Institute of Tsinghua University and the power grid company. The sensors of the project come from different manufacturers and are distributed in various positions of the transmission line, so the sensor types are not the same, including tower, tower, transmission line side and so on more than 10 kinds. The system is currently connected to a total length of about 600 kilometers, including more than 600 towers of transmission line sensors. This scenario is mainly responsible for online monitoring and alarm of data from various sensors. Meanwhile, we also do transient voltage analysis for voltage sensors separately.

There are two difficulties in this application scenario: first, there is no unified communication protocol for sensors from different manufacturers, some use IEC104 protocol related to power, some use Protobuf or other customized protocols; Second, the amount of project data is relatively large, some sensors may produce a single 20 MB or even larger message, some sensors upload data every second.

With Pulsar, we choose not to do any data processing on the producer end and directly forward the data to Pulsar, and then perform further data preprocessing and other business operations through Pulsar Functions. Take the voltage sensor as an example, the voltage sensor will generate three types of data, namely heartbeat data, steady-state waveform data and transient waveform data. The heartbeat data and steady-state waveform data are transmitted through the Protobuf protocol, while the transient data are transmitted through zip compressed files. After receiving protobuf data, perform a series of data processing with Pulsar Functions, including decrypting data and deserializing protobuf through decryption function, and then routing the data. The corresponding ETL function is used for data processing and parsing. Finally, the data is stored in the database through Schema Mapping. We encapsulate each step of the process into an independent Pulsar function, which is based on three considerations:

  • We want to monitor the status of each part of the data flow, capture metrics for each process, and observe key indicators such as whether there is a backlog. State monitoring allows us to adjust the parallel number of functions in each link.
  • It makes the whole data flow more flexible and makes it easier for us to add and delete functions in different processes.
  • This ensures that functions we maintain can be reused to a greater extent.

This scheme also encountered some minor difficulties. For example, due to the large number of functions, we needed to spend more time deploying and maintaining intermediate topics in each process. Currently, our solution is to write the corresponding code to complete deployment and maintenance in one go. Although it takes a bit more effort, we think this model of function development and deployment is worth it. In addition to the two types of protobuf data, the voltage sensor also generates one type of transient data. Transient data is generally generated when a fault or abnormality occurs in the power grid. It is similar to a snapshot of the power system, recording the waveform state before, when and after the fault occurs. In power systems, transient data usually have standard storage schemes and specific parsing interfaces. Compared with other data generated by sensors, this kind of data is characterized by large, often tens of megabytes. Our solution to transient data is to decompress the data and then analyze the data file. Here, we make use of the multi-language support feature of Pulsar Functions. The blue part of the flow chart is implemented by Go Function, and the yellow part is implemented by Python. Python has a library to parse power grid transient data, which can be called, It saves us the time to implement a Go version parsing interface ourselves.

Pulsar X Smart substation

Intelligent substation is our attempt in the substation system. This project is based on the intelligent transmission and transformation equipment manufacturer that we cooperate with, hoping to realize the data access of the substation based on switchgear and other substation equipment. The main goal of this project is to achieve real-time monitoring, fault diagnosis and anomaly monitoring.

In the scenario of intelligent substation, the equipment manufacturer usually provides the fault diagnosis algorithm or diagnosis application of the equipment. We need to integrate the algorithm or application of different properties into the existing solution. The customer-supplied algorithm may be invoked directly in Pulsar Functions, it may be a compiled executable, or it may even be implemented in another language, such as R. To solve these problems, we first encapsulated the implementation provided by the customer into a Docker container, realized a minimum Pulsar Function Runtime in the container, and then communicated with Docker endpoint through Docker proxy function. When function is triggered, the container of corresponding algorithm is created to realize calculation, and the result is finally sent back to the corresponding topic of Pulsar.

In addition, we encountered some application-level requirements in this scenario, such as message push. We use Pulsar Functions to realize some business Functions, in which interfaces of different service providers can be easily called to realize message push, such as SMS, email and application push service. In addition, through Pulsar Functions, we were able to decoupage the business requirements of message push from the platform and make the service Functions for direct use in subsequent scenarios with the same requirements.

Problems and solutions in using Pulsar

We encountered some problems in the process of using Pulsar, and we will share some experience in solving these problems below, hoping to provide some help to students who are preparing or already using Pulsar.

The first is about the default message size of Pulsar. In the default configuration, Pulsar supports a maximum of 5 MB messages, and in the smart grid case mentioned above, a single message can sometimes exceed 20 MB. We modified the MaxMessageSize parameter in the broker configuration file according to the documentation, but the modified configuration did not take effect and messages over 5 MB still could not be delivered to Pulsar properly. We reached out to the Pulsar community for help and got a quick response from the community. The main reason for this problem is that MaxMessageSize in Pulsar 2.4.0 is not synchronized to BookKeeper, so even though the broker can receive larger messages, it still cannot deliver messages to the BookKeeper responsible for storage. So in addition to changing the MaxMessageSize value, you also need to change the configuration of nettyFrameSizeBytes in the Broker and BookKeeper. If these configurations are consistent, Pulsar can handle larger single messages.

The second problem is that when we use Pulsar Functions to process data, we may have a backlog growing in our topic. Backlog includes data that is not sent to Functions (consumers) as well as data that is sent but not ack by Functions (Consumers). In our experience, in the Functions scenario, message backlogs may be due to function being slow to process a single message, taking a long time to process, or function crashing. If it is because function processes messages slowly, one solution is to increase the number of parallel functions, and then analyze the causes of slow execution and optimize them. Another solution is to divide complex functions into multiple simple functions, that is, to divide a complex function into multiple functions mentioned in the smart grid scenario and link the whole process through the chain mode of functions. In this way, we can easily observe the state of each function and further optimize a certain function. If the backlog is caused by the function crash, it is necessary to ensure the stability of the function and debug it with the help of the function log topic.

The third problem is that when the number of producer increases, it is difficult to uniformly manage and observe the state of each producer, that is, the communication state between producer and broker and the communication state between producer and data source. To solve this problem, our current solution is to add heartbeat messages to the producer to the corresponding heartbeat topic for overall monitoring, and also to monitor the state connection between the producer and the broker. Through these changes, we can better aggregate and observe the running state of the producer. We are aware of similar discussions on GitHub and look forward to working with the community to come up with better solutions.

Period to be

We expect Pulsar to improve or add the following features.

  • We look forward to the release of Pulsar Functions Mesh, which implements kubernetes-like service orchestration of Functions. As mentioned above, we have implemented the chained function solution, but this approach has a lot of maintenance challenges. We hope Functions Mesh can solve this problem.
  • Expect Pulsar functions to support runtime in more languages. We use function to make Docker proxy function. Although this scheme is feasible, we hope to have a better solution.
  • IoT scenarios are very focused on edge computing, and we hope Pulsar will experiment with edge computing. We note that Pulsar allows Functions to push messages to another Pulsar cluster, allowing Functions to communicate with an external Pulsar cluster. With this change, you can try to deploy Pulsar to edge devices and perform calculations on them using Pulsar Functions. Pulsar deployment requires a large amount of memory, and it is difficult to deploy Pulsar on some edge devices with weak computing capabilities. It is hoped that Pulsar can be optimized in the subsequent version or provide other solutions to solve this problem.

“Language

As an open source project, Pulsar is growing rapidly, with rapidly updated documentation and a responsive community that is growing in size. We hope to learn more about Pulsar, contribute to Pulsar development, share our practical experience with the community, and grow together with Pulsar community.

While using Pulsar, we encountered some confusion. Thanks to the support of the StreamNative team, we successfully applied Pulsar to the above business scenario. In the future, we will actively try out various new features of Pulsar and apply Pulsar to more energy Internet scenarios.

Author’s brief introduction

Jun Hu, Associate Professor, Department of Electrical Machinery, Tsinghua University, Executive Director of Energy Big Data and Open Ecology Research Center, Institute of Energy Internet Innovation, Tsinghua University, IEEE Member, CIGRE Member.

reading

  • Case | Apache Pulsar power jiangsu mobile remake 5 g era billing support system
  • Case | Apache Pulsar in tencent Angel PowerFL federal study platform of practice
  • Apache Pulsar performance Tuning in BIGO (Part 1)
  • Pulsar Function Mesh

Click on the link to get the Apache Pulsar Core Dry Goods collection