The author | zhihu data engineering team

A necessary and basic link of “Data Intelligence” is the construction of Data warehouse. At the same time, Data warehouse is also a basic service that the company will inevitably provide after the development of Data to a certain scale. From the perspective of intelligent business, the results of data represent the feedback of users, so the timeliness of obtaining the results is particularly important. Rapid acquisition of data feedback can help companies make decisions faster and carry out product iterations better. Real-time data warehouse plays an irreplaceable role in this process.

This paper mainly describes zhihu’s real-time data warehouse practice and the evolution of its architecture, including the following aspects:

ETL Logic Real-time, Spark Streaming.

Real-time data warehouse 2.0, topic: data stratification, real-time index calculation, technical solution: Flink Streaming.

Future prospect of real-time data warehouse: Streaming SQL platformization, meta-information management systematization, result acceptance automation.

Real-time data warehouse version 1.0

The real-time data warehouse of version 1.0 mainly does real-time ETL for flow data, does not calculate real-time indicators, and does not establish a real-time data warehouse system. The real-time scene is relatively simple, and the processing of real-time data flow is mainly to improve the service capability of the data platform. The processing of real-time data depends on the collection of data upward, and is related to the query and visualization of data downward. The figure below is the overall data architecture diagram of real-time data warehouse 1.0.

The first part is data collection, which is collected by the three-terminal SDK and sent to Kafka via the Log Collector Server. The second part is the data ETL, which mainly completes the cleaning and processing of raw data and can be divided into real-time and offline import Druid. The third part is data visualization. Druid is responsible for calculating metrics and visualizing data through Web Server and front-end.

Please refer to the first and third parts respectively: Zhihu client buried point process, model and platform technology, Druid and Zhihu data analysis platform. Here we introduce the second part in detail. Since the stability of real-time data flow is not as good as that of offline data flow, when problems occur in real-time data flow, it is necessary to brush historical data with offline data, so lambda architecture is adopted for real-time processing.

Lambda architecture is characterized by high fault tolerance, low latency and scalability. To achieve this design, we split the ETL effort into two parts: Streaming ETL and Batch ETL.

Streaming ETL

In this part, I will introduce the choice of real-time computing framework, the guarantee of data correctness, some common ETL logic in Streaming, and finally introduce Spark Streaming’s stability practice in real-time ETL.

Selection of computing framework

At the beginning of 2016, Storm and Spark Streaming were the most popular real-time computing frameworks in the industry. Storm is a pure Streaming framework, Spark Streaming uses Micro Batch to simulate Streaming computing, the former is more real-time than the latter, the latter has larger throughput and more complete ecosystem than the former, considering zhihu’s log volume and initial requirements for real-time, Spark Streaming is selected as the real-time data processing framework.

Data correctness assurance

Spark Streaming end-to-end Exactly-once requires downstream support for idempotent and upstream support for flow replay. Here we achieve at-least -once in Spark Streaming layer. Normally the data is not heavy enough. However, some data may be retransmitted when the program is restarted. In order to achieve global Exactly-once, we do the downstream deduplication logic, which I will talk about later.

Generic ETL logic

ETL logic is closely related to the data structure of buried points. All of our buried points share the same set of Proto Buffer Schema, as roughly shown below.

message LogEntry {

optional BaseInfo base = 1;

optional DetailInfo detail = 2;

optional ExtraInfo extra = 3;

}
Copy the code

BaseInfo: indicates the basic information in logs, including user information, client information, time information, and network information necessary for sending logs. DetailInfo: The view information in the log, including the current view and the previous view, is used to locate the user. ExtraInfo: Additional information related to a specific service in a log.

For the above three kinds of information, ETL logic is divided into general and non-general types. General logic is related to each business and mainly applies to Base and Detail information, while non-general logic is proposed by the demander for a certain demand and mainly applies to Extra information. Here we list three general logic to introduce, including: dynamic configuration Streaming, UTM parameter resolution, new and old user identification.

Dynamic Configuration Streaming

Since the Streaming task needs to run 7 * 24 hours, but some business logic, such as: A metadata information center, when the metadata changes, need to map the change to use convenient data flow on the downstream data, the change may need to stop Streaming task to update the business logic, but the metadata changes the frequency is very high, and how to inform the program after the metadata changes maintainer is hard. Dynamically configuring Streaming provides us with a solution, as shown in the figure below.

You can use the Streaming Broadcast variable, which changes frequently, as a function similar to a read-only cache, for which you can set a TTL. When the cache expires, the Executor node will request the latest variable from the Driver again. This mechanism allows metadata changes to be mapped to the data stream very naturally, without restarting the task or notifying the maintainer of the application.

UTM parameter parsing

Urchin Tracking Module (UTM) is used to track the source of website traffic. For background information on UTM, please refer to other websites. Here is our complete logic for parsing UTM information.

After traffic data is parsed through UTM parameters, we can easily meet the following requirements:

Look at the traffic from each search engine and what popular search terms that traffic is coming from.

The volume of traffic brought by a marketing activity, such as page views, number of unique visitors, etc.

The situation that links shared from the site are browsed on various sharing platforms (such as wechat and Weibo).

New and old user identification

For Internet companies, growth is an eternal topic, real-time access to the number of new users, for the growth of operation is very important. For example: launch n channels at a time, and if you can get the real-time new users of each channel, you can quickly determine which channels are more valuable. We use the following figure to illustrate how the Streaming ETL identifies old and new users.

The easiest way to determine whether a user is a new user is to maintain a historical user pool and check whether the user exists in the user pool for each log. Due to the huge amount of logs, in order not to affect the processing speed of the Streaming task, we design two caches: Thread Local Cache and Redis Cache, and use HBase as persistent storage to save historical users. Access speed: local memory > remote memory > remote disk. For this task, only 1% of requests are sent to HBase. The log peak is 26w/s, which does not affect the real-time performance of the task. Of course, the capacity of local cache LRUCache and the performance of Redis are also two factors affecting real-time performance.

In addition to the above general scenarios, the Streaming ETL also has some other logic, some of which exist to meet the needs of downstream more convenient use of data, some of which are to repair some errors buried points. In short, the Streaming ETL is in the upstream of index calculation in the whole real-time data warehouse. Has irreplaceable effect.

Stability practice of Spark Streaming in Real-time data warehouse 1.0

Spark Streaming consumption Kafka Data The Direct mode is recommended. We used High Level or Receiver mode in the early stage, and checkpoint function was used. In this way, we need to delete checkpoint when updating the program logic, otherwise the new program logic will not take effect. In addition, because the checkpoint function is enabled, the Streaming task keeps communicating with HDFS, and the jitter of NameNode may cause the jitter of the Streaming task. Therefore, the Direct mode is recommended. For details on the comparison between this mode and Receiver mode, please refer to the official documents.

Ensure that Spark Streaming task resources are stable. Take Yarn as an example. The minimum resources allocated to the queue running the Streaming task are less than the resources required by the task. The task may lose Executor frequently, which slows down the Streaming task. Because the data corresponding to the lost Executor needs to be recalculated, and the Executor needs to be reassigned.

Spark Streaming consuming Kafka requires data flow rate limiting. By default, Spark Streaming reads message queues as fast as possible. When the Streaming task is started again after a long suspension, the amount of data pulled may burst the I/OS of the upstream Kafka cluster and cause the Kafka cluster to be blocked for a long time. The Streaming Conf parameter can be used as a speed limit to limit the maximum pull speed per second.

Spark Streaming needs to be automatically pulled up after the task fails. After running for a long time, Spark Streaming cannot run stably for 7 * 24h. We use the Supervisor to manage the Driver process, and the Driver process will no longer exist when the task dies. At this point, the Supervisor will re-pull the Streaming task.

Batch ETL

Next, we will introduce the second part of the Lambda architecture: Batch ETL. In this part, we need to solve the problems of data landing, offline ETL, Batch data import Druid, etc. For data landing, we developed the MapReduce task Batch Loader, and for data Repair, we developed the offline task Repair ETL. The offline Repair logic and real-time logic share a set of ETL Lib. For Batch importing ProtoParquet data to Druid, We extended Druid’s import plugin.

Repair ETL

There are two KafKas in the data architecture diagram. The first Kafka stores the original log, and the second Kafka stores the log after real-time ETL. We ground all the data of the two KafKas to ensure the stability of the data link. Because there are a lot of business logic in real-time ETL, the logic of unknown demand may bring security risks to the whole traffic data. However, the upstream Log Collect Server does not have any business logic and is only responsible for sending and receiving logs. Compared with the data of the first Kafka, the data is much safer and more stable. Repair ETL is not always enabled. It is enabled only when real-time ETL data is lost or a logical error occurs.

Batch Load 2 HDFS

As mentioned above, all of our buried points share the same set of Proto Buffer Schema and all data transmission formats are binary. We developed BatchLoader, a MapReduce task that drops Kafka PB data to HDFS. This task is responsible for data deduplication in addition to landing data. In the Streaming ETL stage we achieve at-least-once, and we achieve global Exactly-once through the BatchLoader here. BatchLoader not only supports landing data and data deduplication, but also supports multiple directory partitions (P_date/P_hour/p_PLAform /p_logtype), data playback, and dependency management (there was no unified scheduler in the early days). So far, BatchLoader has logged more than 40 Kakfa topics.

Batch Load 2 Druid

Druid is imported in real time through Tranquility. In this mode, a time window is mandatory. If the upstream data delay exceeds the window value, data outside the window is discarded. To fix this error, we use Druid to initiate an offline MapReduce job that periodically redirects data from the previous period. Through the Batch import here and the real-time import above, the Lambda architecture of the real-time data warehouse is implemented.

Several shortcomings of real-time warehouse 1.0

So far we have covered several modules of the Lambda real-time data warehouse. The 1.0 real-time data warehouse has the following shortcomings:

All traffic data is stored in the same Kafka Topic. If each downstream line of business is consumed, the full volume of data will be consumed multiple times. Kafka’s outgoing traffic is too high to meet this demand.

Druid handles queries from both real-time and offline data sources. As the volume of data explodes, Druid’s stability deteriorates dramatically, resulting in unstable output of core reports for each business.

Because each service uses the same traffic source to configure reports, the query efficiency is low and data isolation and cost calculation cannot be performed for services.

Real-time data warehouse version 2.0

As the volume of data skyrocketed, the traffic data source in Druid frequently timed out and businesses began to consume more real-time data. Continuing with the real-time warehouse 1.0 architecture required significant additional costs. Therefore, on the basis of real-time data warehouse 1.0, we established real-time data warehouse 2.0, sorted out the new architecture design and started to establish the real-time data warehouse system. The new architecture is shown in the figure below.

The original layer

In version 2.0, we added the processing of the change log of the business library. The Binlog log is at the library level or Mysql instance level in the original layer, that is: Change logs for a library or instance are stored in the same Kafka Topic. At the same time, with the development of the company’s business, new apps are constantly produced. In the original layer, not only zhihu logs are collected, but also buried data of Zhihu Extreme edition and internal incubation projects. Buried data of different apps still use the same SET of PB Schema.

Detail layer

The detail layer is our ETL layer, and the data in this layer is obtained from the original layer through Streaming ETL. Among them, the processing of Binlog log is mainly to complete the splitting of library or instance log to table log, and the traffic log is mainly to do some general ETL processing. Because we use the same PB structure, the logic code of data processing for different apps can be completely reused, which greatly reduces our development cost.

The detailed summary of the summary layer

The detail summary layer is obtained by the detail layer through ETL and mainly exists in the form of wide tables. The summary of service details is obtained from the service fact sheet and dimension table Join, and the summary of traffic details is obtained from traffic logs split by service line and traffic dimension Join. Traffic splitting by service can meet real-time consumption requirements of each service. Traffic splitting is automated. The following figure shows the process of automatic traffic data splitting.

The Streaming Proxy is a traffic distribution module. It consumes the full data after upstream ETL and reads the embedded meta information periodically. It “joins” the traffic data with the meta information data to achieve traffic splitting logic by service, and performs ETL processing on the traffic by service. As long as a buried point is added to the embedded meta information, the data corresponding to the buried point is automatically cut into the Kafka of the service. Finally, the data in the Kafka of the service is independent of the current service and has been processed by the common ETL and the service ETL, which greatly reduces the cost of using data for each service.

Summary of indicators at the summary level

The index summary layer is obtained by the detail layer or the detail summary layer through aggregation calculation. This layer produces the vast majority of real-time data warehouse indicators, which is also the biggest difference with real-time data warehouse 1.0. Zhihu is a platform for content production. We can summarize business indicators from the perspective of content and users. From the perspective of content, we can make real-time statistics on the number of “likes”, “attention” and “favorites” of content (answers, questions, articles, videos and ideas). From the perspective of users, I can count the number of fans, answers, questions and other indicators of users in real time. Traffic indicators are summarized into service indicators and global indicators. To various business indicators, we can real-time statistics page, search, video exposure, ideas, and other business card number, card, clicks, CTR and so on, a global index summary we mainly real-time session, real-time statistics the number of PV within a session of exposure, card number, clicks, the depth, browsing session time indicators.

Storage selection of indicator summary layer

Different from the detail layer and detail summary layer, the indicator summary layer needs to store the real-time calculated indicators for use by the application layer. Based on different scenarios, HBase and Redis are selected as storage engines with real-time indicators. Redis scenarios mainly meet the requirements of high OPS with Update operations, such as: Real-time statistics of the cumulative PV number of all content (questions, answers, articles, etc.) of the whole site, because browsing content generates a large number of PV logs, which may be as high as tens of thousands or hundreds of thousands per second, the PV of each piece of content needs to be accumulated in real time. In this scenario, Redis is more appropriate. The HBase scenario mainly meets the requirements of high frequency Append operations, low frequency random reads, and a large number of indicator columns. For example, the number of likes, followers, and favorites of all content is collected every minute. The performance and storage capacity of the aggregated result row Append is not affected. However, there may be a storage bottleneck for Redis in this case.

Index calculation combines index system and visualization system

Indicator caliber management depends on indicator system, and indicator visualization depends on visualization system. We explain how to connect the three through the requirements development process in the figure below.

  1. After sorting out the demand documents, the demander puts forward the demand to the data warehouse engineer and makes an appointment for a meeting to review the demand. The demand documents must contain the calculation caliber of indicators and the corresponding dimension of indicators.

  2. Warehouse engineers will review the demand according to the demand document. If they fail to pass the review, they will return to the demander to further sort out the demand and raise the demand again.

  3. After passing the requirement review, the warehouse engineer starts to schedule the development

  4. Create a data source in the visual system for configuring real-time reports. Creating a data source means creating an HBase table in HBase.

  5. Create an indicator column for the data source, that is, create a column in the HBase column family. When creating an indicator column, the indicator information is input into the indicator management system.

  6. For this data source, bind the dimension table, which is used to select dimension values in the later configuration of multidimensional reports. If the dimension table to be bound already exists, bind it directly; otherwise, import the dimension table.

  7. After a complete data source is created, data warehouse engineers can develop real-time applications through which multidimensional indicators are written to the created data source in real time.

  8. The demander directly configures real-time reports based on created data sources.

The application layer

The application layer primarily uses summary layer data to meet business requirements. The application layer is mainly divided into three parts: 1. Real-time visualization can be made by directly reading index summary data to meet the solidified real-time report demand, which is borne by the real-time market service; 2. 2. Summary data of direct consumption details of recommendation algorithms and other businesses to make real-time recommendation; 3. Summarize detailed real-time ingestion data to Druid through the Druid program to meet real-time multidimensional AD hoc analysis requirements.

Implementation of real-time data warehouse 2.0

Compared with Spark Streaming as the main implementation technology in REAL-TIME database 1.0, Flink is used as the main computing framework for index summary layer in real-time database 2.0. Flink has more obvious advantages over Spark Streaming, mainly embodied in: low latency, Exactly-once semantic support, Streaming SQL support, state management, rich time type and window calculation, CEP support, etc.

We mainly use Flink Streaming SQL as the implementation scheme in real-time warehouse 2.0. Using Streaming SQL has the following advantages: easy platformization, high development efficiency, low dimensional cost. At present, Streaming SQL also has some defects: 1. Hive SQL syntax is different from Hive SQL syntax. 2.UDF is not as rich as Hive and UDF is written more frequently than Hive.

Progress made in Real-time warehouse 2.0

At the detail summary layer, traffic segmentation meets the requirements of real-time consumption logs of each service. Currently, the number of services that have completed traffic sharding has reached 14+. Since each service consumes the traffic after sharding, the Kafka outbound traffic has decreased by an order of magnitude.

The core real-time report of each business can stabilize the output. As the core report is directly calculated by the data warehouse, the visual system directly reads the real-time results, ensuring the stability of the real-time report. At present, many businesses have real-time market, and the real time expression is 40+.

Improved stability of AD hoc queries. Metrics calculations for core reports were moved to data warehouses, Druid was only responsible for AD hoc queries, and the need for multidimensional analysis classes was met.

Costing requirements have been addressed. Since each service has independent data sources and each core market is handled by different real-time programs, storage and computing resources used by each service can be easily counted.

Future prospects of real-time data warehouse

From real time warehouse 1.0 to 2.0, we have accumulated more depth and breadth in both data architecture and technical solutions. With the rapid development of the company’s business and the birth of new technology, real-time data warehouse will continue to iterate and optimize. In the short term, we will further improve the service capability of real-time data warehouse from the following aspects:

  1. Streaming SQL platformization. At present, the Streaming SQL task is submitted in the way of package of code development maven, and the development cost is high. In the later period, with the online of Streaming SQL platform, the development mode of real-time data warehouse will be changed from Jar package to SQL file.

  2. Real-time metadata information management system. The meta information management of logarithmic warehouse can greatly reduce the cost of using data. The meta information management of offline warehouse has been basically perfected, while the meta information management of real-time warehouse has just begun.

  3. Real-time data warehouse results acceptance automation. The acceptance of real-time results can only be compared with offline data indicators. Hive and Kafka data sources are taken as examples to execute Hive SQL and Flink SQL respectively, and the statistical results and comparison are consistent to realize the automatic acceptance of real-time results.

Author’s brief introduction

Data engineering team is one of the core teams of Zhihu Technology Center, which is mainly composed of 31 excellent engineers from four sub-teams: data platform, basic platform, data warehouse and AB Testing.

For more information, please visit the Apache Flink Chinese community website