Tidying | Lu Peijie (Flink Community Volunteer)

Abstract: Apache Flink is a very popular streaming batch unified computing engine in the field of big data at present. Data Lake is a new technical architecture that conforms to the development trend of cloud era. Solutions represented by Iceberg, Hudi and Delta emerge at the historic moment. Iceberg currently supports Flink to write data to Iceberg tables via the DataStream API /Table API and provides integration support for Apache Flink 1.11.x.

This article is shared by Su Shu, senior engineer of Tencent Data Platform Department. It mainly introduces the application practice of Tencent Big Data department to build real-time data warehouse based on Apache Flink and Apache Iceberg. The introduction mainly includes the following aspects:

  1. Background and pain points
  2. Introduction to Apache Iceberg, a data lake
  3. Flink+Iceberg builds real-time data warehouse
  4. The future planning

one Background and pain points

As shown in Figure 1, these are the users of some internal applications that have been assisted at present, among which the data volume of the two applications, small program and Video Number, is above PB level or EB level every day or every month.

Figure 1

When users of these applications build their own data analysis platforms, they often adopt an architecture like figure 2, which is familiar to most of you.

1. Data platform architecture

Business parties, such as users of Tencent Watch or Video, usually collect data such as front-end service data and application service logs, which will be connected to data warehouse or real-time computing engine through messaging-oriented middleware (Kafka/RocketMQ) or data synchronization service (Flume/Nifi /dataX).

In the data warehouse system, there are various big data components, such as Hive/HBase/HDFS/S3, and computing engines such as MapReduce, Spark and Flink. According to different requirements, users will build big data storage and processing platforms, on which data will be processed and analyzed. The resulting data is stored in relational and non-relational databases that support fast queries, such as MySQL and Elasticsearch. The application layer can then use this data for BI report development, user portrait, or interactive query based on Presto OLAP tools.

Figure 2

2. Pain points of the Lambda architecture

During the whole process, we often used some offline scheduling system to perform Spark analysis tasks periodically (T+1 or every few hours), do some data input, output or ETL work. In the whole process of offline data processing, data delay is inevitable. Whether it is data access or intermediate analysis, the data delay is relatively large, which may be hour level or day level. In other scenarios, we often build a real-time processing process for some real-time requirements, such as using Flink+Kafka to build a real-time flow processing system.

On the whole, there are many components in the warehouse architecture, which greatly increases the complexity of the whole architecture and the cost of operation and maintenance.

The diagram below, this is a lot of companies before or now are adopting the Lambda architecture, Lambda architecture several positions can be divided into offline and real-time layer, the corresponding batch and flow processing two independent data processing procedure, and the same data will be processed two above, the same set of business logic code requires the development of fitment twice. Lambda architecture we should be very familiar with, I will focus on the use of Lambda architecture in the process of data warehouse construction encountered some pain points.

Figure 3

For example, in the real-time scenario of real-time calculation of some user-related indicators, when we want to see the current PV and UV, we will put these data into the real-time layer to do some calculations, and the values of these indicators will be displayed in real time. But at the same time, to understand a growing trend of users, we need to calculate the data of the past day. In this way, batch scheduling tasks are required. For example, a Spark scheduling task is launched on the scheduling system at two or three o ‘clock in the morning to run all the data of the day again.

Obviously, in this process, because the two processes run at different times, but run the same data, so it may cause inconsistent data. Due to the updating of one or several data pieces, it is necessary to run the whole offline analysis link again, and the data updating cost is very high. Meanwhile, it is necessary to maintain offline and real-time analysis two sets of computing platform, and the development process and operation and maintenance cost of the whole upper and lower layers are actually very high.

In order to solve the problems caused by the Lambda architecture, the Kappa architecture was born, which should be very familiar to all of you.

3. Pain points of Kappa architecture

Let’s take a look at the Kappa architecture, as shown in Figure 4, which actually uses message queues in the middle and connects the whole link with Flink. Kappa architecture solves the problem of high operation and maintenance costs and development costs caused by different engines between offline processing layer and real-time processing layer in Lambda architecture, but Kappa architecture also has its pain points.

  • First of all, when constructing real-time business scenarios, Kappa will be used to build a near-real-time scenario. However, if you want to do some simple OLAP analysis or further data processing on the middle layer of warehouse such as ODS layer, such as writing data to Kafka of DWD layer, Flink will be needed. At the same time, the need to import data from Kafka at the DWD layer into Clickhouse, Elasticsearch, MySQL, or Hive for further analysis obviously adds complexity to the architecture.
  • Secondly, Kappa architecture is strongly dependent on message queue. As we know, the accuracy of data calculation of message queue itself in the whole link is strictly dependent on the order of its upstream data. The more messages queue is connected, the more possibility of out of order will occur. The ODS layer data is usually absolutely accurate. The ODS layer data may be out of order when sent to the next Kafka, and the DWD layer data may be out of order when sent to the DWS, which can cause serious data inconsistencies.
  • Third, Kafka because it is a sequential storage system, sequential storage system is not directly in its use of OLAP analysis of some optimization strategies, such as predicates push down this kind of optimization strategy, in the sequential storage of Kafka is more difficult to implement.

So is there an architecture that can meet the requirements of real-time and offline computing, reduce the cost of operation and maintenance development, and solve some pain points encountered in the process of constructing Kappa architecture through message queues? The answer is yes, and more on that later.

Figure 4.

4. Pain points summary

■ Traditional T+1 mission

  • Massive TB T+ 1 task delay results in unstable downstream data output time.
  • Task failure recovery is expensive
  • Data architectures struggle with the ability to handle de-gravity and exact-once semantics
  • The complex architecture involves coordination of multiple systems, and task dependence is built by scheduling system

■ Lambda architecture pain points

  • Maintenance of both real-time platform and offline platform engines costs a lot
  • Real-time offline two platforms need to maintain two sets of code with different frameworks but the same business logic, resulting in high development costs
  • Data has two different links, which may cause data inconsistency
  • Data update costs a lot and links need to be rerouted

■ Kappa architecture pain points

  • It has high requirements for message queue storage, and the backtracking capability of message queue is not as good as offline storage
  • Message queues themselves are time-sensitive to data storage, and currently the OLAP engine cannot be used to directly analyze the data in message queues
  • The real-time calculation of full link dependent message queues may result in incorrect results due to the timing of data

Figure 5

5. Real-time warehouse construction needs

Is there a storage technology that can support efficient traceability of data, update of data, read and write data in batches, and access data at minute to second levels?

This is also an urgent need for real-time warehouse construction (Figure 6). In fact, the Kappa architecture can be upgraded to solve some problems encountered in the Kappa architecture. Next, the popular data lake technology –Iceberg can be shared.

Figure 6.

2. Introduction of Apache Iceberg, a data lake

1. The Iceberg is what

First, what is Iceberg? The official website is as follows:

Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.

The official definition of Iceberg is a table type, which can be simply understood as an intermediate layer based on the computing layer (Flink, Spark) and the storage layer (ORC, Parqurt, Avro). Flink or Spark is used to write data to Iceberg. This table is then read in other ways, such as Spark, Flink, Presto, etc.

Figure 7.

2. Introduction to Iceberg’s table format

Iceberg is intended for analyzing massive data and is defined as the Table format, which is between the computing layer and the storage layer.

The table format is used to manage files on the storage system and provide interfaces for the computing layer. For example, when a Hive table is read, the HDFS file system contains partitions, data storage format, data compression format, and HDFS directory information. All these information is stored in Metastore. Metastore can be called a file organization format.

A good file organization format, such as Iceberg, can more efficiently allow the upper computing layer to access files on disk and do some list, rename, or find operations.

3. Summarizing the ability of Iceberg

Iceberg currently supports three file formats parquet, Avro and ORC, as shown in Figure 7. No matter the file is on HDFS or S3, it can be seen that there are row storage and column storage, and its functions will be introduced in detail later. The abilities of Iceberg itself are summarized as follows (See Figure 8). These abilities are very important for us to build real-time data warehouse using Iceberg in the future.

Figure 8.

  • Snapshot-based read/write separation and backtracking
  • Stream batch uniform write and read
  • Not strongly bound to the compute storage engine
  • ACID semantics and data in multiple versions
  • Table, schema, and partition changes

4. Introduction to Iceberg file organization format

The following figure shows Iceberg’s entire file organization format. Viewed from above:

  • The first top layer is the Snapshot module. The snapshot in Iceberg is a basic data unit that can be accessed by a user, that is to say, all data in a table read by a user is the data in a snapshot.
  • Second, manifest. Snapshot-0 has two manifests and snapshot-1 has three. Each manifest manages one or more DataFiles.
  • Third, DataFiles. The manifest file contains the metadata of the data. If you open the manifest file, you can see the datafiles file path line by line.

Snapshot-1 contains snapshop-0 data, while snapshot-1 only writes data into Manifest2. This capability provides a good support for incremental reads.

Figure 9.

5. Introduction to Iceberg reading and writing process

■ Apache Iceberg reads and writes

First, if there is a write operation, snapshot-1 is dashed when snapshot-1 is written, that is, the COMMIT operation has not yet occurred. In this case, reads to snapshot-1 are actually unreadable because users can only read snapshot after the commit. It can only be read after a commit occurs. Similarly, snapshot-2, snapshot-3.

An important ability that Iceberg provides is the ability to separate reading and writing. Snapshot-4 is written without affecting snapshot-2 and snapshot-3 at all. This ability of Iceberg is one of the most important abilities for constructing real-time data warehouse.

Figure 10.

Similarly, reads can be concurrent. Snapshots s1, S2, and S3 can be read at the same time, which provides the ability to backread snapshot-2 or snapshot-3 data. After snapshot-4 is written, a commit operation takes place, at which point snapshot-4 becomes solid and can be read. In addition, you can see that the current Snapshot pointer moves to S4, which means that by default, all user reads to a table are the Snapshot to which the current Snapshot pointer points, but do not affect previous Snapshot reads.

■ Apache Iceberg incremental read

Here’s Iceberg’s incremental reading. First of all, the Iceberg read operation can only be based on the snapshot-1 that has already been submitted. In this case, there will be a snapshot-2. It can be seen that each snapshot contains all data of the previous snapshot. The cost of reading is very high for the computing engine over the entire link.

If you only want to read the data newly added at the current moment, you can actually read only the incremental data from Snapshot1 to Snapshot2 according to the backtracking mechanism of Iceberg snapshot, that is, the data in purple can be read.

Figure 11.

Similarly, S3 can read only the data in the yellow area, and also read the incremental data from S3 to S1. The streaming Reader function based on Flink source has been implemented internally, and has been running online. I just mentioned a very important issue. Since Iceberg has the functions of read and write separation, concurrent reading and incremental reading, it is necessary to realize Iceberg sink if Iceberg wants to connect with Flink.

■ Real-time small file issues

The community has reconstructed FlinkIcebergSink in Flink to provide the function of global Committee. Our structure is actually consistent with that of the community. The content in the curve box is FlinkIcebergSink.

In the case of multiple IcebergStreamWriters and one IcebergFileCommitter, when upstream data is written to IcebergStreamWriter, All writer does is write datafiles.

Figure 12

When each writer has finished writing its current batch of datafiles, it sends a message to the IcebergFileCommitter telling it it is ready to commit. When IcebergFileCommitter receives the message, it commits the Datafiles file once and does a commit.

The COMMIT operation itself simply changes some of the original information, once the data has been written to disk, to make it visible from invisible. In this case, Iceberg only needs a COMMIT to go from invisible to visible.

■ Real-time small file merge

Flink real-time operations usually run in clusters for a long time. In order to ensure the timeliness of data, the time cycle of Iceberg Commit operations is usually set to 30 seconds or one minute. When a Flink job runs for a day, 1440 commit operations are required if the Flink job runs once a minute, and more if the Flink job runs for a month. Even the shorter the snapshot commit interval, the more snapshots will be generated. When a streaming job runs, a large number of small files are generated.

If this problem is not resolved, Iceberg’s sink operation on the Flink processing engine will not be available. We implement internally a feature called data Compaction Operator that runs alongside Flink Sink. When Iceberg’s FlinkIcebergSink completes a commit, it sends a message to downstream FileScanTaskGen to tell FileScanTaskGen that it has completed a commit.

Figure 13

FileScanTaskGen contains logic to generate file merge tasks based on user configuration or disk characteristics. The content of FileScanTaskGen sent to DataFileRewitre is actually a list of files generated in FileScanTaskGen that need to be merged. Similarly, because merging files is a time-consuming operation, it needs to be distributed asynchronously to different task rewrite operators.

Iceberg, which has a commit operation, requires a new snapshot for files after rewrite. Here, Iceberg is also a COMMIT operation, so a single concurrent event like a COMMIT operation is used.

If the commit operation is blocked after the commit operation, the previous write operation will be affected. We will continue to optimize this operation later. Now we have also opened a Design Doc document in Iceberg community to promote the merger and discuss with the community.

3. Flink+Iceberg builds real-time data warehouse

1. Near-real-time data access

As previously introduced, Iceberg supports both read and write separation, concurrent read, incremental read, small file merger, and second to minute delay. Based on these advantages, we try to use Iceberg functions to build a real-time data warehouse architecture based on Flink.

As shown in the figure below, every COMMIT operation of Iceberg changes the visibility of data, such as changing the data from invisible to visible. In this process, near-real-time data recording can be realized.

Figure 14

2. Real-time data storehouse-data lake analysis system

For example, Spark’s offline scheduling task is used to run data, pull data, extract data, and then write data to the Hive table. This process takes a long time. With Iceberg table structure, Flink or Spark Streaming can be used in the middle to complete near-real-time data access.

Based on the above functions, we will review the Kappa architecture discussed above. The pain points of Kappa architecture have been described above. Since Iceberg can be used as an excellent form, supporting both Streaming reader and Streaming sink, Would you consider replacing Kafka with Iceberg?

Iceberg’s underlying storage is cheap storage like HDFS or S3, and Iceberg supports column storage like Parquet, ORC, and Avro. With support for column storage, basic optimization of OLAP analysis can be performed directly in the middle tier. For example, the most basic OLAP optimization strategy of predicate push-down and the Streaming Reader function based on Iceberg Snapshot can greatly reduce the delay from day level to hour level of offline tasks and transform it into a near-real-time data lake analysis system.

Figure 15

In the middle processing layer, presto can be used for some simple queries. Because Iceberg supports Streaming Read, Flink can also be directly connected to the middle layer of the system to perform some tasks of batch processing or Streaming computing. The intermediate results are further calculated and output to the downstream.

■ Advantages and disadvantages of replacing Kafka

In general, the advantages of Iceberg in replacing Kafka mainly include:

  • Realizes the storage layer stream batch unification
  • The middle tier supports OLAP analysis
  • Perfect support for efficient backtracking
  • Storage cost reduction

Of course, there are also some defects, such as:

  • Data latency goes from real-time to near-real-time
  • Interfacing with other data systems requires additional development work

Figure 16

■ Second level analysis – Data lake acceleration

Because Iceberg itself stores all data files in HDFS, HDFS reading and writing can not fully meet our needs for the scene of second-level analysis, so we will support Alluxio as a cache at the bottom of Iceberg. Data lakes can be accelerated with the help of caching capabilities. This structure is also under our future planning and construction.

Figure 17

Best practices

■ Real-time small file merge

As shown in Figure 18, Tencent has realized the complete SQL of Iceberg internally. In fact, we can set some parameters for merging small files in table Properties, such as merging the number of snapshots. The Flink lake task can be started directly with an INSERT statement, the whole task can continue to run, and the datafiles of the background data are automatically merged in the background.

Figure 18

The following picture shows the information of Iceberg data files and corresponding meta files. Because IceberFlinkSink, the open source community, does not have the function of file merging yet, you can try to open a relatively small stream processing task and run it on your own computer. You can see that after the Flink task runs, after a while, the number of files in the corresponding directory jumps.

Figure 19

After using Iceberg’s real-time merging function of small files, it can be seen that the number of files can be controlled at a relatively stable number.

■ Flink real-time incremental reading

Incremental reads of real-time data can be configured to Iceberg’s Table properties parameter and can specify which snapshot to start consumption from. If you specify which snapshot to consume from, only the data added to the latest snapshot will be read each time the Flink task is started.

Figure 20

In this example, the small file merge function is enabled, and finally a Flink sink task is started with SQL.

■ SQL Extension management file

The small file merge function only applies to Flink tasks that run online. The number or size of files generated per commit cycle is not significantly larger than that of offline tasks.

But when a user task to run for a long time, at the bottom of the file may have tens of thousands of, this time directly online with real-time task to merge is obviously not appropriate, and may affect the effectiveness of online real-time tasks, we can through the use of SQL extension to deal with small file to merge, or delete the legacy files, Or an expired snapshot.

In fact, we have already achieved internal management of Iceberg data and data metadata files on disks by using SQL Extension. In the future, we will continue to add more functions to SQL Extension to improve the usability of Iceberg and improve user experience.

Figure 21

4. Future planning

Figure 22

1.Iceberg kernel capability is improved

  • Row-level Delete function. In the process of building the whole data link with Iceberg, what if there is data update? Iceberg currently only supports the update capability of copy on write. Copy on Write has a magnifying effect on writing. If you really want to build a real-time data processing process on the whole link, An efficient merge on Read update capability is still required. This is very important, and we will continue to cooperate with the community in the future. Tencent will also carry out some internal practices to improve the function of row-level DELETE.
  • SQL Extension capability is complete. We will improve the capabilities of SQL Extension.
  • Establish a unified index to speed up data retrieval. At present, Iceberg does not have a unified index to speed up data retrieval. Now we are cooperating with the community, and the community has also proposed a Bloom Filter index capability, which can speed up Iceberg’s ability to retrieve files by building a unified index.

In the kernel improvement of Iceberg, we mainly hope to improve these functions first.

2. Platform construction

In terms of platform construction, we will try to:

  • First, automatic Schema recognition extraction builds tables. It is hoped that this table can be created automatically according to the data Schema information of the front end, which is more convenient for users to use the whole process of data entering the lake.
  • Secondly, more convenient data metadata information management. At present, the meta information of Iceberg is actually naked, which is directly placed on Hive Metastore. If users need to check the data meta information, they actually need to run SQL. We hope to continue to improve it in the construction of platform.
  • Third, build a data acceleration layer based on Alluxio. Hope to use Alluxio to create a data lake acceleration layer function, to facilitate the upper layer to better achieve a second level analysis ability.
  • Fourth, get through with the internal systems. In fact, we also have many internal systems like real-time offline analysis, and we also need to connect our entire platform with various internal systems.