As we all know, iQiyi has a large number of videos, thousands of QPS real-time data and T-level data storage generated in the process of video production. To support such data for impromptu query and JOIN of multiple large tables is the difficulty of iQiyi video production team’s big data application.

Specifically, there are the following points:

1) Real-time requirements require real-time solutions.

2) Production data is updated frequently, and OLAP should support updating.

3) Production requires large table Join scheme. Stream attributes (gigabytes, gigabytes) and program attributes (gigabytes, gigabytes) are often analyzed together.

 

In addition, iQiyi video production data has another feature. The data comes from OLTP data center, and its data is persisted in Mongo, message change write Kafka, Kafka: CurData is the current update data, oriData is the historical change data, such structured data provides the possibility of configuration development.

Iqiyi video production team is responsible for the video production of iQiyi, covering all aspects of “material, film, operation flow, picture”, and around the production of construction in Taiwan, monitoring construction, data report construction, etc., in order to improve the efficiency of video production, save editing effort, faster and better output of high-quality video.

In view of the above pain points, iQiyi video production team made a series of efforts. This article will describe the application of ClickHouse in iQiyi video production real-time data warehouse in detail: This section describes how service data is processed by Spark/Spark Streaming computing engine, stored in HBase as dimension table data, joined in real time, and finally written to ClickHouse for impromptu query.

The final construction results are also significant. The development cycle of the original report is shortened from days to hours, which meets the requirements of frequently updated real-time and offline joinable reports.

01

Background and development history

Spark+ClickHouse real-time data warehouse construction scheme is selected based on the historical development stage and data characteristics of iQiyi video production.

With the booming development of various big data technologies, the data business of iQiyi’s video production has gone through two stages.

Early stage 1: The team introduced Hive technology to develop reports based on the company’s internal BabelX offline data synchronization tool.

In stage 1, the disadvantage is that the full data is run every day, the cost is high, the real-time is low, when the latitude field is modified, the whole link has to be modified; ETL is completely dependent on Hive built-in functions, which has low reusability and high operation and maintenance costs.

Early stage 2: With the increase of production data, the performance of visual query provided by Mysql hit a bottleneck, and the requirements for effectiveness increased. Data reporting entered the second stage, and ClickHouse was introduced for real-time report development.

In the process of introducing clickHouse, we looked at other solutions in the industry, such as Druid and Kudu, and concluded that Druid and Kudu performed well with a small number of videos and a large time span. When the number of videos exceeded 10 million, Druid was affected by aggregation, slowed down dramatically, and even timed out. ClickHouse was chosen, and with its engine selection, we also supported frequent data updates.

The disadvantages of this phase are as follows: table linking operations are not supported, the business library only supports JDBC/ODBC types, the Merge engine does not support update, and data loss occurs during Mysql import ClickHouse and Truncate.

On this basis, we improved the system and finally formed the following new architecture system.

02

Spark+ClickHouse Real-time data store

Without further ado, first frame the picture

The overall structure

ClickHouse is a column-oriented database management system (DBMS) for online analytical processing (OLAP) of queries. Developed by Russian IT company Yandex for the Yandex.metrica network analytics service. Allowing real-time analysis of updated data, the system targets high performance and stores detailed data.

Spark is a unified analysis engine for large-scale data processing, efficiently supporting a wide range of computing modes, including interactive query and stream processing. One key feature is that Spark can perform calculations in memory. Even if complex calculations are performed on disks, Spark is more efficient than MapReduce. Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant Streaming of real-time data. Based on microbatch, it has higher latency than other systems based on a “record at a time” architecture, but it also has throughput advantages. Bulk ClickHouse insertion is what ClickHouse advocates.

Combined with Spark/Spark Streaming and ClickHouse features, this solution has obvious advantages:

ClickHouse supports updates and is extremely fast; Spark Streaming microbatch, more suitable for writing to clickHouse.

The specific construction process is mainly divided into three parts.

Off-line data processing

The Spark computing engine is used to import Mongo data to Hive (service library stability is concerned). The Spark computing engine is used to perform ETL processing on Hive data and import ClickHouse data offline.

Real-time data processing

Historical inventory data is processed by writing Mongo data to ClickHouse using the Spark computing engine (executed only once, directly from the business library. The routine import of Hive tables is what we do. The Spark technology engine writes Kafka messages directly to the ClickHouse for real-time data processing. If you don’t need historical stock data, just consume Kafka and import ClickHouse for real-time computing. The real-time architecture is as follows:

Real-time solution flow chart

The ReplacingMergeTree engine resets the Kafka offset forward at least once due to its idempotent nature. Other engines have error data. Unless Kafka can replay all the historical data in Mongo.

The Join demand

In case of Join requirement, because the two tables are currently stored in 100GB, it is too wasteful to use Redis and CB memory, so we finally choose HBase. Use HBase as the latitude table, merge the data in the Spark calculation engine, and write the data into the fact table.

Flow chart of Join scheme for large table

In addition to the above work, here are some considerations:

1. Import ClickHouse in real time, dimension table data must be generated before fact table.

2. For incremental offline or real-time ClickHouse synchronization, ensure that the dimension table data is basically unchanged or the real-time and offline incremental data will change after the dimension table data changes.

3. Otherwise, dimension table changes are not reflected in the ClickHouse output table.

Looking at this, the overall architecture is clear. So how do you choose a ClickHouse engine to support frequent updates?

03

ClickHouse supports frequent updates

In view of the frequent update request, ClickHouse can choose ReplacingMergeTree and VersionedCollapsingMergeTree engine:

ReplacingMergeTree (overwrite updates)

Using ID as the primary key deletes the same duplicate entries.

There is no guarantee that duplicate data will not occur.

VersionedCollapsingMergeTree folded (update)

Added row collapse logic to the block merge algorithm.

There are two options for offline data.

Solution 1: Use the incremental synchronization solution of the ReplacingMergeTree engine. Use the Spark computing engine to routinely synchronize Mongo data to Hive, and then use the Spark computing engine to consume Hive incremental data to write ClickHouse. The advantage is incremental synchronization and low pressure. The disadvantage is that incremental offline synchronization occurs during Join. Ensure that the dimension table data is basically unchanged or the real-time table data changes after the dimension table data changes. Otherwise dimension table changes will not be reflected in the fact table.

Scheme 2 is to use the MergeTree engine full synchronization scheme: The Spark computing engine is used to synchronize Mongo data to Hive periodically, and then Truncate the ClickHouse table. Finally, Spark is used to consume Hive data written to ClickHouse in recent N days. Its advantage is that it can solve the problem of Join time. The disadvantage is that full synchronization only saves data of the last N days, which is stressful.

There are also two options for real-time data.

One way is by using the VersionedCollapsingMergeTree engines of incremental synchronization scheme: The Spark computing engine is used to synchronize Mongo stock data to ClickHouse once, and the Kafka consumption position is reset to synchronize real-time data to ClickHouse. The advantage is that you can use variant SQL to avoid data errors even if there is duplicate data. The disadvantage is that real-time data is strongly dependent on the accuracy of Kafka messages (oriData, currData) provided by OLTP data center, and offline and real-time data connection points cannot be folded.

Solution 2 is to use the incremental synchronization scheme of the ReplacingMergeTree engine: The Spark computing engine is used to synchronize Mongo stock data to ClickHouse once, and the Kafka consumption location is reset to synchronize real-time data to ClickHouse ReplacingMergeTree. Its advantage is compared with VersionedCollapsingMergeTree more simple and off-line and real-time data connection point, there is no exception. The disadvantage is that there is no guarantee that duplicate data will not occur.

Next, we will introduce the accuracy assurance of data.

04

Data accuracy assurance

In terms of accuracy assurance of offline data, we mainly do the following two points.

The first is when the data is rerun offline. If ClickHouse is a Merge engine, rerun will Drop the rerun partition. The Truncate table is truncated before the Spark task is executed.

To ensure the accuracy of real-time data, the offset is not automatically submitted when Spark consumes Kafka. After all the business logic of the micro-batch data is processed, the offset is manually submitted to achieve the purpose of at least one consumption and ensure that data will not be lost. The ClickHouse ReplacingMergeTree engine writes idempotent. Then, for ClickHouse, Merge the partitions that were modified in the most recent time * 2 interval, taking into account server pressure. The current time is 5 minutes. The diagram below:

Automatic Merge diagram

So far, the details of the real-time data warehouse architecture have been basically finished.

05

Configurational development

However, faced with a flood of reporting requirements, each requirement takes several days to develop, which is not only labor-intensive, but also makes it impossible for developers to get away from repetitive work. Considering that iYIqi video production is structured data, this provides the possibility of configuration development.

The entire process uses the program parameter parser – Apache Commons CLI, an open source command line parsing tool. It helps developers build startup commands quickly, and helps you organize commands’ arguments, output lists, and so on.

Parameter parser structure diagram

06

Value and Planning

After the completion of the current construction plan of real-time data warehouse of iQiyi video production, we basically realized the development of code 0, and the original report development cycle was shortened from days to hours. It meets the requirements of frequently updated real-time and offline joinable reports. Currently, four offline report tasks and three real-time report tasks are supported, including one offline Join task and one real-time Join task. The following tasks may be more.

In the future, we will provide page-based operations on iQiyi video production platform and productize synchronization tools. First, we will connect with Hive, HBase, ClickHouse, etc., automatically build tables, and then automate task creation, operation, and monitoring status logic through scheduling. Support and implement new business scenarios through technological innovation, and continue to promote the real-time development of iQiyi’s data and products.

Did you see the heart?

Stamp?????? “Read the original” direct to the job page

Join iQiyi now!

Maybe you’d like to see more

The real-time construction of iQiyi big data ecology \

How to improve link target consistency? Iqiyi short video recommendation coarse layout model optimization process \

Scan the qr code below, more exciting content to accompany you!