I. The core challenge of data entry into the lake

The real-time data feeding into the lake can be divided into three parts, namely, data source, data pipeline and data lake (data storehouse). The content of this paper will be carried out around these three parts.

1. Case #1: A program BUG causes data transfer to be interrupted

  • First, when the data source is transmitted to the data lake (data warehouse) through the data pipeline, it is likely that there will be a BUG in the operation, which will cause the data to be transmitted half of the time and affect the business.
  • The second problem is how to restart the job when such a situation occurs and ensure that the data is not duplicated or missing and is fully synchronized into the data lake (data storehouse).

2. Case #2: Data changes are too painful

  • Data changes

    When data change happens, it will bring great pressure and challenge to the whole link. Here is an example of a table that originally defined two fields, ID and NAME. At this time, the business side of the students said that the need to add the address, in order to facilitate better mining the value of users.

    First, we need to add a column Address to the Source table, then add a chain to the link to the middle of Kafka, then modify the job and restart. Then the entire link has to be changed all the way through, adding new columns, modifying jobs, restarting, and finally updating all the data in the data lake to create the new columns. This process is not only time-consuming, but also introduces the problem of ensuring that the data is isolated so that changes do not affect the reading of the analysis job.

  • Partition changes

    As shown in the figure below, the table in the data warehouse is partitioned in the unit of “month”, and now we hope to change it to partition in the unit of “day”. This may require updating all the data of many systems, and then partitioning with a new strategy, which is a time-consuming process.

3. Case #3: Slower and slower near-real-time reporting?

When a business needs closer to real-time reporting, it needs to change the data import cycle from “days” to “hours” or even “minutes”, which can cause a number of problems.

The first problem, as shown in the figure above, is that files grow at a rate that is visible to the naked eye, which puts increasing pressure on the external system. Pressure is mainly reflected in two aspects:

  • First, Hive Metastore faced scaling challenges as analytics jobs started increasingly slowly, as shown in the figure below.

    • As the number of small files grows, there will be increasing bottlenecks in using centralized Metastore, which will cause slower and slower launch analysis jobs because all the original data from the small files will be swipes at the time of launch.
    • Second, Metastore is a centralized system that could easily encounter Metastore extension problems. For example, Hive may have to find a way to expand the following MySQL, resulting in a large maintenance cost and overhead.
  • The second pressure is that scanning and analysis operations are slowing down.

    As the number of small files increases, the scanning process becomes slower and slower after the analysis is done. In essence, this is due to the large increase in the number of small files, causing the scanning job to switch between many DataNodes frequently.

4. Case #4: Analyzing CDC data in real time is difficult

When people looked at the various systems in Hadoop, they found that the whole link needed to run fast, stable, and have good concurrency, which was not easy.

  • First of all, from the perspective of the source side, for example, if you want to synchronize the data of MySQL to the data lake for analysis, you may face a problem, that is, there is stock data in MySQL. If incremental data is constantly generated later, how to perfectly synchronize the full and incremental data into the data lake to ensure that the data is neither more nor less.

  • In addition, assuming that the full and incremental switch of the source is solved, how to ensure that CDC data can be synchronized to the downstream for quite a few rows if an exception is encountered during the synchronization process, such as the upstream Schema change that causes the interruption of the operation?

  • The construction of the whole link needs to involve the full source and synchronous switch, including the collusion of the intermediate data stream and the process of writing to the data lake (data warehouse). The construction of the whole link needs to write a lot of code, and the development threshold is high.

  • Finally, and critically, we find that in open source ecosystems and systems, it is difficult to find efficient, high-concurrency analysis of data of a changing nature like CDC.

5. The core challenges of data entry into the lake

  • The data synchronization task was interrupted

    • The impact of writes on analysis cannot be effectively isolated.
    • Synchronizing tasks does not guarantee the exactly-once semantics.
  • End-to-end data changes

    • DDL leads to the complexity of the whole link update and upgrade;
    • Difficulty in modifying lake/warehouse inventory data.
  • Increasingly slow near real-time reporting

    • Frequent writing produces a large number of small files;
    • Metadata system is under great pressure and starts up slowly.
    • The large number of small files causes slow data scanning.
  • CDC data cannot be analyzed in near real time

    • Difficulty in switching from full to incremental synchronization;
    • Involves end-to-end code development, the threshold is high;
    • The open source world lacks efficient storage systems.

Introduction to Apache Iceberg

1. A summary of cloud pain points on Netflix: Hive

The key reason why Netflix makes Iceberg is to solve the pain points of Hive cloud, which are mainly divided into the following three aspects:

1.1 Pain point 1: Difficulties in data change and backtracking

  1. ACID semantics are not provided. When data changes occur, it is difficult to isolate the impact on the analysis task. Typical operations are: INSERT OVERWRITE; Modify data partitions; Modify the Schema;
  2. Unable to handle multiple data changes, resulting in conflicting issues;
  3. The historical version cannot be effectively traced back.

1.2 Pain point 2: Difficulty in replacing HDFS with S3

  1. The data access interface is directly dependent on the HDFS API;
  2. Relying on the atomicity of the Rename interface, which is difficult to achieve the same semantics on object stores like S3;
  3. The List interface relies heavily on file directories, which is inefficient on object storage systems.

1.3 Pain point 3: Too much detail

  1. When Schema changes, different file formats behave inconsistently. FileFormats don’t even have consistent support for data types;
  2. Metastore only maintains Partition-level statistics, resulting in no Task Plan overhead; Hive Metastore was difficult to scale;
  3. A non-Partition field cannot be a Partition Prune.

2. Core features of Apache Iceberg

  • Generalize standard design

    • Perfect decoupling of the computing engine
    • Schema standardization
    • Open data format
    • Support for Java and Python
  • Complete Table semantics

    • Schema definitions and changes
    • Flexible Partition strategy
    • ACID semantics
    • The Snapshot semantics
  • Rich data management

    • Stored stream batch unification
    • Extensible Meta design support
    • Batch updates and CDC
    • Support file encryption
  • Cost performance

    • Calculate push-down design
    • Low-cost metadata management
    • Vectorization calculation
    • Lightweight index

3. Apache Iceberg File Layout

The upper part is a standard Iceberg TableFormat structure, the core is divided into two parts, one is Data, the other is Metadata, no matter which part is maintained on S3 or HDFS.

4. Apache Iceberg Snapshot View

The figure above shows the writing and reading process for Iceberg.

You can see that there are three layers inside:

  • The top yellow is the snapshot;
  • The blue in the middle is Manifest;
  • At the bottom is the file.

Each write produces a batch of files, one or more manifestos, and snapshots.

For example, snap-0 is formed the first time, snap-1 is formed the second time, and so on. But in the maintenance of the original data, are incremental step by step to do additional maintenance.

In this way, users can do batch data analysis on a unified storage and incremental analysis between snapshots based on storage. This is also the reason why Iceberg supports reading and writing of streams and batches.

5. Choose Apache Iceberg

The figure above shows some of the companies that are currently using Apache Iceberg. The domestic examples are familiar to all of us. Here, we will briefly introduce the use of foreign companies.

  • Netflix now has hundreds of petabytes of data on ApacheIceberg, while Flink’s daily data increment is hundreds of terabytes of data.
  • Adobe’s daily data increment size of T, the total size of data in the tens of petabytes.
  • AWS uses Iceberg as a base for its data lakes.
  • Cloudera builds its entire public cloud platform based on Iceberg. The trend of HDFS privatization deployment such as Hadoop is weakening, while the trend of cloud is gradually rising. Iceberg plays a key role in Cloudera’s data architecture cloud stage.
  • Apple has two teams working on it:

    • First, the entire iCloud data platform is built based on Iceberg.
    • Second, artificial intelligence voice service Siri is also based on Flink and Iceberg to build the entire database ecology.

3. How do Flink and Iceberg solve the problem

Back to the heart of the matter, here’s how Flink and Iceberg have solved a series of problems encountered in Part I.

1. Case #1: A program BUG causes data transfer to be interrupted

First of all, the synchronization link with Flink, can guarantee the semantics of exactly once, when the job failure, can do strict recovery, to ensure the consistency of data.

The second is Iceberg, which provides rigorous ACID semantics to help users easily isolate the negative effects of writes on analysis tasks.

2. Case #2: Data changes are too painful

As shown above, when data changes occur, Flink and Iceberg solve this problem.

Flink can capture the upstream Schema change event, and then synchronize the event to the downstream. After synchronization, downstream Flink directly forwards the data down, and then to the storage. Iceberg can instantly change the Schema.

When it comes to DDL such as Schema, Iceberg directly maintains multiple versions of Schema, and then the old data sources are completely unchanged. New data are written to new Schema, and one-key Schema isolation is achieved.

Another example is the problem of zoning changes. The Iceberg approach is shown in the figure above.

Previously, partitioning was done according to “month” (yellow data block above). If you want to change the Partition according to “day”, you can directly change the Partition with one key. The original data remains unchanged, and all new data are partitioned according to “day”, with ACID isolation semantics.

3. Case #3: Slower and slower near-real-time reporting?

A third problem was the pressure that small documents put on Metastore.

First of all, for Metastore, Iceberg saves the original data uniformly to the file system and then maintains it in the form of metadata. In fact, the whole process eliminated the centralized Metastore and relied only on file system extensions, so it was more scalable.

Another problem is that there are more and more small files, which makes data scanning slower and slower. Flink and Iceberg offer a number of solutions to this problem:

  • The first solution is to optimize the small file size by shuffling a Bucket. Shuffle is a small file size.
  • The second scenario is a batch job that periodically merges small files.
  • The third solution, which is relatively smart, is to automatically and incrementally merge small files.

4. Case #4: Analyzing CDC data in real time is difficult

  • The first problem is the synchronization of full volume and incremental data. In fact, the community already has the Flink CDC Connected program, which means that Connected can automatically do the seamless connection between full volume and incremental data.

  • The second problem is how to ensure that the Binlog rows are synchronized to the lake, even if there are exceptions in the process.

    For this problem, Flink does a very good job of identifying different types of events at the Engine level, and then using Flink’s Exact Once semantics, it can automatically recover and handle failures.

  • The third problem is that building the entire link requires a lot of code development, and the barriers are too high.

    After using Flink and Data Lake, you only need to write a source table and a sink table, and then an INSERT INTO, and the whole link can be opened without writing any business code.

  • Finally, how the storage layer supports near real-time CDC data analysis.

IV. Community Roadmap

The map above shows Iceberg’s Roadmap. It can be seen that Iceberg only releases one version in 2019, but releases three versions directly in 2020, and becomes a top project at version 0.9.0.

The Roadmap for Flink and Iceberg, shown above, breaks down into four phases.

  • The first stage is to link Flink to Iceberg.
  • The second stage is Iceberg replacing the Hive scenario. In this scenario, a lot of companies have started to go online, landing their own scene.
  • The third stage is to solve more complex technical problems through Flink and Iceberg.
  • The fourth stage is to transform this set from a simple technical solution to a more complete product solution.