Hive – based offline data warehouse is often an indispensable part of enterprise big data production system. Hive data warehouse has high maturity and stability, but because it is offline, the latency is large. In some scenarios with high delay requirements, it is necessary to build another real-time data warehouse based on Flink to reduce the link delay to the second level. But an offline warehouse plus a real-time warehouse architecture can more than double resource consumption and even lead to repeated development.

Is it necessary to discard existing Hive data bins to build streaming links? No, Flink can realize the existing Hive offline warehouse quasi-real-time. This article is organized from Apache Flink Committer, Alibaba technical expert Li Jinsong’s share, this article will analyze the difficulties of the current offline data warehouse real-time, explain in detail how Flink to solve the Hive flow batch integrated virtual real-time data warehouse, to achieve more efficient and reasonable resource allocation. The outline of the article is as follows:

  1. Difficulties in real-time offline counting warehouse
  2. Flink in stream batch one exploration
  3. The application practice of constructing a flow batch integrated quasi – real-time data warehouse

Difficulties in real-time offline counting warehouse

Offline for warehouse

The figure above is a typical offline data warehouse. Assume that the company has a demand. Currently, the company has a large amount of data and needs to produce a report every day and output it to the business database. The first is the newly stored business data, which can be roughly divided into two types: one is the MySQL binlog, and the other is the business log in the business system. The log log information can be collected by Flume and other tools, and then stored in the data warehouse offline. Then, as the business grows, the individual tables in the business can be abstracted. The benefits of abstraction are better management and more efficient reuse of data and computation. Therefore, the data warehouse is divided into multiple layers (detail layer, middle layer, service layer, etc.), and each layer stores data tables. ETL conversion is implemented between data tables through HiveSQL calculation.

Hive is only static batch computing. Services produce reports every day, which means calculations are performed every day. In this case, scheduling tools and lineage management are required:

  • Scheduling tool: schedules batch computing according to a policy.
  • Blood management: a task is made up by many assignments, there may be very complex table structure level, the whole calculation is a very complex topology, the dependent relationships between homework very complex (to reduce redundant storage and computation, can also have good fault tolerance), only when the end of the level to the next level is calculated.

When the task is very large, we often take a long time to get the result, which is often called T+1, H+1, this is the offline warehouse problem.

Third-party tools

As mentioned above, offline warehouse is more than simple Hive computing. It relies on other third-party tools, such as:

  • Flume is used for warehousing, but there are certain problems. First of all, its fault tolerance may not guarantee the Exactly-Once effect, which requires downstream re-operation. Second, custom logic requires some means, such as script control. Third, offline data warehouse does not have good expansion ability, when the data surge, it is difficult to increase the original number of concurrent.
  • Job scheduling based on scheduling tools will bring cascading calculation delays. For example, the calculation of yesterday’s data started at 1:00 am may be completed at 6:00 or 7:00 am, and it is not guaranteed that the data can be fully ready within the set scheduling time. In addition, cascade computing will also bring complex kinship management issues, and Batch computing of large tasks may suddenly fill the resources of the cluster, so we are required to consider the load management, which will add burden to the business.

Whether offline warehouse or third-party tools, in fact, the main problem or “slow”, how to solve the slow problem, at this time the real-time warehouse appeared.

Number of real-time warehouse

In fact, the real-time data warehouse is changed from Hive+HDFS to Kafka, and the function of ETL is solved by streaming processing of Flink. At this point, there is no scheduling and blood management problems, through real-time incremental updates, the final output to the business DB.

Although the latency is reduced, there are other issues that we face at this point:

  • Historical data loss. Because Kafka is only a temporary storage medium, the data will have a timeout period (for example, only 7 days of data), which will cause our historical data to be lost.
  • The cost is relatively high, and the cost of real-time computing is greater than that of offline computing.

Lambda architecture

Therefore, at this time, many people will choose a set of real-time and offline practices, without interference, according to whether the task needs to follow the real-time needs to separate the requirements.

This architecture seems to solve all the problems, but it actually brings up a lot of problems. First, the Lambda architecture creates a disconnect between offline and real-time solutions to the same business problem, but the two solutions produce different results from the same data source. Table structures at different levels may be inconsistent, and when data inconsistency occurs, a comparison is required.

As the Lambda architecture goes further, development teams, table structures, table dependencies, computing models, and so on are likely to be torn apart, with higher costs and higher costs of unification.

The problem is that the real-time data warehouse consumes so much resources and does not retain historical data. There are so many problems with the Lambda architecture. What is the solution?

The data of lake

Data lake has many advantages, atomicity allows us to achieve quasi-real-time batch streaming, and support the modification operation of existing data. However, the data lake is a new generation of warehouse storage architecture, which is not perfect in every aspect. Existing data lakes rely heavily on Spark(Flink is also embracing data lakes), and moving data to the data lake requires the team to consider migration costs and staff learning costs.

If there is no such determination to migrate data lakes, is there a slightly less drastic solution to speed up existing offline silos?

Flink’s exploration on batch stream integration

Unified metadata

Flink has been working on offline and real-time unification, starting with unified metadata. In simple terms, Kafka table metadata information is stored in the HiveMetaStore, to achieve offline and real-time table Meta unification. (At present, open source real-time computing does not have a relatively perfect MetaStore persistence, Hive MetaStore can not only save offline tables, but also bear the MetaStore capability of real-time computing).

Unified computing engine

With the same metadata, real-time and offline table structures and hierarchies can be designed to be the same, and then shared:

  • Flink itself provides a batch streaming anSI-SQL syntax for the same SET of SQL, which greatly reduces the burden on SQL developers and operators and allows users to focus on business logic.
  • The same engine, Flink flow and reply with a set of optimization and the Runtime framework, at this stage of the big data engine still far less than completely stable, so there are still a lot of time for us to further analysis and optimization, a set of engine allows developers to focus on a single technology stack, avoid to contact multiple technology stack, and only the technical breadth, No technical depth.

The unified data

This paper analyzes the unification of metadata and computing engine, and further, whether it can unify real-time and offline data, avoid data inconsistency, and avoid repeated storage and repeated calculation of data. Can ETL calculation be unified? Since the design of the real-time table can be identical to that of the offline table, can we just have the ETL calculation of the real-time table and the offline table get data from the real-time table?

In addition, real-time links can speed up data preparation for offline links, and batch computing can replace scheduling with streaming input.

Flink Hive/File Streaming Sink to solve this problem, the real-time Kafka table can be synchronized to the appropriate offline table in real time:

  • As the real – time historical data, the offline table fills the vacancy of the real – time historical data.
  • Batch data quasi-real-time intake provides quasi-real-time input for Ad Hoc query of off-line tables.

In this case, the offline batch computing can also be assigned to real-time scheduling. During the real-time task processing, the offline task is automatically scheduled for data synchronization at an opportunity (Partition Commit).

Kafka tables and Hive tables share the same table. My idea is that we might have a table in a warehouse that corresponds to Kafka and Hive+HDFS:

  • When a user performs an INSERT operation, it is automatically inserted into Kafka’s real-time table, and another link is generated, which is automatically synchronized to the Hive table. This makes the table very complete, not only for real-time needs, but also for historical data.
  • An SQL reads such a Hybrid Source and automatically routes it to Hive historical data, or Kafka real-time data, depending on where conditions following your query. Read Hive historical data first and Kafka real-time data second according to certain rules. One idea is Timestamp in data or Kafka.

Implementation of Hive Streaming Sink

StreamingFileSink was already available in Flink 1.11, which not only integrates the Hive Streaming Sink into SQL, but also enables the Hive Streaming Sink to handle all the business logic like offline Hive SQL. And it has brought further increments.

The Hive/File Streaming Sink consists of two components: FileWriter and PartitionCommitter:

  • The FileWriter component is partition-aware and checkpoint mechanism ensures Exactly Once(distributed scenarios are unreliable and require a two-stage commit + midempotency of Rename). FileWriter also provides parameters related to Rolling. Rolling refers to our streaming process, which can control the execution frequency through two parameters: file-size refers to the size of each data stream, and rollover-interval refers to the time interval. Do not set checkpoint too often to create too many small files.
  • Is the Finished Flies directly available after a Committer processes a set of business logic? Typical Hive tables are partitioned tables. When a Partition is ready, you need to notify the downstream that the Partition has been processed and can be synchronized to Hive MetaStore. We need to trigger a specific Partition commit effectively at the appropriate time. Committer writes data and metadata to Hive Partition tables, and even completes Batch jobs after notifying the scheduling system to start execution.

Because streaming jobs are running continuously, how do you set the time when a partition commits, and when a partition commits it?

  • The first is the default policy Process Time, which is what we call the current system time when the event is processed. However, its disadvantages are also obvious, and various data incompleteness may occur.
  • The recommended policy is partition-time, which ensures clear semantics and complete data during submission. The partition field is obtained by event time, that is, the time when the event is generated.

If Current time > partition generation time + commitDelay delay, it is the time at which partition commitDelay can start. A simple example is the hour partition, for example, it is 1 minute past 12, it has passed the 11 o ‘clock partition + 1 hour, so we can say that there is no more data from the 11 o ‘clock partition, so we can submit the 11 o ‘clock partition. (What about LateEvent? So partition commits are also required to be idempotent.

Write SuccessFile and Add partition to Hive MetaStore.

Flink supports both hive-metastore and SuccessFile by setting “sink.partition-commit.policy.kind” to “MetaStore,success-file”, SuccessFile is automatically added to Hive at commit time, and the partition is visible to Hive only after the add operation is complete.

The Custom mechanism allows you to customize a Partition Commit Policy class that can trigger downstream scheduling, Statistic Analysis, or Hive small file merging after the Partition has completed its task processing. Flink will continue to work on merging small files. Flink will continue to work on merging small files.

Real time consumption

In addition to quasi-real-time data ingestion, Flink also brings dimension table associated Hive tables and flow real-time consumption Hive tables.

We know that Flink supports associated query of MySQL and HBase by dimension table, and maintains an LRU cache in calculation, but fails to match query of MySQL or HBase. But what if you don’t have the ability to Lookup? Data is stored in an offline data warehouse. Therefore, Hive tables are used to periodically synchronize data to HBase or MySQL. Flink also allows dimension tables to associate with Hive tables. The current implementation is very simple, requiring that all Hive table data be loaded in each concurrent process. Only small tables can be associated.

Traditional Hive tables can only be read and calculated in batch mode. However, we can now use streaming mode to monitor partition/file generation in Hive. That is, each piece of data can be consumed in real time. Flink Streaming SQL can be used to fully reuse Flink Streaming SQL. It can Join with HBase, MySQL, and Hive Table, and finally write data to Hive Table in real time through FileWriter.

The application practice of constructing a flow batch integrated quasi – real-time data warehouse

The case is as follows: Flume collects Logs and Logs to calculate PV of all ages. At this time, we have two links:

  • A real-time link is used to input access logs and associate the Hive User table to calculate the required results to the service DB.
  • While the other is an offline link, we need Hive to provide hourly partition table to implement ad-hoc query of historical data.

This is what we just mentioned, although it corresponds to two databases: realtime_DB and offline_DB, but they share the same metadata.

Create Hive dialogs in Flink SQL. Create Hive dialogs in Flink SQL. Create Hive dialogs in Flink SQL. Because real-time links have no partitioning concept.

How do I avoid Schema differences caused by partitioning in table structures? A possible solution is to consider introducing the definition of Hidden Partition. The Partition field can be a Computed Column of a field, which can also be compared with the actual common situation, for example, the day or hour is calculated by the time field, followed by the following three parameters:

  • Sink.partition -commit.trigger specifies when to commit a partition, where partition-time is set to ensure exactly-once;
  • Partition. Time-extractor. Timestamp -pattern, how to extract time from partition, equivalent to setting an extraction format;
  • Sink.partition -commit.policy.kind: metastore success-file: metastore success-file: metastore success-file

Create a live table in Kafka. Use insert into to synchronize Kafka data to Hive.

This section describes how a table in Kafka uses Dim join to get the age field of the User table. In the figure, you need to pay attention to the parameter lookup.join.cache. TTL. The user table is broadcast to each task in the same way as broadcast, but the table in Hive may be updated during this process. Here 1h indicates that the data validity period is only 1 hour. The view is created to add the process time required for Dim join. (DimJoin needs to define process time, which is an unnatural process, and we will consider how to simplify DimJoin syntax without breaking SQL semantics.)

Consume Hive tables via real-time Pipeline instead of scheduling or manually triggered batch jobs. The first parameter streaming-source-enable enables the flow processing mechanism. Then use the start-offset parameter to specify which partition/file to start consuming. At this point, the whole flow batch integrated quasi – real-time data warehouse application is basically completed.

The future planning

Hive, as a Table Format for partition-level management, has great limitations in some convenience. If it is a new Table Format such as Iceberg, it will have better support. In the future, Flink will strengthen in the following aspects:

  • Flink Hive/File Streaming Sink’s Auto Compaction(Merging) ability to merge small files is one of the biggest obstacles to real time.
  • Flink embraces Iceberg. Currently, Iceberg Sink has been developed in the community, and Iceberg Source is being promoted. It can be seen that Iceberg can be directly regarded as a message queue in the near future, and it saves all historical data. Achieve true flow batch unification.
  • – Enhanced Flink Batch Shuffle. The current Hash Shuffle has many problems, such as small files, random I/O, and OOM for Buffer management. Subsequent open source Flink (1.12) will strengthen its efforts to introduce SortedShuffle and ShuffleService.
  • Flink Batch BoundedStream support. The old Dataset API can no longer meet the uniform architecture of stream Batch. The community (1.12) will provide Batch calculation capability on DataStream.

About the author:

Jinsong Li, Letter of Flower, Alibaba Technology Specialist, Apache Flink Committer. Since 2014, I have focused on Galaxy stream computing framework within Alibaba. Flink has been developing since 2017, mainly focusing on Batch computation, data structure and types.