With the rapid development of Didi’s business, business has an increasingly high demand for data timeliness. With the continuous development and maturity of real-time technology, Didi has also made a lot of attempts and practices on real-time construction. This paper mainly takes the business of Hitch as an introduction to explain didi’s work and share its experience in the construction process from different aspects of engine, platform and business.

1. Purpose of real-time warehouse construction

As the development of the Internet enters the second half, the timeliness of data is becoming more and more important for the refined operation of enterprises. Shopping malls, like battlefields, are producing massive data every day. How to effectively dig out valuable information in real time is of great help to enterprises’ decision-making and operation strategy adjustment.

Secondly, from the perspective of intelligent business, data results represent user feedback, so the timeliness of obtaining results is particularly important. Rapid acquisition of data feedback can help companies make faster decisions and better product iterations. Real-time data warehouse plays an irreplaceable role in this process.

1.1 Solve the problem of traditional counting warehouse

From the current situation of data warehouse construction, real-time data warehouse is a confusing concept. According to traditional experience analysis, data warehouse has an important function, that is, it can record history. Typically, a warehouse is expected to have data from the first day of operation, and then to the present day. But real time stream processing technology, it is the emphasis on the current state of a processing technology, combined with the current line of giant construction experience and the experience in the field of construction situation, we try to build the real-time number of positions in the company’s purpose for positioning, with several warehouse construction theory and technology, real-time solution because of the low current offline for warehouse data timeliness cannot solve the problem.

The main reasons for building real-time data warehouse at this stage are as follows:

  • The business of the company is more and more urgent to have real-time data to assist in making decisions
  • Real-time data construction is not standardized, data availability is poor, can not form a warehouse system, a large amount of resources waste
  • Data platform tools are becoming more mature to support overall real-time development, reducing development costs

1.2 Application scenarios of real-time data warehouse

  • Real-time OLAP analysis: OLAP analysis itself is a key problem to be solved in the field of data warehouse. Based on the Stream SQL tool based on Flink computing engine provided by the company’s big data architecture team, Kafka and DDMQ (didi self-developed) and other message middleware, OLAP databases, such as Druid and ClickHouse, improve the timeliness of data warehouses and enable better real-time data analysis.
  • Real-time data Kanban: this kind of scene is the main demand scene of the real-time side of the company, such as the real-time large-screen curve display of the order and voucher expenditure of “National carpooling Day”, the minus-level core index data display of the order side of the new Kaesong day, and the real-time effect display of resource input and income of growth projects, etc.
  • Real-time business monitoring: Didi Chuxing needs to be able to monitor a large number of core business indicators in real time, such as security indicators, financial indicators, complaints and incoming indicators, etc.
  • Real-time data interface service: because there are many barriers to business between various lines of business, it led to several warehouse development is hard to familiar with the company in all lines of business, with various lines of business related departments collaborate in data processing and data acquisition aspects, several positions by providing real-time data interface service way, provide data support to the business side.

2. Examples of didi Hitch real-time data warehouse construction

Data within the company, our team had the opportunity to further cooperation and business lines and lift in meet the demand of real-time data business party at the same time, improve the real-time number of warehouse, through multiple iterations, basically meet the lift business in real-time side of all kinds of business requirements, preliminary set up lift real-time warehouse, completed the overall data layering, contains detailed data and summary data, The DWD layer is unified, big data resource consumption is reduced, data reuse is improved, and rich data services can be output externally.

The specific structure of the warehouse is shown in the figure below:

From the data architecture diagram, there are many similarities between the free ride real-time data warehouse and the corresponding offline data warehouse. For example, hierarchical structure; For example, the ODS layer, the detail layer, the summary layer, and even the application layer may all name the same schema. However, a careful comparison reveals that there are many differences between the two:

  • Compared with the offline warehouse, the real-time warehouse has fewer layers

  • Offline for several positions from the current construction experience, several data warehouse detail content is very rich, with detail data generally will also contain the concept of light aggregate layer, application layer data in several positions in offline for several other storehouse, but live in several storehouse, the app application layer data already fell into the application system of the storage medium, can put the number of layers and warehouse table separation.

  • Advantages of less application layer construction: When data is processed in real time, data must be delayed at each layer.

  • Advantages of less summary layer construction: In summary statistics, in order to tolerate some data delay, some artificial delay may be created to ensure the accuracy of data. For example, when collecting the data of a trans-sky order event, it may wait until 00:00:05 or 00:00:10 to ensure that all the data before 00:00 has been received. Therefore, too many levels of aggregation layer can increase the artificial delay of data.

  • The data source storage of real-time data warehouse is different from that of offline data warehouse

  • In the construction of offline data warehouse, the entire offline data warehouse in Didi is currently built on Hive tables. However, in the construction of real-time data warehouse, the same table will be stored in different ways. For example, detailed data or summary data are stored in Kafka, but dimension information such as cities and channels needs to be stored in Hbase, MySQL, or other KV databases.

Next, according to the hitch real-time data warehouse architecture diagram, the construction of each layer is carried out in detail:

2.1 Construction of ODS paste source layer

According to the specific scenarios of hitch, current hitch data sources mainly include binlog logs related to orders, bubbling and security-related public logs, and flow-related buried point logs, etc. Some of these data have been collected and written into Kafka or DDMQ data channels, and some of the data need to be collected with the help of internal self-developed synchronization tools, and finally written into Kafka storage media based on the oDS layer construction specifications of the tailwind warehouse.

** Naming conventions: **ODS layer real-time data sources mainly include two types.

  • One is DDMQ or Kafka Topic that has been automatically produced during offline collection. This type of data is named as cn-binlog- database name – database name eg: cn-binlog-ihap_fangyuan-ihap_fangyuan
  • The ODS layer uses: realtime_ODs_binlog_ {source system library/table name}/ ODs_log_ {log name} eg: realtime_ods_binlog_ihap_fangyuan

2.2 DWD detail layer construction

Based on the characteristics of each specific business process, the most fine-grained detail layer fact table is constructed according to the modeling driven business process of hitch. In combination with the offline data usage characteristics of hitch car analysts, some important dimension attribute fields of the detailed fact table are appropriately redundant, and the wide table processing is completed. Then, based on the current demand for real-time data of hitch car business parties, several modules such as transaction, finance, experience, security and flow are mainly constructed. Data in this layer comes from ODS layer, and ETL work is completed through Stream SQL provided by big data architecture. The processing of binlog logs is mainly for simple data cleaning, processing of data drift and data out of order, and possibly Stream Join to multiple ODS tables. For traffic logs, it mainly does general ETL processing and data filtering for hitch ride scenarios, and completes structured processing of unstructured data and data diversion. In addition to storing data in the message queue Kafka, this layer is typically written to the Druid database in real time for querying detailed data and serving as a processing source for simple summary data.

** Naming specifications: **DWD layer table names use lowercase letters, words are separated by underscores, the total length cannot exceed 40 characters, and should follow the following rules: Realtime_DWD_ {business /pub}{data domain abbreviation}[{business process abbreviation}]_[{custom table naming label abbreviation}]

  • {service /pub} : refer to the service name

  • {Data domain abbreviation} : Refer to the data domain partition section

  • {custom table naming label abbreviation} : Entity name can be based on data warehouse transformation integration to do some business abstract name, the name should accurately express the business meaning of the entity

    ** Realtime_dwD_trip_trD_order_base ** Realtime_dwD_trip_trd_order_base

2.3 DIM layer

  • The common dimension layer, based on the idea of dimension modeling, establishes the consistency dimension of the whole business process and reduces the risk of data calculation caliber and algorithm inconsistency;
  • The data of DIM layer come from two parts: one is obtained by real-time processing of ODS layer data by Flink program; the other is obtained by off-line tasks.
  • Three storage engines are mainly used for DIM dimension data: MySQL, Hbase and Fusion (stored by didi’s own KV). MySQL can be used when dimension table data is small; Fusion can be used when single data is small and QPS query is high. Reduce the usage of machine memory resources. HBase can be used to store data in scenarios that have a large amount of data and are not sensitive to changes in dimension table data.

** Naming conventions: The names of the tables at the **DIM layer are lowercase letters, and words are separated by underscores (_). The total length of the tables cannot exceed 30 characters. Dim_ {business /pub}{dimension definition}[{custom named tag}] :

  • {service /pub} : refer to the service name

  • {dimension definition} : Refer to dimension naming

  • {custom table naming label abbreviation} : Entity name can be based on data warehouse transformation integration to do some business abstract name, the name should accurately express the business meaning of the entity

    ** Example: ** DIM_triP_DRI_base

2.4 DWM summary layer construction

In the construction of the summary layer of the real-time data warehouse of hitch, there are many similarities with the offline data warehouse of hitch, but the specific technical implementation will be very different.

First: for the processing of some common indicators, such as PV, UV and order business process indicators, we will carry out unified calculation in the summary layer to ensure that the calibre of indicators is uniformly completed in a fixed model. For some individual indicators, a unique time field should be determined from the perspective of index reusability, and the field should be aligned with other indicators in the time dimension as far as possible. For example, abnormal order number in a row needs to be aligned with the index in the trading field in the event time.

The second: in the construction of the lift summary layer, the need for the theme of the multidimensional summary, because the number of real-time warehouse itself is subject-oriented, each topic may concern dimension is different, so you need to under the different theme, according to the subject concerned with dimension data summary, finally to summary indicator of business need. In the specific operation, for PV indicators, Stream SQL is used to realize the 1-minute summary indicator as the minimum summary unit indicator, and on this basis, indicators are accumulated in the time dimension. For UV indicators, druID database is directly used as the indicator summary container, according to the timeliness and accuracy requirements of the business side of the summary indicators, to achieve the corresponding precise and inaccurate de-weighting.

Third: the process of summary layer construction will also involve the processing of derivative dimensions. In the summary index processing related to ride coupons, we used the Hbase version mechanism to construct a zipper table of derived dimensions, and obtained the accurate dimension of real-time data at that time by associating event flow with Hbase dimension table

**DWM layer table names are lowercase letters, words are separated by underscores, total length cannot exceed 40 characters, and should follow the following rules: Realtime_dwm_ {Business /pub}{Data domain abbreviation}{Data primary granularity abbreviation}[{custom table naming label abbreviation}]{Statistical time period range abbreviation} :

  • {service /pub} : refer to the service name

  • {Data domain abbreviation} : Refer to the data domain partition section

  • {abbreviation for Data Primary granularity} : Abbreviation for data primary granularity or data domain, also the primary dimension in the federated primary key

  • {custom table naming label abbreviation} : Entity name can be based on data warehouse transformation integration to do some business abstract name, the name should accurately express the business meaning of the entity

  • {statistical period range abbreviation} : 1D: day increment; Td: day total (full amount); 1H: hourly increment; Th: hours accumulated (full); 1min: minute increment. Tmin: Accumulated minutes (full volume)

    ** realtime_DWM_TRIP_TRD_PAS_BUS_ACCUM_1min

2.5 APP Application Layer

The main work of this layer is to write real-time summary data into the database of application system, including Druid database for large-screen display and real-time OLAP (this database can write application data and detailed data to complete summary index calculation), Hbase database for real-time data interface service. MySQL or Redis database for real-time data products.

** Naming specification: ** Based on the particularity of real-time data warehouse do not make hard requirements.

3. Construction results of real-time data warehouse of hitch

So far, a total of established business lines to lift growth, trade and experience, security, financial five modules, involving the 40 + real-time kanban, cover off all the core business process, the real-time and off-line data error < 0.5%, is a lift lines of business data analysis in the beneficial supplement, to lift the coupon dynamic strategy adjustment, Passenger safety monitoring and real-time order trend analysis provide real-time data support to improve the timeliness of decision making.

At the same time, the real-time indicators based on the data warehouse model can timely complete calibre change and real-time offline data consistency verification according to user requirements, which greatly improves the development efficiency of real-time indicators and the accuracy of real-time data, and also provides strong theoretical and practical support for the large-scale construction of real-time data warehouse in the company.

4. Strong dependence of real-time data warehouse construction on data platform

At present, the construction of real-time data warehouse in the company needs to rely on the ability of data platform to really complete the implementation, including StreamSQL capability, data dream engineering StreamSQL IDE environment and task operation and maintenance components, real-time data source metadata function, etc.

4.1 Real-time data requirement development based on StreamSQL

StreamSQL is a product developed by Didi’s Big Data Engine department based on Flink SQL.

Using StreamSQL has several advantages:

  • ** Descriptive language: ** The business side does not need to care about the underlying implementation, only needs to describe the business logic.
  • ** Interface stability: **Flink versions are very stable during iteration as long as the SQL syntax does not change.
  • ** problem easy to troubleshoot: ** logic is strong, users can understand the grammar can investigate the error location.
  • ** Batch integration: ** Batch processing is mainly HiveSQL and Spark SQL. If Flink tasks also use SQL, batch processing and stream processing tasks can share syntax and other aspects to achieve the integration effect.

StreamSQL over Flink SQL (pre-1.9) :

  • ** Complete DDL: ** Includes upstream message queues, downstream message queues, and various stores such as Druid and HBase. Users only need to build a source to describe upstream and downstream messages.
  • ** Built-in message format parsing: ** The data needs to be extracted after consumption, but the data format is often very complex, such as database log binlog, which is implemented separately by each user and difficult. StreamSQL provides built-in functions for fetching library names, table names, and column names. The user only needs to create a binlog source with built-in de-duplication capability. StreamSQL has built-in capabilities to extract log headers, extract business fields, and assemble them into maps for Business logs. For JSON data, you do not need to customize udFs. You only need to specify the required fields through jsonPath.
  • ** Extended UDX: ** Enriched the built-in UDX, such as JSON and MAP, which are often used in didi business scenarios. Support custom UDX, users can customize UDF and use JAR package. Compatible with Hive UDX. For example, if a user is originally a Hive SQL task, it does not require much change to convert the user to a real-time task, facilitating batch flow integration.

Join ability extension:

  • TTL based dual-stream JOIN: In didi’s stream computing business, some join operation data correspond to a long span. For example, the time span from order issuing to order receiving in ride-sharing business may reach about one week. If the join operation of these data is not feasible based on memory, join data is usually put in the state, and the window is realized through TTL, and the expired data will be automatically cleared.

  • Dimension table join capability: The dimension table supports HBase, KVStore, and Mysql, and supports inner, left, right, and full Join modes.

4.2 StreamSQL IDE and task operation based on Data DreamWorks ****StreamSQL IDE:

  • ** Provides common SQL templates: ** You don’t need to start from scratch when developing streaming SQL. You just need to select an SQL template and modify it to achieve the desired results
  • ** Provides UDF libraries: ** Is equivalent to a library that if you don’t know what it means and how to use it, you can just search the library on the IDE and find instructions and use cases, providing syntax detection and intelligence tips
  • ** Provides online debugging of code: ** Can upload local test data or sample a small amount of source data such as Kafka DEBUG, this feature is very important for computing tasks. The version management function allows you to roll back tasks when service versions are being upgraded.

** Task operation and maintenance: ** task operation and maintenance is mainly divided into four aspects

  • ** Log retrieval: ** The log query experience on Flink UI is very bad. Didi collects Flink task logs and stores them in ES for retrieval through a Web-based interface, which is convenient for investigation.
  • ** Indicator monitoring: **Flink has many indicators, and the experience of viewing through the Flink UI is poor. Therefore, Didi built an external reporting platform to monitor indicators.
  • * * warning: ** alarm needs to be balanced. For example, there are multiple types of restart alarm, such as machine breakdown alarm and code error alarm. It can be balanced by setting the threshold of alarm times of a single task within a day. In addition, alarms such as survial alarms (such as kill and start), delay alarms, restart alarms, and Checkpoint failure alarms (such as the Checkpoint interval is incorrectly configured) can be generated.
  • ** Lineage tracing: ** The link of real-time computing task is long, from acquisition to message channel, flow calculation, and then downstream storage often includes 4-5 links, if the trace can not be achieved, it is prone to catastrophic problems. For example, after the task traffic of a stream increases rapidly, it is necessary to check whether the consumption of topic increases, whether the upstream collection of topic increases, whether the collected database DB generates improper batch operations or whether a business keeps increasing logs. Such problems require multidirectional blood tracing from downstream to upstream and from upstream to downstream to facilitate the investigation of the cause.

4.3 Metadata of real-time data sources based on Data Dream Factory (Meta table)

Topic is introduced into real-time table, metaStore manages metadata uniformly, and DDL process is managed uniformly in real-time development. For real-time data warehouse, through the meta-data, can precipitate the construction results of real-time data warehouse, so that the data warehouse modeling can be better landed.

Currently, data DreamWorks supports metadabatic real-time data sources including Postgre, DDMQ, MySQL, Druid, ClickHouse, Kylin and Kafka.

5. Thinking about challenges and solutions

Although didi has begun to take shape in the construction of real-time data warehouse, the problems it faces cannot be ignored.

5.1 Specifications for r&d of real-time data warehouse

** Problem: ** In order to quickly respond to business needs and meet the demand development process of data warehouse, it is urgent to build a standard white paper for real-time data development, which needs to involve requirements docking, caliber sorting, data development, task release, task monitoring and task guarantee.

** Currently, led by Data BP, a set of development specifications for real-time data indicators has been developed:

** Routine process: ** The demander proposes the demand, the analyst connects the demand, provides the calculation caliber and writes the demand document. Then BP and offline data warehouse students check to calculate the caliber, and provide offline Hive table to real-time data warehouse team. Real-time data warehouse students complete data exploration based on offline Hive table, complete real-time data demand development based on real-time data warehouse model, and complete data self-check through offline caliber. Finally delivered to the analyst to complete the second check indicators online.

Diameter changes – a business: * * * * business launched diameter changes, determine whether involves real-time index, number of warehouse BP to pull together, off-line and real-time caliber to offline for warehouse team and the number of real-time warehouse team to provide more mouth diameter and data tables, number of real-time warehouse kanban team on the first test, after the acceptance by switching to formal kanban

Existing deficiencies:

  • When building new real-time data for a business, there will be a difficult initialization process. During this initialization process, there will be a lot of coupling with offline, and the indicator caliber, data source, and a lot of development and testing work need to be determined
  • When the indicator caliber changes, it is necessary to have a better notification mechanism. At present, it is still judged from a human perspective.

5.2 Ensure offline and real-time data consistency

** Current solutions: ** The business, BP and offline data warehouse work together to ensure that the data source and calculation caliber are consistent with that of offline data. During data processing, the data are compared layer by layer with that of offline data, and the index results are tested in detail. After data verification is passed and goes online, real-time and offline data are verified according to the offline cycle

** Problems to be solved: ** Combine index management tools to ensure consistency of index caliber, expand data dream factory function, add real-time offline comparison function during index processing, and reduce data comparison cost.

6. Future outlook: Batch integration

Although Flink has batch integration capability, Didi is not fully batch integration at present and hopes to achieve batch integration from the product level first. Through Meta construction, there is only one MetaStore in the whole didi. Both Hive, Kafka Topic, downstream HBase and ES are defined in MetaStore. All computing engines, including Hive, Spark, Presto, and Flink, query the same MetaStore to achieve the same effect of SQL development. According to whether the Source of SQL consumption is a table or a stream, the batch processing task and the stream processing task are distinguished, and the batch integration effect is realized from the product level.