Real-time data warehouse is mainly to solve the problem of low timeliness of traditional data warehouse data. Real-time data warehouse is usually used in real-time OLAP analysis, real-time data kanban, real-time monitoring of business indicators and other scenarios. Although the structure and technology selection of real-time data warehouse are different from that of traditional offline data warehouse, the basic methodology of data warehouse construction is the same. This article will share the demo of building a real-time data warehouse from 0 to 1 based on Flink SQL, involving the whole processing process of data collection, storage, calculation and visualization. You can learn from this article:

  • The basic architecture of real-time data warehouse
  • Real-time data processing flow of data warehouse
  • New SQL features in Flink1.11
  • Bugs in Flink1.11
  • Complete operation case

The ancients learned without exhaustion, young time old beginning.

The paper come zhongjue shallow, and must know this to practice.

Case description

This paper will take e-commerce business as an example to show the data processing process of real-time data warehouse. In addition, this article aims to illustrate the construction process of real-time data warehouse, so it will not involve too complicated data calculation. In order to ensure the operability and integrity of the case, detailed operation steps will be given in this paper. For demonstration purposes, all operations in this article are done in the Flink SQL Cli.

Architecture design

The specific architecture design is shown in the figure: first, the MySQL binlog is parsed through canal and the data is stored in Kafka. Then Flink SQL is used to clean the original data and write the processed detailed wide table into Kafka. Dimension table data is stored in MySQL. Flink SQL is used to JOIN detailed wide tables and dimension tables, and the aggregated data is written into MySQL. Finally, FineBI is used for visual display.

Service Data Preparation

  • Order table (order_info)
CREATE TABLE `order_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'number',
  `consignee` varchar(100) DEFAULT NULL COMMENT 'Consignee',
  `consignee_tel` varchar(20) DEFAULT NULL COMMENT 'Recipient phone',
  `total_amount` decimal(10.2) DEFAULT NULL COMMENT 'Total amount',
  `order_status` varchar(20) DEFAULT NULL COMMENT 'Order Status',
  `user_id` bigint(20) DEFAULT NULL COMMENT 'user id',
  `payment_way` varchar(20) DEFAULT NULL COMMENT 'Payment Method',
  `delivery_address` varchar(1000) DEFAULT NULL COMMENT 'Shipping address',
  `order_comment` varchar(200) DEFAULT NULL COMMENT 'Order Notes',
  `out_trade_no` varchar(50) DEFAULT NULL COMMENT 'Order Transaction Number (for third-party payment)',
  `trade_body` varchar(200) DEFAULT NULL COMMENT 'Order Description (for third-party payment)',
  `create_time` datetime DEFAULT NULL COMMENT 'Creation time',
  `operate_time` datetime DEFAULT NULL COMMENT 'Operation time',
  `expire_time` datetime DEFAULT NULL COMMENT 'Dead time',
  `tracking_no` varchar(100) DEFAULT NULL COMMENT 'Logistics Number',
  `parent_order_id` bigint(20) DEFAULT NULL COMMENT 'Parent Order Number',
  `img_url` varchar(200) DEFAULT NULL COMMENT 'Picture path',
  `province_id` int(20) DEFAULT NULL COMMENT 'region'.PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='Order sheet';
Copy the code
  • Order details (order_detail)
CREATE TABLE `order_detail` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'number',
  `order_id` bigint(20) DEFAULT NULL COMMENT 'Order Number',
  `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'SKU name (redundant)',
  `img_url` varchar(200) DEFAULT NULL COMMENT 'Image name (redundant)',
  `order_price` decimal(10.2) DEFAULT NULL COMMENT 'Purchase Price (SKU price at time of order)',
  `sku_num` varchar(200) DEFAULT NULL COMMENT 'Number of purchases',
  `create_time` datetime DEFAULT NULL COMMENT 'Creation time'.PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='Order Details';
Copy the code
  • List of goods (SKu_info)
CREATE TABLE `sku_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)',
  `spu_id` bigint(20) DEFAULT NULL COMMENT 'spuid',
  `price` decimal(10.0) DEFAULT NULL COMMENT 'price',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku name',
  `sku_desc` varchar(2000) DEFAULT NULL COMMENT 'Commodity Description',
  `weight` decimal(10.2) DEFAULT NULL COMMENT 'weight',
  `tm_id` bigint(20) DEFAULT NULL COMMENT 'Brand (Redundant)',
  `category3_id` bigint(20) DEFAULT NULL COMMENT 'Level 3 Classification ID (redundant)',
  `sku_default_img` varchar(200) DEFAULT NULL COMMENT 'Display pictures by default (redundant)',
  `create_time` datetime DEFAULT NULL COMMENT 'Creation time'.PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='List of Goods';
Copy the code
  • Commodity Level 1 Category Table (Base_CATEGORY1)
CREATE TABLE `base_category1` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'number',
  `name` varchar(10) NOT NULL COMMENT 'Category name'.PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='Primary Classification Table';
Copy the code
  • Table of Commodity Category 2 (Base_CATEGORY2)
CREATE TABLE `base_category2` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'number',
  `name` varchar(200) NOT NULL COMMENT 'Secondary Classification name',
  `category1_id` bigint(20) DEFAULT NULL COMMENT 'First Class Number'.PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='Secondary Classification Table';
Copy the code
  • Table of Commodity Category three (Base_CATEGORY3)
CREATE TABLE `base_category3` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'number',
  `name` varchar(200) NOT NULL COMMENT 'Tertiary classification name',
  `category2_id` bigint(20) DEFAULT NULL COMMENT 'Secondary Classification Number'.PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='Tertiary Classification Table';
Copy the code
  • Table of provinces (base_province)
CREATE TABLE `base_province` (
  `id` int(20) DEFAULT NULL COMMENT 'id',
  `name` varchar(20) DEFAULT NULL COMMENT Name of province,
  `region_id` int(20) DEFAULT NULL COMMENT 'regional id',
  `area_code` varchar(20) DEFAULT NULL COMMENT 'Administrative location code'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code
  • Region table (base_region)
CREATE TABLE `base_region` (
  `id` int(20) NOT NULL COMMENT 'regional id',
  `region_name` varchar(20) DEFAULT NULL COMMENT 'Region name'.PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

Note: the above table building sentences are completed in MySQL. For the complete table building and simulated data generation scripts, see:

Link: pan.baidu.com/s/1fcMgDHGK… Extraction code: zuqw

Data processing process

ODS layer data synchronization

For data synchronization at the ODS layer, see my other article implementing real-time incremental data synchronization based on Canal and Flink (I). MySQL binlog is parsed by Canal and written to Kafka’s topic. Due to space constraints, specific details will not be explained. The result after synchronization is as follows:

DIM Layer dimension table data preparation

In this case, the dimension table is stored in MySQL. HBase is used to store dimension table data in actual production. We mainly use two dimension tables: regional dimension table and commodity dimension table. The processing process is as follows:

  • Regional dimension table

First, extract the data corresponding to mydw.base_province and myDW. base_region into MySQL, mainly use the ccanal – JSON format corresponding to Kafka data source of Flink SQL, note: Before loading, you need to create the corresponding table in MySQL. The name of MySQL database used in this article is DIM, which is used to store dimension table data. As follows:

-- -------------------------
Province -
-- kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_province`;
CREATE TABLE `ods_base_province` (
  `id` INT,
  `name` STRING,
  `region_id` INT ,
  `area_code`STRING
) WITH(
'connector' = 'kafka'.'topic' = 'mydw.base_province'.'properties.bootstrap.servers' = 'kms-3:9092'.'properties.group.id' = 'testGroup'.'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset');-- -------------------------
Province -
-- MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_province`;
CREATE TABLE `base_province` (
    `id` INT,
    `name` STRING,
    `region_id` INT ,
    `area_code`STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc'.'url' = 'jdbc:mysql://kms-1:3306/dim'.'table-name' = 'base_province'.-- Insert data into the MySQL table
    'driver' = 'com.mysql.jdbc.Driver'.'username' = 'root'.'password' = '123qwe'.'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
Province -
-- MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_province
SELECT *
FROM ods_base_province;

-- -------------------------
- area
-- kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_region`;
CREATE TABLE `ods_base_region` (
  `id` INT,
  `region_name` STRING
) WITH(
'connector' = 'kafka'.'topic' = 'mydw.base_region'.'properties.bootstrap.servers' = 'kms-3:9092'.'properties.group.id' = 'testGroup'.'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset');-- -------------------------
- area
-- MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_region`;
CREATE TABLE `base_region` (
    `id` INT,
    `region_name` STRING,
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc'.'url' = 'jdbc:mysql://kms-1:3306/dim'.'table-name' = 'base_region'.-- Insert data into the MySQL table
    'driver' = 'com.mysql.jdbc.Driver'.'username' = 'root'.'password' = '123qwe'.'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
- area
-- MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_region
SELECT *
FROM ods_base_region;

Copy the code

MySQL > create table (dim_province); create table (dim_province); create table (dim_province);

-- ---------------------------------
-- DIM layer, region dimension table,
Create a view in MySQL
-- ---------------------------------
DROP VIEW IF EXISTS dim_province;
CREATE VIEW dim_province AS
SELECT
  bp.id AS province_id,
  bp.name AS province_name,
  br.id AS region_id,
  br.region_name AS region_name,
  bp.area_code AS area_code
FROM base_region br 
     JOIN base_province bp ON br.id= bp.region_id
;
Copy the code

The required dimension table, DIM_province, is created and can be used by creating the JDBC data source using Flink SQL when the dimension table joins. Similarly, we use the same method to create the commodity dimension table, as follows:

-- -------------------------
-- Table of primary categories
-- kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category1`;
CREATE TABLE `ods_base_category1` (
  `id` BIGINT,
  `name` STRING
)WITH(
 'connector' = 'kafka'.'topic' = 'mydw.base_category1'.'properties.bootstrap.servers' = 'kms-3:9092'.'properties.group.id' = 'testGroup'.'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset');-- -------------------------
-- Table of primary categories
-- MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category1`;
CREATE TABLE `base_category1` (
    `id` BIGINT,
    `name` STRING,
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc'.'url' = 'jdbc:mysql://kms-1:3306/dim'.'table-name' = 'base_category1'.-- Insert data into the MySQL table
    'driver' = 'com.mysql.jdbc.Driver'.'username' = 'root'.'password' = '123qwe'.'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
-- Table of primary categories
-- MySQL Sink Load Data
-- ------------------------- 

INSERT INTO base_category1
SELECT *
FROM ods_base_category1;

-- -------------------------
-- Table of secondary categories
-- kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category2`;
CREATE TABLE `ods_base_category2` (
  `id` BIGINT,
  `name` STRING,
  `category1_id` BIGINT
)WITH(
'connector' = 'kafka'.'topic' = 'mydw.base_category2'.'properties.bootstrap.servers' = 'kms-3:9092'.'properties.group.id' = 'testGroup'.'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset');-- -------------------------
-- Table of secondary categories
-- MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category2`;
CREATE TABLE `base_category2` (
    `id` BIGINT,
    `name` STRING,
    `category1_id` BIGINT.PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc'.'url' = 'jdbc:mysql://kms-1:3306/dim'.'table-name' = 'base_category2'.-- Insert data into the MySQL table
    'driver' = 'com.mysql.jdbc.Driver'.'username' = 'root'.'password' = '123qwe'.'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
-- Table of secondary categories
-- MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_category2
SELECT *
FROM ods_base_category2;

-- -------------------------
-- Table of tertiary categories
-- kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category3`;
CREATE TABLE `ods_base_category3` (
  `id` BIGINT,
  `name` STRING,
  `category2_id` BIGINT
)WITH(
'connector' = 'kafka'.'topic' = 'mydw.base_category3'.'properties.bootstrap.servers' = 'kms-3:9092'.'properties.group.id' = 'testGroup'.'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset');-- -------------------------
-- Table of tertiary categories
-- MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category3`;
CREATE TABLE `base_category3` (
    `id` BIGINT,
    `name` STRING,
    `category2_id` BIGINT.PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc'.'url' = 'jdbc:mysql://kms-1:3306/dim'.'table-name' = 'base_category3'.-- Insert data into the MySQL table
    'driver' = 'com.mysql.jdbc.Driver'.'username' = 'root'.'password' = '123qwe'.'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
-- Table of tertiary categories
-- MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_category3
SELECT *
FROM ods_base_category3;

-- -------------------------
- goods table
-- Kafka Source
-- ------------------------- 

DROP TABLE IF EXISTS `ods_sku_info`;
CREATE TABLE `ods_sku_info` (
  `id` BIGINT,
  `spu_id` BIGINT,
  `price` DECIMAL(10.0),
  `sku_name` STRING,
  `sku_desc` STRING,
  `weight` DECIMAL(10.2),
  `tm_id` BIGINT,
  `category3_id` BIGINT,
  `sku_default_img` STRING,
  `create_time` TIMESTAMP(0))WITH(
 'connector' = 'kafka'.'topic' = 'mydw.sku_info'.'properties.bootstrap.servers' = 'kms-3:9092'.'properties.group.id' = 'testGroup'.'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset');-- -------------------------
- goods table
-- MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `sku_info`;
CREATE TABLE `sku_info` (
  `id` BIGINT,
  `spu_id` BIGINT,
  `price` DECIMAL(10.0),
  `sku_name` STRING,
  `sku_desc` STRING,
  `weight` DECIMAL(10.2),
  `tm_id` BIGINT,
  `category3_id` BIGINT,
  `sku_default_img` STRING,
  `create_time` TIMESTAMP(0),
   PRIMARY KEY (tm_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc'.'url' = 'jdbc:mysql://kms-1:3306/dim'.'table-name' = 'sku_info'.-- Insert data into the MySQL table
    'driver' = 'com.mysql.jdbc.Driver'.'username' = 'root'.'password' = '123qwe'.'sink.buffer-flush.interval' = '1s'
);

-- -------------------------
- goods
-- MySQL Sink Load Data
-- ------------------------- 
INSERT INTO sku_info
SELECT *
FROM ods_sku_info;

Copy the code

After the above steps, we can create commodity dimension table basic table synchronization to MySQL, also need to create the corresponding table in advance. Next we use the base table above to create a view in mySQL’s DIM library: DIM_sku_info, which will be used as a dimension table for future use.

-- ---------------------------------
-- DIM layer, merchandise dimension table,
Create a view in MySQL
-- ---------------------------------
CREATE VIEW dim_sku_info AS
SELECT
  si.id AS id,
  si.sku_name AS sku_name,
  si.category3_id AS c3_id,
  si.weight AS weight,
  si.tm_id AS tm_id,
  si.price AS price,
  si.spu_id AS spu_id,
  c3.name AS c3_name,
  c2.id AS c2_id,
  c2.name AS c2_name,
  c3.id AS c1_id,
  c3.name AS c1_name
FROM
(
  sku_info si 
  JOIN base_category3 c3 ON si.category3_id = c3.id
  JOIN base_category2 c2 ON c3.category2_id =c2.id
  JOIN base_category1 c1 ON c2.category1_id = c1.id
);
Copy the code

Now that we have the dimension table data we need, we can start processing the DWD layer data.

DWD layer data processing

After the above steps, we have prepared the dimension table for use. Next, we will process the original data of ODS into a detailed wide table of DWD layer. The specific process is as follows:

-- -------------------------
-- Order Details
-- Kafka Source
-- ------------------------- 

DROP TABLE IF EXISTS `ods_order_detail`;
CREATE TABLE `ods_order_detail`(
  `id` BIGINT,
  `order_id` BIGINT,
  `sku_id` BIGINT,
  `sku_name` STRING,
  `img_url` STRING,
  `order_price` DECIMAL(10.2),
  `sku_num` INT,
  `create_time` TIMESTAMP(0))WITH(
 'connector' = 'kafka'.'topic' = 'mydw.order_detail'.'properties.bootstrap.servers' = 'kms-3:9092'.'properties.group.id' = 'testGroup'.'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset');-- -------------------------
-- Order Information
-- Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_order_info`;
CREATE TABLE `ods_order_info` (
  `id` BIGINT,
  `consignee` STRING,
  `consignee_tel` STRING,
  `total_amount` DECIMAL(10.2),
  `order_status` STRING,
  `user_id` BIGINT,
  `payment_way` STRING,
  `delivery_address` STRING,
  `order_comment` STRING,
  `out_trade_no` STRING,
  `trade_body` STRING,
  `create_time` TIMESTAMP(0) ,
  `operate_time` TIMESTAMP(0) ,
  `expire_time` TIMESTAMP(0) ,
  `tracking_no` STRING,
  `parent_order_id` BIGINT,
  `img_url` STRING,
  `province_id` INT
) WITH(
'connector' = 'kafka'.'topic' = 'mydw.order_info'.'properties.bootstrap.servers' = 'kms-3:9092'.'properties.group.id' = 'testGroup'.'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset');-- ---------------------------------
-- DWD layer, pay the order detail dwD_PAID_ORDER_detail
-- ---------------------------------
DROP TABLE IF EXISTS dwd_paid_order_detail;
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10.0),
  create_time TIMESTAMP(0),
  pay_time TIMESTAMP(0))WITH (
    'connector' = 'kafka'.'topic' = 'dwd_paid_order_detail'.'scan.startup.mode' = 'earliest-offset'.'properties.bootstrap.servers' = 'kms-3:9092'.'format' = 'changelog-json'
);
-- ---------------------------------
-- DWD layer, list of paid orders
Load data to dwD_PAID_order_detail
-- ---------------------------------
INSERT INTO dwd_paid_order_detail
SELECT
  od.id,
  oi.id order_id,
  oi.user_id,
  oi.province_id,
  od.sku_id,
  od.sku_name,
  od.sku_num,
  od.order_price,
  oi.create_time,
  oi.operate_time
FROM
    (
    SELECT * 
    FROM ods_order_info
    WHERE order_status = '2' Paid -
    ) oi JOIN
    (
    SELECT *
    FROM ods_order_detail
    ) od 
    ON oi.id = od.order_id;
Copy the code

ADS layer data

After the above steps, we created a dwD_PAID_ORDER_detail detail wide table and stored it in Kafka. Next, we will use this detailed wide table to JOIN with dimension table to obtain our ADS application layer data.

  • ads_province_index

MySQL > create ADS target table ads_province_index

CREATE TABLE ads.ads_province_index(
  province_id INT(10),
  area_code VARCHAR(100),
  province_name VARCHAR(100),
  region_id INT(10),
  region_name VARCHAR(100),
  order_amount DECIMAL(10.2),
  order_count BIGINT(10),
  dt VARCHAR(100),
  PRIMARY KEY (province_id, dt) 
) ;
Copy the code

MySQL > alter table ADS;

-- Flink SQL Cli operation
-- ---------------------------------
MySQL > create ADS layer table with DDL
-- Index: 1. Order number per province per day
-- 2. Order amount per province per day
-- ---------------------------------
CREATE TABLE ads_province_index(
  province_id INT,
  area_code STRING,
  province_name STRING,
  region_id INT,
  region_name STRING,
  order_amount DECIMAL(10.2),
  order_count BIGINT,
  dt STRING,
  PRIMARY KEY (province_id, dt) NOT ENFORCED  
) WITH (
    'connector' = 'jdbc'.'url' = 'jdbc:mysql://kms-1:3306/ads'.'table-name' = 'ads_province_index'.'driver' = 'com.mysql.jdbc.Driver'.'username' = 'root'.'password' = '123qwe'
);
-- ---------------------------------
Dwd_paid_order_detail Wide table of paid order details
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10.2),
  create_time STRING,
  pay_time STRING
 ) WITH (
    'connector' = 'kafka'.'topic' = 'dwd_paid_order_detail'.'scan.startup.mode' = 'earliest-offset'.'properties.bootstrap.servers' = 'kms-3:9092'.'format' = 'changelog-json'
);

-- ---------------------------------
-- tmp_province_index
-- Order summary temporary table
-- ---------------------------------
CREATE TABLE tmp_province_index(
    province_id INT,
    order_count BIGINT.- orders
    order_amount DECIMAL(10.2), -- Order Amount
    pay_date DATE
)WITH (
    'connector' = 'kafka'.'topic' = 'tmp_province_index'.'scan.startup.mode' = 'earliest-offset'.'properties.bootstrap.servers' = 'kms-3:9092'.'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_province_index
Order summary temporary table data load
-- ---------------------------------
INSERT INTO tmp_province_index
SELECT
      province_id,
      count(distinct order_id) order_count,- orders
      sum(order_price * sku_num) order_amount, -- Order Amount
      TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY province_id,TO_DATE(pay_time,'yyyy-MM-dd');-- ---------------------------------
-- tmp_province_index_source
-- Use the temporary summary table as the data source
-- ---------------------------------
CREATE TABLE tmp_province_index_source(
    province_id INT,
    order_count BIGINT.- orders
    order_amount DECIMAL(10.2), -- Order Amount
    pay_date DATE,
    proctime as PROCTIME()   Generate a processing time column by calculating the column
 ) WITH (
    'connector' = 'kafka'.'topic' = 'tmp_province_index'.'scan.startup.mode' = 'earliest-offset'.'properties.bootstrap.servers' = 'kms-3:9092'.'format' = 'changelog-json'
);

-- ---------------------------------
-- DIM layer, region dimension table,
Create a regional dimension table data source
-- ---------------------------------
DROP TABLE IF EXISTS `dim_province`;
CREATE TABLE dim_province (
  province_id INT,
  province_name STRING,
  area_code STRING,
  region_id INT,
  region_name STRING ,
  PRIMARY KEY (province_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc'.'url' = 'jdbc:mysql://kms-1:3306/dim'.'table-name' = 'dim_province'.'driver' = 'com.mysql.jdbc.Driver'.'username' = 'root'.'password' = '123qwe'.'scan.fetch-size' = '100'
);

-- ---------------------------------
-- Load data to ads_province_index
- dimension table JOIN
-- ---------------------------------

INSERT INTO ads_province_index
SELECT
  pc.province_id,
  dp.area_code,
  dp.province_name,
  dp.region_id,
  dp.region_name,
  pc.order_amount,
  pc.order_count,
  cast(pc.pay_date as VARCHAR)
FROM
tmp_province_index_source pc
  JOIN dim_province FOR SYSTEM_TIME AS OF pc.proctime as dp 
  ON dp.province_id = pc.province_id;
Copy the code

After submitting the task: Observe the Flink WEB UI:

Ads_province_index = ads_province_index = ads_province_index

  • ads_sku_index

MySQL > create ADS target table ads_sku_index

CREATE TABLE ads_sku_index
(
  sku_id BIGINT(10),
  sku_name VARCHAR(100),
  weight DOUBLE,
  tm_id BIGINT(10),
  price DOUBLE,
  spu_id BIGINT(10),
  c3_id BIGINT(10),
  c3_name VARCHAR(100) ,
  c2_id BIGINT(10),
  c2_name VARCHAR(100),
  c1_id BIGINT(10),
  c1_name VARCHAR(100),
  order_amount DOUBLE,
  order_count BIGINT(10),
  sku_count BIGINT(10),
  dt varchar(100),
  PRIMARY KEY (sku_id,dt)
);
Copy the code

MySQL > alter table ADS;

-- ---------------------------------
MySQL > create ADS layer table with DDL
Indicator: 1. Number of orders for each item per day
-- 2. Order amount for each item per day
-- 3. Quantity per item per day
-- ---------------------------------
CREATE TABLE ads_sku_index
(
  sku_id BIGINT,
  sku_name VARCHAR,
  weight DOUBLE,
  tm_id BIGINT,
  price DOUBLE,
  spu_id BIGINT,
  c3_id BIGINT,
  c3_name VARCHAR ,
  c2_id BIGINT,
  c2_name VARCHAR,
  c1_id BIGINT,
  c1_name VARCHAR,
  order_amount DOUBLE,
  order_count BIGINT,
  sku_count BIGINT,
  dt varchar.PRIMARY KEY (sku_id,dt) NOT ENFORCED
) WITH (
    'connector' = 'jdbc'.'url' = 'jdbc:mysql://kms-1:3306/ads'.'table-name' = 'ads_sku_index'.'driver' = 'com.mysql.jdbc.Driver'.'username' = 'root'.'password' = '123qwe'
);

-- ---------------------------------
Dwd_paid_order_detail Wide table of paid order details
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10.2),
  create_time STRING,
  pay_time STRING
 ) WITH (
    'connector' = 'kafka'.'topic' = 'dwd_paid_order_detail'.'scan.startup.mode' = 'earliest-offset'.'properties.bootstrap.servers' = 'kms-3:9092'.'format' = 'changelog-json'
);

-- ---------------------------------
-- tmp_sku_index
-- Statistics of commodity indicators
-- ---------------------------------
CREATE TABLE tmp_sku_index(
    sku_id BIGINT,
    order_count BIGINT.- orders
    order_amount DECIMAL(10.2), -- Order Amount
	order_sku_num BIGINT,
    pay_date DATE
)WITH (
    'connector' = 'kafka'.'topic' = 'tmp_sku_index'.'scan.startup.mode' = 'earliest-offset'.'properties.bootstrap.servers' = 'kms-3:9092'.'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- Data loading
-- ---------------------------------
INSERT INTO tmp_sku_index
SELECT
      sku_id,
      count(distinct order_id) order_count,- orders
      sum(order_price * sku_num) order_amount, -- Order Amount
	  sum(sku_num) order_sku_num,
      TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY sku_id,TO_DATE(pay_time,'yyyy-MM-dd');-- ---------------------------------
-- tmp_sku_index_source
-- Use the temporary summary table as the data source
-- ---------------------------------
CREATE TABLE tmp_sku_index_source(
    sku_id BIGINT,
    order_count BIGINT.- orders
    order_amount DECIMAL(10.2), -- Order Amount
    order_sku_num BIGINT,
    pay_date DATE,
    proctime as PROCTIME()   Generate a processing time column by calculating the column
 ) WITH (
    'connector' = 'kafka'.'topic' = 'tmp_sku_index'.'scan.startup.mode' = 'earliest-offset'.'properties.bootstrap.servers' = 'kms-3:9092'.'format' = 'changelog-json'
);
-- ---------------------------------
-- DIM layer, merchandise dimension table,
Create the commodity dimension table data source
-- ---------------------------------
DROP TABLE IF EXISTS `dim_sku_info`;
CREATE TABLE dim_sku_info (
  id BIGINT,
  sku_name STRING,
  c3_id BIGINT,
  weight DECIMAL(10.2),
  tm_id BIGINT,
  price DECIMAL(10.2),
  spu_id BIGINT,
  c3_name STRING,
  c2_id BIGINT,
  c2_name STRING,
  c1_id BIGINT,
  c1_name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc'.'url' = 'jdbc:mysql://kms-1:3306/dim'.'table-name' = 'dim_sku_info'.'driver' = 'com.mysql.jdbc.Driver'.'username' = 'root'.'password' = '123qwe'.'scan.fetch-size' = '100'
);
-- ---------------------------------
-- Load data to ads_sku_index
- dimension table JOIN
-- ---------------------------------
INSERT INTO ads_sku_index
SELECT
  sku_id ,
  sku_name ,
  weight ,
  tm_id ,
  price ,
  spu_id ,
  c3_id ,
  c3_name,
  c2_id ,
  c2_name ,
  c1_id ,
  c1_name ,
  sc.order_amount,
  sc.order_count ,
  sc.order_sku_num ,
  cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_source sc 
  JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds
  ON ds.id = sc.sku_id
  ;
Copy the code

After submitting the task: Observe the Flink WEB UI:

Select ads_SKu_index from ADS layer;

FineBI results display

Other Points for attention

Bugs in Flink1.11.0

When Flink1.11.0 is used in the code, if a change-log data source is inserted into a upsert sink, the following exception will be reported:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])Copy the code

The bug has now been fixed and the fix is available in Blink 1.11.1.

conclusion

This article mainly shares the construction of a real-time data warehouse demo case, through this article can understand the real-time data warehouse data processing process, on this basis, Flink SQL CDC will have a deeper understanding. In addition, this article provides a very detailed use case, you can directly start the operation, in practice to explore the real-time warehouse construction process.

The public account “Big Data Technology and Data Warehouse”, reply to “information” to receive the big data data package