The author | 39

This article is adapted from the Flink Meetup conference held in Shenzhen on April 13, 2019. Zhang Jun is currently the head of OPPO big data platform research and development, and also an Apache Flink contributor. The main contents of this paper are as follows:

  • Evolution of OPPO real-time data warehouse;
  • Extension work based on Flink SQL;
  • The application case of constructing real-time data warehouse;
  • Thinking and prospect of future work.

I. Evolution of OPPO real-time data warehouse

1.1.OPPO business and data scale

We all know that OPPO is a smart phone company, but we don’t know how OPPO is related to the Internet and big data. The following figure provides an overview of OPPO’s business and data:

OPPO, as a mobile phone manufacturer, has customized its Own ColorOS system based on Android and currently has more than 200 million daily active users. OPPO has built many Internet applications around ColorOS, such as the app store, browser, and news stream. In the process of operating these Internet applications, OPPO has accumulated a large amount of data. On the right side of the figure is the evolution of the overall data scale: since 2012, the growth rate has been 2 to 3 times every year, and up to now, the total data volume has exceeded 100PB, and the daily increase of data volume has exceeded 200TB. To support such a large amount of data, OPPO developed a complete set of data systems and services, and gradually formed its own data center system.

1.2.OPPO data center

This year everyone is talking about data center, how does OPPO understand data center? We have divided it into four levels:

  • The lowest level is the unified tool system, covering the “access – governance – development – consumption” full data link;
  • Based on the tool system, the data warehouse is divided into “original layer – detail layer – summary layer – application layer”, which is also the classical data warehouse architecture.
  • And then we have global data systems. What is global? All the company’s service data is integrated to form unified data assets, such as ID-mapping and user tags.
  • Ultimately, for data to be used by businesses, scenario-driven data products and services are needed.

The above is the whole system of OPPO data center, and data warehouse is in a very basic and core position.

1.3. Build OPPO offline data warehouse

Over the past two or three years, our focus has been on building offline warehouses. The figure above roughly describes the whole construction process: First, data sources are basically mobile phones, log files and DB databases. We built a high availability and high throughput access system based on Apache NiFi, and uniformly dropped the data into HDFS to form the original layer. Then, hour-level ETL and day-level summary Hive tasks based on Hive are responsible for computing and generating detail layer and summary layer respectively. Finally, the application layer is based on OPPO’s internal research and development of data products, mainly report analysis, user portrait and interface services. In addition, the details layer in the middle supports presto-based AD hoc queries and self-help data extraction. With the gradual improvement of offline data warehouse, the demand for real-time data warehouse is becoming more and more intense.

1.4. The appeal of real-time data warehouse

For the demand of real-time warehouse, we usually look at it from a business perspective, but in fact, from the perspective of the platform, real-time can also bring real benefits. First of all, from the business side, reports, labels, interfaces, etc., all have real-time application scenarios. See several cases on the left of the figure respectively. Secondly, for the platform side, we can see from three cases: first, a large number of OPPO batch tasks are started from 0, and all of them do data processing in the way of T+1, which will lead to the concentrated outbreak of computing load, putting great pressure on the cluster; Second, label import also belongs to a T+1 batch task, and each full import will take a long time. Third, the monitoring of data quality must be T+1, leading to the failure to timely find some problems in data.

Since both the business side and the platform side have this demand for real-time, how does OPPO build its own real-time data warehouse?

1.5. Smooth migration from offline to real-time

Both a platform and a system can not be separated from two layers: the upper layer is API, which is user-oriented programming abstraction and interface; At the bottom is Runtime, which is the kernel oriented execution engine. What do we mean by wanting the migration from offline to live to be smooth? From the API layer, the abstraction of data warehouse is Table, and the programming interface is SQL+UDF. Offline data warehouse era users have been used to such API, and it is best to keep the same after moving to real-time data warehouse. At the Runtime level, the computing engine evolved from Hive to Flink, and the storage engine evolved from HDFS to Kafka.

Based on the above ideas, we only need to transform the offline warehouse pipeline mentioned before to get the real-time warehouse pipeline.

1.6. Build real-time OPPO data warehouse

As you can see from the figure above, the whole pipeline is basically similar to the offline warehouse, except that Hive is replaced by Flink and HDFS is replaced by Kafka. From the point of view of the overall process, the basic model remains unchanged and is composed of cascade calculation of the original layer, detail layer, summary layer and application layer.

Therefore, the core problem here is how to build this pipeline based on Flink. Here is a look at some of the work we did based on Flink SQL.

Two. Based on Flink SQL extension work

2.1. According to the Flink SQL

First, why Flink SQL? The following figure shows the basic structure of the Flink framework. At the bottom is Runtime. We think the core advantages of this execution engine are four: first, low latency and high throughput; Second, end-to-end Exactly-once; Third, fault-tolerant state management; Fourth, Window & Event time support. There are three levels of API abstraction based on Runtime, with SQL at the top.

What are the advantages of the Flink SQL API? We also looked at it from four perspectives: first, standards that support ANSI SQL; Second, it supports rich data types and built-in functions, including common arithmetic operations and statistical aggregation. Third, the Source/Sink can be customized, based on which the upstream and downstream can be flexibly expanded; Fourth, batch stream unification, the same SQL, can run offline can also run in real time.

So how do you program based on the Flink SQL API? Here’s a simple demo:

The first step is to define and register the input/output tables. Here we create two Kakfa tables that specify what kafka version is and which topic it corresponds to. Next is registering a UDF, the definition of a UDF is not listed here for space reasons; The last is to execute the real SQL. As you can see, with all the coding that goes into executing the SQL, this is not the interface we want to expose to the user.

2.2. Web-based development IDE

As mentioned earlier, the abstraction of data warehouse is Table, and the programming interface is SQL+UDF. The programming interface provided by the platform should be similar to the one shown in the preceding figure. Those who have used HUE to perform interactive query should be familiar with the interface. The menu on the left is the Table list, and the SQL editor on the right is where you can write SQL directly and submit it for execution. To achieve such a way of interaction, Flink SQL default is not able to achieve, there is a gap in the middle, summed up in two points: first, metadata management, how to create a library table, how to upload UDF, so that later in SQL can be directly referenced; Second, the management of SQL jobs, how to compile SQL, how to submit jobs. In the process of technical research, we found that Uber opened the AthenaX framework in 2017.

AthenaX: ReST-based SQL manager

AthenaX is a REST-based SQL manager. How does it manage SQL jobs and metadata?

For SQL Job submission, there is an abstraction of the Job in AthenaX that encapsulates information such as the SQL to be executed and the Job resources. All jobs are hosted by a JobStore, which periodically matches the Running App in YARN. If they are inconsistent, the corresponding Job is submitted to YARN.

For metadata management, the core problem is how to inject externally created library tables into Flink so that they can be recognized in SQL. In fact, Flink itself reserves the ability to interface with external metadata, providing two abstractions, ExternalCatalog and ExternalCatalogTable, respectively. AthenaX then encapsulated a TableCatalog and extended the interface level to some extent. At the SQL job submission stage, AthenaX would automatically register TableCatalog with Flink, then call the interface of Flink SQL to compile the SQL into The executable unit of Flink, JobGraph, and finally submit it to YARN to generate a new App.

AthenaX defined the TableCatalog interface, but did not provide a directly usable implementation. So, how do we implement this in order to connect to our existing metadata system?

2.4.Flink SQL registry table process

First, we need to figure out how the library tables are registered inside Flink SQL. The whole process involves three basic abstractions: TableDescriptor, TableFactory, and TableEnvironment.

A TableDescriptor, as the name implies, is a description of a table. It consists of three sub-descriptors. The first is Connector, which describes the source of data, such as Kafka and ES. The second is Format, which describes the Format of data, such as CSV, JSON, avro, etc. The third is Schema, which describes the name and type of each field. There are two basic implementations of TableDescriptor — ConnectTableDescriptor is used to describe internal tables, that is, tables that are created programmatically; ExternalCatalogTable Describes external tables.

So once you have a TableDescriptor, then you need the TableFactory to instantiate the Table based on the description. Different descriptions require different TableFactories to handle them. How does Flink find a matching TableFactory implementation? In fact, in order to ensure the extensibility of the framework, Flink uses the Java SPI mechanism to load all declared TableFactories, and iterate to find which TableFactory matches the TableDescriptor. The TableDescriptor is converted to a map before being passed to the TableFactory, and all the description information is expressed in key-value form. TableFactory defines two methods for filtering matches — requiredContext(), which checks whether the values of certain keys match, such as connector.type is kakfa; SupportedProperties (), which tests whether the key can be recognized. If no other key is recognized, the file cannot be matched.

Once the correct TableFactory is matched, the next step is to create the actual Table and register it with the TableEnvironment. A Table can be referenced in SQL only after it is successfully registered.

2.5.Flink SQL connects to external data sources

Figuring out the Flink SQL registry table process gives us the idea that if tables created by external metadata can also be converted into a map that TableFactory can recognize, then they can be seamlessly registered into the TableEnvironment. Based on this idea, we realized the connection between Flink SQL and the existing metadata center, and the general process is shown in the following figure:

All tables created through the metadata center will store metadata information in MySQL. One Table is used to record basic Table information, and the other three tables record description information after Connector, Format, and Schema are converted into key-value respectively. The reason for splitting into three tables is to be able to update the three descriptions independently. Next comes a custom implementation of ExternalCatalog that reads the four MySQL tables and transforms them into a map structure.

2.6. Real-time table-dimension table association

So far, our platform has the capabilities of metadata management and SQL job management, but there is one fundamental feature that is missing before it is truly open to users. The star model is unavoidable through the way we construct the stack. Here’s a simple example: The fact table in the middle records the AD click flow, surrounded by dimension tables about users, ads, products, and channels.

Suppose we have an SQL analysis and need to associate the clickflow table with the user dimension table. How would this be implemented in Flink SQL? We have two implementations, one based on UDF and one based on SQL transformation.

2.7. Udf-based dimension table association

The first is the UDF-based implementation, which requires the user to rewrite the original SQL into THE SQL with the UDF call, in this case userDimFunc, whose code implementation is shown on the right in the image above. UserDimFunc inherits from Flink SQL abstraction TableFunction, which is one of the UDF types that can convert any row of data to one or more rows of data. To implement the dimension table association, data of the dimension table must be fully loaded from MySQL during UDF initialization and cached in memory cache. For each subsequent row, TableFunction calls eval(), which looks up the cache based on the user_id. Of course, it is assumed that the dimension table data is relatively small. If the data amount is large, it is not suitable for full loading and caching, so it will not be expanded here.

Udf-based implementation is not friendly to users and platforms. Users need to write strange SQL statements, such as LATERAL TABLE in the figure. The platform needs to customize a specific UDF for each associated scenario, which is costly to maintain. Is there a better way? Let’s look at the implementation based on SQL transformation.

2.8. Dimension table association based on SQL transformation

We want to solve the problems of udF-based implementations where users don’t have to rewrite the original SQL and the platform doesn’t have to develop a lot of UDFs. One idea is whether a layer of SQL parsing and rewriting can be added before SQL is compiled by Flink to automatically realize the association of dimension tables. After some technical research and POC, we found that it is feasible, so it is called the implementation based on SQL transformation. Here’s an explanation of this idea.

First, the added SQL parsing is to identify the presence of predefined dimension tables in the SQL, such as user_DIM in the figure above. Once the dimension Table is identified, the SQL rewriting process will be triggered to rewrite the join statement marked with red box into a new Table. How to obtain the Table? As we know, the concept of “stream table duality” has been developed in the field of stream computing in recent years, and Flink is also a practitioner of this concept. This means that Stream and Table can be converted to and from each other in Flink. We convert the Table from ad_clicks to Stream, call flatmap to form another Stream, and then convert back to Table to get ad_clicks_user. Finally, how does flatMap implement dimension table association?

The flatmap operation on a Stream in Flink actually performs a RichFlatmapFunciton, calling its flatMap () method for each row of data. So, we can customize a RichFlatmapFunction to load, cache, find and associate dimension table data, similar to udF-based TableFunction implementation.

Since the implementation logic of RichFlatmapFunciton is similar to that of TableFunction, why is this implementation more general than the UDF-based approach? The core point is that there is a layer of SQL parsing, you can obtain the information of the dimension table (such as dimension table name, associated fields, SELECT fields, etc.), and then encapsulate into JoinContext and pass it to RichFlatmapFunciton, so that the expression ability is universal.

Two. The construction of real-time data warehouse application case

Here are a few typical applications implemented on our platform using Flink SQL.

3.1. Real-time ETL resolution

Here is a typical real-time ETL link, splitting the small table corresponding to each business from the large table:

The biggest data source of OPPO is the buried point of mobile phone. The data from mobile phone APP has a feature that all data are reported through several unified channels. Because it is impossible to upgrade clients and add new channels every time there is a new burial point. For example, we have a SDk_log channel, to which all the buried points of the APP report data, resulting in a huge original layer table corresponding to this channel, dozens of TERabytes a day. But in reality, each business only cares about its own piece of data, which requires ETL splitting at the original layer.

This SQL logic is relatively simple. It does nothing more than filter some business fields and insert them into different business tables. It features multiple rows of SQL that are eventually merged into a single SQL submitted to Flink for execution. The concern is, if you have 4 SQL statements, will you read the same data 4 times? In fact, some optimizations are made during Flink’s SQL compilation phase, because the end point is the same Kafka topic, so the data is only read once.

In addition, the same Flink SQL, which we used for both offline and real-time data warehouse ETL split, falls into HDFS and Kafka respectively. Flink itself supports HDFS writing Sink, such as RollingFileSink.

3.2. Real-time indicator statistics

Here is a typical case of calculating CTR of information flow, respectively calculating exposure and click times within a certain period of time, dividing to get the click rate, importing it into Mysql, and then visualizing it through our internal reporting system. This SQL has the following characteristics: Windows and subqueries.

3.3. Real-time label import

Here is an example of real-time label import. The mobile terminal senses the longitude and latitude of the current user in real time, converts it into a specific POI and imports ES, and finally conducts user orientation on the label system.

The feature of this SQL is the use of AggregateFunction. In a 5-minute window, we only care about the latitude and longitude reported by the user last time. AggregateFunction is a UDF type that is usually used to aggregate statistics of metrics, such as the calculation of sum or average. In this example, since we only care about the latest latitude and longitude, we can replace the old data each time.

Iv. Thinking and prospect of future work

Finally, I would like to share with you some thoughts and plans about our future work. They are not mature yet, so we can discuss them with you.

4.1. End-to-end real-time stream processing

What is end-to-end? One end is the collected original data, the other end is the presentation and application of data such as reports/labels/interfaces, and the intermediate real-time flow is connected at both ends. The source table is Kafka, the target table is Kafka, and the source table is Kafka. The source table is Kafka, and the target table is Kafka. This is designed to improve the stability and availability of the overall process. First, Kafka acts as a buffer for downstream systems, avoiding the impact of downstream system anomalies on real-time flow calculations (one system remains stable, which is more likely than multiple systems being stable at the same time). Secondly, kafka to Kafka real-time flow, exactly-once semantics is relatively mature, consistency is guaranteed.

However, this end-to-end process is actually done in three separate steps, each of which may require a different role: data developer for data processing, engine developer for data import, and product developer for data capture.

Can our platform automate end-to-end processing, import, and capitalization with a single SQL submission? In this way, data development is no longer a Kafka Table, but a scenario-oriented presentation Table/label Table/interface Table. For example, when creating a display table, the platform will automatically import real-time streaming data from Kafka into Druid, import the Druid data source into the report system, and even automatically generate report templates.

4.2. Lineage analysis of real-time stream

As for blood analysis, friends who have done offline data warehouse are well aware of its importance, which plays an indispensable key role in data governance. The same is true for real-time positions. We want to build an end-to-end relationship, from the access channel of the acquisition system, through the real-time tables and real-time jobs that flow through, to the products that consume the data. Based on the analysis of blood relationship, we can evaluate the application value of data and calculate the calculation cost of data.

4.3 offline – real-time data warehouse integration

The last direction is the integration of offline real-time data warehouse. We believe that in the short term, real-time data warehouse cannot replace offline data warehouse, and the coexistence of both is the new normal. In the era of offline data warehouse, how to adapt the tool system we have accumulated to real-time data warehouse, and how to realize the integration of offline and real-time data warehouse management? Theoretically, the data sources are the same, and the upper level abstractions are both Table and SQL, but they are also different in nature, such as time granularity and calculation patterns. We are also exploring and thinking about what needs to be done to achieve full integration of data tools and products.