Brief introduction:This paper introduces some practices of using Flink + Iceberg 0.11 in Qunar data platform.

Author: Yu Dong

Abstract: This paper introduces some practices of using Flink + Iceberg 0.11 in Qunar data platform. Contents include:

  • Background and pain points
  • Iceberg architecture
  • Pain point 1: Kafka data loss
  • Pain point two: high stress in near real-time HIVE
  • Iceberg optimization practice
  • conclusion

Making the address https://github.com/apache/flink welcome to Flink thumb up to star ~

Background and pain points

1. The background

We have encountered some issues with Flink as a real-time database and data transfer process, such as Kafka data loss, and Flink’s near-real-time database performance in conjunction with Hive. The new feature of Iceberg 0.11 addresses these business scenarios. Compared with Kafka, Iceberg has its own advantages in some specific scenes. Here, we share some practices based on Iceberg.

2. The original architecture scheme

The original architecture used Kafka to store real-time data, including logs, orders, tickets, and so on. Then use Flink SQL or Flink Datastream to stream the consumption data. I have developed a platform to submit SQL and DataStream internally and submit real-time jobs through this platform.

3. The pain points

  • Kafka has high storage costs and large data volumes. Due to high pressure, Kafka sets the data expiration time relatively short. When the data has backpressure, backlog and other situations, if the data is not consumed within a certain period of time, the data will expire and the data will be lost.
  • Flink provides near-real-time read and write support on Hive. In order to share the pressure of Kafka, put some data that is not too real-time into Hive, and let Hive do the minute-level partition. However, as the amount of metadata increases, Hive Metadata becomes increasingly stressed, queries become slower, and the database that stores Hive metadata becomes stressed.

II. Iceberg Architecture

1. Iceberg is an Iceberg

The term parse

  • Data Files

    The Iceberg table is a file that actually stores data, usually in the data directory and ends with “.parquet “.

  • Manifest file

    Each row is a detailed description of each data file, including the state of the data file, the file path, partition information, column-level statistics (such as the maximum and minimum values per column, the number of null values, and so on). Through this file, the irrelevant data can be filtered out and the retrieval speed can be improved.

  • Snapshot

    A snapshot represents the state of a table at some point in time. Each snapshot version contains a list of all the data files at a given time. Data files are stored in different manifest files, manifest files are stored in a manifest list file, and a manifest list file represents a snapshot.

2. Iceberg schemes

Query planning is the process of looking up the “files required for the query” in a table.

  • Metadata filtering

    The manifest file includes partition data tuples and column-level statistics for each data file. During the planning period, the query predicates are automatically converted to predicates on the partition data and are first applied to the filtered data files. Next, use column-level value counts, null counts, lower bound, and upper bound to eliminate files that do not match the query predicate.

  • Snapshot ID

    Each Snapshot ID is associated with a set of manifest files, and each set of manifest files contains many manifest files.

  • Manifest Files list

    Each Manifest Files records the metadata information of the current data block, including the maximum and minimum values of the file column. Then, according to this metadata information, it indexes to the specific file block, so as to search for the data faster.

Pain point 1: Kafka data loss

1. Pain spot introduction

We usually choose Kafka for real-time data storehouse and log transfer. The storage cost of Kafka itself is very high, and the data retention time is time-sensitive. Once the consumption backlogs, the data reaches the expiration time, and the data will be lost and not consumed.

2. Solutions

Put the real-time business data into the lake with low requirements, such as accepting 1-10 minutes of delay. Because Iceberg 0.11 also supports real-time SQL reading, it can also save historical data. This will reduce the pressure on Kafka online, and ensure that data is not lost and can be read in real time.

3. Why does Iceberg only provide near real-time access to the lake?

  1. Iceberg commits transactions at file granularity. This makes it impossible to commit transactions in seconds, which would swell the number of files;
  2. There are no online service nodes. For the real-time high throughput and low delay write, the pure real-time response cannot be obtained.
  3. When the physical data is written to Iceberg, it cannot be directly queried. When the checkpoint is triggered, the metadata file will be written. At this time, the data will change from invisible to visible. The checkpoint is executed for a certain amount of time.

4. Flink into lake analysis

Component is introduced

  • IcebergStreamWriter

    It is mainly used to write records to the corresponding Avro, Parquet and ORC files, generate a corresponding Iceberg DataFile, and send it to the downstream operator.

The IcebergFilesCommitter (IcebergFilesCommitter) is used to collect all datafiles when checkpoints arrive and commit transactions to Apache Iceberg (IcebergFilesCommitter). Finish the data writing of this checkpoint and generate the DataFile.

  • IcebergFilesCommitter

    We maintain a List of datafiles for each CheckPointId, map<Long, List>, so that even if one of the checkpoints fails to commit, Its DataFile files are still maintained in the State, and it is still possible to submit data to the Iceberg table through subsequent checkpoints.

5. Flink SQL Demo

Flink Iceberg flows into the lake in real time, consumes Kafka data to write Iceberg, and reads data from Iceberg in near real time.

5.1 Preliminary work

  • Enable real-time read and write

    set execution.type = streaming

  • Turn on the table SQL hint function to use the OPTIONS property

    set table.dynamic-table-options.enabled=true

  • The registered Iceberg Catalog is used to manipulate the Iceberg table

    CREATE CATALOG Iceberg_catalog WITH (\n" + "' Iceberg_catalog '=' Iceberg_catalog ',\n" +" ' 'Catalog -type'='Hive'," + "' URI '=' Thrift: //localhost: 9083'" + ");
  • Kafka real-time data into the lake

    insert into Iceberg_catalog.Iceberg_db.tbl1 \n 
                select * from Kafka_tbl;
  • Real-time flow between data lakes TBL1-> TBL2

      insert into Iceberg_catalog.Iceberg_db.tbl2  
        select * from Iceberg_catalog.Iceberg_db.tbl1 
        /*+ OPTIONS('streaming'='true', 
    'monitor-interval'='10s',snapshot-id'='3821550127947089987')*/

5.2 Parameter Interpretation

  • monitor-interval

    Continuously monitor the interval of newly committed data files (default: 1s).

  • start-snapshot-id

    The data is read from the specified snapshot ID. Each snapshot ID is associated with a manifest file. Each metadata file maps to its own real data file and is read to a version of the data by snapshot ID.

6. Pit record

I wrote data to Iceberg in SQL Client before. The data in the data directory has been updated all the time, but there is no data in metadata, so there is no number when querying, because the query of Iceberg requires metadata to index real data. The SQL Client does not enable checkpoint by default, so you need to use a configuration file to enable checkpoint. This causes the data directory to write data while the metadata directory does not write metadata.

PS: Checkpoint must be enabled when entering the lake via SQL or DataStream.

7. Data sample

The following two figures show the effect of real-time query for Iceberg, and how the data changes one second before and one second after.

  • One second ago

  • Data refreshed after one second

Pain point 2: Flink is getting slower to integrate with Hive in near real time

1. Pain spot introduction

The near-real-time architecture of Flink + Hive supports real-time reads and writes, but the problem with this architecture is that as tables and partitions grow, there will be the following problems:

  • Too much metadata

    Hive changed the partition to an hour-per-minute level. Although it improved the quasi-real-time performance of data, the pressure on Metestore was also evident. Excess metadata could slow the query generation plan and affect the stability of other businesses on the line.

  • Database pressure increases

    As metadata increases, so does the pressure on the database that stores Hive metadata, and over time, the repository will need to be expanded, such as storage space.

2. Solutions

Migrate the original Hive near real time to Iceberg. Why is Iceberg able to deal with large amounts of metadata, while Hive tends to create bottlenecks when metadata is large?

  • Iceberg maintains metadata on extensible distributed file systems instead of centralized metadata systems.
  • Hive is the metadata that is maintained above Partitions in Metastore (too many Partitions can cause MySQL to stress). Metadata within a partition is actually maintained in a file (starting a job requires listing a large number of files to determine if the file needs to be scanned, which is a time-consuming process).

Fifth, optimize the practice

1. Small file processing

  • Prior to Iceberg 0.11, small file merges were triggered periodically by the Batch API. This allows for merges, but requires a set of Actions to be maintained and is not merged in real time.

    Table table = findTable(options, conf);
    Actions.forTable(table).rewriteDataFiles()
            .targetSizeInBytes(10 * 1024) // 10KB
            .execute();
  • Iceberg 0.11 is a new feature that supports streaming small file merging.

    The advantage of using the partition/bucket key to hash hash and merge files directly from the source is that a task will process the data in a partition and submit its own datafiles, for example, a task will only process the data in the corresponding partition. This avoids the problem of many small files being submitted by multiple tasks, and requires no extra maintenance code. All you need to do is specify the property write-distribution-mode when you build the table, which is common to other engines such as Spark.

    CREATE TABLE city_table ( 
         province BIGINT,
         city STRING
      ) PARTITIONED BY (province, city) WITH (
        'write.distribution-mode'='hash' 
      );

2. Iceberg rank 0.11

2.1 Introduction to Sorting

Before Iceberg 0.11, Flink did not support Iceberg sorting, so it could only support sorting in batch mode with Spark. 0.11 adds sorting support, which means we can also experience this benefit in real time.

The essence of sorting is to scan faster, because after aggregating according to the sort key, all the data are arranged in order from small to large, and max-min can filter out a lot of invalid data.

2.2 sorting demo

insert into Iceberg_table select days from Kafka_tbl order by days, province_id;

3. What is seen here is the most manifest

Parameter interpretation

  • FILE \ _PATH: Physical file location.
  • Partition: The partition of the file.
  • Lower \_bounds: The minimum value of multiple sort fields in this file. Below are my days and province\_id minimums.
  • Upper \_bounds: The maximum value of multiple sort fields in this file. Figure below shows my days and province\_id maxima.

File \ _PATH files are read from partitions and columns. After sorting the data, the file column information is also recorded in the metadata. The query plan can locate the file from the manifest, which does not need to record information in Hive Metadata. In this way, Hive Metadata pressure can be reduced and query efficiency can be improved.

The sorting feature of Iceberg 0.11 is used to take the day as the partition. Sort by day, hour, and minute, then the manifest file will record this sort rule, so that when retrieving data, improve the query efficiency, not only can realize the benefits of Hive partition retrieval, but also can avoid the pressure caused by too much Hive metadata.

Six, summarized

Compared with the previous version, Iceberg 0.11 adds a lot of practical functions. By comparing the old version used before, the following conclusions are made:

  • Flink + Iceberg sorting feature

    Before Iceberg 0.11, sorting integrated Spark but not Flink, when Spark + Iceberg 0.10 was used to migrate a batch of Hive tables. The benefits on BI are: In order to improve the Hive query speed, BI originally built multi-level partitions, which led to too many small files and metadata. In the process of entering the park, Spark is used to sort the conditions of BI frequent queries, combined with implicit partitions, and finally improve the retrieval speed of BI. At the same time, there is no problem of small files. It also reduces Hive Metadata stress.

    Icebeg 0.11 supports Flink sorting and is a useful feature point. We can transfer the original Flink + Hive partition to Iceberg sorting, which can not only achieve the effect of Hive partition, but also reduce small files and improve query efficiency.

  • Real-time data reading

    The real-time reading of data can be realized by SQL programming. The advantage is that data with low real-time requirements can be put in Iceberg, for example, the business can accept 1-10 minute delay. While reducing Kafka pressure, data can be read in near real time and historical data can also be saved.

  • Merge small files in real time

    Before Iceberg 0.11, Iceberg’s merge API needs to be used to maintain small file merges. This API needs to pass in table information and timing information, and merges are done by batches, not in real time. From the code, increased maintenance and development costs; Not real time, in terms of timeliness. 0.11 Hash the data in real time from the source by specifying (‘ write.distribut-mode ‘=’ Hash ‘) property at SQL table build time. No manual maintenance is required.

Copyright Notice:The content of this article is contributed by Aliyun real-name registered users, and the copyright belongs to the original author. Aliyun developer community does not own the copyright and does not bear the corresponding legal liability. For specific rules, please refer to User Service Agreement of Alibaba Cloud Developer Community and Guidance on Intellectual Property Protection of Alibaba Cloud Developer Community. If you find any suspected plagiarism in the community, fill in the infringement complaint form to report, once verified, the community will immediately delete the suspected infringing content.