This article is edited from the “Iceberg and Object Storage Construction Data Lake Scheme” shared by Sun Wei, senior software R&D manager of Dell Technology Group, on Flink Meetup in Shanghai on April 17. The content of the article is as follows:

  1. Introduction to Data Lake and Iceberg
  2. Object storage supports Iceberg Data Lake
  3. Demonstration project
  4. Some thoughts on storage optimization

I. Introduction to Data Lake and Iceberg

1. Data Lake Ecology

For a mature data lake ecology, as shown in the figure above:

  • First of all, we think it should have the capacity of mass storage, common object storage, public cloud storage and HDFS;
  • On top of this, we also need to support rich data types, including unstructured images and video, semi-structured CSV, XML, LOG, and structured database tables;
  • In addition, efficient and uniform metadata management is needed so that computing engines can easily index various types of data for analysis.
  • Finally, we need to support a rich range of computing engines, including Flink, Spark, Hive, Presto, etc., to easily connect to existing application architectures in the enterprise.

2. Application scenarios of structured data in the data lake

Above is a typical data lake application scenario.

There may be a variety of data on the data source, different data sources and different formats. For example, transaction data, logs, buried point information, IoT, etc. This data flows through a number of streams and then into the computing platform, where it requires a structured solution that organizes the data onto a storage platform for real-time or timed queries by back-end data applications.

What are the characteristics of such a database solution?

  • First, you can see that there are many types of data sources, so you need to support a relatively rich organization of data schemas;
  • Second, it supports real-time data query during injection, so it needs ACID guarantee to ensure that it will not read some dirty data in the middle state that has not been written yet.
  • Finally, things like logs may need to be reformatted temporarily, or a column added. In this case, it is necessary to avoid the traditional data storehouse, which may have to pull out all the data and write it again and re-inject it into the storage. Instead, a lightweight solution is needed to meet the requirements.

The positioning of Iceberg database is to achieve such a function, docking computing platform at the top, docking storage platform at the bottom.

3. Typical solutions for structured data on the data lake

For data structured organization, the typical solution is to use the traditional organization method of database.

As shown in the figure above, there are namespaces, isolation of database tables; There are multiple tables in the middle, which can provide a variety of data Schema preservation; The data is placed underneath, and the table needs to provide ACID features, as well as support for partial Schema evolution.

4. Iceberg table data organization structure

  • Snapshot Metadata: table Schema, Partition, Partition spec, Manifest List path, current snapshot, etc.
  • Manifest List: Manifest File path and its Partition, data File statistics.
  • Manifest File: The Data File path and the upper and lower boundaries of each column.
  • Data File: The actual table content Data, organized in Parque, ORC, Avro and other formats.

Here’s how Iceberg organizes data in detail. As shown in the figure above:

  • You can see that the right side starts with the data file, which stores the content data of the table and generally supports formats such as PARQUET, ORC and AVRO.
  • Next comes the Manifest File, which records the path to the underlying data File and the upper and lower boundaries of each column to filter the query File.
  • Then there is the Manifest List, which links multiple Manifest files below and records the partition range information corresponding to the Manifest File, which is also for the convenience of subsequent filtering query.

    The Manifest List, which already represents snapshot information, contains all the data links of the current database tables and is the key guarantee for Iceberg’s support of ACID.

    With snapshots, when reading data, you can only read the data referenced by the snapshot. If you are still writing data, it will not be referenced by the snapshot, so you will not read the dirty data. Multiple snapshots share previous data files and share previous data by sharing these Manifest files.

  • Above that is the snapshot metadata, which records the current or historical changes to the table Scheme, the configuration of the partition, all snapshot Manifest File paths, and which snapshot is the current snapshot.

    At the same time, Iceberg provides abstractions of namespaces and tables to complete data organization and management.

5. Iceberg writing process

Above is the flow chart of Iceberg data written, using the computing engine Flink as an example.

  • First, Data Workers read Data from metadata, parse it, and then pass a record to a member of the party.
  • Like most databases, Iceberg has predefined partitions. The records are written to different partitions to form new files.
  • Flink has a CheckPoint mechanism. After the files arrive, Flink will finish writing the files, and then generate the list of the files, and then send it to the Commit Worker.
  • The Commit Worker reads the information of the current snapshot, and then merges it with the List of files generated this time, generating a new Manifest List and the information of the table files with the subsequent metadata, and then committing it. After success, a new snapshot is formed.

6. Iceberg Query Process

Above is Iceberg data query process.

  • First, the Flink Table Scan worker makes a scan. During the scan, it can start from the root like a tree to find the current snapshot or a historical snapshot specified by the user, and then take out the Manifest List file of the current snapshot from the snapshot. According to the information saved at that time, the Manifest File that meets the query condition can be filtered out.
  • Then go down through the information recorded in the Manifest File and filter out the Data Files that are needed below. After the file is taken out, the Recode will be read from the file to the Recorder Reader worker, and then returned to the upper layer for the call.

It can be seen here that no List is used in the whole data query process. This is because Iceberg records it completely. The tree structure of the whole file does not need a List, which is pointed to by a single path directly, so there is no time-consuming List operation in query performance. This is more friendly for object storage, because storing objects on a List is a more expensive operation.

7. Iceberg Catalog features

Iceberg provides Catalog with good abstraction for data storage and metadata management. Any store that implements Iceberg’s Catalog abstraction has a chance to connect with Iceberg to organize access to the above data lake scheme.

As you can see from the figure above, the Catalog mainly provides several aspects of abstraction.

  • It defines a set of role files for Iceberg;
  • Its File IO can be customized, including read and write and delete;
  • Its namespace and table operations (also known as metadata operations) can also be customized;
  • The Catalog can be customized for reading/scanning tables and submitting tables.

This can provide flexible operation space, convenient docking various storage underneath.

Object storage support Iceberg Data Lake

1. The current Iceberg Catalog implementation

At present, the implementation of Iceberg Catalog in the community can be divided into two parts, one is data IO part, the other is metadata management part.

As shown in the above, in fact the lack of private object storage oriented Catalog implementation, S3A can pick up objects stored in theory, but it is with the file system semantics, not natural object storage semantics, simulate the file operations will have extra overhead, and we want to achieve is all the data and metadata management are to an object storage, Instead of a separate design.

2. Comparison between object storage and HDFS

Here’s the question, why use object storage when you have HDFS?

As shown below, we compare object storage to HDFS from all angles.

To sum up, we think:

  • Object storage has advantages in cluster scalability, small file friendliness, multi-site deployment and low storage overhead.
  • HDFS’s benefit is that it provides append uploads and atomic rename, two advantages Iceberg needs.

The following is a brief description of the respective advantages of the two stores.

1) Comparison: Cluster scalability

  • The HDFS architecture uses a single Name Node to store all the metadata, which determines that its single Node capacity is limited, so there is no horizontal scaling ability in metadata.
  • Object storage generally USES hash, metadata is divided into individual pieces, the pieces to the services on different Node to manage, naturally it metadata ceiling will be higher, even in extreme cases can undertake rehash, cut the piece more fine, to more Node to manage the metadata, to expand capacity.

2) Comparison: Small files are friendly

Small files are becoming more common and a pain point in big data applications.

  • HDFS is based on the limitations of the architecture. Small file storage is limited to Name Node memory and other resources. Although HDFS provides an ARCHIVE method to merge small files and reduce the pressure on Name nodes, this requires additional complexity and is not native.

    Similarly, the TPS of small files is limited by the processing power of the Name Node, as there is only a single Name Node. The metadata of object storage is distributed storage and management, and the traffic can be well distributed among nodes, so that a single Node can store a large number of small files.

  • Nowadays, many object stores offer multi-media, layered acceleration, which can improve the performance of small files.

3) Comparison: Multi-site deployment

  • Object storage supports multi-site deployment

    • Global namespace
    • Supports rich rule configuration
  • The multi-site deployment capability of object storage is suitable for the architecture of two places, three centers and many activities, while HDFS does not have the native multi-site deployment capability. Although some commercial versions have been seen to add the ability for multiple sites to handle data to HDFS, its two systems may be independent and therefore do not support the ability to live under a truly global namespace.

4) Comparison: Low storage overhead

  • For a storage system, to accommodate random hardware failures, it usually has a replication mechanism to protect data.

    • The common one is three copies. The data is stored in three copies and then stored separately in three nodes. The storage overhead is three times, but it can tolerate the failure of two copies at the same time to ensure that the data will not be lost.
    • The other is Erasure Coding, commonly referred to as EC. Take 10+2, for example, which cuts data into 10 data blocks and then uses the algorithm to calculate two code blocks for a total of 12 blocks. Then spread across four nodes, the storage overhead is 1.2 times. It can also tolerate simultaneous failure of two blocks, in which case all the data can be calculated from the remaining 10 blocks, thus reducing storage overhead and achieving a level of failure tolerance.
  • HDFS uses the three-copy mechanism by default, and EC capabilities are already supported on newer versions of HDFS. After research, it is file-based to do EC, so it has a natural disadvantage against small files. Because if the size of the small file is smaller than the size required by the partition, it will be more expensive than the original overhead, because there is no saving on the two code blocks. In an extreme case, if it is the same size as a single block of code, it is already equivalent to three copies.

    At the same time, HDFS once EC, can no longer support append, hflush, hsync and other operations, which will greatly affect the EC can use the scene. Object storage natively supports EC, and for small files, it internally consolinates the small files into a large block to do EC, which ensures that data overhead is always constant, based on a preconfigured policy.

3. The challenge of object storage: the additional uploading of data

In the S3 protocol, objects are required to provide a size when uploaded.

Taking S3 standard as an example, when object storage is connected with Iceberg, S3 standard object storage does not support the interface of data appending upload, and the protocol requires that the file size be provided when uploading the file. So in this case, it’s not very friendly to stream File IO incoming.

1) Solution 1: S3 Catalog Data Append Upload – Small File Cache Local/In-memory

For small files, as they are streamed in, they are written to the local cache/memory, and when they are completely written, they are then uploaded to the object store.

S3 Catalog Data Upupload – Upupload large files in MPU segment

For large files, MPU fragment uploading is used as defined by the S3 standard.

It is generally divided into several steps:

  • The first step is to create the initial MPU and get a Upload ID. Then give each segment an Upload ID and a number. These segments can be uploaded in parallel.
  • After the Upload is completed, there is a Complete operation, which is equivalent to the notification system. It will arrange all the numbers based on the same Upload ID and form a large file from small to large.
  • To apply the mechanism to the data append upload scenario, the conventional implementation is to write a file, cache the file to the local, when it reaches the required size of the block, it can be initialized MPU, and a block of it will start uploading. The same operation is performed for each subsequent chunk until the last chunk is uploaded, and then a completion operation is called to complete the upload.

MPU has advantages and disadvantages:

  • The downside is that there is an upper limit to the number of sharding in an MPU, perhaps 10,000 in the S3 standard. If you want to support large files, this partition should not be too small, so for files smaller than the partition, it is still necessary to use the previous method for caching upload;
  • The advantage of MPU is the ability to upload in parallel. If you do an asynchronous upload, after the cache reaches the file, you can continue to cache the next block without waiting for the previous block to be uploaded successfully, and then start uploading. When the front-side injection is fast enough, the back-end asynchronous commit becomes a parallel operation. With this mechanism, it can provide faster uploads than single stream uploads.

4. The challenge of object storage: atomic commit

The next issue is atomic commit to object storage.

As mentioned earlier in the data injection process, the final commit is actually divided into several steps and is a linear transaction. First it reads the current snapshot version, then it merges this list of files, and then commits its new version. This operation is similar to the common “I = I +1” operation in our programming. It is not an atomic operation and the object storage standard does not provide this capability.

Above is a scenario where meta-information is submitted concurrently.

  • Here Commit Worker 1 to V006, merge his files and Commit V007 successfully.
  • At this point there is another Commit Worker 2, which also gets V006, merges it, and also provides V007. At this point we need a mechanism to tell it that V007 has conflicted and cannot be uploaded, and then let it Retry itself. Retry then fetching the new V007 merge and submitting it to V008.

This is a typical collision scenario, where a mechanism is needed, because if it cannot detect that it is a conflict, a second submission of V007 will overwrite the previous submission, causing all the data from the previous submission to be lost.

As shown in the figure above, we can use a distributed lock mechanism to solve the above problem.

  • First, Commit Worker 1 to get v006, and then merge the files. Before committing, obtain the lock, and determine the current snapshot version. If it is V006, then V007 can be successfully committed and unlocked after successful submission.
  • Similarly, when Commit Worker 2 gets the V006 merge, it does not get the lock at first until Commit Worker 1 releases the lock. When you get the lock and check it again, it will find that the current version is already V007, which conflicts with your own V007, so the operation must fail, and it will Retry.

This is done by locking concurrent commits.

5. UPLOAD DELL EMC ECS DATA

The solution to the object storage and Iceberg problem based on the S3 standard has some problems, such as performance penalty or additional deployment of lock services.

Dell EMC ECS is also an object store, based on this problem has a different answer, it is based on the S3 standard protocol has some extensions, can support additional data upload.

Its append uploads differ from MPU in that there is no block size limit. The chunking can be set a little smaller, and after uploading, the inside will be concatenated, and it will still be a valid file.

The Append Upload and MPU can be adapted to a certain extent for different scenarios.

MPU has the ability to accelerate uploading, and the performance of additional uploading is sufficient even when the speed is not very fast. Moreover, it does not have the initialization and merging operations of MPU, so both of them can be used in different scenarios in terms of performance.

6. Dell EMC ECS solution under concurrent submission

ECS object storage also provides an if-match semantics, which is an interface capability on both Microsoft’s cloud storage and Google’s cloud storage.

  • If-match means to get the eTag of the file at the same time as the Commit Worker 1 gets v006. The system needs to determine whether the ETag of the file to be overwritten is the same as the real ETag of the current file. If it is the same, the overwrite operation is allowed, then V007 will be committed successfully.
  • The other situation is that Commit Worker 2 also gets the eTag of v006, and when uploading the eTag, it finds out that the eTag is different from the file in the current system, then returns a failure and triggers a Retry.

This implementation has the same effect as the locking mechanism and does not require an external redeployment of the lock service to ensure atomic commits.

7. S3 Catalog – Unified Stored Data

To recap, above we solved the problem of uploading data IO in file IO and the problem of atomic submission in metadata tables.

After solving these problems, the management of data and metadata can be handed over to the object store, and there is no need to deploy additional metadata services, and the concept of data store can be truly unified.

Third, the demonstration scheme

As shown above, the demo uses Pravega, which can be understood simply as an alternative to Kafka, but has been optimized for performance.

In this example, we will inject the data into Pravega’s stream, and Flink will read the data from Pravega, parse it, and then store it in Iceberg. Iceberg uses the ECS Catalog, which is directly connected to the object store without any other deployment, and then uses Flink to read the data.

Four, some thinking about the optimization of storage

The figure above shows the data organization structure currently supported by Iceberg, which you can see is stored directly in the Parquet file.

Our idea is that if the lake and the metadata lake are actually the same lake, is it possible that the generated Parquet file and the source file have great data redundancy, and whether the storage of redundant information can be reduced?

In the most extreme case, where a source file is recorded in Iceberg, the Parquet data file is not stored. When it comes time to query, the same effect can be achieved by customizing File IO to generate a format similar to Parquet based on the original File in real time in memory and submitting it to the upper application query.

This approach, however, is limited to situations where the cost of storage is high but the performance of the query is not high. This is also based on Iceberg’s good abstraction, because its File metadata and File IO are abstracted, you can pull the source File into it and make it think it is a Parquet File.

Further consideration is given to whether query performance can be optimized while saving storage space.

For example, it is estimated to take out some commonly used columns of source files and then statistic information to the Iceberg. When reading, source files and cloud computing files can be used to quickly query information and save the storage space of less commonly used data columns at the same time.

This is a relatively preliminary idea. If it can be realized, Iceberg can not only index the structured PARQUET file format, but also index some semi-structured and structured Data, and solve the query task of the upper layer through temporary calculation, so as to become a more complete Data Catalog.