Author: Liu Dalong @ViPSHOP

As the development of the Internet enters the second half, the timeliness of data is becoming more and more important for the refined operation of enterprises. Shopping malls, like battlefields, are producing massive data every day. How to effectively dig out valuable information in real time is of great help to enterprises’ decision-making and operation strategy adjustment. In addition, with the maturity and wide application of 5G technology, enterprises need a complete and mature real-time data system to improve their industrial competitiveness in industries with high requirements for data timeliness such as industrial Internet and Internet of Things.

Based on the above status quo and real-time data requirements, combined with industrial cases and the author’s real-time data development experience, this paper summarizes the overall scheme of real-time data system construction, which is mainly divided into three parts:

The first part mainly introduces the main application scenarios and corresponding solutions of Flink, a popular real-time computing engine in the industry, in the construction of real-time data system. The second part considers the construction scheme of real-time data system from four aspects: real-time data architecture, real-time data model layering, real-time data system construction mode, real-time data architecture development of stream and batch integration. The third part introduces how to use Flink SQL to fulfill real-time data statistics class requirements with a specific case.

I. Flink real-time application scenario

At present, the main application scenarios of Flink in the field of real-time computing can be divided into four categories, namely, real-time data synchronization, streaming ETL, real-time data analysis and complex event processing. The specific business scenarios and corresponding solutions can be studied in detail in the figure below.

Real-time data architecture

The real-time data system can be divided into three types of scenarios: traffic, service and feature, which are different from each other.

  • In the data model, the flow class is flat wide meter, the service data warehouse is based on paradigm modeling, and the characteristic data is KV storage.
  • In terms of data sources, the data source of traffic data store is generally log data, the data source of service data store is service binlog data, and the data source of characteristic data store is various.
  • In terms of data volume, flow and feature number warehouse are massive data, more than a billion levels per day, while the data volume of business number warehouse is generally millions to tens of millions of levels per day;
  • In terms of data update frequency, traffic data is rarely updated, so service and feature data are updated more. Traffic data generally focus on time sequence and trend, while service data and feature data focus on state change.
  • In terms of data accuracy, traffic data has lower requirements, while service data and characteristic data have higher requirements.

2.1 Overall architecture of real-time data system

The whole real-time data architecture is divided into five layers, namely access layer, storage layer, computing layer, platform layer and application layer. The figure above is just an overview of the overall architecture, and the specific tasks of each layer will be detailed in the following text.

  • Access layer: This layer uses various data access tools to collect the data of various systems, including binlog logs, buried logs, and back-end service logs. The data will be collected into Kafka. These data not only participate in real-time computing, but also participate in offline computing to ensure the unification of real-time and offline original data.
  • Storage layer: This layer stores the original data and detailed data after cleaning and association. Based on the unified real-time data model layering concept, the data of different application scenarios are stored in storage engines such as Kafka, HDFS, Kudu, Clickhouse, Hbase, Redis and Mysql. The specific types of data stored by the various storage engines are described in detail in the real-time data Model layering section;
  • Calculate the layer: The computing layer mainly uses four computing engines, namely Flink, Spark, Presto, and ClickHouse. Flink computing engine is mainly used for real-time data synchronization, streaming ETL, and second-level real-time indicator calculation of critical systems. Spark SQL is mainly used in the scenario where complex multidimensional analysis requires semi-real-time indicator calculation. Presto and ClickHouse are mainly used in the scenario where multidimensional self-service analysis does not require high query response time.
  • Platform layer: the main work of the platform layer is to provide unified query service, metadata and index management, data quality and blood relationship.
  • Application layer: the unified query service supports the data scenarios of each line of business, including real-time large screen, real-time data products, real-time OLAP, real-time features, etc.

The detailed work of the platform layer is as follows:

  1. Unified query service supports the query of detailed data at the bottom layer and data at the aggregation layer, and supports SQL query of data in KV storage such as Redis and Hbase.
  2. Metadata and indicator management: The main real-time Kafka table, Kudu table, Clickhouse table, Hive table and other unified management, to the number of warehouse model table naming way standard table naming, clear each table field meaning, use, index management is as far as possible through the index management system will be all real-time indicators unified management, clear calculation aperture, Provide to different business parties for use;
  3. Data quality and blood relationship analysis: Data quality is divided into platform monitoring and data monitoring, while blood relationship analysis mainly analyzes the dependence of real-time data and real-time tasks.

Platform monitoring part one is to monitor the running status of tasks, alarm abnormal tasks and automatically pull up and restore tasks according to the set parameters. Second, for Flink tasks, Kafka consumption processing delay is monitored and real-time alarm.

Data monitoring is divided into two parts. First, streaming ETL is an important part of the whole real-time data flow process, during which various dimension tables will be associated. During real-time correlation, abnormal logs will be reported to the monitoring platform periodically for records that are not associated, and alarm will be triggered when the number reaches a certain threshold. Some key real-time indicators use the Lambda architecture. Therefore, you need to periodically compare historical real-time indicators with offline Hive calculated data, monitor real-time data quality, and alarm indicators that exceed the threshold.

To fit the data monitoring, real-time data needs to be done by blood, main is data dependency in combing system of real-time data, and the real-time task dependencies, from the underlying ODS to DW and DM, and DM which model layer is used, the whole chain together, do it in the data/task actively adjust can notify related to the downstream, When the index is abnormal, we can use the blood relationship to locate the problem. Meanwhile, based on the analysis of blood relationship, we can also evaluate the application value of data and calculate the calculation cost of data.

2.2 Real-time data model layering

Considering the efficiency problem, off-line warehouse will generally adopt the way of space for time, and the hierarchy will be more; Considering the real-time problem, the real-time data warehouse is divided into four layers as the fewer layers are the better. In addition, it also reduces the possibility of errors in the intermediate process.

S ODS layer:

  • Operate the data layer, save the original data, conduct structured processing on the unstructured data, gently clean, and almost never delete the original data;
  • The data in this layer mainly comes from binlog, buried log and application log of business database.
  • If binlog logs are monitored by canal and written to the message queue Kafka, corresponding to buried points and application logs, nginx and Tomcat logs are collected by Filebeat and reported to Kafka.
  • In addition to storage in Kafka, it also writes binlog logs of the service database into HDFS, Kudu and other storage engines through Flink, landing to the 5min Hive table for querying detailed data, and also provides it to the offline data warehouse as its original data. In addition, for buried log data, there is no need to land because the ODS layer is unstructured.

S DWD layer:

  • Real-time detail data layer, driven by business process as modeling, based on the characteristics of each specific business process, to build the most fine-grained detail layer fact table; Some important dimension attribute fields of detail fact table can be properly redundant, that is, wide table processing, combined with the characteristics of enterprise data use.
  • The data of this layer comes from the ODS layer and is obtained by simple Streaming ETL. The processing of binlog mainly carries out simple data cleaning, data drift processing, and possibly Streaming Join to tables of multiple ODS layers. For traffic logs, some general ETL processes are performed to structure unstructured data and associate general dimension fields.
  • Data at this layer is stored in message queue Kafka, and Flink is also used to write into the Hive 5min table in real time for querying detailed data, and it is provided to the offline data warehouse as its original data.

S DIM layer:

  • The common dimension layer, based on the idea of dimension modeling, establishes the consistency dimension of the whole business process and reduces the risk of data calculation caliber and algorithm inconsistency;
  • The data of DIM layer come from two parts: one is obtained by real-time processing of ODS layer data by Flink program; the other is obtained by off-line tasks.
  • Storage engines such as MySQL, Hbase and Redis are mainly used for DIM dimension data. MySQL can be used when dimension table data is small; Redis can be used for storage when single data is small and QPS query is high to reduce the occupation of machine memory resources. HBase storage is used in scenarios where the data volume is large and is not sensitive to changes in dimension table data.

S DM layer:

(1) Data mart layer

On the concept of data domain + business domain to build public summary layer, layer for DM is more complex, need comprehensive consideration to the requirement of data to the ground and concrete query engine to choose different way of storage, divided into mild summary and highly summarized layer, output at the same time, high aggregation layer data for front simpler KV query, improve query performance, such as real-time screen, Real-time reports, data timeliness requirements for second level, mild summary layer Kafka wide table real-time write OLAP storage engine, used for front-end product complex OLAP query scenarios, meet the needs of self-help analysis and production of complex reports, the timeliness requirements of data can be tolerated to minute level;

(2) Mild summary layer
  • The mild summary layer is obtained by Streaming ETL and mainly exists in the form of a wide table. The summary of service details is obtained by the service fact sheet and dimension table JOIN, and the summary of traffic details is obtained by splitting traffic logs according to service lines and dimension table join.
  • The data storage of mild summary layer is relatively diversified. Firstly, Flink is used to consume dimension table required by detailed data join business process in Kafka of DWD layer in real time, and then it is written into Kafka of this layer after being broadened in real time and stored in Json or PB format.
  • At the same time, the summary data of multidimensional business details is written to Kudu in real time through Flink for querying detailed data and more complex multidimensional data analysis requirements. The traffic data is written to HDFS and ClickHouse respectively through Flink for complex multidimensional data analysis. Real-time feature data is written into HDFS in real time after Flink Join dimension table for downstream offline ETL consumption.
  • Spark SQL can be used to do minute-level estimation for the wide table data of the Kudu and HDFS, which can meet the complex data analysis requirements of the business side and provide minute-level delay data, thus accelerating the delay of the offline ETL process. In addition, with the continuous improvement of Flink SQL and Hive ecological integration, Try Flink SQL for off-line ETL and OLAP computing tasks (Flink stream computing is very similar to Presto in its in-memory nature, which makes it also an OLAP computing engine), with a set of computing engines for real-time off-line requirements to achieve batch flow unification;
  • The data details in Kudu and ClickHouse can also meet the personalized data analysis requirements of the business side. With the powerful OLAP computing engine, the detailed data can be queried in real time and the response time is 10 seconds. These requirements are also real-time OLAP requirements with high flexibility.
(3) Height summary layer
  • The high summary layer is written into the storage engine by the detailed data layer or the mild summary layer through aggregation calculation, which produces part of the real-time data index requirements and has poor flexibility.
  • The calculation engine uses Flink Datastream API and Flink SQL. According to different requirements, the common simple index summary model can be directly stored in MySQL, while the model with more dimensions and large writing and updating can be stored in HBase. There is also the need to do sorting, query QPS, response time requirements are very high, and do not need persistent storage, such as online TopN goods during the promotion activities are directly stored in Redis;
  • In second-level indicator requirements, Lambda and Kappa architectures are needed to be mixed. Most real-time indicators are calculated using Kappa architecture, and a small number of key indicators (such as amount related) are re-calculated using batch processing using Lambda architecture, adding a proofreading process.

In general, the DM layer provides three kinds of time-sensitive data:

The first is the second-level real-time indicators calculated by real-time computing engines such as Flink. Such requirements have high requirements on the timeliness of data and are used for real-time reports with large screen and uncomplicated computing dimensions.

Secondly, Spark SQL is a quasi-real-time indicator with a delay of minutes. This indicator meets some complicated data analysis scenarios that do not require high data timeliness, and may involve join of multiple fact tables, such as sales attribution.

The last one is the complex multidimensional data analysis scenario of ad-hoc query without predictive calculation. This kind of demand is more personalized and flexible. If the PERFORMANCE of OLAP computing engine is strong enough, it can also fully meet the demand of second-level computing. The proportion of second level real-time data provided externally and the other two quasi-real-time data is approximately 3:7. Quasi-real-time computing or ad-hoc mode is preferred for most business requirements, which can reduce resource usage, improve data accuracy and meet complex business scenarios in a more flexible way.

2.3 Construction method of real-time data system

The whole real-time data system is divided into two construction methods, namely real-time and quasi-real-time (their implementation methods are stream-based computing engine, ETL and OLAP engine respectively, and data timeliness is second level and minute level respectively.

  • In terms of scheduling overhead, quasi-real-time data is a batch process, so it still needs scheduling system support and scheduling frequency is high, while real-time data has no scheduling overhead.
  • In terms of business flexibility, because the quasi-real-time data is implemented based on ETL or OLAP engine, the flexibility is better than that based on flow calculation.
  • In terms of the tolerance of late arrival of data, quasi-real-time data can be fully calculated based on the data within a cycle, so the tolerance of late arrival of data is relatively high, while real-time data is incremental calculation, so the tolerance of late arrival of data is lower.
  • In terms of applicable scenarios, quasi-real-time data is mainly applied to scenarios with low real-time requirements, involving multi-table association and frequent business changes, such as real-time analysis of transaction types, while real-time data is more applicable to scenarios with high real-time requirements and large data volume, such as real-time analysis of real-time characteristics and traffic types.

2.4 Development of stream-batch integrated real-time data architecture

From 1990, when Inmon proposed the concept of data warehouse to today, big data architecture has experienced from the original offline big data architecture, Lambda architecture, Kappa architecture and Flink’s hot introduction of stream and batch architecture. Data architecture technology is constantly evolving, essentially developing in the direction of stream and batch architecture. Allow users to complete real-time computing with the most natural and minimum cost.

  • Offline Big Data Architecture: Data sources are imported to the offline data warehouse. Downstream applications can read DM directly or add a data service based on service requirements, such as MySQL or Redis. The data storage engine is HDFS or Hive. Data warehouse is divided into ODS, DWD and DM from model level.
  • Lambda architecture: Along with the development of the large data application, people put forward the real-time requirements for system gradually, in order to calculate some real-time index, is on the basis of the original offline for several positions increased by a real-time calculation of the link, and the data source for streaming transformation (that is, the data sent to the message queue), real-time computing to subscribe to a message queue, directly to complete the calculation of index increment, Push to the downstream data service, by the data service layer to complete the combination of offline and real-time results;
  • Kappa architecture: Although the Lambda architecture meets the real-time requirements, it brings more development and operation work. Its architectural background is that the flow processing engine is not perfect, and the results of stream processing only serve as temporary and approximate values for reference. Later, with the emergence of stream processing engines such as Flink, stream processing technology became mature. At this time, in order to solve the problem of two sets of code, Jay Kreps of LickedIn proposed Kappa architecture.
  • Flow batch integrated architecture: In this architecture, flow computing is responsible for basic data, while interactive analysis engine is the center. The flow computing engine performs real-time ETL work on data, which reduces latency compared with offline ETL process. The interactive analysis engine has its own storage, and realizes high write TPS, high query QPS and low query latency through cooperative optimization of computing storage, so as to achieve real-time and SQL of the whole link. In this way, real-time analysis and on-demand analysis can be implemented in batch mode, and can quickly respond to business changes. 1 + 1 > 2; This architecture has a very high demand on interactive analysis engine, which may be a focus and direction of the future development of large database technology.

In order to cope with the business side’s more complex multidimensional real-time data analysis needs, the author now introduces Kudu, the OLAP storage engine, into the data development, and uses Presto + Kudu to calculate the business data such as orders to explore the feasibility of the streaming and batch integrated architecture in the field of real-time data analysis. In addition, currently popular data lake technologies, such as Delta Lake and Hudi, support upSERt updates on HDFS. With the maturity of streaming write and SQL engine support, a storage engine can be used to solve real-time and offline data requirements in the future, thus reducing the development cost of multi-engine operation and maintenance.

Three, Flink SQL real-time calculation of UV indicators

Part of the macro level from the introduction of how to build real-time data system, very ungrounded gas, we may need only a specific case to understand how to do, so the next case with a grounded gas to introduce how to calculate real-time UV data. As we all know, in the ToC Internet company, UV is a very important indicator, for the boss, business, operation of timely decision-making will have a great impact, the author in the e-commerce company, the main work is to calculate UV, sales and other real-time data, experience is particularly profound, Therefore, a simple demo shows how to use Flink SQL to consume PV data in Kafka, calculate UV metrics in real time and write to Hbase.

3.1 Kafka source data parsing

PV data comes from buried data reported by FileBeat and cleaned, and then written into downstream Kafka in ProtoBuffer format. When consuming, the first step is to deserialize PB data into Row type that Flink can recognize. Therefore, it is necessary to customize the DeserializationSchema interface, the specific code is as follows, here only extract the MID of the PV used in calculation, event time time_local, and get the log_date field from its analysis:

public class PageViewDeserializationSchema implements DeserializationSchema<Row> {

    public static final Logger LOG = LoggerFactory.getLogger(PageViewDeserializationSchema.class);
    protected SimpleDateFormat dayFormatter;

    private final RowTypeInfo rowTypeInfo;

    public PageViewDeserializationSchema(RowTypeInfo rowTypeInfo){
        dayFormatter = new SimpleDateFormat("yyyyMMdd", Locale.UK);
        this.rowTypeInfo = rowTypeInfo;
    }
    @Override
    public Row deserialize(byte[] message) throws IOException {
        Row row = new Row(rowTypeInfo.getArity());
        MobilePage mobilePage = null;
        try {
            mobilePage = MobilePage.parseFrom(message);
            String mid = mobilePage.getMid();
            row.setField(0, mid);
            Long timeLocal = mobilePage.getTimeLocal();
            String logDate = dayFormatter.format(timeLocal);
            row.setField(1, logDate); row.setField(2, timeLocal); }catch (Exception e){ String mobilePageError = (mobilePage ! = null) ? mobilePage.toString() :"";
            LOG.error("error parse bytes payload is {}, pageview error is {}", message.toString(), mobilePageError, e);
        }
        return null;
    }
Copy the code

3.2 Write the main program of Flink Job

PV data parsing into Flink Row type, the next is very simple, write the main function, write SQL can statistics UV index, the code is as follows:

public class RealtimeUV { public static void main(String[] args) throws Exception { //step1 Parsed Kakfa, Hbase configuration information, and checkpoint parameter information Map<String from the properties configuration file. String> config = PropertiesUtil.loadConfFromFile(args[0]); String topic = config.get("source.kafka.topic");
        String groupId = config.get("source.group.id");
        String sourceBootStrapServers = config.get("source.bootstrap.servers");
        String hbaseTable = config.get("hbase.table.name");
        String hbaseZkQuorum = config.get("hbase.zk.quorum");
        String hbaseZkParent = config.get("hbase.zk.parent");
        int checkPointPeriod = Integer.parseInt(config.get("checkpoint.period"));
        int checkPointTimeout = Integer.parseInt(config.get("checkpoint.timeout")); StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); //step2 set Checkpoint parameters. Used for Failover fault-tolerant sEnv. GetConfig (.) registerTypeWithKryoSerializer (MobilePage. Class, ProtobufSerializer. Class); sEnv.getCheckpointConfig().setFailOnCheckpointingErrors(false); sEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); sEnv.enableCheckpointing(checkPointPeriod, CheckpointingMode.EXACTLY_ONCE); sEnv.getCheckpointConfig().setCheckpointTimeout(checkPointTimeout); sEnv.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); On the Blink Planner, create a TableEnvironment and set the state expiration time. Avoid Job OOM EnvironmentSettings EnvironmentSettings = EnvironmentSettings. NewInstance (). UseBlinkPlanner () .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, environmentSettings); tEnv.getConfig().setIdleStateRetentionTime(Time.days(1), Time.days(2)); PropertiessourceProperties = new Properties();
        sourceProperties.setProperty("bootstrap.servers".sourceBootStrapServers);
        sourceProperties.setProperty("auto.commit.interval.ms"."3000");
        sourceProperties.setProperty("group.id", groupId); //step4 initialize KafkaTableSource Schema information. Here, we use register TableSource to register source tables in Flink instead of register DataStream. Think because too familiar with how to register KafkaTableSource into Flink TableSchema schema. = TableSchemaUtil getAppPageViewTableSchema (); Optional<String> proctimeAttribute = Optional.empty(); List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors = Collections.emptyList(); Map<String, String> fieldMapping = new HashMap<>(); List<String> columnNames = new ArrayList<>(); RowTypeInfo rowTypeInfo = new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames()); columnNames.addAll(Arrays.asList(schema.getFieldNames())); columnNames.forEach(name -> fieldMapping.put(name, name)); PageViewDeserializationSchema deserializationSchema = new PageViewDeserializationSchema( rowTypeInfo); Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); Kafka011TableSource kafkaTableSource = new Kafka011TableSource( schema, proctimeAttribute, rowtimeAttributeDescriptors, Optional.of(fieldMapping), topic,sourceProperties,
                deserializationSchema,
                StartupMode.EARLIEST,
                specificOffsets);
        tEnv.registerTableSource("pageview", kafkaTableSource); //step5 initialize Hbase TableSchema, write parameters, and register it in Flink HBaseTableSchema HBaseTableSchema = new HBaseTableSchema(); //step5 initialize Hbase TableSchema, write parameters, and register it in Flink. hBaseTableSchema.setRowKey("log_date", String.class);
        hBaseTableSchema.addColumn("f"."UV", Long.class);
        HBaseOptions hBaseOptions = HBaseOptions.builder()
                .setTableName(hbaseTable)
                .setZkQuorum(hbaseZkQuorum)
                .setZkNodeParent(hbaseZkParent)
                .build();
        HBaseWriteOptions hBaseWriteOptions = HBaseWriteOptions.builder()
                .setBufferFlushMaxRows(1000)
                .setBufferFlushIntervalMillis(1000)
                .build();
        HBaseUpsertTableSink hBaseSink = new HBaseUpsertTableSink(hBaseTableSchema, hBaseOptions, hBaseWriteOptions);
        tEnv.registerTableSink("uv_index", hBaseSink); // Step6 Calculate the UV index SQL of the day in real time. Here, the simplest group by AGG is used, and minibatch or window is not used. It is best to use the latter two methods String uvQuery = in the case of large data volume optimization"insert into uv_index "
                + "select log_date,\n"
                + "ROW(count(distinct mid) as UV)\n"
                + "from pageview\n"
                + "group by log_date"; tEnv.sqlUpdate(uvQuery); //step7 run Job senv. execute("UV Job"); }}Copy the code

Flink SQL statistics UV case, the code is very simple, just need to figure out how to parse Kafka data, how to initialize the Table Schema, and how to register the Table in Flink. Flink SQL can be used to complete a variety of complex real-time data statistics class business requirements, learning costs are much lower than the API approach. Note that this demo is based on the current business scenario and the development of the production environment can be run, may not be out of the box, you need to customize the corresponding Kafka data parsing classes for your own business scenario.

References:

If you also want to do real-time data warehouse architecture evolution and application scenarios of the noobie supply chain real-time data warehouse architecture evolution and evolution based on Flink, the cornerstone of OPPO data Center: How to build a real data warehouse based on Flink SQL