About the Apache Pulsar

Apache Pulsar is the top project of Apache Software Foundation. It is the next generation cloud native distributed message flow platform, integrating message, storage and lightweight functional computing. It adopts the architecture design of computing and storage separation, supports multi-tenant, persistent storage, and multi-room cross-region data replication. It has strong consistency, high throughput, low latency, and high scalability. GitHub address: github.com/apache/puls…

Article from: ApacheHudi, author: Guo Sijie, CEO of StreamNative, Apache Pulsar PMC.

The layout of this issue: StreamNative@Tango

motivation

Lakehouse was first developed by Databricks as a low-cost, direct access to cloud storage and a data management system that provides traditional DBMS management system performance and ACID transaction, versioning, auditing, indexing, caching, query optimization. Lakehouse combines the benefits of a data lake and a data warehouse: Including low-cost storage of data lakes and open data format access, powerful management and optimization capabilities of data warehouses. Delta Lake, Apache Hudi, and Apache Iceberg are three techniques for building Lakehouses.

At the same time, Pulsar provides a number of features, including tiered storage, streaming unload, and column unload, that make it a storage layer that can unify batch and event streams. The layered storage feature, in particular, makes Pulsar a lightweight data lake, but Pulsar still lacks some performance optimizations, such as indexes and data versions (common in traditional DBMS management systems), and the introduction of column offloads is intended to close the performance gap, but not enough.

This proposal attempts to use Apache Pulsar as a Lakehouse. The proposal only provides the top-level design, and the detailed design and implementation will be addressed in a later sub-proposal (interested friends can keep an eye on it).

Analysis of the

This section will analyze the key features needed to build Lakehouse, then examine whether Pulsar meets the requirements and identify any gaps.

Lakehouse has the following key features:

  • Many pipeliine data in enterprise-class Lakehouse can concurrently read and write data. ACID transactions can ensure the consistency of concurrent read and write data, especially when using SQL. Three data Lake frameworks, Delta Lake, Iceberg and Hudi, all implement a transaction layer based on low-cost object storage and all support transactions. Pulsar introduced transaction support after version 2.7.0 and supports cross-topic transactions;

  • Schema constraints and governance: Lakehouse needs to support Schema constraints and evolution, support warehouse Schema paradigms such as star/Snowflake Schema, and be able to reason about data integrity and have robust governance and auditing mechanisms, which all three systems have. Pulsar has a built-in Schema registry service that meets the basic requirements of Schema constraints and governance, but there may still be areas for improvement.

  • BI support: Lakehouses can use BI tools directly on source data, which reduces obsolescence, improves freshness, reduces wait time, and reduces the cost of having to operate two copies of data simultaneously in the data lake and warehouse. The three data lakes framework integrates well with Apache Spark, while allowing Redshift, Presto/Athena to query source data, and the Hudi community has already completed support for multiple engines such as Flink. Pulsar exposes segments in hierarchical storage for direct access, which can be tightly integrated with popular data processing engines. But tiered storage in Pulsar itself still has performance gaps in terms of serving BI workloads, and we will address those gaps in this proposal.

  • Storage and computing separation: This means that storage and computing are clustered in separate clusters, so these systems can be individually and horizontally expanded indefinitely. The three enclosures support separation of storage and computing. Pulsar uses a multi-tier architecture deployment that separates storage and computing.

  • Openness: Open and standardized data formats such as Parquet are used, and they provide apis so that various tools and engines (including machine learning and Python/R libraries) can access the data “directly” and efficiently. The Parquet format is supported by the three frameworks, while Iceberg also supports ORC. The ORC format Hudi is being supported by the community. Pulsar does not yet support any open formats, and the column storage uninstall supports the Parquet format.

  • Support for multiple data types from unstructured to structured data: Lakehouse can be used to store, optimize, analyze, and access the data types required by many new data applications, including images, video, audio, semi-structured data, and text. It’s not clear how Delta, Iceberg, Hudi support this. Pulsar supports various types of data.

  • Support for a variety of workloads: including data science, machine learning, and SQL and analytics. Multiple tools may be required to support all of these workloads, but they all depend on the same data repository. The three frameworks are closely integrated with Spark, which offers a wide selection of tools. Pulsar is also closely integrated with Spark.

  • End-to-end streaming: Real-time reporting is the norm for many enterprises, support for streaming eliminates the need for a separate system dedicated to serving real-time data applications, and Delta Lake and Hudi provide streaming capabilities through change logging. But that’s not really flow. Pulsar is a true flow system.

You can see that Pulsar meets all the criteria for building Lakehouse. However, today’s tiered storage has a large performance gap, for example:

  • Pulsar does not store data in open and standard formats such as Parquet;
  • Pulsar does not deploy any indexing mechanism for unmounted data;
  • Plusar does not support efficient Upserts;

The goal here is to solve the performance problem of Pulsar storage layer so that Pulsar can act as a Lakehouse.

The current plan

Figure 1 shows the storage layout of the current Pulsar flow.

  • Pulsar stores segment metadata in ZooKeeper.
  • The latest segments are stored in Apache BookKeeper (Faster storage layer)
  • The old segments are unloaded from Apache BookKeeper into tiered storage (a cheap storage tier). Metadata for unmounted segments remains in Zookeeper, referring to objects unmounted in tiered storage.

The current scheme has some disadvantages:

  1. It does not use any open storage format to store unloaded data. This means it is difficult to integrate with the wider ecosystem.
  2. It keeps all metadata information in ZooKeeper, which can limit scalability.

New Lakehouse storage scheme

The new solution recommends using Lakehouse in tiered storage to store unloaded data. The proposal recommends using Apache Hudi for Lakehouse storage for the following reasons:

  • Cloud providers provide good support on Apache Hudi.
  • Apache Hudi has graduated as a top project.
  • Apache Hudi supports both Spark and Flink engines. There is also a fairly active community in China.

New storage layout

Figure 2 shows the new layout of Pulsar Topic.

  • The metadata for the latest fragments (ununloaded fragments) is stored in ZooKeeper.

  • The data for the latest fragment (ununloaded fragment) is stored in BookKeeper.

  • The metadata and data of the offload segment are stored directly in tiered storage. Because it’s an append only stream. We don’t have to use a Lakehouse repository like Apache Hudi. But if we also store the metadata in tiered storage, it makes more sense to use the Lakehouse repository to ensure ACID.

Supports efficient Upserts

Pulsar does not directly support UPSERT. It supports upsert through topic compression. But the current approach to topic compression is neither extensible nor efficient.

  1. The topic is compressed within the broker. It does not support insertion of large amounts of data, especially if the data set is large.
  2. Topic compression does not support storing data in tiered storage.

To support efficient and scalable Upsert, the proposal recommends using Apache Hudi to store compressed data in tiered storage. Figure 3 shows the way to support effective UPserts in topic compression using Apache Hudi.

The idea is to implement topic compression services. The topic compression service can be run as a separate service (that is, the Pulsar function) to compress topics.

  1. The proxy issues a topic compression request to the compression service.
  2. The compression service receives the compression request and reads the message and inserts it up into the Hudi table.
  3. After upSERt is complete, the topic compression cursor advances to the last message it compressed.

The topic compression cursor stores metadata for reference locations in a tiered store that stores Hudi tables.

Think of Hudi tables as Pulsar topics

Hudi maintains a timeline of all operations performed on the table at different real-time times, which helps provide a real-time view of the table while also effectively enabling data retrieval in _arrival_ order. Hudi supports incremental pulling of changes from tables. We can support _ReadOnly_ topics backed up by Hudi tables. This allows the application to stream Hudi table changes from the Pulsar agent. Figure 4 illustrates this idea.

Extensible metadata management

As we begin to store all data in tiered storage, the proposal proposes not storing metadata for unmounted or compressed data, but relying only on tiered storage to store metadata for unmounted or compressed data.

The proposal proposes to organize unload and compressed data in the following directory layout.

- <tenant>/
  - <namespace>/
    - <topics>/
      - segments/ <= Use Hudi to store the list of segments to guarantee ACID
        - segment_<segment-id>
        - ...
      - cursors/
        - <cursor A>/ <= Use Hudi to store the compacted table for cursor A.
        - <cursor B>/ <= ...
Copy the code

reference

[1] Lakehouse: A New Generation of Open Platforms that Unify Data Warehousing and Advanced Analytics. cidrdb.org/cidr2021/pa…

[2] What is a Lakehouse? Databricks.com/blog/2020/0…

[3] Diving Deep into the inner workings of the Lakehouse and Delta Lake. Databricks.com/blog/2020/0…